1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from psycopg import sql as pysql
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
21 class NominatimEnvironment:
22 """ Collects all functions for the execution of Nominatim functions.
25 def __init__(self, config):
26 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27 self.db_host = config['DB_HOST']
28 self.db_port = config['DB_PORT']
29 self.db_user = config['DB_USER']
30 self.db_pass = config['DB_PASS']
31 self.template_db = config['TEMPLATE_DB']
32 self.test_db = config['TEST_DB']
33 self.api_test_db = config['API_TEST_DB']
34 self.api_test_file = config['API_TEST_FILE']
35 self.tokenizer = config['TOKENIZER']
36 self.import_style = config['STYLE']
37 self.reuse_template = not config['REMOVE_TEMPLATE']
38 self.keep_scenario_db = config['KEEP_TEST_DB']
40 self.default_config = Configuration(None).get_os_env()
42 self.template_db_done = False
43 self.api_db_done = False
44 self.website_dir = None
46 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
47 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
48 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
50 def connect_database(self, dbname):
51 """ Return a connection to the database with the given name.
52 Uses configured host, user and port.
54 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
56 dbargs['host'] = self.db_host
58 dbargs['port'] = self.db_port
60 dbargs['user'] = self.db_user
62 dbargs['password'] = self.db_pass
63 return psycopg.connect(**dbargs)
66 def write_nominatim_config(self, dbname):
67 """ Set up a custom test configuration that connects to the given
68 database. This sets up the environment variables so that they can
69 be picked up by dotenv and creates a project directory with the
70 appropriate website scripts.
72 if dbname.startswith('sqlite:'):
73 dsn = 'sqlite:dbname={}'.format(dbname[7:])
75 dsn = 'pgsql:dbname={}'.format(dbname)
77 dsn += ';host=' + self.db_host
79 dsn += ';port=' + self.db_port
81 dsn += ';user=' + self.db_user
83 dsn += ';password=' + self.db_pass
85 self.test_env = dict(self.default_config)
86 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
87 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
88 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
89 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
90 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
91 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
92 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
93 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
94 if self.tokenizer is not None:
95 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
96 if self.import_style is not None:
97 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
99 if self.website_dir is not None:
100 self.website_dir.cleanup()
102 self.website_dir = tempfile.TemporaryDirectory()
105 def get_test_config(self):
106 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
109 def get_libpq_dsn(self):
110 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
112 def quote_param(param):
113 key, val = param.split('=')
114 val = val.replace('\\', '\\\\').replace("'", "\\'")
116 val = "'" + val + "'"
117 return key + '=' + val
119 if dsn.startswith('pgsql:'):
120 # Old PHP DSN format. Convert before returning.
121 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
126 def db_drop_database(self, name):
127 """ Drop the database with the given name.
129 with self.connect_database('postgres') as conn:
130 conn.autocommit = True
131 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
132 + pysql.Identifier(name))
134 def setup_template_db(self):
135 """ Setup a template database that already contains common test data.
136 Having a template database speeds up tests considerably but at
137 the price that the tests sometimes run with stale data.
139 if self.template_db_done:
142 self.template_db_done = True
144 self.write_nominatim_config(self.template_db)
146 if not self._reuse_or_drop_db(self.template_db):
148 # execute nominatim import on an empty file to get the right tables
149 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
150 fd.write(b'<osm version="0.6"></osm>')
152 self.run_nominatim('import', '--osm-file', fd.name,
153 '--osm2pgsql-cache', '1',
155 '--offline', '--index-noanalyse')
157 self.db_drop_database(self.template_db)
160 self.run_nominatim('refresh', '--functions')
163 def setup_api_db(self):
164 """ Setup a test against the API test database.
166 self.write_nominatim_config(self.api_test_db)
168 if self.api_test_db.startswith('sqlite:'):
171 if not self.api_db_done:
172 self.api_db_done = True
174 if not self._reuse_or_drop_db(self.api_test_db):
175 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
176 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
177 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
178 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
181 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
182 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
183 self.run_nominatim('freeze')
185 csv_path = str(testdata / 'full_en_phrases_test.csv')
186 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
188 self.db_drop_database(self.api_test_db)
191 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
194 def setup_unknown_db(self):
195 """ Setup a test against a non-existing database.
197 # The tokenizer needs an existing database to function.
198 # So start with the usual database
203 self.setup_db(context)
204 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
206 # Then drop the DB again
207 self.teardown_db(context, force_drop=True)
209 def setup_db(self, context):
210 """ Setup a test against a fresh, empty test database.
212 self.setup_template_db()
213 with self.connect_database(self.template_db) as conn:
214 conn.autocommit = True
215 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
216 + pysql.Identifier(self.test_db))
217 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
218 pysql.Identifier(self.test_db),
219 pysql.Identifier(self.template_db)))
221 self.write_nominatim_config(self.test_db)
222 context.db = self.connect_database(self.test_db)
223 context.db.autocommit = True
224 register_hstore(context.db)
226 def teardown_db(self, context, force_drop=False):
227 """ Remove the test database, if it exists.
229 if hasattr(context, 'db'):
232 if force_drop or not self.keep_scenario_db:
233 self.db_drop_database(self.test_db)
235 def _reuse_or_drop_db(self, name):
236 """ Check for the existence of the given DB. If reuse is enabled,
237 then the function checks for existnce and returns True if the
238 database is already there. Otherwise an existing database is
239 dropped and always false returned.
241 if self.reuse_template:
242 with self.connect_database('postgres') as conn:
243 num = execute_scalar(conn,
244 'select count(*) from pg_database where datname = %s',
249 self.db_drop_database(name)
254 def reindex_placex(self, db):
255 """ Run the indexing step until all data in the placex has
256 been processed. Indexing during updates can produce more data
257 to index under some circumstances. That is why indexing may have
258 to be run multiple times.
260 self.run_nominatim('index')
263 def run_nominatim(self, *cmdline):
264 """ Run the nominatim command-line tool via the library.
266 if self.website_dir is not None:
267 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
269 cli.nominatim(module_dir=None,
272 environ=self.test_env)
275 def copy_from_place(self, db):
276 """ Copy data from place to the placex and location_property_osmline
277 tables invoking the appropriate triggers.
279 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
281 with db.cursor() as cur:
282 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
283 name, admin_level, address,
285 SELECT osm_type, osm_id, class, type,
286 name, admin_level, address,
289 WHERE not (class='place' and type='houses' and osm_type='W')""")
290 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
291 SELECT osm_id, address, geometry
293 WHERE class='place' and type='houses'
295 and ST_GeometryType(geometry) = 'ST_LineString'""")
298 def create_api_request_func_starlette(self):
299 import nominatim_api.server.starlette.server
300 from asgi_lifespan import LifespanManager
303 async def _request(endpoint, params, project_dir, environ, http_headers):
304 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
306 async with LifespanManager(app):
307 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
308 response = await client.get(f"/{endpoint}", params=params,
309 headers=http_headers)
311 return response.text, response.status_code
316 def create_api_request_func_falcon(self):
317 import nominatim_api.server.falcon.server
318 import falcon.testing
320 async def _request(endpoint, params, project_dir, environ, http_headers):
321 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
323 async with falcon.testing.ASGIConductor(app) as conductor:
324 response = await conductor.get(f"/{endpoint}", params=params,
325 headers=http_headers)
327 return response.text, response.status_code