1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.tools import refresh
13 from steps.utils import run_script
15 class NominatimEnvironment:
16 """ Collects all functions for the execution of Nominatim functions.
19 def __init__(self, config):
20 self.build_dir = Path(config['BUILDDIR']).resolve()
21 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
22 self.db_host = config['DB_HOST']
23 self.db_port = config['DB_PORT']
24 self.db_user = config['DB_USER']
25 self.db_pass = config['DB_PASS']
26 self.template_db = config['TEMPLATE_DB']
27 self.test_db = config['TEST_DB']
28 self.api_test_db = config['API_TEST_DB']
29 self.api_test_file = config['API_TEST_FILE']
30 self.server_module_path = config['SERVER_MODULE_PATH']
31 self.reuse_template = not config['REMOVE_TEMPLATE']
32 self.keep_scenario_db = config['KEEP_TEST_DB']
33 self.code_coverage_path = config['PHPCOV']
34 self.code_coverage_id = 1
36 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
38 self.template_db_done = False
39 self.api_db_done = False
40 self.website_dir = None
42 def connect_database(self, dbname):
43 """ Return a connection to the database with the given name.
44 Uses configured host, user and port.
46 dbargs = {'database': dbname}
48 dbargs['host'] = self.db_host
50 dbargs['port'] = self.db_port
52 dbargs['user'] = self.db_user
54 dbargs['password'] = self.db_pass
55 conn = psycopg2.connect(**dbargs)
58 def next_code_coverage_file(self):
59 """ Generate the next name for a coverage file.
61 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
62 self.code_coverage_id += 1
66 def write_nominatim_config(self, dbname):
67 """ Set up a custom test configuration that connects to the given
68 database. This sets up the environment variables so that they can
69 be picked up by dotenv and creates a project directory with the
70 appropriate website scripts.
72 dsn = 'pgsql:dbname={}'.format(dbname)
74 dsn += ';host=' + self.db_host
76 dsn += ';port=' + self.db_port
78 dsn += ';user=' + self.db_user
80 dsn += ';password=' + self.db_pass
82 if self.website_dir is not None \
83 and self.test_env is not None \
84 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
85 return # environment already set uo
87 self.test_env = dict(self.default_config)
88 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
89 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
90 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
91 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
92 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
93 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
94 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
95 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
96 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
97 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
99 if self.server_module_path:
100 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
102 # avoid module being copied into the temporary environment
103 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
105 if self.website_dir is not None:
106 self.website_dir.cleanup()
108 self.website_dir = tempfile.TemporaryDirectory()
109 cfg = Configuration(None, self.src_dir / 'settings', environ=self.test_env)
110 refresh.setup_website(Path(self.website_dir.name) / 'website', self.src_dir / 'lib-php', cfg)
113 def db_drop_database(self, name):
114 """ Drop the database with the given name.
116 conn = self.connect_database('postgres')
117 conn.set_isolation_level(0)
119 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
122 def setup_template_db(self):
123 """ Setup a template database that already contains common test data.
124 Having a template database speeds up tests considerably but at
125 the price that the tests sometimes run with stale data.
127 if self.template_db_done:
130 self.template_db_done = True
132 if self._reuse_or_drop_db(self.template_db):
136 # call the first part of database setup
137 self.write_nominatim_config(self.template_db)
138 self.run_setup_script('create-db', 'setup-db')
139 # remove external data to speed up indexing for tests
140 conn = self.connect_database(self.template_db)
142 cur.execute("""select tablename from pg_tables
143 where tablename in ('gb_postcode', 'us_postcode')""")
145 conn.cursor().execute('TRUNCATE TABLE {}'.format(t[0]))
149 # execute osm2pgsql import on an empty file to get the right tables
150 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
151 fd.write(b'<osm version="0.6"></osm>')
153 self.run_setup_script('import-data',
157 'create-partition-tables',
158 'create-partition-functions',
160 'create-search-indices',
162 osm2pgsql_cache='200')
164 self.db_drop_database(self.template_db)
168 def setup_api_db(self):
169 """ Setup a test against the API test database.
171 self.write_nominatim_config(self.api_test_db)
176 self.api_db_done = True
178 if self._reuse_or_drop_db(self.api_test_db):
181 testdata = Path('__file__') / '..' / '..' / 'testdb'
182 self.test_env['NOMINATIM_TIGER_DATA_PATH'] = str((testdata / 'tiger').resolve())
183 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
186 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
187 self.run_setup_script('import-tiger-data')
188 self.run_nominatim('freeze')
190 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
191 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
193 self.db_drop_database(self.api_test_db)
197 def setup_unknown_db(self):
198 """ Setup a test against a non-existing database.
200 self.write_nominatim_config('UNKNOWN_DATABASE_NAME')
202 def setup_db(self, context):
203 """ Setup a test against a fresh, empty test database.
205 self.setup_template_db()
206 self.write_nominatim_config(self.test_db)
207 conn = self.connect_database(self.template_db)
208 conn.set_isolation_level(0)
210 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
211 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
213 context.db = self.connect_database(self.test_db)
214 context.db.autocommit = True
215 psycopg2.extras.register_hstore(context.db, globally=False)
217 def teardown_db(self, context):
218 """ Remove the test database, if it exists.
223 if not self.keep_scenario_db:
224 self.db_drop_database(self.test_db)
226 def _reuse_or_drop_db(self, name):
227 """ Check for the existance of the given DB. If reuse is enabled,
228 then the function checks for existance and returns True if the
229 database is already there. Otherwise an existing database is
230 dropped and always false returned.
232 if self.reuse_template:
233 conn = self.connect_database('postgres')
234 with conn.cursor() as cur:
235 cur.execute('select count(*) from pg_database where datname = %s',
237 if cur.fetchone()[0] == 1:
241 self.db_drop_database(name)
245 def reindex_placex(self, db):
246 """ Run the indexing step until all data in the placex has
247 been processed. Indexing during updates can produce more data
248 to index under some circumstances. That is why indexing may have
249 to be run multiple times.
251 with db.cursor() as cur:
253 self.run_nominatim('index')
255 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
256 if cur.rowcount == 0:
259 def run_nominatim(self, *cmdline):
260 """ Run the nominatim command-line tool via the library.
262 cli.nominatim(module_dir='',
263 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
264 phplib_dir=str(self.src_dir / 'lib-php'),
265 sqllib_dir=str(self.src_dir / 'lib-sql'),
266 data_dir=str(self.src_dir / 'data'),
267 config_dir=str(self.src_dir / 'settings'),
270 environ=self.test_env)
272 def run_setup_script(self, *args, **kwargs):
273 """ Run the Nominatim setup script with the given arguments.
275 self.run_nominatim_script('setup', *args, **kwargs)
277 def run_update_script(self, *args, **kwargs):
278 """ Run the Nominatim update script with the given arguments.
280 self.run_nominatim_script('update', *args, **kwargs)
282 def run_nominatim_script(self, script, *args, **kwargs):
283 """ Run one of the Nominatim utility scripts with the given arguments.
285 cmd = ['/usr/bin/env', 'php', '-Cq']
286 cmd.append((Path(self.src_dir) / 'lib-php' / 'admin' / '{}.php'.format(script)).resolve())
287 cmd.extend(['--' + x for x in args])
288 for k, v in kwargs.items():
289 cmd.extend(('--' + k.replace('_', '-'), str(v)))
291 if self.website_dir is not None:
292 cwd = self.website_dir.name
296 run_script(cmd, cwd=cwd, env=self.test_env)
298 def copy_from_place(self, db):
299 """ Copy data from place to the placex and location_property_osmline
300 tables invoking the appropriate triggers.
302 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
304 with db.cursor() as cur:
305 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
306 name, admin_level, address,
308 SELECT osm_type, osm_id, class, type,
309 name, admin_level, address,
312 WHERE not (class='place' and type='houses' and osm_type='W')""")
313 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
314 SELECT osm_id, address, geometry
316 WHERE class='place' and type='houses'
318 and ST_GeometryType(geometry) = 'ST_LineString'""")