1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from psycopg import sql as pysql
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
21 class NominatimEnvironment:
22 """ Collects all functions for the execution of Nominatim functions.
25 def __init__(self, config):
26 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27 self.db_host = config['DB_HOST']
28 self.db_port = config['DB_PORT']
29 self.db_user = config['DB_USER']
30 self.db_pass = config['DB_PASS']
31 self.template_db = config['TEMPLATE_DB']
32 self.test_db = config['TEST_DB']
33 self.api_test_db = config['API_TEST_DB']
34 self.api_test_file = config['API_TEST_FILE']
35 self.tokenizer = config['TOKENIZER']
36 self.import_style = config['STYLE']
37 self.server_module_path = config['SERVER_MODULE_PATH']
38 self.reuse_template = not config['REMOVE_TEMPLATE']
39 self.keep_scenario_db = config['KEEP_TEST_DB']
41 self.default_config = Configuration(None).get_os_env()
43 self.template_db_done = False
44 self.api_db_done = False
45 self.website_dir = None
47 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
48 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
49 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
51 if self.tokenizer == 'legacy' and self.server_module_path is None:
52 raise RuntimeError("You must set -DSERVER_MODULE_PATH when testing the legacy tokenizer.")
54 def connect_database(self, dbname):
55 """ Return a connection to the database with the given name.
56 Uses configured host, user and port.
58 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
60 dbargs['host'] = self.db_host
62 dbargs['port'] = self.db_port
64 dbargs['user'] = self.db_user
66 dbargs['password'] = self.db_pass
67 return psycopg.connect(**dbargs)
70 def write_nominatim_config(self, dbname):
71 """ Set up a custom test configuration that connects to the given
72 database. This sets up the environment variables so that they can
73 be picked up by dotenv and creates a project directory with the
74 appropriate website scripts.
76 if dbname.startswith('sqlite:'):
77 dsn = 'sqlite:dbname={}'.format(dbname[7:])
79 dsn = 'pgsql:dbname={}'.format(dbname)
81 dsn += ';host=' + self.db_host
83 dsn += ';port=' + self.db_port
85 dsn += ';user=' + self.db_user
87 dsn += ';password=' + self.db_pass
89 self.test_env = dict(self.default_config)
90 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
91 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
92 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
93 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
94 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
95 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
96 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
97 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
98 if self.tokenizer is not None:
99 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
100 if self.import_style is not None:
101 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
103 if self.server_module_path:
104 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
106 if self.website_dir is not None:
107 self.website_dir.cleanup()
109 self.website_dir = tempfile.TemporaryDirectory()
112 def get_test_config(self):
113 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
114 cfg.set_libdirs(module=self.server_module_path)
117 def get_libpq_dsn(self):
118 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
120 def quote_param(param):
121 key, val = param.split('=')
122 val = val.replace('\\', '\\\\').replace("'", "\\'")
124 val = "'" + val + "'"
125 return key + '=' + val
127 if dsn.startswith('pgsql:'):
128 # Old PHP DSN format. Convert before returning.
129 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
134 def db_drop_database(self, name):
135 """ Drop the database with the given name.
137 with self.connect_database('postgres') as conn:
138 conn.autocommit = True
139 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
140 + pysql.Identifier(name))
142 def setup_template_db(self):
143 """ Setup a template database that already contains common test data.
144 Having a template database speeds up tests considerably but at
145 the price that the tests sometimes run with stale data.
147 if self.template_db_done:
150 self.template_db_done = True
152 self.write_nominatim_config(self.template_db)
154 if not self._reuse_or_drop_db(self.template_db):
156 # execute nominatim import on an empty file to get the right tables
157 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
158 fd.write(b'<osm version="0.6"></osm>')
160 self.run_nominatim('import', '--osm-file', fd.name,
161 '--osm2pgsql-cache', '1',
163 '--offline', '--index-noanalyse')
165 self.db_drop_database(self.template_db)
168 self.run_nominatim('refresh', '--functions')
171 def setup_api_db(self):
172 """ Setup a test against the API test database.
174 self.write_nominatim_config(self.api_test_db)
176 if self.api_test_db.startswith('sqlite:'):
179 if not self.api_db_done:
180 self.api_db_done = True
182 if not self._reuse_or_drop_db(self.api_test_db):
183 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
184 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
185 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
186 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
189 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
190 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
191 self.run_nominatim('freeze')
193 if self.tokenizer == 'legacy':
194 phrase_file = str(testdata / 'specialphrases_testdb.sql')
195 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
197 csv_path = str(testdata / 'full_en_phrases_test.csv')
198 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
200 self.db_drop_database(self.api_test_db)
203 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
206 def setup_unknown_db(self):
207 """ Setup a test against a non-existing database.
209 # The tokenizer needs an existing database to function.
210 # So start with the usual database
215 self.setup_db(context)
216 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
218 # Then drop the DB again
219 self.teardown_db(context, force_drop=True)
221 def setup_db(self, context):
222 """ Setup a test against a fresh, empty test database.
224 self.setup_template_db()
225 with self.connect_database(self.template_db) as conn:
226 conn.autocommit = True
227 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
228 + pysql.Identifier(self.test_db))
229 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
230 pysql.Identifier(self.test_db),
231 pysql.Identifier(self.template_db)))
233 self.write_nominatim_config(self.test_db)
234 context.db = self.connect_database(self.test_db)
235 context.db.autocommit = True
236 register_hstore(context.db)
238 def teardown_db(self, context, force_drop=False):
239 """ Remove the test database, if it exists.
241 if hasattr(context, 'db'):
244 if force_drop or not self.keep_scenario_db:
245 self.db_drop_database(self.test_db)
247 def _reuse_or_drop_db(self, name):
248 """ Check for the existence of the given DB. If reuse is enabled,
249 then the function checks for existnce and returns True if the
250 database is already there. Otherwise an existing database is
251 dropped and always false returned.
253 if self.reuse_template:
254 with self.connect_database('postgres') as conn:
255 num = execute_scalar(conn,
256 'select count(*) from pg_database where datname = %s',
261 self.db_drop_database(name)
266 def reindex_placex(self, db):
267 """ Run the indexing step until all data in the placex has
268 been processed. Indexing during updates can produce more data
269 to index under some circumstances. That is why indexing may have
270 to be run multiple times.
272 self.run_nominatim('index')
275 def run_nominatim(self, *cmdline):
276 """ Run the nominatim command-line tool via the library.
278 if self.website_dir is not None:
279 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
281 cli.nominatim(module_dir=self.server_module_path,
284 environ=self.test_env)
287 def copy_from_place(self, db):
288 """ Copy data from place to the placex and location_property_osmline
289 tables invoking the appropriate triggers.
291 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
293 with db.cursor() as cur:
294 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
295 name, admin_level, address,
297 SELECT osm_type, osm_id, class, type,
298 name, admin_level, address,
301 WHERE not (class='place' and type='houses' and osm_type='W')""")
302 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
303 SELECT osm_id, address, geometry
305 WHERE class='place' and type='houses'
307 and ST_GeometryType(geometry) = 'ST_LineString'""")
310 def create_api_request_func_starlette(self):
311 import nominatim_api.server.starlette.server
312 from asgi_lifespan import LifespanManager
315 async def _request(endpoint, params, project_dir, environ, http_headers):
316 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
318 async with LifespanManager(app):
319 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
320 response = await client.get(f"/{endpoint}", params=params,
321 headers=http_headers)
323 return response.text, response.status_code
328 def create_api_request_func_falcon(self):
329 import nominatim_api.server.falcon.server
330 import falcon.testing
332 async def _request(endpoint, params, project_dir, environ, http_headers):
333 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
335 async with falcon.testing.ASGIConductor(app) as conductor:
336 response = await conductor.get(f"/{endpoint}", params=params,
337 headers=http_headers)
339 return response.text, response.status_code