]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/nominatim_environment.py
Add tests for the new SPWikiLoader and SPCsvLoader
[nominatim.git] / test / bdd / steps / nominatim_environment.py
1 from pathlib import Path
2 import sys
3 import tempfile
4
5 import psycopg2
6 import psycopg2.extras
7
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
9
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.tools import refresh
13 from nominatim.tokenizer import factory as tokenizer_factory
14 from steps.utils import run_script
15
16 class NominatimEnvironment:
17     """ Collects all functions for the execution of Nominatim functions.
18     """
19
20     def __init__(self, config):
21         self.build_dir = Path(config['BUILDDIR']).resolve()
22         self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
23         self.db_host = config['DB_HOST']
24         self.db_port = config['DB_PORT']
25         self.db_user = config['DB_USER']
26         self.db_pass = config['DB_PASS']
27         self.template_db = config['TEMPLATE_DB']
28         self.test_db = config['TEST_DB']
29         self.api_test_db = config['API_TEST_DB']
30         self.api_test_file = config['API_TEST_FILE']
31         self.tokenizer = config['TOKENIZER']
32         self.server_module_path = config['SERVER_MODULE_PATH']
33         self.reuse_template = not config['REMOVE_TEMPLATE']
34         self.keep_scenario_db = config['KEEP_TEST_DB']
35         self.code_coverage_path = config['PHPCOV']
36         self.code_coverage_id = 1
37
38         self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
39         self.test_env = None
40         self.template_db_done = False
41         self.api_db_done = False
42         self.website_dir = None
43
44     def connect_database(self, dbname):
45         """ Return a connection to the database with the given name.
46             Uses configured host, user and port.
47         """
48         dbargs = {'database': dbname}
49         if self.db_host:
50             dbargs['host'] = self.db_host
51         if self.db_port:
52             dbargs['port'] = self.db_port
53         if self.db_user:
54             dbargs['user'] = self.db_user
55         if self.db_pass:
56             dbargs['password'] = self.db_pass
57         conn = psycopg2.connect(**dbargs)
58         return conn
59
60     def next_code_coverage_file(self):
61         """ Generate the next name for a coverage file.
62         """
63         fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
64         self.code_coverage_id += 1
65
66         return fn.resolve()
67
68     def write_nominatim_config(self, dbname):
69         """ Set up a custom test configuration that connects to the given
70             database. This sets up the environment variables so that they can
71             be picked up by dotenv and creates a project directory with the
72             appropriate website scripts.
73         """
74         dsn = 'pgsql:dbname={}'.format(dbname)
75         if self.db_host:
76             dsn += ';host=' + self.db_host
77         if self.db_port:
78             dsn += ';port=' + self.db_port
79         if self.db_user:
80             dsn += ';user=' + self.db_user
81         if self.db_pass:
82             dsn += ';password=' + self.db_pass
83
84         if self.website_dir is not None \
85            and self.test_env is not None \
86            and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
87             return # environment already set uo
88
89         self.test_env = dict(self.default_config)
90         self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
91         self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
92         self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
93         self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
94         self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
95         self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
96         self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
97         self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
98         self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
99         self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
100         if self.tokenizer is not None:
101             self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
102
103         if self.server_module_path:
104             self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
105         else:
106             # avoid module being copied into the temporary environment
107             self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
108
109         if self.website_dir is not None:
110             self.website_dir.cleanup()
111
112         self.website_dir = tempfile.TemporaryDirectory()
113         refresh.setup_website(Path(self.website_dir.name) / 'website',
114                               self.get_test_config())
115
116
117     def get_test_config(self):
118         cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
119                             environ=self.test_env)
120         cfg.set_libdirs(module=self.build_dir / 'module',
121                         osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
122                         php=self.src_dir / 'lib-php',
123                         sql=self.src_dir / 'lib-sql',
124                         data=self.src_dir / 'data')
125         return cfg
126
127     def get_libpq_dsn(self):
128         dsn = self.test_env['NOMINATIM_DATABASE_DSN']
129
130         def quote_param(param):
131             key, val = param.split('=')
132             val = val.replace('\\', '\\\\').replace("'", "\\'")
133             if ' ' in val:
134                 val = "'" + val + "'"
135             return key + '=' + val
136
137         if dsn.startswith('pgsql:'):
138             # Old PHP DSN format. Convert before returning.
139             return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
140
141         return dsn
142
143
144     def db_drop_database(self, name):
145         """ Drop the database with the given name.
146         """
147         conn = self.connect_database('postgres')
148         conn.set_isolation_level(0)
149         cur = conn.cursor()
150         cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
151         conn.close()
152
153     def setup_template_db(self):
154         """ Setup a template database that already contains common test data.
155             Having a template database speeds up tests considerably but at
156             the price that the tests sometimes run with stale data.
157         """
158         if self.template_db_done:
159             return
160
161         self.template_db_done = True
162
163         if self._reuse_or_drop_db(self.template_db):
164             return
165
166         self.write_nominatim_config(self.template_db)
167
168         try:
169             # execute nominatim import on an empty file to get the right tables
170             with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
171                 fd.write(b'<osm version="0.6"></osm>')
172                 fd.flush()
173                 self.run_nominatim('import', '--osm-file', fd.name,
174                                              '--osm2pgsql-cache', '1',
175                                              '--ignore-errors')
176         except:
177             self.db_drop_database(self.template_db)
178             raise
179
180
181     def setup_api_db(self):
182         """ Setup a test against the API test database.
183         """
184         self.write_nominatim_config(self.api_test_db)
185
186         if not self.api_db_done:
187             self.api_db_done = True
188
189             if not self._reuse_or_drop_db(self.api_test_db):
190                 testdata = Path('__file__') / '..' / '..' / 'testdb'
191                 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
192
193                 try:
194                     self.run_nominatim('import', '--osm-file', str(self.api_test_file))
195                     if self.tokenizer != 'legacy_icu':
196                         self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
197                     self.run_nominatim('freeze')
198
199                     if self.tokenizer != 'legacy_icu':
200                         phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
201                         run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
202                     else:
203                         csv_path = str((testdata / 'full_en_phrases_test.csv').resolve())
204                         self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
205                 except:
206                     self.db_drop_database(self.api_test_db)
207                     raise
208
209         tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
210
211
212     def setup_unknown_db(self):
213         """ Setup a test against a non-existing database.
214         """
215         # The tokenizer needs an existing database to function.
216         # So start with the usual database
217         class _Context:
218             db = None
219
220         context = _Context()
221         self.setup_db(context)
222         tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
223
224         # Then drop the DB again
225         self.teardown_db(context, force_drop=True)
226
227     def setup_db(self, context):
228         """ Setup a test against a fresh, empty test database.
229         """
230         self.setup_template_db()
231         self.write_nominatim_config(self.test_db)
232         conn = self.connect_database(self.template_db)
233         conn.set_isolation_level(0)
234         cur = conn.cursor()
235         cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
236         cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
237         conn.close()
238         context.db = self.connect_database(self.test_db)
239         context.db.autocommit = True
240         psycopg2.extras.register_hstore(context.db, globally=False)
241
242     def teardown_db(self, context, force_drop=False):
243         """ Remove the test database, if it exists.
244         """
245         if hasattr(context, 'db'):
246             context.db.close()
247
248         if force_drop or not self.keep_scenario_db:
249             self.db_drop_database(self.test_db)
250
251     def _reuse_or_drop_db(self, name):
252         """ Check for the existance of the given DB. If reuse is enabled,
253             then the function checks for existance and returns True if the
254             database is already there. Otherwise an existing database is
255             dropped and always false returned.
256         """
257         if self.reuse_template:
258             conn = self.connect_database('postgres')
259             with conn.cursor() as cur:
260                 cur.execute('select count(*) from pg_database where datname = %s',
261                             (name,))
262                 if cur.fetchone()[0] == 1:
263                     return True
264             conn.close()
265         else:
266             self.db_drop_database(name)
267
268         return False
269
270     def reindex_placex(self, db):
271         """ Run the indexing step until all data in the placex has
272             been processed. Indexing during updates can produce more data
273             to index under some circumstances. That is why indexing may have
274             to be run multiple times.
275         """
276         with db.cursor() as cur:
277             while True:
278                 self.run_nominatim('index')
279
280                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
281                 if cur.rowcount == 0:
282                     return
283
284     def run_nominatim(self, *cmdline):
285         """ Run the nominatim command-line tool via the library.
286         """
287         if self.website_dir is not None:
288             cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
289
290         cli.nominatim(module_dir='',
291                       osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
292                       phplib_dir=str(self.src_dir / 'lib-php'),
293                       sqllib_dir=str(self.src_dir / 'lib-sql'),
294                       data_dir=str(self.src_dir / 'data'),
295                       config_dir=str(self.src_dir / 'settings'),
296                       cli_args=cmdline,
297                       phpcgi_path='',
298                       environ=self.test_env)
299
300
301     def copy_from_place(self, db):
302         """ Copy data from place to the placex and location_property_osmline
303             tables invoking the appropriate triggers.
304         """
305         self.run_nominatim('refresh', '--functions', '--no-diff-updates')
306
307         with db.cursor() as cur:
308             cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
309                                                name, admin_level, address,
310                                                extratags, geometry)
311                              SELECT osm_type, osm_id, class, type,
312                                     name, admin_level, address,
313                                     extratags, geometry
314                                FROM place
315                                WHERE not (class='place' and type='houses' and osm_type='W')""")
316             cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
317                              SELECT osm_id, address, geometry
318                                FROM place
319                               WHERE class='place' and type='houses'
320                                     and osm_type='W'
321                                     and ST_GeometryType(geometry) = 'ST_LineString'""")