1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
11 from psycopg import sql as pysql
13 from nominatim_db import cli
14 from nominatim_db.config import Configuration
15 from nominatim_db.db.connection import register_hstore, execute_scalar
16 from nominatim_db.tokenizer import factory as tokenizer_factory
19 class NominatimEnvironment:
20 """ Collects all functions for the execution of Nominatim functions.
23 def __init__(self, config):
24 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
25 self.db_host = config['DB_HOST']
26 self.db_port = config['DB_PORT']
27 self.db_user = config['DB_USER']
28 self.db_pass = config['DB_PASS']
29 self.template_db = config['TEMPLATE_DB']
30 self.test_db = config['TEST_DB']
31 self.api_test_db = config['API_TEST_DB']
32 self.api_test_file = config['API_TEST_FILE']
33 self.tokenizer = config['TOKENIZER']
34 self.import_style = config['STYLE']
35 self.reuse_template = not config['REMOVE_TEMPLATE']
36 self.keep_scenario_db = config['KEEP_TEST_DB']
38 self.default_config = Configuration(None).get_os_env()
40 self.template_db_done = False
41 self.api_db_done = False
42 self.website_dir = None
44 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
45 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
46 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
48 def connect_database(self, dbname):
49 """ Return a connection to the database with the given name.
50 Uses configured host, user and port.
52 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
54 dbargs['host'] = self.db_host
56 dbargs['port'] = self.db_port
58 dbargs['user'] = self.db_user
60 dbargs['password'] = self.db_pass
61 return psycopg.connect(**dbargs)
63 def write_nominatim_config(self, dbname):
64 """ Set up a custom test configuration that connects to the given
65 database. This sets up the environment variables so that they can
66 be picked up by dotenv and creates a project directory with the
67 appropriate website scripts.
69 if dbname.startswith('sqlite:'):
70 dsn = 'sqlite:dbname={}'.format(dbname[7:])
72 dsn = 'pgsql:dbname={}'.format(dbname)
74 dsn += ';host=' + self.db_host
76 dsn += ';port=' + self.db_port
78 dsn += ';user=' + self.db_user
80 dsn += ';password=' + self.db_pass
82 self.test_env = dict(self.default_config)
83 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
84 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
85 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
86 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
87 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
88 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
89 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
90 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
91 if self.tokenizer is not None:
92 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
93 if self.import_style is not None:
94 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
96 if self.website_dir is not None:
97 self.website_dir.cleanup()
99 self.website_dir = tempfile.TemporaryDirectory()
101 def get_test_config(self):
102 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
105 def get_libpq_dsn(self):
106 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
108 def quote_param(param):
109 key, val = param.split('=')
110 val = val.replace('\\', '\\\\').replace("'", "\\'")
112 val = "'" + val + "'"
113 return key + '=' + val
115 if dsn.startswith('pgsql:'):
116 # Old PHP DSN format. Convert before returning.
117 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
121 def db_drop_database(self, name):
122 """ Drop the database with the given name.
124 with self.connect_database('postgres') as conn:
125 conn.autocommit = True
126 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
127 + pysql.Identifier(name))
129 def setup_template_db(self):
130 """ Setup a template database that already contains common test data.
131 Having a template database speeds up tests considerably but at
132 the price that the tests sometimes run with stale data.
134 if self.template_db_done:
137 self.template_db_done = True
139 self.write_nominatim_config(self.template_db)
141 if not self._reuse_or_drop_db(self.template_db):
143 # execute nominatim import on an empty file to get the right tables
144 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
145 fd.write(b'<osm version="0.6"></osm>')
147 self.run_nominatim('import', '--osm-file', fd.name,
148 '--osm2pgsql-cache', '1',
150 '--offline', '--index-noanalyse')
152 self.db_drop_database(self.template_db)
155 self.run_nominatim('refresh', '--functions')
157 def setup_api_db(self):
158 """ Setup a test against the API test database.
160 self.write_nominatim_config(self.api_test_db)
162 if self.api_test_db.startswith('sqlite:'):
165 if not self.api_db_done:
166 self.api_db_done = True
168 if not self._reuse_or_drop_db(self.api_test_db):
169 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
170 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
171 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
172 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
175 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
176 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
177 self.run_nominatim('freeze')
179 csv_path = str(testdata / 'full_en_phrases_test.csv')
180 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
182 self.db_drop_database(self.api_test_db)
185 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
187 def setup_unknown_db(self):
188 """ Setup a test against a non-existing database.
190 # The tokenizer needs an existing database to function.
191 # So start with the usual database
196 self.setup_db(context)
197 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
199 # Then drop the DB again
200 self.teardown_db(context, force_drop=True)
202 def setup_db(self, context):
203 """ Setup a test against a fresh, empty test database.
205 self.setup_template_db()
206 with self.connect_database(self.template_db) as conn:
207 conn.autocommit = True
208 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
209 + pysql.Identifier(self.test_db))
210 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
211 pysql.Identifier(self.test_db),
212 pysql.Identifier(self.template_db)))
214 self.write_nominatim_config(self.test_db)
215 context.db = self.connect_database(self.test_db)
216 context.db.autocommit = True
217 register_hstore(context.db)
219 def teardown_db(self, context, force_drop=False):
220 """ Remove the test database, if it exists.
222 if hasattr(context, 'db'):
225 if force_drop or not self.keep_scenario_db:
226 self.db_drop_database(self.test_db)
228 def _reuse_or_drop_db(self, name):
229 """ Check for the existence of the given DB. If reuse is enabled,
230 then the function checks for existnce and returns True if the
231 database is already there. Otherwise an existing database is
232 dropped and always false returned.
234 if self.reuse_template:
235 with self.connect_database('postgres') as conn:
236 num = execute_scalar(conn,
237 'select count(*) from pg_database where datname = %s',
242 self.db_drop_database(name)
246 def reindex_placex(self, db):
247 """ Run the indexing step until all data in the placex has
248 been processed. Indexing during updates can produce more data
249 to index under some circumstances. That is why indexing may have
250 to be run multiple times.
252 self.run_nominatim('index')
254 def run_nominatim(self, *cmdline):
255 """ Run the nominatim command-line tool via the library.
257 if self.website_dir is not None:
258 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
260 cli.nominatim(osm2pgsql_path=None,
262 environ=self.test_env)
264 def copy_from_place(self, db):
265 """ Copy data from place to the placex and location_property_osmline
266 tables invoking the appropriate triggers.
268 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
270 with db.cursor() as cur:
271 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
272 name, admin_level, address,
274 SELECT osm_type, osm_id, class, type,
275 name, admin_level, address,
278 WHERE not (class='place' and type='houses' and osm_type='W')""")
279 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
280 SELECT osm_id, address, geometry
282 WHERE class='place' and type='houses'
284 and ST_GeometryType(geometry) = 'ST_LineString'""")
286 def create_api_request_func_starlette(self):
287 import nominatim_api.server.starlette.server
288 from asgi_lifespan import LifespanManager
291 async def _request(endpoint, params, project_dir, environ, http_headers):
292 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
294 async with LifespanManager(app):
295 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
296 response = await client.get(f"/{endpoint}", params=params,
297 headers=http_headers)
299 return response.text, response.status_code
303 def create_api_request_func_falcon(self):
304 import nominatim_api.server.falcon.server
305 import falcon.testing
307 async def _request(endpoint, params, project_dir, environ, http_headers):
308 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
310 async with falcon.testing.ASGIConductor(app) as conductor:
311 response = await conductor.get(f"/{endpoint}", params=params,
312 headers=http_headers)
314 return response.text, response.status_code