]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/nominatim_environment.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / bdd / steps / nominatim_environment.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
8 import tempfile
9
10 import psycopg
11 from psycopg import sql as pysql
12
13 from nominatim_db import cli
14 from nominatim_db.config import Configuration
15 from nominatim_db.db.connection import register_hstore, execute_scalar
16 from nominatim_db.tokenizer import factory as tokenizer_factory
17
18
19 class NominatimEnvironment:
20     """ Collects all functions for the execution of Nominatim functions.
21     """
22
23     def __init__(self, config):
24         self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
25         self.db_host = config['DB_HOST']
26         self.db_port = config['DB_PORT']
27         self.db_user = config['DB_USER']
28         self.db_pass = config['DB_PASS']
29         self.template_db = config['TEMPLATE_DB']
30         self.test_db = config['TEST_DB']
31         self.api_test_db = config['API_TEST_DB']
32         self.api_test_file = config['API_TEST_FILE']
33         self.tokenizer = config['TOKENIZER']
34         self.import_style = config['STYLE']
35         self.reuse_template = not config['REMOVE_TEMPLATE']
36         self.keep_scenario_db = config['KEEP_TEST_DB']
37
38         self.default_config = Configuration(None).get_os_env()
39         self.test_env = None
40         self.template_db_done = False
41         self.api_db_done = False
42         self.website_dir = None
43
44         if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
45             raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
46         self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
47
48     def connect_database(self, dbname):
49         """ Return a connection to the database with the given name.
50             Uses configured host, user and port.
51         """
52         dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
53         if self.db_host:
54             dbargs['host'] = self.db_host
55         if self.db_port:
56             dbargs['port'] = self.db_port
57         if self.db_user:
58             dbargs['user'] = self.db_user
59         if self.db_pass:
60             dbargs['password'] = self.db_pass
61         return psycopg.connect(**dbargs)
62
63     def write_nominatim_config(self, dbname):
64         """ Set up a custom test configuration that connects to the given
65             database. This sets up the environment variables so that they can
66             be picked up by dotenv and creates a project directory with the
67             appropriate website scripts.
68         """
69         if dbname.startswith('sqlite:'):
70             dsn = 'sqlite:dbname={}'.format(dbname[7:])
71         else:
72             dsn = 'pgsql:dbname={}'.format(dbname)
73         if self.db_host:
74             dsn += ';host=' + self.db_host
75         if self.db_port:
76             dsn += ';port=' + self.db_port
77         if self.db_user:
78             dsn += ';user=' + self.db_user
79         if self.db_pass:
80             dsn += ';password=' + self.db_pass
81
82         self.test_env = dict(self.default_config)
83         self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
84         self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
85         self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
86         self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
87         self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
88         self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
89         self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
90         self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
91         if self.tokenizer is not None:
92             self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
93         if self.import_style is not None:
94             self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
95
96         if self.website_dir is not None:
97             self.website_dir.cleanup()
98
99         self.website_dir = tempfile.TemporaryDirectory()
100
101     def get_test_config(self):
102         cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
103         return cfg
104
105     def get_libpq_dsn(self):
106         dsn = self.test_env['NOMINATIM_DATABASE_DSN']
107
108         def quote_param(param):
109             key, val = param.split('=')
110             val = val.replace('\\', '\\\\').replace("'", "\\'")
111             if ' ' in val:
112                 val = "'" + val + "'"
113             return key + '=' + val
114
115         if dsn.startswith('pgsql:'):
116             # Old PHP DSN format. Convert before returning.
117             return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
118
119         return dsn
120
121     def db_drop_database(self, name):
122         """ Drop the database with the given name.
123         """
124         with self.connect_database('postgres') as conn:
125             conn.autocommit = True
126             conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
127                          + pysql.Identifier(name))
128
129     def setup_template_db(self):
130         """ Setup a template database that already contains common test data.
131             Having a template database speeds up tests considerably but at
132             the price that the tests sometimes run with stale data.
133         """
134         if self.template_db_done:
135             return
136
137         self.template_db_done = True
138
139         self.write_nominatim_config(self.template_db)
140
141         if not self._reuse_or_drop_db(self.template_db):
142             try:
143                 # execute nominatim import on an empty file to get the right tables
144                 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
145                     fd.write(b'<osm version="0.6"></osm>')
146                     fd.flush()
147                     self.run_nominatim('import', '--osm-file', fd.name,
148                                                  '--osm2pgsql-cache', '1',
149                                                  '--ignore-errors',
150                                                  '--offline', '--index-noanalyse')
151             except:  # noqa: E722
152                 self.db_drop_database(self.template_db)
153                 raise
154
155         self.run_nominatim('refresh', '--functions')
156
157     def setup_api_db(self):
158         """ Setup a test against the API test database.
159         """
160         self.write_nominatim_config(self.api_test_db)
161
162         if self.api_test_db.startswith('sqlite:'):
163             return
164
165         if not self.api_db_done:
166             self.api_db_done = True
167
168             if not self._reuse_or_drop_db(self.api_test_db):
169                 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
170                 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
171                 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
172                 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
173
174                 try:
175                     self.run_nominatim('import', '--osm-file', str(self.api_test_file))
176                     self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
177                     self.run_nominatim('freeze')
178
179                     csv_path = str(testdata / 'full_en_phrases_test.csv')
180                     self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
181                 except:  # noqa: E722
182                     self.db_drop_database(self.api_test_db)
183                     raise
184
185         tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
186
187     def setup_unknown_db(self):
188         """ Setup a test against a non-existing database.
189         """
190         # The tokenizer needs an existing database to function.
191         # So start with the usual database
192         class _Context:
193             db = None
194
195         context = _Context()
196         self.setup_db(context)
197         tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
198
199         # Then drop the DB again
200         self.teardown_db(context, force_drop=True)
201
202     def setup_db(self, context):
203         """ Setup a test against a fresh, empty test database.
204         """
205         self.setup_template_db()
206         with self.connect_database(self.template_db) as conn:
207             conn.autocommit = True
208             conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
209                          + pysql.Identifier(self.test_db))
210             conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
211                            pysql.Identifier(self.test_db),
212                            pysql.Identifier(self.template_db)))
213
214         self.write_nominatim_config(self.test_db)
215         context.db = self.connect_database(self.test_db)
216         context.db.autocommit = True
217         register_hstore(context.db)
218
219     def teardown_db(self, context, force_drop=False):
220         """ Remove the test database, if it exists.
221         """
222         if hasattr(context, 'db'):
223             context.db.close()
224
225         if force_drop or not self.keep_scenario_db:
226             self.db_drop_database(self.test_db)
227
228     def _reuse_or_drop_db(self, name):
229         """ Check for the existence of the given DB. If reuse is enabled,
230             then the function checks for existnce and returns True if the
231             database is already there. Otherwise an existing database is
232             dropped and always false returned.
233         """
234         if self.reuse_template:
235             with self.connect_database('postgres') as conn:
236                 num = execute_scalar(conn,
237                                      'select count(*) from pg_database where datname = %s',
238                                      (name,))
239                 if num == 1:
240                     return True
241         else:
242             self.db_drop_database(name)
243
244         return False
245
246     def reindex_placex(self, db):
247         """ Run the indexing step until all data in the placex has
248             been processed. Indexing during updates can produce more data
249             to index under some circumstances. That is why indexing may have
250             to be run multiple times.
251         """
252         self.run_nominatim('index')
253
254     def run_nominatim(self, *cmdline):
255         """ Run the nominatim command-line tool via the library.
256         """
257         if self.website_dir is not None:
258             cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
259
260         cli.nominatim(cli_args=cmdline,
261                       environ=self.test_env)
262
263     def copy_from_place(self, db):
264         """ Copy data from place to the placex and location_property_osmline
265             tables invoking the appropriate triggers.
266         """
267         self.run_nominatim('refresh', '--functions', '--no-diff-updates')
268
269         with db.cursor() as cur:
270             cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
271                                                name, admin_level, address,
272                                                extratags, geometry)
273                              SELECT osm_type, osm_id, class, type,
274                                     name, admin_level, address,
275                                     extratags, geometry
276                                FROM place
277                                WHERE not (class='place' and type='houses' and osm_type='W')""")
278             cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
279                              SELECT osm_id, address, geometry
280                                FROM place
281                               WHERE class='place' and type='houses'
282                                     and osm_type='W'
283                                     and ST_GeometryType(geometry) = 'ST_LineString'""")
284
285     def create_api_request_func_starlette(self):
286         import nominatim_api.server.starlette.server
287         from asgi_lifespan import LifespanManager
288         import httpx
289
290         async def _request(endpoint, params, project_dir, environ, http_headers):
291             app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
292
293             async with LifespanManager(app):
294                 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
295                     response = await client.get(f"/{endpoint}", params=params,
296                                                 headers=http_headers)
297
298             return response.text, response.status_code
299
300         return _request
301
302     def create_api_request_func_falcon(self):
303         import nominatim_api.server.falcon.server
304         import falcon.testing
305
306         async def _request(endpoint, params, project_dir, environ, http_headers):
307             app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
308
309             async with falcon.testing.ASGIConductor(app) as conductor:
310                 response = await conductor.get(f"/{endpoint}", params=params,
311                                                headers=http_headers)
312
313             return response.text, response.status_code
314
315         return _request