]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/nominatim_environment.py
explicit export for functions in flex-base
[nominatim.git] / test / bdd / steps / nominatim_environment.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
8 import sys
9 import tempfile
10
11 import psycopg2
12 import psycopg2.extras
13
14 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
15
16 from nominatim import cli
17 from nominatim.config import Configuration
18 from nominatim.db.connection import Connection
19 from nominatim.tools import refresh
20 from nominatim.tokenizer import factory as tokenizer_factory
21 from steps.utils import run_script
22
23 class NominatimEnvironment:
24     """ Collects all functions for the execution of Nominatim functions.
25     """
26
27     def __init__(self, config):
28         self.build_dir = Path(config['BUILDDIR']).resolve()
29         self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
30         self.db_host = config['DB_HOST']
31         self.db_port = config['DB_PORT']
32         self.db_user = config['DB_USER']
33         self.db_pass = config['DB_PASS']
34         self.template_db = config['TEMPLATE_DB']
35         self.test_db = config['TEST_DB']
36         self.api_test_db = config['API_TEST_DB']
37         self.api_test_file = config['API_TEST_FILE']
38         self.tokenizer = config['TOKENIZER']
39         self.import_style = config['STYLE']
40         self.server_module_path = config['SERVER_MODULE_PATH']
41         self.reuse_template = not config['REMOVE_TEMPLATE']
42         self.keep_scenario_db = config['KEEP_TEST_DB']
43         self.code_coverage_path = config['PHPCOV']
44         self.code_coverage_id = 1
45
46         self.default_config = Configuration(None).get_os_env()
47         self.test_env = None
48         self.template_db_done = False
49         self.api_db_done = False
50         self.website_dir = None
51
52     def connect_database(self, dbname):
53         """ Return a connection to the database with the given name.
54             Uses configured host, user and port.
55         """
56         dbargs = {'database': dbname}
57         if self.db_host:
58             dbargs['host'] = self.db_host
59         if self.db_port:
60             dbargs['port'] = self.db_port
61         if self.db_user:
62             dbargs['user'] = self.db_user
63         if self.db_pass:
64             dbargs['password'] = self.db_pass
65         conn = psycopg2.connect(connection_factory=Connection, **dbargs)
66         return conn
67
68     def next_code_coverage_file(self):
69         """ Generate the next name for a coverage file.
70         """
71         fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
72         self.code_coverage_id += 1
73
74         return fn.resolve()
75
76     def write_nominatim_config(self, dbname):
77         """ Set up a custom test configuration that connects to the given
78             database. This sets up the environment variables so that they can
79             be picked up by dotenv and creates a project directory with the
80             appropriate website scripts.
81         """
82         dsn = 'pgsql:dbname={}'.format(dbname)
83         if self.db_host:
84             dsn += ';host=' + self.db_host
85         if self.db_port:
86             dsn += ';port=' + self.db_port
87         if self.db_user:
88             dsn += ';user=' + self.db_user
89         if self.db_pass:
90             dsn += ';password=' + self.db_pass
91
92         if self.website_dir is not None \
93            and self.test_env is not None \
94            and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
95             return # environment already set uo
96
97         self.test_env = dict(self.default_config)
98         self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
99         self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
100         self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
101         self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
102         self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
103         self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
104         self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
105         self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
106         self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
107         self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
108         if self.tokenizer is not None:
109             self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
110         if self.import_style is not None:
111             self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
112
113         if self.server_module_path:
114             self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
115         else:
116             # avoid module being copied into the temporary environment
117             self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
118
119         if self.website_dir is not None:
120             self.website_dir.cleanup()
121
122         self.website_dir = tempfile.TemporaryDirectory()
123
124         try:
125             conn = self.connect_database(dbname)
126         except:
127             conn = False
128         refresh.setup_website(Path(self.website_dir.name) / 'website',
129                               self.get_test_config(), conn)
130
131
132     def get_test_config(self):
133         cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
134         cfg.set_libdirs(module=self.build_dir / 'module',
135                         osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
136         return cfg
137
138     def get_libpq_dsn(self):
139         dsn = self.test_env['NOMINATIM_DATABASE_DSN']
140
141         def quote_param(param):
142             key, val = param.split('=')
143             val = val.replace('\\', '\\\\').replace("'", "\\'")
144             if ' ' in val:
145                 val = "'" + val + "'"
146             return key + '=' + val
147
148         if dsn.startswith('pgsql:'):
149             # Old PHP DSN format. Convert before returning.
150             return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
151
152         return dsn
153
154
155     def db_drop_database(self, name):
156         """ Drop the database with the given name.
157         """
158         conn = self.connect_database('postgres')
159         conn.set_isolation_level(0)
160         cur = conn.cursor()
161         cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
162         conn.close()
163
164     def setup_template_db(self):
165         """ Setup a template database that already contains common test data.
166             Having a template database speeds up tests considerably but at
167             the price that the tests sometimes run with stale data.
168         """
169         if self.template_db_done:
170             return
171
172         self.template_db_done = True
173
174         self.write_nominatim_config(self.template_db)
175
176         if not self._reuse_or_drop_db(self.template_db):
177             try:
178                 # execute nominatim import on an empty file to get the right tables
179                 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
180                     fd.write(b'<osm version="0.6"></osm>')
181                     fd.flush()
182                     self.run_nominatim('import', '--osm-file', fd.name,
183                                                  '--osm2pgsql-cache', '1',
184                                                  '--ignore-errors',
185                                                  '--offline', '--index-noanalyse')
186             except:
187                 self.db_drop_database(self.template_db)
188                 raise
189
190         self.run_nominatim('refresh', '--functions')
191
192
193     def setup_api_db(self):
194         """ Setup a test against the API test database.
195         """
196         self.write_nominatim_config(self.api_test_db)
197
198         if not self.api_db_done:
199             self.api_db_done = True
200
201             if not self._reuse_or_drop_db(self.api_test_db):
202                 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
203                 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
204                 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
205                 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
206
207                 try:
208                     self.run_nominatim('import', '--osm-file', str(self.api_test_file))
209                     self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
210                     self.run_nominatim('freeze')
211
212                     if self.tokenizer == 'legacy':
213                         phrase_file = str(testdata / 'specialphrases_testdb.sql')
214                         run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
215                     else:
216                         csv_path = str(testdata / 'full_en_phrases_test.csv')
217                         self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
218                 except:
219                     self.db_drop_database(self.api_test_db)
220                     raise
221
222         tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
223
224
225     def setup_unknown_db(self):
226         """ Setup a test against a non-existing database.
227         """
228         # The tokenizer needs an existing database to function.
229         # So start with the usual database
230         class _Context:
231             db = None
232
233         context = _Context()
234         self.setup_db(context)
235         tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
236
237         # Then drop the DB again
238         self.teardown_db(context, force_drop=True)
239
240     def setup_db(self, context):
241         """ Setup a test against a fresh, empty test database.
242         """
243         self.setup_template_db()
244         conn = self.connect_database(self.template_db)
245         conn.set_isolation_level(0)
246         cur = conn.cursor()
247         cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
248         cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
249         conn.close()
250         self.write_nominatim_config(self.test_db)
251         context.db = self.connect_database(self.test_db)
252         context.db.autocommit = True
253         psycopg2.extras.register_hstore(context.db, globally=False)
254
255     def teardown_db(self, context, force_drop=False):
256         """ Remove the test database, if it exists.
257         """
258         if hasattr(context, 'db'):
259             context.db.close()
260
261         if force_drop or not self.keep_scenario_db:
262             self.db_drop_database(self.test_db)
263
264     def _reuse_or_drop_db(self, name):
265         """ Check for the existance of the given DB. If reuse is enabled,
266             then the function checks for existance and returns True if the
267             database is already there. Otherwise an existing database is
268             dropped and always false returned.
269         """
270         if self.reuse_template:
271             conn = self.connect_database('postgres')
272             with conn.cursor() as cur:
273                 cur.execute('select count(*) from pg_database where datname = %s',
274                             (name,))
275                 if cur.fetchone()[0] == 1:
276                     return True
277             conn.close()
278         else:
279             self.db_drop_database(name)
280
281         return False
282
283     def reindex_placex(self, db):
284         """ Run the indexing step until all data in the placex has
285             been processed. Indexing during updates can produce more data
286             to index under some circumstances. That is why indexing may have
287             to be run multiple times.
288         """
289         with db.cursor() as cur:
290             while True:
291                 self.run_nominatim('index')
292
293                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
294                 if cur.rowcount == 0:
295                     return
296
297     def run_nominatim(self, *cmdline):
298         """ Run the nominatim command-line tool via the library.
299         """
300         if self.website_dir is not None:
301             cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
302
303         cli.nominatim(module_dir='',
304                       osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
305                       cli_args=cmdline,
306                       phpcgi_path='',
307                       environ=self.test_env)
308
309
310     def copy_from_place(self, db):
311         """ Copy data from place to the placex and location_property_osmline
312             tables invoking the appropriate triggers.
313         """
314         self.run_nominatim('refresh', '--functions', '--no-diff-updates')
315
316         with db.cursor() as cur:
317             cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
318                                                name, admin_level, address,
319                                                extratags, geometry)
320                              SELECT osm_type, osm_id, class, type,
321                                     name, admin_level, address,
322                                     extratags, geometry
323                                FROM place
324                                WHERE not (class='place' and type='houses' and osm_type='W')""")
325             cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
326                              SELECT osm_id, address, geometry
327                                FROM place
328                               WHERE class='place' and type='houses'
329                                     and osm_type='W'
330                                     and ST_GeometryType(geometry) = 'ST_LineString'""")