from nominatim import cli
from nominatim.config import Configuration
from nominatim.tools import refresh
+from nominatim.tokenizer import factory as tokenizer_factory
from steps.utils import run_script
class NominatimEnvironment:
self.website_dir.cleanup()
self.website_dir = tempfile.TemporaryDirectory()
- cfg = Configuration(None, self.src_dir / 'settings', environ=self.test_env)
- refresh.setup_website(Path(self.website_dir.name) / 'website', self.src_dir / 'lib-php', cfg)
+ refresh.setup_website(Path(self.website_dir.name) / 'website',
+ self.get_test_config())
+
+
+ def get_test_config(self):
+ cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
+ environ=self.test_env)
+ cfg.set_libdirs(module=self.build_dir / 'module',
+ osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
+ php=self.src_dir / 'lib-php',
+ sql=self.src_dir / 'lib-sql',
+ data=self.src_dir / 'data')
+ return cfg
+
+ def get_libpq_dsn(self):
+ dsn = self.test_env['NOMINATIM_DATABASE_DSN']
+
+ def quote_param(param):
+ key, val = param.split('=')
+ val = val.replace('\\', '\\\\').replace("'", "\\'")
+ if ' ' in val:
+ val = "'" + val + "'"
+ return key + '=' + val
+
+ if dsn.startswith('pgsql:'):
+ # Old PHP DSN format. Convert before returning.
+ return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
+
+ return dsn
def db_drop_database(self, name):
if self._reuse_or_drop_db(self.template_db):
return
- try:
- # call the first part of database setup
- self.write_nominatim_config(self.template_db)
- self.run_setup_script('create-db', 'setup-db')
- # remove external data to speed up indexing for tests
- conn = self.connect_database(self.template_db)
- cur = conn.cursor()
- cur.execute("""select tablename from pg_tables
- where tablename in ('gb_postcode', 'us_postcode')""")
- for t in cur:
- conn.cursor().execute('TRUNCATE TABLE {}'.format(t[0]))
- conn.commit()
- conn.close()
+ self.write_nominatim_config(self.template_db)
- # execute osm2pgsql import on an empty file to get the right tables
+ try:
+ # execute nominatim import on an empty file to get the right tables
with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
fd.write(b'<osm version="0.6"></osm>')
fd.flush()
- self.run_setup_script('import-data',
- 'ignore-errors',
- 'create-functions',
- 'create-tables',
- 'create-partition-tables',
- 'create-partition-functions',
- 'load-data',
- 'create-search-indices',
- osm_file=fd.name,
- osm2pgsql_cache='200')
+ self.run_nominatim('import', '--osm-file', fd.name,
+ '--osm2pgsql-cache', '1',
+ '--ignore-errors')
except:
self.db_drop_database(self.template_db)
raise
"""
self.write_nominatim_config(self.api_test_db)
- if self.api_db_done:
- return
+ if not self.api_db_done:
+ self.api_db_done = True
- self.api_db_done = True
+ if not self._reuse_or_drop_db(self.api_test_db):
+ testdata = Path('__file__') / '..' / '..' / 'testdb'
+ self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
- if self._reuse_or_drop_db(self.api_test_db):
- return
+ try:
+ self.run_nominatim('import', '--osm-file', str(self.api_test_file))
+ self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
+ self.run_nominatim('freeze')
- testdata = Path('__file__') / '..' / '..' / 'testdb'
- self.test_env['NOMINATIM_TIGER_DATA_PATH'] = str((testdata / 'tiger').resolve())
- self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
+ phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
+ run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
+ except:
+ self.db_drop_database(self.api_test_db)
+ raise
- try:
- self.run_nominatim('import', '--osm-file', str(self.api_test_file))
- self.run_setup_script('import-tiger-data')
- self.run_nominatim('freeze')
-
- phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
- run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
- except:
- self.db_drop_database(self.api_test_db)
- raise
+ tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
def setup_unknown_db(self):
""" Setup a test against a non-existing database.
"""
- self.write_nominatim_config('UNKNOWN_DATABASE_NAME')
+ # The tokenizer needs an existing database to function.
+ # So start with the usual database
+ class _Context:
+ db = None
+
+ context = _Context()
+ self.setup_db(context)
+ tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
+
+ # Then drop the DB again
+ self.teardown_db(context, force_drop=True)
def setup_db(self, context):
""" Setup a test against a fresh, empty test database.
context.db.autocommit = True
psycopg2.extras.register_hstore(context.db, globally=False)
- def teardown_db(self, context):
+ def teardown_db(self, context, force_drop=False):
""" Remove the test database, if it exists.
"""
- if 'db' in context:
+ if hasattr(context, 'db'):
context.db.close()
- if not self.keep_scenario_db:
+ if force_drop or not self.keep_scenario_db:
self.db_drop_database(self.test_db)
def _reuse_or_drop_db(self, name):
def run_nominatim(self, *cmdline):
""" Run the nominatim command-line tool via the library.
"""
+ if self.website_dir is not None:
+ cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
+
cli.nominatim(module_dir='',
osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
phplib_dir=str(self.src_dir / 'lib-php'),
phpcgi_path='',
environ=self.test_env)
- def run_setup_script(self, *args, **kwargs):
- """ Run the Nominatim setup script with the given arguments.
- """
- self.run_nominatim_script('setup', *args, **kwargs)
-
- def run_update_script(self, *args, **kwargs):
- """ Run the Nominatim update script with the given arguments.
- """
- self.run_nominatim_script('update', *args, **kwargs)
-
- def run_nominatim_script(self, script, *args, **kwargs):
- """ Run one of the Nominatim utility scripts with the given arguments.
- """
- cmd = ['/usr/bin/env', 'php', '-Cq']
- cmd.append((Path(self.src_dir) / 'lib-php' / 'admin' / '{}.php'.format(script)).resolve())
- cmd.extend(['--' + x for x in args])
- for k, v in kwargs.items():
- cmd.extend(('--' + k.replace('_', '-'), str(v)))
-
- if self.website_dir is not None:
- cwd = self.website_dir.name
- else:
- cwd = None
-
- run_script(cmd, cwd=cwd, env=self.test_env)
def copy_from_place(self, db):
""" Copy data from place to the placex and location_property_osmline