-import logging
-import os
from pathlib import Path
-import subprocess
+import sys
import tempfile
import psycopg2
import psycopg2.extras
-LOG = logging.getLogger(__name__)
+sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
+
+from nominatim.config import Configuration
+from steps.utils import run_script
class NominatimEnvironment:
""" Collects all functions for the execution of Nominatim functions.
self.template_db = config['TEMPLATE_DB']
self.test_db = config['TEST_DB']
self.api_test_db = config['API_TEST_DB']
+ self.api_test_file = config['API_TEST_FILE']
self.server_module_path = config['SERVER_MODULE_PATH']
self.reuse_template = not config['REMOVE_TEMPLATE']
self.keep_scenario_db = config['KEEP_TEST_DB']
self.code_coverage_path = config['PHPCOV']
self.code_coverage_id = 1
- self.test_env = None
+ self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
+ self.test_env = None
self.template_db_done = False
+ self.api_db_done = False
self.website_dir = None
def connect_database(self, dbname):
and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
return # environment already set uo
- self.test_env = os.environ
+ self.test_env = dict(self.default_config)
self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
+ self.test_env['NOMINATIM_DATADIR'] = self.src_dir
+ self.test_env['NOMINATIM_BINDIR'] = self.src_dir / 'utils'
+ self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.build_dir / 'module'
+ self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = self.build_dir / 'osm2pgsql' / 'osm2pgsql'
+ self.test_env['NOMINATIM_NOMINATIM_TOOL'] = self.build_dir / 'nominatim'
if self.server_module_path:
self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
self.template_db_done = True
- if self.reuse_template:
- # check that the template is there
- conn = self.connect_database('postgres')
- cur = conn.cursor()
- cur.execute('select count(*) from pg_database where datname = %s',
- (self.template_db,))
- if cur.fetchone()[0] == 1:
- return
- conn.close()
- else:
- # just in case... make sure a previous table has been dropped
- self.db_drop_database(self.template_db)
+ if self._reuse_or_drop_db(self.template_db):
+ return
try:
# call the first part of database setup
"""
self.write_nominatim_config(self.api_test_db)
+ if self.api_db_done:
+ return
+
+ self.api_db_done = True
+
+ if self._reuse_or_drop_db(self.api_test_db):
+ return
+
+ testdata = Path('__file__') / '..' / '..' / 'testdb'
+ self.test_env['NOMINATIM_TIGER_DATA_PATH'] = str((testdata / 'tiger').resolve())
+ self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
+
+ try:
+ self.run_setup_script('all', osm_file=self.api_test_file)
+ self.run_setup_script('import-tiger-data')
+
+ phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
+ run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
+ except:
+ self.db_drop_database(self.api_test_db)
+ raise
+
+
def setup_unknown_db(self):
""" Setup a test against a non-existing database.
"""
cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
conn.close()
context.db = self.connect_database(self.test_db)
+ context.db.autocommit = True
psycopg2.extras.register_hstore(context.db, globally=False)
def teardown_db(self, context):
if not self.keep_scenario_db:
self.db_drop_database(self.test_db)
+ def _reuse_or_drop_db(self, name):
+ """ Check for the existance of the given DB. If reuse is enabled,
+ then the function checks for existance and returns True if the
+ database is already there. Otherwise an existing database is
+ dropped and always false returned.
+ """
+ if self.reuse_template:
+ conn = self.connect_database('postgres')
+ with conn.cursor() as cur:
+ cur.execute('select count(*) from pg_database where datname = %s',
+ (name,))
+ if cur.fetchone()[0] == 1:
+ return True
+ conn.close()
+ else:
+ self.db_drop_database(name)
+
+ return False
+
+ def reindex_placex(self, db):
+ """ Run the indexing step until all data in the placex has
+ been processed. Indexing during updates can produce more data
+ to index under some circumstances. That is why indexing may have
+ to be run multiple times.
+ """
+ with db.cursor() as cur:
+ while True:
+ self.run_update_script('index')
+
+ cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
+ if cur.rowcount == 0:
+ return
+
def run_setup_script(self, *args, **kwargs):
""" Run the Nominatim setup script with the given arguments.
"""
""" Run one of the Nominatim utility scripts with the given arguments.
"""
cmd = ['/usr/bin/env', 'php', '-Cq']
- cmd.append((Path(self.build_dir) / 'utils' / '{}.php'.format(script)).resolve())
+ cmd.append((Path(self.src_dir) / 'lib-php' / 'admin' / '{}.php'.format(script)).resolve())
cmd.extend(['--' + x for x in args])
for k, v in kwargs.items():
cmd.extend(('--' + k.replace('_', '-'), str(v)))
if self.website_dir is not None:
cwd = self.website_dir.name
else:
- cwd = self.build_dir
-
- proc = subprocess.Popen(cmd, cwd=cwd, env=self.test_env,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- (outp, outerr) = proc.communicate()
- outerr = outerr.decode('utf-8').replace('\\n', '\n')
- LOG.debug("run_nominatim_script: %s\n%s\n%s", cmd, outp, outerr)
- assert (proc.returncode == 0), "Script '%s' failed:\n%s\n%s\n" % (script, outp, outerr)
+ cwd = None
+
+ run_script(cmd, cwd=cwd, env=self.test_env)
+
+ def copy_from_place(self, db):
+ """ Copy data from place to the placex and location_property_osmline
+ tables invoking the appropriate triggers.
+ """
+ self.run_setup_script('create-functions', 'create-partition-functions')
+
+ with db.cursor() as cur:
+ cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
+ name, admin_level, address,
+ extratags, geometry)
+ SELECT osm_type, osm_id, class, type,
+ name, admin_level, address,
+ extratags, geometry
+ FROM place
+ WHERE not (class='place' and type='houses' and osm_type='W')""")
+ cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
+ SELECT osm_id, address, geometry
+ FROM place
+ WHERE class='place' and type='houses'
+ and osm_type='W'
+ and ST_GeometryType(geometry) = 'ST_LineString'""")