1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 from psycopg import sql as pysql
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
21 class NominatimEnvironment:
22 """ Collects all functions for the execution of Nominatim functions.
25 def __init__(self, config):
26 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27 self.db_host = config['DB_HOST']
28 self.db_port = config['DB_PORT']
29 self.db_user = config['DB_USER']
30 self.db_pass = config['DB_PASS']
31 self.template_db = config['TEMPLATE_DB']
32 self.test_db = config['TEST_DB']
33 self.api_test_db = config['API_TEST_DB']
34 self.api_test_file = config['API_TEST_FILE']
35 self.tokenizer = config['TOKENIZER']
36 self.import_style = config['STYLE']
37 self.reuse_template = not config['REMOVE_TEMPLATE']
38 self.keep_scenario_db = config['KEEP_TEST_DB']
40 self.default_config = Configuration(None).get_os_env()
42 self.template_db_done = False
43 self.api_db_done = False
44 self.website_dir = None
46 if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
47 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
48 self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
50 def connect_database(self, dbname):
51 """ Return a connection to the database with the given name.
52 Uses configured host, user and port.
54 dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
56 dbargs['host'] = self.db_host
58 dbargs['port'] = self.db_port
60 dbargs['user'] = self.db_user
62 dbargs['password'] = self.db_pass
63 return psycopg.connect(**dbargs)
66 def write_nominatim_config(self, dbname):
67 """ Set up a custom test configuration that connects to the given
68 database. This sets up the environment variables so that they can
69 be picked up by dotenv and creates a project directory with the
70 appropriate website scripts.
72 if dbname.startswith('sqlite:'):
73 dsn = 'sqlite:dbname={}'.format(dbname[7:])
75 dsn = 'pgsql:dbname={}'.format(dbname)
77 dsn += ';host=' + self.db_host
79 dsn += ';port=' + self.db_port
81 dsn += ';user=' + self.db_user
83 dsn += ';password=' + self.db_pass
85 self.test_env = dict(self.default_config)
86 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
87 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
88 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
89 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
90 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
91 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
92 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
93 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
94 if self.tokenizer is not None:
95 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
96 if self.import_style is not None:
97 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
99 if self.website_dir is not None:
100 self.website_dir.cleanup()
102 self.website_dir = tempfile.TemporaryDirectory()
105 def get_test_config(self):
106 cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
109 def get_libpq_dsn(self):
110 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
112 def quote_param(param):
113 key, val = param.split('=')
114 val = val.replace('\\', '\\\\').replace("'", "\\'")
116 val = "'" + val + "'"
117 return key + '=' + val
119 if dsn.startswith('pgsql:'):
120 # Old PHP DSN format. Convert before returning.
121 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
126 def db_drop_database(self, name):
127 """ Drop the database with the given name.
129 with self.connect_database('postgres') as conn:
130 conn.autocommit = True
131 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
132 + pysql.Identifier(name))
134 def setup_template_db(self):
135 """ Setup a template database that already contains common test data.
136 Having a template database speeds up tests considerably but at
137 the price that the tests sometimes run with stale data.
139 if self.template_db_done:
142 self.template_db_done = True
144 self.write_nominatim_config(self.template_db)
146 if not self._reuse_or_drop_db(self.template_db):
148 # execute nominatim import on an empty file to get the right tables
149 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
150 fd.write(b'<osm version="0.6"></osm>')
152 self.run_nominatim('import', '--osm-file', fd.name,
153 '--osm2pgsql-cache', '1',
155 '--offline', '--index-noanalyse')
157 self.db_drop_database(self.template_db)
160 self.run_nominatim('refresh', '--functions')
163 def setup_api_db(self):
164 """ Setup a test against the API test database.
166 self.write_nominatim_config(self.api_test_db)
168 if self.api_test_db.startswith('sqlite:'):
171 if not self.api_db_done:
172 self.api_db_done = True
174 if not self._reuse_or_drop_db(self.api_test_db):
175 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
176 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
177 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
178 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
181 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
182 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
183 self.run_nominatim('freeze')
185 csv_path = str(testdata / 'full_en_phrases_test.csv')
186 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
188 self.db_drop_database(self.api_test_db)
191 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
194 def setup_unknown_db(self):
195 """ Setup a test against a non-existing database.
197 # The tokenizer needs an existing database to function.
198 # So start with the usual database
203 self.setup_db(context)
204 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
206 # Then drop the DB again
207 self.teardown_db(context, force_drop=True)
209 def setup_db(self, context):
210 """ Setup a test against a fresh, empty test database.
212 self.setup_template_db()
213 with self.connect_database(self.template_db) as conn:
214 conn.autocommit = True
215 conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
216 + pysql.Identifier(self.test_db))
217 conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
218 pysql.Identifier(self.test_db),
219 pysql.Identifier(self.template_db)))
221 self.write_nominatim_config(self.test_db)
222 context.db = self.connect_database(self.test_db)
223 context.db.autocommit = True
224 register_hstore(context.db)
226 def teardown_db(self, context, force_drop=False):
227 """ Remove the test database, if it exists.
229 if hasattr(context, 'db'):
232 if force_drop or not self.keep_scenario_db:
233 self.db_drop_database(self.test_db)
235 def _reuse_or_drop_db(self, name):
236 """ Check for the existence of the given DB. If reuse is enabled,
237 then the function checks for existnce and returns True if the
238 database is already there. Otherwise an existing database is
239 dropped and always false returned.
241 if self.reuse_template:
242 with self.connect_database('postgres') as conn:
243 num = execute_scalar(conn,
244 'select count(*) from pg_database where datname = %s',
249 self.db_drop_database(name)
254 def reindex_placex(self, db):
255 """ Run the indexing step until all data in the placex has
256 been processed. Indexing during updates can produce more data
257 to index under some circumstances. That is why indexing may have
258 to be run multiple times.
260 self.run_nominatim('index')
263 def run_nominatim(self, *cmdline):
264 """ Run the nominatim command-line tool via the library.
266 if self.website_dir is not None:
267 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
269 cli.nominatim(osm2pgsql_path=None,
271 environ=self.test_env)
274 def copy_from_place(self, db):
275 """ Copy data from place to the placex and location_property_osmline
276 tables invoking the appropriate triggers.
278 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
280 with db.cursor() as cur:
281 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
282 name, admin_level, address,
284 SELECT osm_type, osm_id, class, type,
285 name, admin_level, address,
288 WHERE not (class='place' and type='houses' and osm_type='W')""")
289 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
290 SELECT osm_id, address, geometry
292 WHERE class='place' and type='houses'
294 and ST_GeometryType(geometry) = 'ST_LineString'""")
297 def create_api_request_func_starlette(self):
298 import nominatim_api.server.starlette.server
299 from asgi_lifespan import LifespanManager
302 async def _request(endpoint, params, project_dir, environ, http_headers):
303 app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
305 async with LifespanManager(app):
306 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
307 response = await client.get(f"/{endpoint}", params=params,
308 headers=http_headers)
310 return response.text, response.status_code
315 def create_api_request_func_falcon(self):
316 import nominatim_api.server.falcon.server
317 import falcon.testing
319 async def _request(endpoint, params, project_dir, environ, http_headers):
320 app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
322 async with falcon.testing.ASGIConductor(app) as conductor:
323 response = await conductor.get(f"/{endpoint}", params=params,
324 headers=http_headers)
326 return response.text, response.status_code