1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from psycopg import sql as pysql
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
21 class NominatimEnvironment:
22 """ Collects all functions for the execution of Nominatim functions.
25 def __init__(self, config):
26 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27 self.db_host = config['DB_HOST']
28 self.db_port = config['DB_PORT']
29 self.db_user = config['DB_USER']
30 self.db_pass = config['DB_PASS']
31 self.template_db = config['TEMPLATE_DB']
32 self.test_db = config['TEST_DB']
33 self.api_test_db = config['API_TEST_DB']
34 self.api_test_file = config['API_TEST_FILE']
35 self.tokenizer = config['TOKENIZER']
36 self.import_style = config['STYLE']
37 self.server_module_path = config['SERVER_MODULE_PATH']
38 self.reuse_template = not config['REMOVE_TEMPLATE']
39 self.keep_scenario_db = config['KEEP_TEST_DB']
40 self.code_coverage_path = config['PHPCOV']
41 self.code_coverage_id = 1
43 self.default_config = Configuration(None).get_os_env()
45 self.template_db_done = False
46 self.api_db_done = False
47 self.website_dir = None
49 self.api_engine = None
50 if config['API_ENGINE'] != 'php':
51 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
52 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
53 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
55 if self.tokenizer == 'legacy' and self.server_module_path is None:
56 raise RuntimeError("You must set -DSERVER_MODULE_PATH when testing the legacy tokenizer.")
58 def connect_database(self, dbname):
59 """ Return a connection to the database with the given name.
60 Uses configured host, user and port.
62 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
64 dbargs['host'] = self.db_host
66 dbargs['port'] = self.db_port
68 dbargs['user'] = self.db_user
70 dbargs['password'] = self.db_pass
71 return psycopg.connect(**dbargs)
73 def next_code_coverage_file(self):
74 """ Generate the next name for a coverage file.
76 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
77 self.code_coverage_id += 1
81 def write_nominatim_config(self, dbname):
82 """ Set up a custom test configuration that connects to the given
83 database. This sets up the environment variables so that they can
84 be picked up by dotenv and creates a project directory with the
85 appropriate website scripts.
87 if dbname.startswith('sqlite:'):
88 dsn = 'sqlite:dbname={}'.format(dbname[7:])
90 dsn = 'pgsql:dbname={}'.format(dbname)
92 dsn += ';host=' + self.db_host
94 dsn += ';port=' + self.db_port
96 dsn += ';user=' + self.db_user
98 dsn += ';password=' + self.db_pass
100 self.test_env = dict(self.default_config)
101 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
102 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
103 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
104 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
105 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
106 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
107 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
108 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
109 if self.tokenizer is not None:
110 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
111 if self.import_style is not None:
112 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
114 if self.server_module_path:
115 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
117 if self.website_dir is not None:
118 self.website_dir.cleanup()
120 self.website_dir = tempfile.TemporaryDirectory()
123 conn = self.connect_database(dbname)
126 refresh.setup_website(Path(self.website_dir.name) / 'website',
127 self.get_test_config(), conn)
132 def get_test_config(self):
133 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
134 cfg.set_libdirs(module=self.server_module_path)
137 def get_libpq_dsn(self):
138 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
140 def quote_param(param):
141 key, val = param.split('=')
142 val = val.replace('\\', '\\\\').replace("'", "\\'")
144 val = "'" + val + "'"
145 return key + '=' + val
147 if dsn.startswith('pgsql:'):
148 # Old PHP DSN format. Convert before returning.
149 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
154 def db_drop_database(self, name):
155 """ Drop the database with the given name.
157 with self.connect_database('postgres') as conn:
158 conn.autocommit = True
159 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
160 + pysql.Identifier(name))
162 def setup_template_db(self):
163 """ Setup a template database that already contains common test data.
164 Having a template database speeds up tests considerably but at
165 the price that the tests sometimes run with stale data.
167 if self.template_db_done:
170 self.template_db_done = True
172 self.write_nominatim_config(self.template_db)
174 if not self._reuse_or_drop_db(self.template_db):
176 # execute nominatim import on an empty file to get the right tables
177 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
178 fd.write(b'<osm version="0.6"></osm>')
180 self.run_nominatim('import', '--osm-file', fd.name,
181 '--osm2pgsql-cache', '1',
183 '--offline', '--index-noanalyse')
185 self.db_drop_database(self.template_db)
188 self.run_nominatim('refresh', '--functions')
191 def setup_api_db(self):
192 """ Setup a test against the API test database.
194 self.write_nominatim_config(self.api_test_db)
196 if self.api_test_db.startswith('sqlite:'):
199 if not self.api_db_done:
200 self.api_db_done = True
202 if not self._reuse_or_drop_db(self.api_test_db):
203 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
204 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
205 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
206 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
209 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
210 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
211 self.run_nominatim('freeze')
213 if self.tokenizer == 'legacy':
214 phrase_file = str(testdata / 'specialphrases_testdb.sql')
215 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
217 csv_path = str(testdata / 'full_en_phrases_test.csv')
218 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
220 self.db_drop_database(self.api_test_db)
223 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
226 def setup_unknown_db(self):
227 """ Setup a test against a non-existing database.
229 # The tokenizer needs an existing database to function.
230 # So start with the usual database
235 self.setup_db(context)
236 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
238 # Then drop the DB again
239 self.teardown_db(context, force_drop=True)
241 def setup_db(self, context):
242 """ Setup a test against a fresh, empty test database.
244 self.setup_template_db()
245 with self.connect_database(self.template_db) as conn:
246 conn.autocommit = True
247 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
248 + pysql.Identifier(self.test_db))
249 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
250 pysql.Identifier(self.test_db),
251 pysql.Identifier(self.template_db)))
253 self.write_nominatim_config(self.test_db)
254 context.db = self.connect_database(self.test_db)
255 context.db.autocommit = True
256 register_hstore(context.db)
258 def teardown_db(self, context, force_drop=False):
259 """ Remove the test database, if it exists.
261 if hasattr(context, 'db'):
264 if force_drop or not self.keep_scenario_db:
265 self.db_drop_database(self.test_db)
267 def _reuse_or_drop_db(self, name):
268 """ Check for the existence of the given DB. If reuse is enabled,
269 then the function checks for existnce and returns True if the
270 database is already there. Otherwise an existing database is
271 dropped and always false returned.
273 if self.reuse_template:
274 with self.connect_database('postgres') as conn:
275 num = execute_scalar(conn,
276 'select count(*) from pg_database where datname = %s',
281 self.db_drop_database(name)
286 def reindex_placex(self, db):
287 """ Run the indexing step until all data in the placex has
288 been processed. Indexing during updates can produce more data
289 to index under some circumstances. That is why indexing may have
290 to be run multiple times.
292 self.run_nominatim('index')
295 def run_nominatim(self, *cmdline):
296 """ Run the nominatim command-line tool via the library.
298 if self.website_dir is not None:
299 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
301 cli.nominatim(module_dir=self.server_module_path,
304 environ=self.test_env)
307 def copy_from_place(self, db):
308 """ Copy data from place to the placex and location_property_osmline
309 tables invoking the appropriate triggers.
311 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
313 with db.cursor() as cur:
314 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
315 name, admin_level, address,
317 SELECT osm_type, osm_id, class, type,
318 name, admin_level, address,
321 WHERE not (class='place' and type='houses' and osm_type='W')""")
322 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
323 SELECT osm_id, address, geometry
325 WHERE class='place' and type='houses'
327 and ST_GeometryType(geometry) = 'ST_LineString'""")
330 def create_api_request_func_starlette(self):
331 import nominatim_api.server.starlette.server
332 from asgi_lifespan import LifespanManager
335 async def _request(endpoint, params, project_dir, environ, http_headers):
336 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
338 async with LifespanManager(app):
339 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
340 response = await client.get(f"/{endpoint}", params=params,
341 headers=http_headers)
343 return response.text, response.status_code
348 def create_api_request_func_falcon(self):
349 import nominatim_api.server.falcon.server
350 import falcon.testing
352 async def _request(endpoint, params, project_dir, environ, http_headers):
353 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
355 async with falcon.testing.ASGIConductor(app) as conductor:
356 response = await conductor.get(f"/{endpoint}", params=params,
357 headers=http_headers)
359 return response.text, response.status_code