1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 import psycopg2.extras
14 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
16 from nominatim import cli
17 from nominatim.config import Configuration
18 from nominatim.db.connection import _Connection
19 from nominatim.tools import refresh
20 from nominatim.tokenizer import factory as tokenizer_factory
21 from steps.utils import run_script
23 class NominatimEnvironment:
24 """ Collects all functions for the execution of Nominatim functions.
27 def __init__(self, config):
28 self.build_dir = Path(config['BUILDDIR']).resolve()
29 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
30 self.db_host = config['DB_HOST']
31 self.db_port = config['DB_PORT']
32 self.db_user = config['DB_USER']
33 self.db_pass = config['DB_PASS']
34 self.template_db = config['TEMPLATE_DB']
35 self.test_db = config['TEST_DB']
36 self.api_test_db = config['API_TEST_DB']
37 self.api_test_file = config['API_TEST_FILE']
38 self.tokenizer = config['TOKENIZER']
39 self.server_module_path = config['SERVER_MODULE_PATH']
40 self.reuse_template = not config['REMOVE_TEMPLATE']
41 self.keep_scenario_db = config['KEEP_TEST_DB']
42 self.code_coverage_path = config['PHPCOV']
43 self.code_coverage_id = 1
45 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
47 self.template_db_done = False
48 self.api_db_done = False
49 self.website_dir = None
51 def connect_database(self, dbname):
52 """ Return a connection to the database with the given name.
53 Uses configured host, user and port.
55 dbargs = {'database': dbname}
57 dbargs['host'] = self.db_host
59 dbargs['port'] = self.db_port
61 dbargs['user'] = self.db_user
63 dbargs['password'] = self.db_pass
64 conn = psycopg2.connect(connection_factory=_Connection, **dbargs)
67 def next_code_coverage_file(self):
68 """ Generate the next name for a coverage file.
70 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
71 self.code_coverage_id += 1
75 def write_nominatim_config(self, dbname):
76 """ Set up a custom test configuration that connects to the given
77 database. This sets up the environment variables so that they can
78 be picked up by dotenv and creates a project directory with the
79 appropriate website scripts.
81 dsn = 'pgsql:dbname={}'.format(dbname)
83 dsn += ';host=' + self.db_host
85 dsn += ';port=' + self.db_port
87 dsn += ';user=' + self.db_user
89 dsn += ';password=' + self.db_pass
91 if self.website_dir is not None \
92 and self.test_env is not None \
93 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
94 return # environment already set uo
96 self.test_env = dict(self.default_config)
97 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
98 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
99 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
100 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
101 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
102 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
103 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
104 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
105 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
106 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
107 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
108 if self.tokenizer is not None:
109 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
111 if self.server_module_path:
112 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
114 # avoid module being copied into the temporary environment
115 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
117 if self.website_dir is not None:
118 self.website_dir.cleanup()
120 self.website_dir = tempfile.TemporaryDirectory()
123 conn = self.connect_database(dbname)
126 refresh.setup_website(Path(self.website_dir.name) / 'website',
127 self.get_test_config(), conn)
130 def get_test_config(self):
131 cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
132 environ=self.test_env)
133 cfg.set_libdirs(module=self.build_dir / 'module',
134 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
135 php=self.src_dir / 'lib-php',
136 sql=self.src_dir / 'lib-sql',
137 data=self.src_dir / 'data')
140 def get_libpq_dsn(self):
141 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
143 def quote_param(param):
144 key, val = param.split('=')
145 val = val.replace('\\', '\\\\').replace("'", "\\'")
147 val = "'" + val + "'"
148 return key + '=' + val
150 if dsn.startswith('pgsql:'):
151 # Old PHP DSN format. Convert before returning.
152 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
157 def db_drop_database(self, name):
158 """ Drop the database with the given name.
160 conn = self.connect_database('postgres')
161 conn.set_isolation_level(0)
163 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
166 def setup_template_db(self):
167 """ Setup a template database that already contains common test data.
168 Having a template database speeds up tests considerably but at
169 the price that the tests sometimes run with stale data.
171 if self.template_db_done:
174 self.template_db_done = True
176 self.write_nominatim_config(self.template_db)
178 if not self._reuse_or_drop_db(self.template_db):
180 # execute nominatim import on an empty file to get the right tables
181 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
182 fd.write(b'<osm version="0.6"></osm>')
184 self.run_nominatim('import', '--osm-file', fd.name,
185 '--osm2pgsql-cache', '1',
187 '--offline', '--index-noanalyse')
189 self.db_drop_database(self.template_db)
192 self.run_nominatim('refresh', '--functions')
195 def setup_api_db(self):
196 """ Setup a test against the API test database.
198 self.write_nominatim_config(self.api_test_db)
200 if not self.api_db_done:
201 self.api_db_done = True
203 if not self._reuse_or_drop_db(self.api_test_db):
204 testdata = Path('__file__') / '..' / '..' / 'testdb'
205 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
208 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
209 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
210 self.run_nominatim('freeze')
212 if self.tokenizer == 'legacy':
213 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
214 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
216 csv_path = str((testdata / 'full_en_phrases_test.csv').resolve())
217 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
219 self.db_drop_database(self.api_test_db)
222 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
225 def setup_unknown_db(self):
226 """ Setup a test against a non-existing database.
228 # The tokenizer needs an existing database to function.
229 # So start with the usual database
234 self.setup_db(context)
235 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
237 # Then drop the DB again
238 self.teardown_db(context, force_drop=True)
240 def setup_db(self, context):
241 """ Setup a test against a fresh, empty test database.
243 self.setup_template_db()
244 conn = self.connect_database(self.template_db)
245 conn.set_isolation_level(0)
247 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
248 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
250 self.write_nominatim_config(self.test_db)
251 context.db = self.connect_database(self.test_db)
252 context.db.autocommit = True
253 psycopg2.extras.register_hstore(context.db, globally=False)
255 def teardown_db(self, context, force_drop=False):
256 """ Remove the test database, if it exists.
258 if hasattr(context, 'db'):
261 if force_drop or not self.keep_scenario_db:
262 self.db_drop_database(self.test_db)
264 def _reuse_or_drop_db(self, name):
265 """ Check for the existance of the given DB. If reuse is enabled,
266 then the function checks for existance and returns True if the
267 database is already there. Otherwise an existing database is
268 dropped and always false returned.
270 if self.reuse_template:
271 conn = self.connect_database('postgres')
272 with conn.cursor() as cur:
273 cur.execute('select count(*) from pg_database where datname = %s',
275 if cur.fetchone()[0] == 1:
279 self.db_drop_database(name)
283 def reindex_placex(self, db):
284 """ Run the indexing step until all data in the placex has
285 been processed. Indexing during updates can produce more data
286 to index under some circumstances. That is why indexing may have
287 to be run multiple times.
289 with db.cursor() as cur:
291 self.run_nominatim('index')
293 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
294 if cur.rowcount == 0:
297 def run_nominatim(self, *cmdline):
298 """ Run the nominatim command-line tool via the library.
300 if self.website_dir is not None:
301 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
303 cli.nominatim(module_dir='',
304 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
305 phplib_dir=str(self.src_dir / 'lib-php'),
306 sqllib_dir=str(self.src_dir / 'lib-sql'),
307 data_dir=str(self.src_dir / 'data'),
308 config_dir=str(self.src_dir / 'settings'),
311 environ=self.test_env)
314 def copy_from_place(self, db):
315 """ Copy data from place to the placex and location_property_osmline
316 tables invoking the appropriate triggers.
318 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
320 with db.cursor() as cur:
321 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
322 name, admin_level, address,
324 SELECT osm_type, osm_id, class, type,
325 name, admin_level, address,
328 WHERE not (class='place' and type='houses' and osm_type='W')""")
329 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
330 SELECT osm_id, address, geometry
332 WHERE class='place' and type='houses'
334 and ST_GeometryType(geometry) = 'ST_LineString'""")