]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/nominatim_environment.py
88a4f11c73ebc5ec135b0443660cce3f14753731
[nominatim.git] / test / bdd / steps / nominatim_environment.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
8 import importlib
9 import tempfile
10
11 import psycopg
12 from psycopg import sql as pysql
13
14 from nominatim_db import cli
15 from nominatim_db.config import Configuration
16 from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
17 from nominatim_db.tools import refresh
18 from nominatim_db.tokenizer import factory as tokenizer_factory
19 from steps.utils import run_script
20
21 class NominatimEnvironment:
22     """ Collects all functions for the execution of Nominatim functions.
23     """
24
25     def __init__(self, config):
26         self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
27         self.db_host = config['DB_HOST']
28         self.db_port = config['DB_PORT']
29         self.db_user = config['DB_USER']
30         self.db_pass = config['DB_PASS']
31         self.template_db = config['TEMPLATE_DB']
32         self.test_db = config['TEST_DB']
33         self.api_test_db = config['API_TEST_DB']
34         self.api_test_file = config['API_TEST_FILE']
35         self.tokenizer = config['TOKENIZER']
36         self.import_style = config['STYLE']
37         self.server_module_path = config['SERVER_MODULE_PATH']
38         self.reuse_template = not config['REMOVE_TEMPLATE']
39         self.keep_scenario_db = config['KEEP_TEST_DB']
40         self.code_coverage_path = config['PHPCOV']
41         self.code_coverage_id = 1
42
43         self.default_config = Configuration(None).get_os_env()
44         self.test_env = None
45         self.template_db_done = False
46         self.api_db_done = False
47         self.website_dir = None
48
49         self.api_engine = None
50         if config['API_ENGINE'] != 'php':
51             if not hasattr(self, f"create_api_request_func_{config['API_ENGINE']}"):
52                 raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
53             self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
54
55         if self.tokenizer == 'legacy' and self.server_module_path is None:
56             raise RuntimeError("You must set -DSERVER_MODULE_PATH when testing the legacy tokenizer.")
57
58     def connect_database(self, dbname):
59         """ Return a connection to the database with the given name.
60             Uses configured host, user and port.
61         """
62         dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
63         if self.db_host:
64             dbargs['host'] = self.db_host
65         if self.db_port:
66             dbargs['port'] = self.db_port
67         if self.db_user:
68             dbargs['user'] = self.db_user
69         if self.db_pass:
70             dbargs['password'] = self.db_pass
71         return psycopg.connect(**dbargs)
72
73     def next_code_coverage_file(self):
74         """ Generate the next name for a coverage file.
75         """
76         fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
77         self.code_coverage_id += 1
78
79         return fn.resolve()
80
81     def write_nominatim_config(self, dbname):
82         """ Set up a custom test configuration that connects to the given
83             database. This sets up the environment variables so that they can
84             be picked up by dotenv and creates a project directory with the
85             appropriate website scripts.
86         """
87         if dbname.startswith('sqlite:'):
88             dsn = 'sqlite:dbname={}'.format(dbname[7:])
89         else:
90             dsn = 'pgsql:dbname={}'.format(dbname)
91         if self.db_host:
92             dsn += ';host=' + self.db_host
93         if self.db_port:
94             dsn += ';port=' + self.db_port
95         if self.db_user:
96             dsn += ';user=' + self.db_user
97         if self.db_pass:
98             dsn += ';password=' + self.db_pass
99
100         self.test_env = dict(self.default_config)
101         self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
102         self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
103         self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
104         self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
105         self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
106         self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
107         self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
108         self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
109         if self.tokenizer is not None:
110             self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
111         if self.import_style is not None:
112             self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
113
114         if self.server_module_path:
115             self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
116
117         if self.website_dir is not None:
118             self.website_dir.cleanup()
119
120         self.website_dir = tempfile.TemporaryDirectory()
121
122         try:
123             conn = self.connect_database(dbname)
124         except:
125             conn = False
126         refresh.setup_website(Path(self.website_dir.name) / 'website',
127                               self.get_test_config(), conn)
128         if conn:
129             conn.close()
130
131
132     def get_test_config(self):
133         cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
134         cfg.set_libdirs(module=self.server_module_path)
135         return cfg
136
137     def get_libpq_dsn(self):
138         dsn = self.test_env['NOMINATIM_DATABASE_DSN']
139
140         def quote_param(param):
141             key, val = param.split('=')
142             val = val.replace('\\', '\\\\').replace("'", "\\'")
143             if ' ' in val:
144                 val = "'" + val + "'"
145             return key + '=' + val
146
147         if dsn.startswith('pgsql:'):
148             # Old PHP DSN format. Convert before returning.
149             return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
150
151         return dsn
152
153
154     def db_drop_database(self, name):
155         """ Drop the database with the given name.
156         """
157         with self.connect_database('postgres') as conn:
158             conn.autocommit = True
159             conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
160                          +  pysql.Identifier(name))
161
162     def setup_template_db(self):
163         """ Setup a template database that already contains common test data.
164             Having a template database speeds up tests considerably but at
165             the price that the tests sometimes run with stale data.
166         """
167         if self.template_db_done:
168             return
169
170         self.template_db_done = True
171
172         self.write_nominatim_config(self.template_db)
173
174         if not self._reuse_or_drop_db(self.template_db):
175             try:
176                 # execute nominatim import on an empty file to get the right tables
177                 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
178                     fd.write(b'<osm version="0.6"></osm>')
179                     fd.flush()
180                     self.run_nominatim('import', '--osm-file', fd.name,
181                                                  '--osm2pgsql-cache', '1',
182                                                  '--ignore-errors',
183                                                  '--offline', '--index-noanalyse')
184             except:
185                 self.db_drop_database(self.template_db)
186                 raise
187
188         self.run_nominatim('refresh', '--functions')
189
190
191     def setup_api_db(self):
192         """ Setup a test against the API test database.
193         """
194         self.write_nominatim_config(self.api_test_db)
195
196         if self.api_test_db.startswith('sqlite:'):
197             return
198
199         if not self.api_db_done:
200             self.api_db_done = True
201
202             if not self._reuse_or_drop_db(self.api_test_db):
203                 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
204                 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
205                 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
206                 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
207
208                 try:
209                     self.run_nominatim('import', '--osm-file', str(self.api_test_file))
210                     self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
211                     self.run_nominatim('freeze')
212
213                     if self.tokenizer == 'legacy':
214                         phrase_file = str(testdata / 'specialphrases_testdb.sql')
215                         run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
216                     else:
217                         csv_path = str(testdata / 'full_en_phrases_test.csv')
218                         self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
219                 except:
220                     self.db_drop_database(self.api_test_db)
221                     raise
222
223         tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
224
225
226     def setup_unknown_db(self):
227         """ Setup a test against a non-existing database.
228         """
229         # The tokenizer needs an existing database to function.
230         # So start with the usual database
231         class _Context:
232             db = None
233
234         context = _Context()
235         self.setup_db(context)
236         tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
237
238         # Then drop the DB again
239         self.teardown_db(context, force_drop=True)
240
241     def setup_db(self, context):
242         """ Setup a test against a fresh, empty test database.
243         """
244         self.setup_template_db()
245         with self.connect_database(self.template_db) as conn:
246             conn.autocommit = True
247             conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
248                                    + pysql.Identifier(self.test_db))
249             conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
250                            pysql.Identifier(self.test_db),
251                            pysql.Identifier(self.template_db)))
252
253         self.write_nominatim_config(self.test_db)
254         context.db = self.connect_database(self.test_db)
255         context.db.autocommit = True
256         register_hstore(context.db)
257
258     def teardown_db(self, context, force_drop=False):
259         """ Remove the test database, if it exists.
260         """
261         if hasattr(context, 'db'):
262             context.db.close()
263
264         if force_drop or not self.keep_scenario_db:
265             self.db_drop_database(self.test_db)
266
267     def _reuse_or_drop_db(self, name):
268         """ Check for the existence of the given DB. If reuse is enabled,
269             then the function checks for existnce and returns True if the
270             database is already there. Otherwise an existing database is
271             dropped and always false returned.
272         """
273         if self.reuse_template:
274             with self.connect_database('postgres') as conn:
275                 num = execute_scalar(conn,
276                                      'select count(*) from pg_database where datname = %s',
277                                      (name,))
278                 if num == 1:
279                     return True
280         else:
281             self.db_drop_database(name)
282
283         return False
284
285
286     def reindex_placex(self, db):
287         """ Run the indexing step until all data in the placex has
288             been processed. Indexing during updates can produce more data
289             to index under some circumstances. That is why indexing may have
290             to be run multiple times.
291         """
292         self.run_nominatim('index')
293
294
295     def run_nominatim(self, *cmdline):
296         """ Run the nominatim command-line tool via the library.
297         """
298         if self.website_dir is not None:
299             cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
300
301         cli.nominatim(module_dir=self.server_module_path,
302                       osm2pgsql_path=None,
303                       cli_args=cmdline,
304                       environ=self.test_env)
305
306
307     def copy_from_place(self, db):
308         """ Copy data from place to the placex and location_property_osmline
309             tables invoking the appropriate triggers.
310         """
311         self.run_nominatim('refresh', '--functions', '--no-diff-updates')
312
313         with db.cursor() as cur:
314             cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
315                                                name, admin_level, address,
316                                                extratags, geometry)
317                              SELECT osm_type, osm_id, class, type,
318                                     name, admin_level, address,
319                                     extratags, geometry
320                                FROM place
321                                WHERE not (class='place' and type='houses' and osm_type='W')""")
322             cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
323                              SELECT osm_id, address, geometry
324                                FROM place
325                               WHERE class='place' and type='houses'
326                                     and osm_type='W'
327                                     and ST_GeometryType(geometry) = 'ST_LineString'""")
328
329
330     def create_api_request_func_starlette(self):
331         import nominatim_api.server.starlette.server
332         from asgi_lifespan import LifespanManager
333         import httpx
334
335         async def _request(endpoint, params, project_dir, environ, http_headers):
336             app = nominatim_api.server.starlette.server.get_application(project_dir, environ)
337
338             async with LifespanManager(app):
339                 async with httpx.AsyncClient(app=app, base_url="http://nominatim.test") as client:
340                     response = await client.get(f"/{endpoint}", params=params,
341                                                 headers=http_headers)
342
343             return response.text, response.status_code
344
345         return _request
346
347
348     def create_api_request_func_falcon(self):
349         import nominatim_api.server.falcon.server
350         import falcon.testing
351
352         async def _request(endpoint, params, project_dir, environ, http_headers):
353             app = nominatim_api.server.falcon.server.get_application(project_dir, environ)
354
355             async with falcon.testing.ASGIConductor(app) as conductor:
356                 response = await conductor.get(f"/{endpoint}", params=params,
357                                                headers=http_headers)
358
359             return response.text, response.status_code
360
361         return _request
362
363
364