1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
13 import psycopg2.extras
15 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..'/ 'src').resolve()))
17 from nominatim_db import cli
18 from nominatim_core.config import Configuration
19 from nominatim_core.db.connection import Connection
20 from nominatim_db.tools import refresh
21 from nominatim_db.tokenizer import factory as tokenizer_factory
22 from steps.utils import run_script
24 class NominatimEnvironment:
25 """ Collects all functions for the execution of Nominatim functions.
28 def __init__(self, config):
29 self.build_dir = Path(config['BUILDDIR']).resolve()
30 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
31 self.db_host = config['DB_HOST']
32 self.db_port = config['DB_PORT']
33 self.db_user = config['DB_USER']
34 self.db_pass = config['DB_PASS']
35 self.template_db = config['TEMPLATE_DB']
36 self.test_db = config['TEST_DB']
37 self.api_test_db = config['API_TEST_DB']
38 self.api_test_file = config['API_TEST_FILE']
39 self.tokenizer = config['TOKENIZER']
40 self.import_style = config['STYLE']
41 self.server_module_path = config['SERVER_MODULE_PATH']
42 self.reuse_template = not config['REMOVE_TEMPLATE']
43 self.keep_scenario_db = config['KEEP_TEST_DB']
44 self.code_coverage_path = config['PHPCOV']
45 self.code_coverage_id = 1
47 self.default_config = Configuration(None).get_os_env()
49 self.template_db_done = False
50 self.api_db_done = False
51 self.website_dir = None
53 self.api_engine = None
54 if config['API_ENGINE'] != 'php':
55 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
56 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
57 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
59 def connect_database(self, dbname):
60 """ Return a connection to the database with the given name.
61 Uses configured host, user and port.
63 dbargs = {'database': dbname}
65 dbargs['host'] = self.db_host
67 dbargs['port'] = self.db_port
69 dbargs['user'] = self.db_user
71 dbargs['password'] = self.db_pass
72 conn = psycopg2.connect(connection_factory=Connection, **dbargs)
75 def next_code_coverage_file(self):
76 """ Generate the next name for a coverage file.
78 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
79 self.code_coverage_id += 1
83 def write_nominatim_config(self, dbname):
84 """ Set up a custom test configuration that connects to the given
85 database. This sets up the environment variables so that they can
86 be picked up by dotenv and creates a project directory with the
87 appropriate website scripts.
89 if dbname.startswith('sqlite:'):
90 dsn = 'sqlite:dbname={}'.format(dbname[7:])
92 dsn = 'pgsql:dbname={}'.format(dbname)
94 dsn += ';host=' + self.db_host
96 dsn += ';port=' + self.db_port
98 dsn += ';user=' + self.db_user
100 dsn += ';password=' + self.db_pass
102 self.test_env = dict(self.default_config)
103 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
104 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
105 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
106 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
107 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
108 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
109 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
110 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
111 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
112 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
113 if self.tokenizer is not None:
114 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
115 if self.import_style is not None:
116 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
118 if self.server_module_path:
119 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
121 # avoid module being copied into the temporary environment
122 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
124 if self.website_dir is not None:
125 self.website_dir.cleanup()
127 self.website_dir = tempfile.TemporaryDirectory()
130 conn = self.connect_database(dbname)
133 refresh.setup_website(Path(self.website_dir.name) / 'website',
134 self.get_test_config(), conn)
137 def get_test_config(self):
138 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
139 cfg.set_libdirs(module=self.build_dir / 'module',
140 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
143 def get_libpq_dsn(self):
144 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
146 def quote_param(param):
147 key, val = param.split('=')
148 val = val.replace('\\', '\\\\').replace("'", "\\'")
150 val = "'" + val + "'"
151 return key + '=' + val
153 if dsn.startswith('pgsql:'):
154 # Old PHP DSN format. Convert before returning.
155 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
160 def db_drop_database(self, name):
161 """ Drop the database with the given name.
163 conn = self.connect_database('postgres')
164 conn.set_isolation_level(0)
166 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
169 def setup_template_db(self):
170 """ Setup a template database that already contains common test data.
171 Having a template database speeds up tests considerably but at
172 the price that the tests sometimes run with stale data.
174 if self.template_db_done:
177 self.template_db_done = True
179 self.write_nominatim_config(self.template_db)
181 if not self._reuse_or_drop_db(self.template_db):
183 # execute nominatim import on an empty file to get the right tables
184 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
185 fd.write(b'<osm version="0.6"></osm>')
187 self.run_nominatim('import', '--osm-file', fd.name,
188 '--osm2pgsql-cache', '1',
190 '--offline', '--index-noanalyse')
192 self.db_drop_database(self.template_db)
195 self.run_nominatim('refresh', '--functions')
198 def setup_api_db(self):
199 """ Setup a test against the API test database.
201 self.write_nominatim_config(self.api_test_db)
203 if self.api_test_db.startswith('sqlite:'):
206 if not self.api_db_done:
207 self.api_db_done = True
209 if not self._reuse_or_drop_db(self.api_test_db):
210 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
211 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
212 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
213 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
216 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
217 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
218 self.run_nominatim('freeze')
220 if self.tokenizer == 'legacy':
221 phrase_file = str(testdata / 'specialphrases_testdb.sql')
222 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
224 csv_path = str(testdata / 'full_en_phrases_test.csv')
225 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
227 self.db_drop_database(self.api_test_db)
230 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
233 def setup_unknown_db(self):
234 """ Setup a test against a non-existing database.
236 # The tokenizer needs an existing database to function.
237 # So start with the usual database
242 self.setup_db(context)
243 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
245 # Then drop the DB again
246 self.teardown_db(context, force_drop=True)
248 def setup_db(self, context):
249 """ Setup a test against a fresh, empty test database.
251 self.setup_template_db()
252 conn = self.connect_database(self.template_db)
253 conn.set_isolation_level(0)
255 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
256 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
258 self.write_nominatim_config(self.test_db)
259 context.db = self.connect_database(self.test_db)
260 context.db.autocommit = True
261 psycopg2.extras.register_hstore(context.db, globally=False)
263 def teardown_db(self, context, force_drop=False):
264 """ Remove the test database, if it exists.
266 if hasattr(context, 'db'):
269 if force_drop or not self.keep_scenario_db:
270 self.db_drop_database(self.test_db)
272 def _reuse_or_drop_db(self, name):
273 """ Check for the existence of the given DB. If reuse is enabled,
274 then the function checks for existnce and returns True if the
275 database is already there. Otherwise an existing database is
276 dropped and always false returned.
278 if self.reuse_template:
279 conn = self.connect_database('postgres')
280 with conn.cursor() as cur:
281 cur.execute('select count(*) from pg_database where datname = %s',
283 if cur.fetchone()[0] == 1:
287 self.db_drop_database(name)
291 def reindex_placex(self, db):
292 """ Run the indexing step until all data in the placex has
293 been processed. Indexing during updates can produce more data
294 to index under some circumstances. That is why indexing may have
295 to be run multiple times.
297 with db.cursor() as cur:
299 self.run_nominatim('index')
301 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
302 if cur.rowcount == 0:
305 def run_nominatim(self, *cmdline):
306 """ Run the nominatim command-line tool via the library.
308 if self.website_dir is not None:
309 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
311 cli.nominatim(module_dir='',
312 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
314 environ=self.test_env)
317 def copy_from_place(self, db):
318 """ Copy data from place to the placex and location_property_osmline
319 tables invoking the appropriate triggers.
321 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
323 with db.cursor() as cur:
324 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
325 name, admin_level, address,
327 SELECT osm_type, osm_id, class, type,
328 name, admin_level, address,
331 WHERE not (class='place' and type='houses' and osm_type='W')""")
332 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
333 SELECT osm_id, address, geometry
335 WHERE class='place' and type='houses'
337 and ST_GeometryType(geometry) = 'ST_LineString'""")
340 def create_api_request_func_starlette(self):
341 import nominatim_api.server.starlette.server
342 from asgi_lifespan import LifespanManager
345 async def _request(endpoint, params, project_dir, environ, http_headers):
346 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
348 async with LifespanManager(app):
349 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
350 response = await client.get(f"/{endpoint}", params=params,
351 headers=http_headers)
353 return response.text, response.status_code
358 def create_api_request_func_falcon(self):
359 import nominatim_api.server.falcon.server
360 import falcon.testing
362 async def _request(endpoint, params, project_dir, environ, http_headers):
363 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
365 async with falcon.testing.ASGIConductor(app) as conductor:
366 response = await conductor.get(f"/{endpoint}", params=params,
367 headers=http_headers)
369 return response.text, response.status_code