1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from psycopg import sql as pysql
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
21 class NominatimEnvironment:
22 """ Collects all functions for the execution of Nominatim functions.
25 def __init__(self, config):
26 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27 self.db_host = config['DB_HOST']
28 self.db_port = config['DB_PORT']
29 self.db_user = config['DB_USER']
30 self.db_pass = config['DB_PASS']
31 self.template_db = config['TEMPLATE_DB']
32 self.test_db = config['TEST_DB']
33 self.api_test_db = config['API_TEST_DB']
34 self.api_test_file = config['API_TEST_FILE']
35 self.tokenizer = config['TOKENIZER']
36 self.import_style = config['STYLE']
37 self.server_module_path = config['SERVER_MODULE_PATH']
38 self.reuse_template = not config['REMOVE_TEMPLATE']
39 self.keep_scenario_db = config['KEEP_TEST_DB']
41 self.default_config = Configuration(None).get_os_env()
43 self.template_db_done = False
44 self.api_db_done = False
45 self.website_dir = None
47 self.api_engine = None
48 if config['API_ENGINE'] != 'php':
49 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
50 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
51 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
53 if self.tokenizer == 'legacy' and self.server_module_path is None:
54 raise RuntimeError("You must set -DSERVER_MODULE_PATH when testing the legacy tokenizer.")
56 def connect_database(self, dbname):
57 """ Return a connection to the database with the given name.
58 Uses configured host, user and port.
60 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
62 dbargs['host'] = self.db_host
64 dbargs['port'] = self.db_port
66 dbargs['user'] = self.db_user
68 dbargs['password'] = self.db_pass
69 return psycopg.connect(**dbargs)
72 def write_nominatim_config(self, dbname):
73 """ Set up a custom test configuration that connects to the given
74 database. This sets up the environment variables so that they can
75 be picked up by dotenv and creates a project directory with the
76 appropriate website scripts.
78 if dbname.startswith('sqlite:'):
79 dsn = 'sqlite:dbname={}'.format(dbname[7:])
81 dsn = 'pgsql:dbname={}'.format(dbname)
83 dsn += ';host=' + self.db_host
85 dsn += ';port=' + self.db_port
87 dsn += ';user=' + self.db_user
89 dsn += ';password=' + self.db_pass
91 self.test_env = dict(self.default_config)
92 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
93 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
94 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
95 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
96 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
97 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
98 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
99 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
100 if self.tokenizer is not None:
101 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
102 if self.import_style is not None:
103 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
105 if self.server_module_path:
106 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
108 if self.website_dir is not None:
109 self.website_dir.cleanup()
111 self.website_dir = tempfile.TemporaryDirectory()
114 conn = self.connect_database(dbname)
117 refresh.setup_website(Path(self.website_dir.name) / 'website',
118 self.get_test_config(), conn)
123 def get_test_config(self):
124 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
125 cfg.set_libdirs(module=self.server_module_path)
128 def get_libpq_dsn(self):
129 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
131 def quote_param(param):
132 key, val = param.split('=')
133 val = val.replace('\\', '\\\\').replace("'", "\\'")
135 val = "'" + val + "'"
136 return key + '=' + val
138 if dsn.startswith('pgsql:'):
139 # Old PHP DSN format. Convert before returning.
140 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
145 def db_drop_database(self, name):
146 """ Drop the database with the given name.
148 with self.connect_database('postgres') as conn:
149 conn.autocommit = True
150 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
151 + pysql.Identifier(name))
153 def setup_template_db(self):
154 """ Setup a template database that already contains common test data.
155 Having a template database speeds up tests considerably but at
156 the price that the tests sometimes run with stale data.
158 if self.template_db_done:
161 self.template_db_done = True
163 self.write_nominatim_config(self.template_db)
165 if not self._reuse_or_drop_db(self.template_db):
167 # execute nominatim import on an empty file to get the right tables
168 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
169 fd.write(b'<osm version="0.6"></osm>')
171 self.run_nominatim('import', '--osm-file', fd.name,
172 '--osm2pgsql-cache', '1',
174 '--offline', '--index-noanalyse')
176 self.db_drop_database(self.template_db)
179 self.run_nominatim('refresh', '--functions')
182 def setup_api_db(self):
183 """ Setup a test against the API test database.
185 self.write_nominatim_config(self.api_test_db)
187 if self.api_test_db.startswith('sqlite:'):
190 if not self.api_db_done:
191 self.api_db_done = True
193 if not self._reuse_or_drop_db(self.api_test_db):
194 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
195 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
196 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
197 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
200 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
201 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
202 self.run_nominatim('freeze')
204 if self.tokenizer == 'legacy':
205 phrase_file = str(testdata / 'specialphrases_testdb.sql')
206 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
208 csv_path = str(testdata / 'full_en_phrases_test.csv')
209 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
211 self.db_drop_database(self.api_test_db)
214 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
217 def setup_unknown_db(self):
218 """ Setup a test against a non-existing database.
220 # The tokenizer needs an existing database to function.
221 # So start with the usual database
226 self.setup_db(context)
227 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
229 # Then drop the DB again
230 self.teardown_db(context, force_drop=True)
232 def setup_db(self, context):
233 """ Setup a test against a fresh, empty test database.
235 self.setup_template_db()
236 with self.connect_database(self.template_db) as conn:
237 conn.autocommit = True
238 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
239 + pysql.Identifier(self.test_db))
240 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
241 pysql.Identifier(self.test_db),
242 pysql.Identifier(self.template_db)))
244 self.write_nominatim_config(self.test_db)
245 context.db = self.connect_database(self.test_db)
246 context.db.autocommit = True
247 register_hstore(context.db)
249 def teardown_db(self, context, force_drop=False):
250 """ Remove the test database, if it exists.
252 if hasattr(context, 'db'):
255 if force_drop or not self.keep_scenario_db:
256 self.db_drop_database(self.test_db)
258 def _reuse_or_drop_db(self, name):
259 """ Check for the existence of the given DB. If reuse is enabled,
260 then the function checks for existnce and returns True if the
261 database is already there. Otherwise an existing database is
262 dropped and always false returned.
264 if self.reuse_template:
265 with self.connect_database('postgres') as conn:
266 num = execute_scalar(conn,
267 'select count(*) from pg_database where datname = %s',
272 self.db_drop_database(name)
277 def reindex_placex(self, db):
278 """ Run the indexing step until all data in the placex has
279 been processed. Indexing during updates can produce more data
280 to index under some circumstances. That is why indexing may have
281 to be run multiple times.
283 self.run_nominatim('index')
286 def run_nominatim(self, *cmdline):
287 """ Run the nominatim command-line tool via the library.
289 if self.website_dir is not None:
290 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
292 cli.nominatim(module_dir=self.server_module_path,
295 environ=self.test_env)
298 def copy_from_place(self, db):
299 """ Copy data from place to the placex and location_property_osmline
300 tables invoking the appropriate triggers.
302 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
304 with db.cursor() as cur:
305 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
306 name, admin_level, address,
308 SELECT osm_type, osm_id, class, type,
309 name, admin_level, address,
312 WHERE not (class='place' and type='houses' and osm_type='W')""")
313 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
314 SELECT osm_id, address, geometry
316 WHERE class='place' and type='houses'
318 and ST_GeometryType(geometry) = 'ST_LineString'""")
321 def create_api_request_func_starlette(self):
322 import nominatim_api.server.starlette.server
323 from asgi_lifespan import LifespanManager
326 async def _request(endpoint, params, project_dir, environ, http_headers):
327 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
329 async with LifespanManager(app):
330 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
331 response = await client.get(f"/{endpoint}", params=params,
332 headers=http_headers)
334 return response.text, response.status_code
339 def create_api_request_func_falcon(self):
340 import nominatim_api.server.falcon.server
341 import falcon.testing
343 async def _request(endpoint, params, project_dir, environ, http_headers):
344 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
346 async with falcon.testing.ASGIConductor(app) as conductor:
347 response = await conductor.get(f"/{endpoint}", params=params,
348 headers=http_headers)
350 return response.text, response.status_code