1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from asgi_lifespan import LifespanManager
16 import psycopg2.extras
18 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
20 from nominatim import cli
21 from nominatim.config import Configuration
22 from nominatim.db.connection import Connection
23 from nominatim.tools import refresh
24 from nominatim.tokenizer import factory as tokenizer_factory
25 from steps.utils import run_script
27 class NominatimEnvironment:
28 """ Collects all functions for the execution of Nominatim functions.
31 def __init__(self, config):
32 self.build_dir = Path(config['BUILDDIR']).resolve()
33 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
34 self.db_host = config['DB_HOST']
35 self.db_port = config['DB_PORT']
36 self.db_user = config['DB_USER']
37 self.db_pass = config['DB_PASS']
38 self.template_db = config['TEMPLATE_DB']
39 self.test_db = config['TEST_DB']
40 self.api_test_db = config['API_TEST_DB']
41 self.api_test_file = config['API_TEST_FILE']
42 self.tokenizer = config['TOKENIZER']
43 self.import_style = config['STYLE']
44 self.server_module_path = config['SERVER_MODULE_PATH']
45 self.reuse_template = not config['REMOVE_TEMPLATE']
46 self.keep_scenario_db = config['KEEP_TEST_DB']
47 self.code_coverage_path = config['PHPCOV']
48 self.code_coverage_id = 1
50 self.default_config = Configuration(None).get_os_env()
52 self.template_db_done = False
53 self.api_db_done = False
54 self.website_dir = None
56 self.api_engine = None
57 if config['API_ENGINE'] != 'php':
58 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
59 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
60 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
62 def connect_database(self, dbname):
63 """ Return a connection to the database with the given name.
64 Uses configured host, user and port.
66 dbargs = {'database': dbname}
68 dbargs['host'] = self.db_host
70 dbargs['port'] = self.db_port
72 dbargs['user'] = self.db_user
74 dbargs['password'] = self.db_pass
75 conn = psycopg2.connect(connection_factory=Connection, **dbargs)
78 def next_code_coverage_file(self):
79 """ Generate the next name for a coverage file.
81 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
82 self.code_coverage_id += 1
86 def write_nominatim_config(self, dbname):
87 """ Set up a custom test configuration that connects to the given
88 database. This sets up the environment variables so that they can
89 be picked up by dotenv and creates a project directory with the
90 appropriate website scripts.
92 dsn = 'pgsql:dbname={}'.format(dbname)
94 dsn += ';host=' + self.db_host
96 dsn += ';port=' + self.db_port
98 dsn += ';user=' + self.db_user
100 dsn += ';password=' + self.db_pass
102 self.test_env = dict(self.default_config)
103 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
104 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
105 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
106 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
107 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
108 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
109 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
110 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
111 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
112 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
113 if self.tokenizer is not None:
114 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
115 if self.import_style is not None:
116 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
118 if self.server_module_path:
119 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
121 # avoid module being copied into the temporary environment
122 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
124 if self.website_dir is not None:
125 self.website_dir.cleanup()
127 self.website_dir = tempfile.TemporaryDirectory()
130 conn = self.connect_database(dbname)
133 refresh.setup_website(Path(self.website_dir.name) / 'website',
134 self.get_test_config(), conn)
137 def get_test_config(self):
138 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
139 cfg.set_libdirs(module=self.build_dir / 'module',
140 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
143 def get_libpq_dsn(self):
144 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
146 def quote_param(param):
147 key, val = param.split('=')
148 val = val.replace('\\', '\\\\').replace("'", "\\'")
150 val = "'" + val + "'"
151 return key + '=' + val
153 if dsn.startswith('pgsql:'):
154 # Old PHP DSN format. Convert before returning.
155 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
160 def db_drop_database(self, name):
161 """ Drop the database with the given name.
163 conn = self.connect_database('postgres')
164 conn.set_isolation_level(0)
166 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
169 def setup_template_db(self):
170 """ Setup a template database that already contains common test data.
171 Having a template database speeds up tests considerably but at
172 the price that the tests sometimes run with stale data.
174 if self.template_db_done:
177 self.template_db_done = True
179 self.write_nominatim_config(self.template_db)
181 if not self._reuse_or_drop_db(self.template_db):
183 # execute nominatim import on an empty file to get the right tables
184 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
185 fd.write(b'<osm version="0.6"></osm>')
187 self.run_nominatim('import', '--osm-file', fd.name,
188 '--osm2pgsql-cache', '1',
190 '--offline', '--index-noanalyse')
192 self.db_drop_database(self.template_db)
195 self.run_nominatim('refresh', '--functions')
198 def setup_api_db(self):
199 """ Setup a test against the API test database.
201 self.write_nominatim_config(self.api_test_db)
203 if not self.api_db_done:
204 self.api_db_done = True
206 if not self._reuse_or_drop_db(self.api_test_db):
207 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
208 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
209 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
210 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
213 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
214 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
215 self.run_nominatim('freeze')
217 if self.tokenizer == 'legacy':
218 phrase_file = str(testdata / 'specialphrases_testdb.sql')
219 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
221 csv_path = str(testdata / 'full_en_phrases_test.csv')
222 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
224 self.db_drop_database(self.api_test_db)
227 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
230 def setup_unknown_db(self):
231 """ Setup a test against a non-existing database.
233 # The tokenizer needs an existing database to function.
234 # So start with the usual database
239 self.setup_db(context)
240 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
242 # Then drop the DB again
243 self.teardown_db(context, force_drop=True)
245 def setup_db(self, context):
246 """ Setup a test against a fresh, empty test database.
248 self.setup_template_db()
249 conn = self.connect_database(self.template_db)
250 conn.set_isolation_level(0)
252 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
253 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
255 self.write_nominatim_config(self.test_db)
256 context.db = self.connect_database(self.test_db)
257 context.db.autocommit = True
258 psycopg2.extras.register_hstore(context.db, globally=False)
260 def teardown_db(self, context, force_drop=False):
261 """ Remove the test database, if it exists.
263 if hasattr(context, 'db'):
266 if force_drop or not self.keep_scenario_db:
267 self.db_drop_database(self.test_db)
269 def _reuse_or_drop_db(self, name):
270 """ Check for the existance of the given DB. If reuse is enabled,
271 then the function checks for existance and returns True if the
272 database is already there. Otherwise an existing database is
273 dropped and always false returned.
275 if self.reuse_template:
276 conn = self.connect_database('postgres')
277 with conn.cursor() as cur:
278 cur.execute('select count(*) from pg_database where datname = %s',
280 if cur.fetchone()[0] == 1:
284 self.db_drop_database(name)
288 def reindex_placex(self, db):
289 """ Run the indexing step until all data in the placex has
290 been processed. Indexing during updates can produce more data
291 to index under some circumstances. That is why indexing may have
292 to be run multiple times.
294 with db.cursor() as cur:
296 self.run_nominatim('index')
298 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
299 if cur.rowcount == 0:
302 def run_nominatim(self, *cmdline):
303 """ Run the nominatim command-line tool via the library.
305 if self.website_dir is not None:
306 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
308 cli.nominatim(module_dir='',
309 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
312 environ=self.test_env)
315 def copy_from_place(self, db):
316 """ Copy data from place to the placex and location_property_osmline
317 tables invoking the appropriate triggers.
319 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
321 with db.cursor() as cur:
322 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
323 name, admin_level, address,
325 SELECT osm_type, osm_id, class, type,
326 name, admin_level, address,
329 WHERE not (class='place' and type='houses' and osm_type='W')""")
330 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
331 SELECT osm_id, address, geometry
333 WHERE class='place' and type='houses'
335 and ST_GeometryType(geometry) = 'ST_LineString'""")
338 def create_api_request_func_starlette(self):
339 import nominatim.server.starlette.server
341 async def _request(endpoint, params, project_dir, environ):
342 app = nominatim.server.starlette.server.get_application(project_dir, environ)
344 async with LifespanManager(app):
345 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
346 response = await client.get(f"/{endpoint}", params=params)
348 return response.text, response.status_code
353 def create_api_request_func_sanic(self):
354 import nominatim.server.sanic.server
356 async def _request(endpoint, params, project_dir, environ):
357 app = nominatim.server.sanic.server.get_application(project_dir, environ)
359 _, response = await app.asgi_client.get(f"/{endpoint}", params=params)
361 return response.text, response.status_code
366 def create_api_request_func_falcon(self):
367 import nominatim.server.falcon.server
368 import falcon.testing
370 async def _request(endpoint, params, project_dir, environ):
371 app = nominatim.server.falcon.server.get_application(project_dir, environ)
373 async with falcon.testing.ASGIConductor(app) as conductor:
374 response = await conductor.get(f"/{endpoint}", params=params)
376 return response.text, response.status_code