1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
13 import psycopg2.extras
15 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
17 from nominatim import cli
18 from nominatim.config import Configuration
19 from nominatim.db.connection import Connection
20 from nominatim.tools import refresh
21 from nominatim.tokenizer import factory as tokenizer_factory
22 from steps.utils import run_script
24 class NominatimEnvironment:
25 """ Collects all functions for the execution of Nominatim functions.
28 def __init__(self, config):
29 self.build_dir = Path(config['BUILDDIR']).resolve()
30 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
31 self.db_host = config['DB_HOST']
32 self.db_port = config['DB_PORT']
33 self.db_user = config['DB_USER']
34 self.db_pass = config['DB_PASS']
35 self.template_db = config['TEMPLATE_DB']
36 self.test_db = config['TEST_DB']
37 self.api_test_db = config['API_TEST_DB']
38 self.api_test_file = config['API_TEST_FILE']
39 self.tokenizer = config['TOKENIZER']
40 self.import_style = config['STYLE']
41 self.server_module_path = config['SERVER_MODULE_PATH']
42 self.reuse_template = not config['REMOVE_TEMPLATE']
43 self.keep_scenario_db = config['KEEP_TEST_DB']
44 self.code_coverage_path = config['PHPCOV']
45 self.code_coverage_id = 1
47 self.default_config = Configuration(None).get_os_env()
49 self.template_db_done = False
50 self.api_db_done = False
51 self.website_dir = None
53 self.api_engine = None
54 if config['API_ENGINE'] != 'php':
55 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
56 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
57 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
59 def connect_database(self, dbname):
60 """ Return a connection to the database with the given name.
61 Uses configured host, user and port.
63 dbargs = {'database': dbname}
65 dbargs['host'] = self.db_host
67 dbargs['port'] = self.db_port
69 dbargs['user'] = self.db_user
71 dbargs['password'] = self.db_pass
72 conn = psycopg2.connect(connection_factory=Connection, **dbargs)
75 def next_code_coverage_file(self):
76 """ Generate the next name for a coverage file.
78 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
79 self.code_coverage_id += 1
83 def write_nominatim_config(self, dbname):
84 """ Set up a custom test configuration that connects to the given
85 database. This sets up the environment variables so that they can
86 be picked up by dotenv and creates a project directory with the
87 appropriate website scripts.
89 dsn = 'pgsql:dbname={}'.format(dbname)
91 dsn += ';host=' + self.db_host
93 dsn += ';port=' + self.db_port
95 dsn += ';user=' + self.db_user
97 dsn += ';password=' + self.db_pass
99 self.test_env = dict(self.default_config)
100 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
101 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
102 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
103 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
104 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
105 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
106 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
107 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
108 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
109 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
110 if self.tokenizer is not None:
111 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
112 if self.import_style is not None:
113 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
115 if self.server_module_path:
116 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
118 # avoid module being copied into the temporary environment
119 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
121 if self.website_dir is not None:
122 self.website_dir.cleanup()
124 self.website_dir = tempfile.TemporaryDirectory()
127 conn = self.connect_database(dbname)
130 refresh.setup_website(Path(self.website_dir.name) / 'website',
131 self.get_test_config(), conn)
134 def get_test_config(self):
135 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
136 cfg.set_libdirs(module=self.build_dir / 'module',
137 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
140 def get_libpq_dsn(self):
141 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
143 def quote_param(param):
144 key, val = param.split('=')
145 val = val.replace('\\', '\\\\').replace("'", "\\'")
147 val = "'" + val + "'"
148 return key + '=' + val
150 if dsn.startswith('pgsql:'):
151 # Old PHP DSN format. Convert before returning.
152 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
157 def db_drop_database(self, name):
158 """ Drop the database with the given name.
160 conn = self.connect_database('postgres')
161 conn.set_isolation_level(0)
163 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
166 def setup_template_db(self):
167 """ Setup a template database that already contains common test data.
168 Having a template database speeds up tests considerably but at
169 the price that the tests sometimes run with stale data.
171 if self.template_db_done:
174 self.template_db_done = True
176 self.write_nominatim_config(self.template_db)
178 if not self._reuse_or_drop_db(self.template_db):
180 # execute nominatim import on an empty file to get the right tables
181 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
182 fd.write(b'<osm version="0.6"></osm>')
184 self.run_nominatim('import', '--osm-file', fd.name,
185 '--osm2pgsql-cache', '1',
187 '--offline', '--index-noanalyse')
189 self.db_drop_database(self.template_db)
192 self.run_nominatim('refresh', '--functions')
195 def setup_api_db(self):
196 """ Setup a test against the API test database.
198 self.write_nominatim_config(self.api_test_db)
200 if not self.api_db_done:
201 self.api_db_done = True
203 if not self._reuse_or_drop_db(self.api_test_db):
204 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
205 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
206 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
207 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
210 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
211 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
212 self.run_nominatim('freeze')
214 if self.tokenizer == 'legacy':
215 phrase_file = str(testdata / 'specialphrases_testdb.sql')
216 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
218 csv_path = str(testdata / 'full_en_phrases_test.csv')
219 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
221 self.db_drop_database(self.api_test_db)
224 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
227 def setup_unknown_db(self):
228 """ Setup a test against a non-existing database.
230 # The tokenizer needs an existing database to function.
231 # So start with the usual database
236 self.setup_db(context)
237 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
239 # Then drop the DB again
240 self.teardown_db(context, force_drop=True)
242 def setup_db(self, context):
243 """ Setup a test against a fresh, empty test database.
245 self.setup_template_db()
246 conn = self.connect_database(self.template_db)
247 conn.set_isolation_level(0)
249 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
250 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
252 self.write_nominatim_config(self.test_db)
253 context.db = self.connect_database(self.test_db)
254 context.db.autocommit = True
255 psycopg2.extras.register_hstore(context.db, globally=False)
257 def teardown_db(self, context, force_drop=False):
258 """ Remove the test database, if it exists.
260 if hasattr(context, 'db'):
263 if force_drop or not self.keep_scenario_db:
264 self.db_drop_database(self.test_db)
266 def _reuse_or_drop_db(self, name):
267 """ Check for the existance of the given DB. If reuse is enabled,
268 then the function checks for existance and returns True if the
269 database is already there. Otherwise an existing database is
270 dropped and always false returned.
272 if self.reuse_template:
273 conn = self.connect_database('postgres')
274 with conn.cursor() as cur:
275 cur.execute('select count(*) from pg_database where datname = %s',
277 if cur.fetchone()[0] == 1:
281 self.db_drop_database(name)
285 def reindex_placex(self, db):
286 """ Run the indexing step until all data in the placex has
287 been processed. Indexing during updates can produce more data
288 to index under some circumstances. That is why indexing may have
289 to be run multiple times.
291 with db.cursor() as cur:
293 self.run_nominatim('index')
295 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
296 if cur.rowcount == 0:
299 def run_nominatim(self, *cmdline):
300 """ Run the nominatim command-line tool via the library.
302 if self.website_dir is not None:
303 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
305 cli.nominatim(module_dir='',
306 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
309 environ=self.test_env)
312 def copy_from_place(self, db):
313 """ Copy data from place to the placex and location_property_osmline
314 tables invoking the appropriate triggers.
316 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
318 with db.cursor() as cur:
319 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
320 name, admin_level, address,
322 SELECT osm_type, osm_id, class, type,
323 name, admin_level, address,
326 WHERE not (class='place' and type='houses' and osm_type='W')""")
327 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
328 SELECT osm_id, address, geometry
330 WHERE class='place' and type='houses'
332 and ST_GeometryType(geometry) = 'ST_LineString'""")
335 def create_api_request_func_starlette(self):
336 import nominatim.server.starlette.server
337 from asgi_lifespan import LifespanManager
340 async def _request(endpoint, params, project_dir, environ, http_headers):
341 app = nominatim.server.starlette.server.get_application(project_dir, environ)
343 async with LifespanManager(app):
344 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
345 response = await client.get(f"/{endpoint}", params=params,
346 headers=http_headers)
348 return response.text, response.status_code
353 def create_api_request_func_falcon(self):
354 import nominatim.server.falcon.server
355 import falcon.testing
357 async def _request(endpoint, params, project_dir, environ, http_headers):
358 app = nominatim.server.falcon.server.get_application(project_dir, environ)
360 async with falcon.testing.ASGIConductor(app) as conductor:
361 response = await conductor.get(f"/{endpoint}", params=params,
362 headers=http_headers)
364 return response.text, response.status_code