In the future, the BDD tests will simply set up the required
test database themselves. Like with the template database, it
is not reimported when it already exists unless that is explicitly
forced.
Makes most of the API tests currently fail because they still
point to old test data.
from steps.geometry_factory import GeometryFactory
from steps.nominatim_environment import NominatimEnvironment
from steps.geometry_factory import GeometryFactory
from steps.nominatim_environment import NominatimEnvironment
+TEST_BASE_DIR = Path(__file__) / '..' / '..'
+
- 'BUILDDIR' : (Path(__file__) / '..' / '..' / '..' / 'build').resolve(),
+ 'BUILDDIR' : (TEST_BASE_DIR / '..' / 'build').resolve(),
'REMOVE_TEMPLATE' : False,
'KEEP_TEST_DB' : False,
'DB_HOST' : None,
'REMOVE_TEMPLATE' : False,
'KEEP_TEST_DB' : False,
'DB_HOST' : None,
'TEMPLATE_DB' : 'test_template_nominatim',
'TEST_DB' : 'test_nominatim',
'API_TEST_DB' : 'test_api_nominatim',
'TEMPLATE_DB' : 'test_template_nominatim',
'TEST_DB' : 'test_nominatim',
'API_TEST_DB' : 'test_api_nominatim',
+ 'API_TEST_FILE' : (TEST_BASE_DIR / 'testdb' / 'apidb-test-data.pbf').resolve(),
'SERVER_MODULE_PATH' : None,
'PHPCOV' : False, # set to output directory to enable code coverage
}
'SERVER_MODULE_PATH' : None,
'PHPCOV' : False, # set to output directory to enable code coverage
}
self.template_db = config['TEMPLATE_DB']
self.test_db = config['TEST_DB']
self.api_test_db = config['API_TEST_DB']
self.template_db = config['TEMPLATE_DB']
self.test_db = config['TEST_DB']
self.api_test_db = config['API_TEST_DB']
+ self.api_test_file = config['API_TEST_FILE']
self.server_module_path = config['SERVER_MODULE_PATH']
self.reuse_template = not config['REMOVE_TEMPLATE']
self.keep_scenario_db = config['KEEP_TEST_DB']
self.code_coverage_path = config['PHPCOV']
self.code_coverage_id = 1
self.server_module_path = config['SERVER_MODULE_PATH']
self.reuse_template = not config['REMOVE_TEMPLATE']
self.keep_scenario_db = config['KEEP_TEST_DB']
self.code_coverage_path = config['PHPCOV']
self.code_coverage_id = 1
self.template_db_done = False
self.template_db_done = False
+ self.api_db_done = False
self.website_dir = None
def connect_database(self, dbname):
self.website_dir = None
def connect_database(self, dbname):
self.template_db_done = True
self.template_db_done = True
- if self.reuse_template:
- # check that the template is there
- conn = self.connect_database('postgres')
- cur = conn.cursor()
- cur.execute('select count(*) from pg_database where datname = %s',
- (self.template_db,))
- if cur.fetchone()[0] == 1:
- return
- conn.close()
- else:
- # just in case... make sure a previous table has been dropped
- self.db_drop_database(self.template_db)
+ if self._reuse_or_drop_db(self.template_db):
+ return
try:
# call the first part of database setup
try:
# call the first part of database setup
"""
self.write_nominatim_config(self.api_test_db)
"""
self.write_nominatim_config(self.api_test_db)
+ if self.api_db_done:
+ return
+
+ self.api_db_done = True
+
+ if self._reuse_or_drop_db(self.api_test_db):
+ return
+
+ testdata = Path('__file__') / '..' / '..' / 'testdb'
+ self.test_env['NOMINATIM_TIGER_DATA_PATH'] = str((testdata / 'tiger').resolve())
+ self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
+
+ try:
+ self.run_setup_script('all', 'import-tiger-data',
+ osm_file=self.api_test_file)
+ except:
+ self.db_drop_database(self.api_test_db)
+ raise
+
+
def setup_unknown_db(self):
""" Setup a test against a non-existing database.
"""
def setup_unknown_db(self):
""" Setup a test against a non-existing database.
"""
if not self.keep_scenario_db:
self.db_drop_database(self.test_db)
if not self.keep_scenario_db:
self.db_drop_database(self.test_db)
+ def _reuse_or_drop_db(self, name):
+ """ Check for the existance of the given DB. If reuse is enabled,
+ then the function checks for existance and returns True if the
+ database is already there. Otherwise an existing database is
+ dropped and always false returned.
+ """
+ if self.reuse_template:
+ conn = self.connect_database('postgres')
+ with conn.cursor() as cur:
+ cur.execute('select count(*) from pg_database where datname = %s',
+ (name,))
+ if cur.fetchone()[0] == 1:
+ return True
+ conn.close()
+ else:
+ self.db_drop_database(name)
+
+ return False
+
def reindex_placex(self, db):
""" Run the indexing step until all data in the placex has
been processed. Indexing during updates can produce more data
def reindex_placex(self, db):
""" Run the indexing step until all data in the placex has
been processed. Indexing during updates can produce more data