1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
11 from psycopg import sql as pysql
13 from nominatim_db import cli
14 from nominatim_db.config import Configuration
15 from nominatim_db.db.connection import register_hstore, execute_scalar
16 from nominatim_db.tokenizer import factory as tokenizer_factory
19 class NominatimEnvironment:
20 """ Collects all functions for the execution of Nominatim functions.
23 def __init__(self, config):
24 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
25 self.db_host = config['DB_HOST']
26 self.db_port = config['DB_PORT']
27 self.db_user = config['DB_USER']
28 self.db_pass = config['DB_PASS']
29 self.template_db = config['TEMPLATE_DB']
30 self.test_db = config['TEST_DB']
31 self.api_test_db = config['API_TEST_DB']
32 self.api_test_file = config['API_TEST_FILE']
33 self.tokenizer = config['TOKENIZER']
34 self.import_style = config['STYLE']
35 self.reuse_template = not config['REMOVE_TEMPLATE']
36 self.keep_scenario_db = config['KEEP_TEST_DB']
38 self.default_config = Configuration(None).get_os_env()
40 self.template_db_done = False
41 self.api_db_done = False
42 self.website_dir = None
44 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
45 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
46 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
48 def connect_database(self, dbname):
49 """ Return a connection to the database with the given name.
50 Uses configured host, user and port.
52 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
54 dbargs['host'] = self.db_host
56 dbargs['port'] = self.db_port
58 dbargs['user'] = self.db_user
60 dbargs['password'] = self.db_pass
61 return psycopg.connect(**dbargs)
63 def write_nominatim_config(self, dbname):
64 """ Set up a custom test configuration that connects to the given
65 database. This sets up the environment variables so that they can
66 be picked up by dotenv and creates a project directory with the
67 appropriate website scripts.
69 if dbname.startswith('sqlite:'):
70 dsn = 'sqlite:dbname={}'.format(dbname[7:])
72 dsn = 'pgsql:dbname={}'.format(dbname)
74 dsn += ';host=' + self.db_host
76 dsn += ';port=' + self.db_port
78 dsn += ';user=' + self.db_user
80 dsn += ';password=' + self.db_pass
82 self.test_env = dict(self.default_config)
83 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
84 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
85 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
86 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
87 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
88 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
89 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
90 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
91 if self.tokenizer is not None:
92 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
93 if self.import_style is not None:
94 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
96 if self.website_dir is not None:
97 self.website_dir.cleanup()
99 self.website_dir = tempfile.TemporaryDirectory()
101 def get_test_config(self):
102 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
105 def get_libpq_dsn(self):
106 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
108 def quote_param(param):
109 key, val = param.split('=')
110 val = val.replace('\\', '\\\\').replace("'", "\\'")
112 val = "'" + val + "'"
113 return key + '=' + val
115 if dsn.startswith('pgsql:'):
116 # Old PHP DSN format. Convert before returning.
117 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
121 def db_drop_database(self, name):
122 """ Drop the database with the given name.
124 with self.connect_database('postgres') as conn:
125 conn.autocommit = True
126 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
127 + pysql.Identifier(name))
129 def setup_template_db(self):
130 """ Setup a template database that already contains common test data.
131 Having a template database speeds up tests considerably but at
132 the price that the tests sometimes run with stale data.
134 if self.template_db_done:
137 self.template_db_done = True
139 self.write_nominatim_config(self.template_db)
141 if not self._reuse_or_drop_db(self.template_db):
143 # execute nominatim import on an empty file to get the right tables
144 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
145 fd.write(b'<osm version="0.6"></osm>')
147 self.run_nominatim('import', '--osm-file', fd.name,
148 '--osm2pgsql-cache', '1',
150 '--offline', '--index-noanalyse')
152 self.db_drop_database(self.template_db)
155 self.run_nominatim('refresh', '--functions')
157 def setup_api_db(self):
158 """ Setup a test against the API test database.
160 self.write_nominatim_config(self.api_test_db)
162 if self.api_test_db.startswith('sqlite:'):
165 if not self.api_db_done:
166 self.api_db_done = True
168 if not self._reuse_or_drop_db(self.api_test_db):
169 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
170 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
171 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
172 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
175 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
176 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
177 self.run_nominatim('freeze')
179 csv_path = str(testdata / 'full_en_phrases_test.csv')
180 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
182 self.db_drop_database(self.api_test_db)
185 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
187 def setup_unknown_db(self):
188 """ Setup a test against a non-existing database.
190 # The tokenizer needs an existing database to function.
191 # So start with the usual database
196 self.setup_db(context)
197 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
199 # Then drop the DB again
200 self.teardown_db(context, force_drop=True)
202 def setup_db(self, context):
203 """ Setup a test against a fresh, empty test database.
205 self.setup_template_db()
206 with self.connect_database(self.template_db) as conn:
207 conn.autocommit = True
208 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
209 + pysql.Identifier(self.test_db))
210 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
211 pysql.Identifier(self.test_db),
212 pysql.Identifier(self.template_db)))
214 self.write_nominatim_config(self.test_db)
215 context.db = self.connect_database(self.test_db)
216 context.db.autocommit = True
217 register_hstore(context.db)
219 def teardown_db(self, context, force_drop=False):
220 """ Remove the test database, if it exists.
222 if hasattr(context, 'db'):
225 if force_drop or not self.keep_scenario_db:
226 self.db_drop_database(self.test_db)
228 def _reuse_or_drop_db(self, name):
229 """ Check for the existence of the given DB. If reuse is enabled,
230 then the function checks for existnce and returns True if the
231 database is already there. Otherwise an existing database is
232 dropped and always false returned.
234 if self.reuse_template:
235 with self.connect_database('postgres') as conn:
236 num = execute_scalar(conn,
237 'select count(*) from pg_database where datname = %s',
242 self.db_drop_database(name)
246 def reindex_placex(self, db):
247 """ Run the indexing step until all data in the placex has
248 been processed. Indexing during updates can produce more data
249 to index under some circumstances. That is why indexing may have
250 to be run multiple times.
252 self.run_nominatim('index')
254 def run_nominatim(self, *cmdline):
255 """ Run the nominatim command-line tool via the library.
257 if self.website_dir is not None:
258 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
260 cli.nominatim(cli_args=cmdline,
261 environ=self.test_env)
263 def copy_from_place(self, db):
264 """ Copy data from place to the placex and location_property_osmline
265 tables invoking the appropriate triggers.
267 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
269 with db.cursor() as cur:
270 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
271 name, admin_level, address,
273 SELECT osm_type, osm_id, class, type,
274 name, admin_level, address,
277 WHERE not (class='place' and type='houses' and osm_type='W')""")
278 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
279 SELECT osm_id, address, geometry
281 WHERE class='place' and type='houses'
283 and ST_GeometryType(geometry) = 'ST_LineString'""")
285 def create_api_request_func_starlette(self):
286 import nominatim_api.server.starlette.server
287 from asgi_lifespan import LifespanManager
290 async def _request(endpoint, params, project_dir, environ, http_headers):
291 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
293 async with LifespanManager(app):
294 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
295 response = await client.get(f"/{endpoint}", params=params,
296 headers=http_headers)
298 return response.text, response.status_code
302 def create_api_request_func_falcon(self):
303 import nominatim_api.server.falcon.server
304 import falcon.testing
306 async def _request(endpoint, params, project_dir, environ, http_headers):
307 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
309 async with falcon.testing.ASGIConductor(app) as conductor:
310 response = await conductor.get(f"/{endpoint}", params=params,
311 headers=http_headers)
313 return response.text, response.status_code