1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 import psycopg2.extras
14 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
16 from nominatim import cli
17 from nominatim.config import Configuration
18 from nominatim.db.connection import _Connection
19 from nominatim.tools import refresh
20 from nominatim.tokenizer import factory as tokenizer_factory
21 from steps.utils import run_script
23 class NominatimEnvironment:
24 """ Collects all functions for the execution of Nominatim functions.
27 def __init__(self, config):
28 self.build_dir = Path(config['BUILDDIR']).resolve()
29 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
30 self.db_host = config['DB_HOST']
31 self.db_port = config['DB_PORT']
32 self.db_user = config['DB_USER']
33 self.db_pass = config['DB_PASS']
34 self.template_db = config['TEMPLATE_DB']
35 self.test_db = config['TEST_DB']
36 self.api_test_db = config['API_TEST_DB']
37 self.api_test_file = config['API_TEST_FILE']
38 self.tokenizer = config['TOKENIZER']
39 self.server_module_path = config['SERVER_MODULE_PATH']
40 self.reuse_template = not config['REMOVE_TEMPLATE']
41 self.keep_scenario_db = config['KEEP_TEST_DB']
42 self.code_coverage_path = config['PHPCOV']
43 self.code_coverage_id = 1
45 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
47 self.template_db_done = False
48 self.api_db_done = False
49 self.website_dir = None
51 def connect_database(self, dbname):
52 """ Return a connection to the database with the given name.
53 Uses configured host, user and port.
55 dbargs = {'database': dbname}
57 dbargs['host'] = self.db_host
59 dbargs['port'] = self.db_port
61 dbargs['user'] = self.db_user
63 dbargs['password'] = self.db_pass
64 conn = psycopg2.connect(connection_factory=_Connection, **dbargs)
67 def next_code_coverage_file(self):
68 """ Generate the next name for a coverage file.
70 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
71 self.code_coverage_id += 1
75 def write_nominatim_config(self, dbname):
76 """ Set up a custom test configuration that connects to the given
77 database. This sets up the environment variables so that they can
78 be picked up by dotenv and creates a project directory with the
79 appropriate website scripts.
81 dsn = 'pgsql:dbname={}'.format(dbname)
83 dsn += ';host=' + self.db_host
85 dsn += ';port=' + self.db_port
87 dsn += ';user=' + self.db_user
89 dsn += ';password=' + self.db_pass
91 if self.website_dir is not None \
92 and self.test_env is not None \
93 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
94 return # environment already set uo
96 self.test_env = dict(self.default_config)
97 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
98 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
99 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
100 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
101 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
102 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
103 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
104 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
105 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
106 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
107 if self.tokenizer is not None:
108 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
110 if self.server_module_path:
111 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
113 # avoid module being copied into the temporary environment
114 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
116 if self.website_dir is not None:
117 self.website_dir.cleanup()
119 self.website_dir = tempfile.TemporaryDirectory()
122 conn = self.connect_database(dbname)
125 refresh.setup_website(Path(self.website_dir.name) / 'website',
126 self.get_test_config(), conn)
129 def get_test_config(self):
130 cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
131 environ=self.test_env)
132 cfg.set_libdirs(module=self.build_dir / 'module',
133 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
134 php=self.src_dir / 'lib-php',
135 sql=self.src_dir / 'lib-sql',
136 data=self.src_dir / 'data')
139 def get_libpq_dsn(self):
140 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
142 def quote_param(param):
143 key, val = param.split('=')
144 val = val.replace('\\', '\\\\').replace("'", "\\'")
146 val = "'" + val + "'"
147 return key + '=' + val
149 if dsn.startswith('pgsql:'):
150 # Old PHP DSN format. Convert before returning.
151 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
156 def db_drop_database(self, name):
157 """ Drop the database with the given name.
159 conn = self.connect_database('postgres')
160 conn.set_isolation_level(0)
162 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
165 def setup_template_db(self):
166 """ Setup a template database that already contains common test data.
167 Having a template database speeds up tests considerably but at
168 the price that the tests sometimes run with stale data.
170 if self.template_db_done:
173 self.template_db_done = True
175 if self._reuse_or_drop_db(self.template_db):
178 self.write_nominatim_config(self.template_db)
181 # execute nominatim import on an empty file to get the right tables
182 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
183 fd.write(b'<osm version="0.6"></osm>')
185 self.run_nominatim('import', '--osm-file', fd.name,
186 '--osm2pgsql-cache', '1',
188 '--offline', '--index-noanalyse')
190 self.db_drop_database(self.template_db)
194 def setup_api_db(self):
195 """ Setup a test against the API test database.
197 self.write_nominatim_config(self.api_test_db)
199 if not self.api_db_done:
200 self.api_db_done = True
202 if not self._reuse_or_drop_db(self.api_test_db):
203 testdata = Path('__file__') / '..' / '..' / 'testdb'
204 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
207 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
208 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
209 self.run_nominatim('freeze')
211 if self.tokenizer == 'legacy':
212 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
213 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
215 csv_path = str((testdata / 'full_en_phrases_test.csv').resolve())
216 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
218 self.db_drop_database(self.api_test_db)
221 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
224 def setup_unknown_db(self):
225 """ Setup a test against a non-existing database.
227 # The tokenizer needs an existing database to function.
228 # So start with the usual database
233 self.setup_db(context)
234 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
236 # Then drop the DB again
237 self.teardown_db(context, force_drop=True)
239 def setup_db(self, context):
240 """ Setup a test against a fresh, empty test database.
242 self.setup_template_db()
243 conn = self.connect_database(self.template_db)
244 conn.set_isolation_level(0)
246 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
247 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
249 self.write_nominatim_config(self.test_db)
250 context.db = self.connect_database(self.test_db)
251 context.db.autocommit = True
252 psycopg2.extras.register_hstore(context.db, globally=False)
254 def teardown_db(self, context, force_drop=False):
255 """ Remove the test database, if it exists.
257 if hasattr(context, 'db'):
260 if force_drop or not self.keep_scenario_db:
261 self.db_drop_database(self.test_db)
263 def _reuse_or_drop_db(self, name):
264 """ Check for the existance of the given DB. If reuse is enabled,
265 then the function checks for existance and returns True if the
266 database is already there. Otherwise an existing database is
267 dropped and always false returned.
269 if self.reuse_template:
270 conn = self.connect_database('postgres')
271 with conn.cursor() as cur:
272 cur.execute('select count(*) from pg_database where datname = %s',
274 if cur.fetchone()[0] == 1:
278 self.db_drop_database(name)
282 def reindex_placex(self, db):
283 """ Run the indexing step until all data in the placex has
284 been processed. Indexing during updates can produce more data
285 to index under some circumstances. That is why indexing may have
286 to be run multiple times.
288 with db.cursor() as cur:
290 self.run_nominatim('index')
292 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
293 if cur.rowcount == 0:
296 def run_nominatim(self, *cmdline):
297 """ Run the nominatim command-line tool via the library.
299 if self.website_dir is not None:
300 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
302 cli.nominatim(module_dir='',
303 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
304 phplib_dir=str(self.src_dir / 'lib-php'),
305 sqllib_dir=str(self.src_dir / 'lib-sql'),
306 data_dir=str(self.src_dir / 'data'),
307 config_dir=str(self.src_dir / 'settings'),
310 environ=self.test_env)
313 def copy_from_place(self, db):
314 """ Copy data from place to the placex and location_property_osmline
315 tables invoking the appropriate triggers.
317 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
319 with db.cursor() as cur:
320 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
321 name, admin_level, address,
323 SELECT osm_type, osm_id, class, type,
324 name, admin_level, address,
327 WHERE not (class='place' and type='houses' and osm_type='W')""")
328 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
329 SELECT osm_id, address, geometry
331 WHERE class='place' and type='houses'
333 and ST_GeometryType(geometry) = 'ST_LineString'""")