# For a full list of authors see the git log.
from pathlib import Path
import importlib
-import sys
import tempfile
-import psycopg2
-import psycopg2.extras
-
-sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..'/ 'src').resolve()))
+import psycopg
+from psycopg import sql as pysql
from nominatim_db import cli
from nominatim_db.config import Configuration
-from nominatim_db.db.connection import Connection
+from nominatim_db.db.connection import Connection, register_hstore, execute_scalar
from nominatim_db.tools import refresh
from nominatim_db.tokenizer import factory as tokenizer_factory
from steps.utils import run_script
"""
def __init__(self, config):
- self.build_dir = Path(config['BUILDDIR']).resolve()
self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
self.db_host = config['DB_HOST']
self.db_port = config['DB_PORT']
self.server_module_path = config['SERVER_MODULE_PATH']
self.reuse_template = not config['REMOVE_TEMPLATE']
self.keep_scenario_db = config['KEEP_TEST_DB']
- self.code_coverage_path = config['PHPCOV']
- self.code_coverage_id = 1
self.default_config = Configuration(None).get_os_env()
self.test_env = None
raise RuntimeError(f"Unknown API engine '{config['API_ENGINE']}'")
self.api_engine = getattr(self, f"create_api_request_func_{config['API_ENGINE']}")()
+ if self.tokenizer == 'legacy' and self.server_module_path is None:
+ raise RuntimeError("You must set -DSERVER_MODULE_PATH when testing the legacy tokenizer.")
+
def connect_database(self, dbname):
""" Return a connection to the database with the given name.
Uses configured host, user and port.
"""
- dbargs = {'database': dbname}
+ dbargs = {'dbname': dbname, 'row_factory': psycopg.rows.dict_row}
if self.db_host:
dbargs['host'] = self.db_host
if self.db_port:
dbargs['user'] = self.db_user
if self.db_pass:
dbargs['password'] = self.db_pass
- conn = psycopg2.connect(connection_factory=Connection, **dbargs)
- return conn
+ return psycopg.connect(**dbargs)
- def next_code_coverage_file(self):
- """ Generate the next name for a coverage file.
- """
- fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
- self.code_coverage_id += 1
-
- return fn.resolve()
def write_nominatim_config(self, dbname):
""" Set up a custom test configuration that connects to the given
self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
- self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
- self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
if self.tokenizer is not None:
self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
if self.import_style is not None:
if self.server_module_path:
self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
- else:
- # avoid module being copied into the temporary environment
- self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
if self.website_dir is not None:
self.website_dir.cleanup()
self.website_dir = tempfile.TemporaryDirectory()
- try:
- conn = self.connect_database(dbname)
- except:
- conn = False
- refresh.setup_website(Path(self.website_dir.name) / 'website',
- self.get_test_config(), conn)
-
def get_test_config(self):
cfg = Configuration(Path(self.website_dir.name), environ=self.test_env)
- cfg.set_libdirs(module=self.build_dir / 'module',
- osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql')
+ cfg.set_libdirs(module=self.server_module_path)
return cfg
def get_libpq_dsn(self):
def db_drop_database(self, name):
""" Drop the database with the given name.
"""
- conn = self.connect_database('postgres')
- conn.set_isolation_level(0)
- cur = conn.cursor()
- cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
- conn.close()
+ with self.connect_database('postgres') as conn:
+ conn.autocommit = True
+ conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
+ + pysql.Identifier(name))
def setup_template_db(self):
""" Setup a template database that already contains common test data.
""" Setup a test against a fresh, empty test database.
"""
self.setup_template_db()
- conn = self.connect_database(self.template_db)
- conn.set_isolation_level(0)
- cur = conn.cursor()
- cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
- cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
- conn.close()
+ with self.connect_database(self.template_db) as conn:
+ conn.autocommit = True
+ conn.execute(pysql.SQL('DROP DATABASE IF EXISTS')
+ + pysql.Identifier(self.test_db))
+ conn.execute(pysql.SQL('CREATE DATABASE {} TEMPLATE = {}').format(
+ pysql.Identifier(self.test_db),
+ pysql.Identifier(self.template_db)))
+
self.write_nominatim_config(self.test_db)
context.db = self.connect_database(self.test_db)
context.db.autocommit = True
- psycopg2.extras.register_hstore(context.db, globally=False)
+ register_hstore(context.db)
def teardown_db(self, context, force_drop=False):
""" Remove the test database, if it exists.
dropped and always false returned.
"""
if self.reuse_template:
- conn = self.connect_database('postgres')
- with conn.cursor() as cur:
- cur.execute('select count(*) from pg_database where datname = %s',
- (name,))
- if cur.fetchone()[0] == 1:
+ with self.connect_database('postgres') as conn:
+ num = execute_scalar(conn,
+ 'select count(*) from pg_database where datname = %s',
+ (name,))
+ if num == 1:
return True
- conn.close()
else:
self.db_drop_database(name)
return False
+
def reindex_placex(self, db):
""" Run the indexing step until all data in the placex has
been processed. Indexing during updates can produce more data
to index under some circumstances. That is why indexing may have
to be run multiple times.
"""
- with db.cursor() as cur:
- while True:
- self.run_nominatim('index')
+ self.run_nominatim('index')
- cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
- if cur.rowcount == 0:
- return
def run_nominatim(self, *cmdline):
""" Run the nominatim command-line tool via the library.
if self.website_dir is not None:
cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
- cli.nominatim(module_dir='',
- osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
+ cli.nominatim(module_dir=self.server_module_path,
+ osm2pgsql_path=None,
cli_args=cmdline,
environ=self.test_env)