1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.db.connection import _Connection
13 from nominatim.tools import refresh
14 from nominatim.tokenizer import factory as tokenizer_factory
15 from steps.utils import run_script
17 class NominatimEnvironment:
18 """ Collects all functions for the execution of Nominatim functions.
21 def __init__(self, config):
22 self.build_dir = Path(config['BUILDDIR']).resolve()
23 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
24 self.db_host = config['DB_HOST']
25 self.db_port = config['DB_PORT']
26 self.db_user = config['DB_USER']
27 self.db_pass = config['DB_PASS']
28 self.template_db = config['TEMPLATE_DB']
29 self.test_db = config['TEST_DB']
30 self.api_test_db = config['API_TEST_DB']
31 self.api_test_file = config['API_TEST_FILE']
32 self.tokenizer = config['TOKENIZER']
33 self.server_module_path = config['SERVER_MODULE_PATH']
34 self.reuse_template = not config['REMOVE_TEMPLATE']
35 self.keep_scenario_db = config['KEEP_TEST_DB']
36 self.code_coverage_path = config['PHPCOV']
37 self.code_coverage_id = 1
39 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
41 self.template_db_done = False
42 self.api_db_done = False
43 self.website_dir = None
45 def connect_database(self, dbname):
46 """ Return a connection to the database with the given name.
47 Uses configured host, user and port.
49 dbargs = {'database': dbname}
51 dbargs['host'] = self.db_host
53 dbargs['port'] = self.db_port
55 dbargs['user'] = self.db_user
57 dbargs['password'] = self.db_pass
58 conn = psycopg2.connect(connection_factory=_Connection, **dbargs)
61 def next_code_coverage_file(self):
62 """ Generate the next name for a coverage file.
64 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
65 self.code_coverage_id += 1
69 def write_nominatim_config(self, dbname):
70 """ Set up a custom test configuration that connects to the given
71 database. This sets up the environment variables so that they can
72 be picked up by dotenv and creates a project directory with the
73 appropriate website scripts.
75 dsn = 'pgsql:dbname={}'.format(dbname)
77 dsn += ';host=' + self.db_host
79 dsn += ';port=' + self.db_port
81 dsn += ';user=' + self.db_user
83 dsn += ';password=' + self.db_pass
85 if self.website_dir is not None \
86 and self.test_env is not None \
87 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
88 return # environment already set uo
90 self.test_env = dict(self.default_config)
91 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
92 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
93 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
94 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
95 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
96 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
97 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
98 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
99 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
100 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
101 if self.tokenizer is not None:
102 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
104 if self.server_module_path:
105 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
107 # avoid module being copied into the temporary environment
108 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
110 if self.website_dir is not None:
111 self.website_dir.cleanup()
113 self.website_dir = tempfile.TemporaryDirectory()
116 conn = self.connect_database(dbname)
119 refresh.setup_website(Path(self.website_dir.name) / 'website',
120 self.get_test_config(), conn)
123 def get_test_config(self):
124 cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
125 environ=self.test_env)
126 cfg.set_libdirs(module=self.build_dir / 'module',
127 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
128 php=self.src_dir / 'lib-php',
129 sql=self.src_dir / 'lib-sql',
130 data=self.src_dir / 'data')
133 def get_libpq_dsn(self):
134 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
136 def quote_param(param):
137 key, val = param.split('=')
138 val = val.replace('\\', '\\\\').replace("'", "\\'")
140 val = "'" + val + "'"
141 return key + '=' + val
143 if dsn.startswith('pgsql:'):
144 # Old PHP DSN format. Convert before returning.
145 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
150 def db_drop_database(self, name):
151 """ Drop the database with the given name.
153 conn = self.connect_database('postgres')
154 conn.set_isolation_level(0)
156 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
159 def setup_template_db(self):
160 """ Setup a template database that already contains common test data.
161 Having a template database speeds up tests considerably but at
162 the price that the tests sometimes run with stale data.
164 if self.template_db_done:
167 self.template_db_done = True
169 if self._reuse_or_drop_db(self.template_db):
172 self.write_nominatim_config(self.template_db)
175 # execute nominatim import on an empty file to get the right tables
176 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
177 fd.write(b'<osm version="0.6"></osm>')
179 self.run_nominatim('import', '--osm-file', fd.name,
180 '--osm2pgsql-cache', '1',
183 self.db_drop_database(self.template_db)
187 def setup_api_db(self):
188 """ Setup a test against the API test database.
190 self.write_nominatim_config(self.api_test_db)
192 if not self.api_db_done:
193 self.api_db_done = True
195 if not self._reuse_or_drop_db(self.api_test_db):
196 testdata = Path('__file__') / '..' / '..' / 'testdb'
197 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
200 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
201 if self.tokenizer != 'legacy_icu':
202 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
203 self.run_nominatim('freeze')
205 if self.tokenizer != 'legacy_icu':
206 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
207 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
209 # XXX Temporary use the wiki while there is no CSV import
211 self.test_env['NOMINATIM_LANGUAGES'] = 'en'
212 self.run_nominatim('special-phrases', '--import-from-wiki')
213 del self.test_env['NOMINATIM_LANGUAGES']
215 self.db_drop_database(self.api_test_db)
218 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
221 def setup_unknown_db(self):
222 """ Setup a test against a non-existing database.
224 # The tokenizer needs an existing database to function.
225 # So start with the usual database
230 self.setup_db(context)
231 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
233 # Then drop the DB again
234 self.teardown_db(context, force_drop=True)
236 def setup_db(self, context):
237 """ Setup a test against a fresh, empty test database.
239 self.setup_template_db()
240 conn = self.connect_database(self.template_db)
241 conn.set_isolation_level(0)
243 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
244 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
246 self.write_nominatim_config(self.test_db)
247 context.db = self.connect_database(self.test_db)
248 context.db.autocommit = True
249 psycopg2.extras.register_hstore(context.db, globally=False)
251 def teardown_db(self, context, force_drop=False):
252 """ Remove the test database, if it exists.
254 if hasattr(context, 'db'):
257 if force_drop or not self.keep_scenario_db:
258 self.db_drop_database(self.test_db)
260 def _reuse_or_drop_db(self, name):
261 """ Check for the existance of the given DB. If reuse is enabled,
262 then the function checks for existance and returns True if the
263 database is already there. Otherwise an existing database is
264 dropped and always false returned.
266 if self.reuse_template:
267 conn = self.connect_database('postgres')
268 with conn.cursor() as cur:
269 cur.execute('select count(*) from pg_database where datname = %s',
271 if cur.fetchone()[0] == 1:
275 self.db_drop_database(name)
279 def reindex_placex(self, db):
280 """ Run the indexing step until all data in the placex has
281 been processed. Indexing during updates can produce more data
282 to index under some circumstances. That is why indexing may have
283 to be run multiple times.
285 with db.cursor() as cur:
287 self.run_nominatim('index')
289 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
290 if cur.rowcount == 0:
293 def run_nominatim(self, *cmdline):
294 """ Run the nominatim command-line tool via the library.
296 if self.website_dir is not None:
297 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
299 cli.nominatim(module_dir='',
300 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
301 phplib_dir=str(self.src_dir / 'lib-php'),
302 sqllib_dir=str(self.src_dir / 'lib-sql'),
303 data_dir=str(self.src_dir / 'data'),
304 config_dir=str(self.src_dir / 'settings'),
307 environ=self.test_env)
310 def copy_from_place(self, db):
311 """ Copy data from place to the placex and location_property_osmline
312 tables invoking the appropriate triggers.
314 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
316 with db.cursor() as cur:
317 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
318 name, admin_level, address,
320 SELECT osm_type, osm_id, class, type,
321 name, admin_level, address,
324 WHERE not (class='place' and type='houses' and osm_type='W')""")
325 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
326 SELECT osm_id, address, geometry
328 WHERE class='place' and type='houses'
330 and ST_GeometryType(geometry) = 'ST_LineString'""")