1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
13 from psycopg import sql as pysql
15 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..'/ 'src').resolve()))
17 from nominatim_db import cli
18 from nominatim_db.config import Configuration
19 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
20 from nominatim_db.tools import refresh
21 from nominatim_db.tokenizer import factory as tokenizer_factory
22 from steps.utils import run_script
24 class NominatimEnvironment:
25 """ Collects all functions for the execution of Nominatim functions.
28 def __init__(self, config):
29 self.build_dir = Path(config['BUILDDIR']).resolve()
30 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
31 self.db_host = config['DB_HOST']
32 self.db_port = config['DB_PORT']
33 self.db_user = config['DB_USER']
34 self.db_pass = config['DB_PASS']
35 self.template_db = config['TEMPLATE_DB']
36 self.test_db = config['TEST_DB']
37 self.api_test_db = config['API_TEST_DB']
38 self.api_test_file = config['API_TEST_FILE']
39 self.tokenizer = config['TOKENIZER']
40 self.import_style = config['STYLE']
41 self.server_module_path = config['SERVER_MODULE_PATH']
42 self.reuse_template = not config['REMOVE_TEMPLATE']
43 self.keep_scenario_db = config['KEEP_TEST_DB']
44 self.code_coverage_path = config['PHPCOV']
45 self.code_coverage_id = 1
47 self.default_config = Configuration(None).get_os_env()
49 self.template_db_done = False
50 self.api_db_done = False
51 self.website_dir = None
53 self.api_engine = None
54 if config['API_ENGINE'] != 'php':
55 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
56 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
57 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
59 def connect_database(self, dbname):
60 """ Return a connection to the database with the given name.
61 Uses configured host, user and port.
63 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
65 dbargs['host'] = self.db_host
67 dbargs['port'] = self.db_port
69 dbargs['user'] = self.db_user
71 dbargs['password'] = self.db_pass
72 return psycopg.connect(**dbargs)
74 def next_code_coverage_file(self):
75 """ Generate the next name for a coverage file.
77 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
78 self.code_coverage_id += 1
82 def write_nominatim_config(self, dbname):
83 """ Set up a custom test configuration that connects to the given
84 database. This sets up the environment variables so that they can
85 be picked up by dotenv and creates a project directory with the
86 appropriate website scripts.
88 if dbname.startswith('sqlite:'):
89 dsn = 'sqlite:dbname={}'.format(dbname[7:])
91 dsn = 'pgsql:dbname={}'.format(dbname)
93 dsn += ';host=' + self.db_host
95 dsn += ';port=' + self.db_port
97 dsn += ';user=' + self.db_user
99 dsn += ';password=' + self.db_pass
101 self.test_env = dict(self.default_config)
102 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
103 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
104 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
105 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
106 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
107 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
108 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
109 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
110 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
111 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
112 if self.tokenizer is not None:
113 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
114 if self.import_style is not None:
115 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
117 if self.server_module_path:
118 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
120 # avoid module being copied into the temporary environment
121 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
123 if self.website_dir is not None:
124 self.website_dir.cleanup()
126 self.website_dir = tempfile.TemporaryDirectory()
129 conn = self.connect_database(dbname)
132 refresh.setup_website(Path(self.website_dir.name) / 'website',
133 self.get_test_config(), conn)
138 def get_test_config(self):
139 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
140 cfg.set_libdirs(module=self.build_dir / 'module',
141 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
144 def get_libpq_dsn(self):
145 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
147 def quote_param(param):
148 key, val = param.split('=')
149 val = val.replace('\\', '\\\\').replace("'", "\\'")
151 val = "'" + val + "'"
152 return key + '=' + val
154 if dsn.startswith('pgsql:'):
155 # Old PHP DSN format. Convert before returning.
156 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
161 def db_drop_database(self, name):
162 """ Drop the database with the given name.
164 with self.connect_database('postgres') as conn:
165 conn.autocommit = True
166 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
167 + pysql.Identifier(name))
169 def setup_template_db(self):
170 """ Setup a template database that already contains common test data.
171 Having a template database speeds up tests considerably but at
172 the price that the tests sometimes run with stale data.
174 if self.template_db_done:
177 self.template_db_done = True
179 self.write_nominatim_config(self.template_db)
181 if not self._reuse_or_drop_db(self.template_db):
183 # execute nominatim import on an empty file to get the right tables
184 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
185 fd.write(b'<osm version="0.6"></osm>')
187 self.run_nominatim('import', '--osm-file', fd.name,
188 '--osm2pgsql-cache', '1',
190 '--offline', '--index-noanalyse')
192 self.db_drop_database(self.template_db)
195 self.run_nominatim('refresh', '--functions')
198 def setup_api_db(self):
199 """ Setup a test against the API test database.
201 self.write_nominatim_config(self.api_test_db)
203 if self.api_test_db.startswith('sqlite:'):
206 if not self.api_db_done:
207 self.api_db_done = True
209 if not self._reuse_or_drop_db(self.api_test_db):
210 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
211 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
212 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
213 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
216 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
217 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
218 self.run_nominatim('freeze')
220 if self.tokenizer == 'legacy':
221 phrase_file = str(testdata / 'specialphrases_testdb.sql')
222 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
224 csv_path = str(testdata / 'full_en_phrases_test.csv')
225 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
227 self.db_drop_database(self.api_test_db)
230 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
233 def setup_unknown_db(self):
234 """ Setup a test against a non-existing database.
236 # The tokenizer needs an existing database to function.
237 # So start with the usual database
242 self.setup_db(context)
243 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
245 # Then drop the DB again
246 self.teardown_db(context, force_drop=True)
248 def setup_db(self, context):
249 """ Setup a test against a fresh, empty test database.
251 self.setup_template_db()
252 with self.connect_database(self.template_db) as conn:
253 conn.autocommit = True
254 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
255 + pysql.Identifier(self.test_db))
256 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
257 pysql.Identifier(self.test_db),
258 pysql.Identifier(self.template_db)))
260 self.write_nominatim_config(self.test_db)
261 context.db = self.connect_database(self.test_db)
262 context.db.autocommit = True
263 register_hstore(context.db)
265 def teardown_db(self, context, force_drop=False):
266 """ Remove the test database, if it exists.
268 if hasattr(context, 'db'):
271 if force_drop or not self.keep_scenario_db:
272 self.db_drop_database(self.test_db)
274 def _reuse_or_drop_db(self, name):
275 """ Check for the existence of the given DB. If reuse is enabled,
276 then the function checks for existnce and returns True if the
277 database is already there. Otherwise an existing database is
278 dropped and always false returned.
280 if self.reuse_template:
281 with self.connect_database('postgres') as conn:
282 num = execute_scalar(conn,
283 'select count(*) from pg_database where datname = %s',
288 self.db_drop_database(name)
293 def reindex_placex(self, db):
294 """ Run the indexing step until all data in the placex has
295 been processed. Indexing during updates can produce more data
296 to index under some circumstances. That is why indexing may have
297 to be run multiple times.
299 self.run_nominatim('index')
302 def run_nominatim(self, *cmdline):
303 """ Run the nominatim command-line tool via the library.
305 if self.website_dir is not None:
306 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
308 cli.nominatim(module_dir='',
309 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
311 environ=self.test_env)
314 def copy_from_place(self, db):
315 """ Copy data from place to the placex and location_property_osmline
316 tables invoking the appropriate triggers.
318 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
320 with db.cursor() as cur:
321 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
322 name, admin_level, address,
324 SELECT osm_type, osm_id, class, type,
325 name, admin_level, address,
328 WHERE not (class='place' and type='houses' and osm_type='W')""")
329 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
330 SELECT osm_id, address, geometry
332 WHERE class='place' and type='houses'
334 and ST_GeometryType(geometry) = 'ST_LineString'""")
337 def create_api_request_func_starlette(self):
338 import nominatim_api.server.starlette.server
339 from asgi_lifespan import LifespanManager
342 async def _request(endpoint, params, project_dir, environ, http_headers):
343 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
345 async with LifespanManager(app):
346 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
347 response = await client.get(f"/{endpoint}", params=params,
348 headers=http_headers)
350 return response.text, response.status_code
355 def create_api_request_func_falcon(self):
356 import nominatim_api.server.falcon.server
357 import falcon.testing
359 async def _request(endpoint, params, project_dir, environ, http_headers):
360 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
362 async with falcon.testing.ASGIConductor(app) as conductor:
363 response = await conductor.get(f"/{endpoint}", params=params,
364 headers=http_headers)
366 return response.text, response.status_code