1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from psycopg import sql as pysql
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
21 class NominatimEnvironment:
22 """ Collects all functions for the execution of Nominatim functions.
25 def __init__(self, config):
26 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27 self.db_host = config['DB_HOST']
28 self.db_port = config['DB_PORT']
29 self.db_user = config['DB_USER']
30 self.db_pass = config['DB_PASS']
31 self.template_db = config['TEMPLATE_DB']
32 self.test_db = config['TEST_DB']
33 self.api_test_db = config['API_TEST_DB']
34 self.api_test_file = config['API_TEST_FILE']
35 self.tokenizer = config['TOKENIZER']
36 self.import_style = config['STYLE']
37 self.server_module_path = config['SERVER_MODULE_PATH']
38 self.reuse_template = not config['REMOVE_TEMPLATE']
39 self.keep_scenario_db = config['KEEP_TEST_DB']
41 self.default_config = Configuration(None).get_os_env()
43 self.template_db_done = False
44 self.api_db_done = False
45 self.website_dir = None
47 self.api_engine = None
48 if config['API_ENGINE'] != 'php':
49 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
50 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
51 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
53 if self.tokenizer == 'legacy' and self.server_module_path is None:
54 raise RuntimeError("You must set -DSERVER_MODULE_PATH when testing the legacy tokenizer.")
56 def connect_database(self, dbname):
57 """ Return a connection to the database with the given name.
58 Uses configured host, user and port.
60 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
62 dbargs['host'] = self.db_host
64 dbargs['port'] = self.db_port
66 dbargs['user'] = self.db_user
68 dbargs['password'] = self.db_pass
69 return psycopg.connect(**dbargs)
72 def write_nominatim_config(self, dbname):
73 """ Set up a custom test configuration that connects to the given
74 database. This sets up the environment variables so that they can
75 be picked up by dotenv and creates a project directory with the
76 appropriate website scripts.
78 if dbname.startswith('sqlite:'):
79 dsn = 'sqlite:dbname={}'.format(dbname[7:])
81 dsn = 'pgsql:dbname={}'.format(dbname)
83 dsn += ';host=' + self.db_host
85 dsn += ';port=' + self.db_port
87 dsn += ';user=' + self.db_user
89 dsn += ';password=' + self.db_pass
91 self.test_env = dict(self.default_config)
92 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
93 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
94 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
95 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
96 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
97 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
98 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
99 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
100 if self.tokenizer is not None:
101 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
102 if self.import_style is not None:
103 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
105 if self.server_module_path:
106 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
108 if self.website_dir is not None:
109 self.website_dir.cleanup()
111 self.website_dir = tempfile.TemporaryDirectory()
114 def get_test_config(self):
115 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
116 cfg.set_libdirs(module=self.server_module_path)
119 def get_libpq_dsn(self):
120 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
122 def quote_param(param):
123 key, val = param.split('=')
124 val = val.replace('\\', '\\\\').replace("'", "\\'")
126 val = "'" + val + "'"
127 return key + '=' + val
129 if dsn.startswith('pgsql:'):
130 # Old PHP DSN format. Convert before returning.
131 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
136 def db_drop_database(self, name):
137 """ Drop the database with the given name.
139 with self.connect_database('postgres') as conn:
140 conn.autocommit = True
141 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
142 + pysql.Identifier(name))
144 def setup_template_db(self):
145 """ Setup a template database that already contains common test data.
146 Having a template database speeds up tests considerably but at
147 the price that the tests sometimes run with stale data.
149 if self.template_db_done:
152 self.template_db_done = True
154 self.write_nominatim_config(self.template_db)
156 if not self._reuse_or_drop_db(self.template_db):
158 # execute nominatim import on an empty file to get the right tables
159 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
160 fd.write(b'<osm version="0.6"></osm>')
162 self.run_nominatim('import', '--osm-file', fd.name,
163 '--osm2pgsql-cache', '1',
165 '--offline', '--index-noanalyse')
167 self.db_drop_database(self.template_db)
170 self.run_nominatim('refresh', '--functions')
173 def setup_api_db(self):
174 """ Setup a test against the API test database.
176 self.write_nominatim_config(self.api_test_db)
178 if self.api_test_db.startswith('sqlite:'):
181 if not self.api_db_done:
182 self.api_db_done = True
184 if not self._reuse_or_drop_db(self.api_test_db):
185 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
186 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
187 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
188 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
191 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
192 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
193 self.run_nominatim('freeze')
195 if self.tokenizer == 'legacy':
196 phrase_file = str(testdata / 'specialphrases_testdb.sql')
197 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
199 csv_path = str(testdata / 'full_en_phrases_test.csv')
200 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
202 self.db_drop_database(self.api_test_db)
205 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
208 def setup_unknown_db(self):
209 """ Setup a test against a non-existing database.
211 # The tokenizer needs an existing database to function.
212 # So start with the usual database
217 self.setup_db(context)
218 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
220 # Then drop the DB again
221 self.teardown_db(context, force_drop=True)
223 def setup_db(self, context):
224 """ Setup a test against a fresh, empty test database.
226 self.setup_template_db()
227 with self.connect_database(self.template_db) as conn:
228 conn.autocommit = True
229 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
230 + pysql.Identifier(self.test_db))
231 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
232 pysql.Identifier(self.test_db),
233 pysql.Identifier(self.template_db)))
235 self.write_nominatim_config(self.test_db)
236 context.db = self.connect_database(self.test_db)
237 context.db.autocommit = True
238 register_hstore(context.db)
240 def teardown_db(self, context, force_drop=False):
241 """ Remove the test database, if it exists.
243 if hasattr(context, 'db'):
246 if force_drop or not self.keep_scenario_db:
247 self.db_drop_database(self.test_db)
249 def _reuse_or_drop_db(self, name):
250 """ Check for the existence of the given DB. If reuse is enabled,
251 then the function checks for existnce and returns True if the
252 database is already there. Otherwise an existing database is
253 dropped and always false returned.
255 if self.reuse_template:
256 with self.connect_database('postgres') as conn:
257 num = execute_scalar(conn,
258 'select count(*) from pg_database where datname = %s',
263 self.db_drop_database(name)
268 def reindex_placex(self, db):
269 """ Run the indexing step until all data in the placex has
270 been processed. Indexing during updates can produce more data
271 to index under some circumstances. That is why indexing may have
272 to be run multiple times.
274 self.run_nominatim('index')
277 def run_nominatim(self, *cmdline):
278 """ Run the nominatim command-line tool via the library.
280 if self.website_dir is not None:
281 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
283 cli.nominatim(module_dir=self.server_module_path,
286 environ=self.test_env)
289 def copy_from_place(self, db):
290 """ Copy data from place to the placex and location_property_osmline
291 tables invoking the appropriate triggers.
293 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
295 with db.cursor() as cur:
296 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
297 name, admin_level, address,
299 SELECT osm_type, osm_id, class, type,
300 name, admin_level, address,
303 WHERE not (class='place' and type='houses' and osm_type='W')""")
304 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
305 SELECT osm_id, address, geometry
307 WHERE class='place' and type='houses'
309 and ST_GeometryType(geometry) = 'ST_LineString'""")
312 def create_api_request_func_starlette(self):
313 import nominatim_api.server.starlette.server
314 from asgi_lifespan import LifespanManager
317 async def _request(endpoint, params, project_dir, environ, http_headers):
318 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
320 async with LifespanManager(app):
321 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
322 response = await client.get(f"/{endpoint}", params=params,
323 headers=http_headers)
325 return response.text, response.status_code
330 def create_api_request_func_falcon(self):
331 import nominatim_api.server.falcon.server
332 import falcon.testing
334 async def _request(endpoint, params, project_dir, environ, http_headers):
335 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
337 async with falcon.testing.ASGIConductor(app) as conductor:
338 response = await conductor.get(f"/{endpoint}", params=params,
339 headers=http_headers)
341 return response.text, response.status_code