1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 from pathlib import Path
12 import psycopg2.extras
14 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
16 from nominatim import cli
17 from nominatim.config import Configuration
18 from nominatim.db.connection import Connection
19 from nominatim.tools import refresh
20 from nominatim.tokenizer import factory as tokenizer_factory
21 from steps.utils import run_script
23 class NominatimEnvironment:
24 """ Collects all functions for the execution of Nominatim functions.
27 def __init__(self, config):
28 self.build_dir = Path(config['BUILDDIR']).resolve()
29 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
30 self.db_host = config['DB_HOST']
31 self.db_port = config['DB_PORT']
32 self.db_user = config['DB_USER']
33 self.db_pass = config['DB_PASS']
34 self.template_db = config['TEMPLATE_DB']
35 self.test_db = config['TEST_DB']
36 self.api_test_db = config['API_TEST_DB']
37 self.api_test_file = config['API_TEST_FILE']
38 self.tokenizer = config['TOKENIZER']
39 self.import_style = config['STYLE']
40 self.server_module_path = config['SERVER_MODULE_PATH']
41 self.reuse_template = not config['REMOVE_TEMPLATE']
42 self.keep_scenario_db = config['KEEP_TEST_DB']
43 self.code_coverage_path = config['PHPCOV']
44 self.code_coverage_id = 1
46 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
48 self.template_db_done = False
49 self.api_db_done = False
50 self.website_dir = None
52 def connect_database(self, dbname):
53 """ Return a connection to the database with the given name.
54 Uses configured host, user and port.
56 dbargs = {'database': dbname}
58 dbargs['host'] = self.db_host
60 dbargs['port'] = self.db_port
62 dbargs['user'] = self.db_user
64 dbargs['password'] = self.db_pass
65 conn = psycopg2.connect(connection_factory=Connection, **dbargs)
68 def next_code_coverage_file(self):
69 """ Generate the next name for a coverage file.
71 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
72 self.code_coverage_id += 1
76 def write_nominatim_config(self, dbname):
77 """ Set up a custom test configuration that connects to the given
78 database. This sets up the environment variables so that they can
79 be picked up by dotenv and creates a project directory with the
80 appropriate website scripts.
82 dsn = 'pgsql:dbname={}'.format(dbname)
84 dsn += ';host=' + self.db_host
86 dsn += ';port=' + self.db_port
88 dsn += ';user=' + self.db_user
90 dsn += ';password=' + self.db_pass
92 if self.website_dir is not None \
93 and self.test_env is not None \
94 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
95 return # environment already set uo
97 self.test_env = dict(self.default_config)
98 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
99 self.test_env['NOMINATIM_LANGUAGES'] = 'en,de,fr,ja'
100 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
101 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
102 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
103 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
104 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
105 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
106 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
107 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
108 if self.tokenizer is not None:
109 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
110 if self.import_style is not None:
111 self.test_env['NOMINATIM_IMPORT_STYLE'] = self.import_style
113 if self.server_module_path:
114 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
116 # avoid module being copied into the temporary environment
117 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
119 if self.website_dir is not None:
120 self.website_dir.cleanup()
122 self.website_dir = tempfile.TemporaryDirectory()
125 conn = self.connect_database(dbname)
128 refresh.setup_website(Path(self.website_dir.name) / 'website',
129 self.get_test_config(), conn)
132 def get_test_config(self):
133 cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
134 environ=self.test_env)
135 cfg.set_libdirs(module=self.build_dir / 'module',
136 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
137 php=self.src_dir / 'lib-php',
138 sql=self.src_dir / 'lib-sql',
139 data=self.src_dir / 'data')
142 def get_libpq_dsn(self):
143 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
145 def quote_param(param):
146 key, val = param.split('=')
147 val = val.replace('\\', '\\\\').replace("'", "\\'")
149 val = "'" + val + "'"
150 return key + '=' + val
152 if dsn.startswith('pgsql:'):
153 # Old PHP DSN format. Convert before returning.
154 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
159 def db_drop_database(self, name):
160 """ Drop the database with the given name.
162 conn = self.connect_database('postgres')
163 conn.set_isolation_level(0)
165 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
168 def setup_template_db(self):
169 """ Setup a template database that already contains common test data.
170 Having a template database speeds up tests considerably but at
171 the price that the tests sometimes run with stale data.
173 if self.template_db_done:
176 self.template_db_done = True
178 self.write_nominatim_config(self.template_db)
180 if not self._reuse_or_drop_db(self.template_db):
182 # execute nominatim import on an empty file to get the right tables
183 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
184 fd.write(b'<osm version="0.6"></osm>')
186 self.run_nominatim('import', '--osm-file', fd.name,
187 '--osm2pgsql-cache', '1',
189 '--offline', '--index-noanalyse')
191 self.db_drop_database(self.template_db)
194 self.run_nominatim('refresh', '--functions')
197 def setup_api_db(self):
198 """ Setup a test against the API test database.
200 self.write_nominatim_config(self.api_test_db)
202 if not self.api_db_done:
203 self.api_db_done = True
205 if not self._reuse_or_drop_db(self.api_test_db):
206 testdata = (Path(__file__) / '..' / '..' / '..' / 'testdb').resolve()
207 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata)
208 simp_file = Path(self.website_dir.name) / 'secondary_importance.sql.gz'
209 simp_file.symlink_to(testdata / 'secondary_importance.sql.gz')
212 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
213 self.run_nominatim('add-data', '--tiger-data', str(testdata / 'tiger'))
214 self.run_nominatim('freeze')
216 if self.tokenizer == 'legacy':
217 phrase_file = str(testdata / 'specialphrases_testdb.sql')
218 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
220 csv_path = str(testdata / 'full_en_phrases_test.csv')
221 self.run_nominatim('special-phrases', '--import-from-csv', csv_path)
223 self.db_drop_database(self.api_test_db)
226 tokenizer_factory.get_tokenizer_for_db(self.get_test_config())
229 def setup_unknown_db(self):
230 """ Setup a test against a non-existing database.
232 # The tokenizer needs an existing database to function.
233 # So start with the usual database
238 self.setup_db(context)
239 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
241 # Then drop the DB again
242 self.teardown_db(context, force_drop=True)
244 def setup_db(self, context):
245 """ Setup a test against a fresh, empty test database.
247 self.setup_template_db()
248 conn = self.connect_database(self.template_db)
249 conn.set_isolation_level(0)
251 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
252 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
254 self.write_nominatim_config(self.test_db)
255 context.db = self.connect_database(self.test_db)
256 context.db.autocommit = True
257 psycopg2.extras.register_hstore(context.db, globally=False)
259 def teardown_db(self, context, force_drop=False):
260 """ Remove the test database, if it exists.
262 if hasattr(context, 'db'):
265 if force_drop or not self.keep_scenario_db:
266 self.db_drop_database(self.test_db)
268 def _reuse_or_drop_db(self, name):
269 """ Check for the existance of the given DB. If reuse is enabled,
270 then the function checks for existance and returns True if the
271 database is already there. Otherwise an existing database is
272 dropped and always false returned.
274 if self.reuse_template:
275 conn = self.connect_database('postgres')
276 with conn.cursor() as cur:
277 cur.execute('select count(*) from pg_database where datname = %s',
279 if cur.fetchone()[0] == 1:
283 self.db_drop_database(name)
287 def reindex_placex(self, db):
288 """ Run the indexing step until all data in the placex has
289 been processed. Indexing during updates can produce more data
290 to index under some circumstances. That is why indexing may have
291 to be run multiple times.
293 with db.cursor() as cur:
295 self.run_nominatim('index')
297 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
298 if cur.rowcount == 0:
301 def run_nominatim(self, *cmdline):
302 """ Run the nominatim command-line tool via the library.
304 if self.website_dir is not None:
305 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
307 cli.nominatim(module_dir='',
308 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
309 phplib_dir=str(self.src_dir / 'lib-php'),
310 sqllib_dir=str(self.src_dir / 'lib-sql'),
311 data_dir=str(self.src_dir / 'data'),
312 config_dir=str(self.src_dir / 'settings'),
315 environ=self.test_env)
318 def copy_from_place(self, db):
319 """ Copy data from place to the placex and location_property_osmline
320 tables invoking the appropriate triggers.
322 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
324 with db.cursor() as cur:
325 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
326 name, admin_level, address,
328 SELECT osm_type, osm_id, class, type,
329 name, admin_level, address,
332 WHERE not (class='place' and type='houses' and osm_type='W')""")
333 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
334 SELECT osm_id, address, geometry
336 WHERE class='place' and type='houses'
338 and ST_GeometryType(geometry) = 'ST_LineString'""")