1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim.config import Configuration
11 from nominatim.tools import refresh
12 from steps.utils import run_script
14 class NominatimEnvironment:
15 """ Collects all functions for the execution of Nominatim functions.
18 def __init__(self, config):
19 self.build_dir = Path(config['BUILDDIR']).resolve()
20 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
21 self.db_host = config['DB_HOST']
22 self.db_port = config['DB_PORT']
23 self.db_user = config['DB_USER']
24 self.db_pass = config['DB_PASS']
25 self.template_db = config['TEMPLATE_DB']
26 self.test_db = config['TEST_DB']
27 self.api_test_db = config['API_TEST_DB']
28 self.api_test_file = config['API_TEST_FILE']
29 self.server_module_path = config['SERVER_MODULE_PATH']
30 self.reuse_template = not config['REMOVE_TEMPLATE']
31 self.keep_scenario_db = config['KEEP_TEST_DB']
32 self.code_coverage_path = config['PHPCOV']
33 self.code_coverage_id = 1
35 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
37 self.template_db_done = False
38 self.api_db_done = False
39 self.website_dir = None
41 def connect_database(self, dbname):
42 """ Return a connection to the database with the given name.
43 Uses configured host, user and port.
45 dbargs = {'database': dbname}
47 dbargs['host'] = self.db_host
49 dbargs['port'] = self.db_port
51 dbargs['user'] = self.db_user
53 dbargs['password'] = self.db_pass
54 conn = psycopg2.connect(**dbargs)
57 def next_code_coverage_file(self):
58 """ Generate the next name for a coverage file.
60 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
61 self.code_coverage_id += 1
65 def write_nominatim_config(self, dbname):
66 """ Set up a custom test configuration that connects to the given
67 database. This sets up the environment variables so that they can
68 be picked up by dotenv and creates a project directory with the
69 appropriate website scripts.
71 dsn = 'pgsql:dbname={}'.format(dbname)
73 dsn += ';host=' + self.db_host
75 dsn += ';port=' + self.db_port
77 dsn += ';user=' + self.db_user
79 dsn += ';password=' + self.db_pass
81 if self.website_dir is not None \
82 and self.test_env is not None \
83 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
84 return # environment already set uo
86 self.test_env = dict(self.default_config)
87 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
88 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
89 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
90 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
91 self.test_env['NOMINATIM_DATADIR'] = self.src_dir / 'data'
92 self.test_env['NOMINATIM_SQLDIR'] = self.src_dir / 'lib-sql'
93 self.test_env['NOMINATIM_CONFIGDIR'] = self.src_dir / 'settings'
94 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = self.build_dir / 'module'
95 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = self.build_dir / 'osm2pgsql' / 'osm2pgsql'
96 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = self.build_dir / 'nominatim'
98 if self.server_module_path:
99 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
101 # avoid module being copied into the temporary environment
102 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.build_dir / 'module'
104 if self.website_dir is not None:
105 self.website_dir.cleanup()
107 self.website_dir = tempfile.TemporaryDirectory()
108 cfg = Configuration(None, self.src_dir / 'settings', environ=self.test_env)
109 refresh.setup_website(Path(self.website_dir.name) / 'website', self.src_dir / 'lib-php', cfg)
112 def db_drop_database(self, name):
113 """ Drop the database with the given name.
115 conn = self.connect_database('postgres')
116 conn.set_isolation_level(0)
118 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
121 def setup_template_db(self):
122 """ Setup a template database that already contains common test data.
123 Having a template database speeds up tests considerably but at
124 the price that the tests sometimes run with stale data.
126 if self.template_db_done:
129 self.template_db_done = True
131 if self._reuse_or_drop_db(self.template_db):
135 # call the first part of database setup
136 self.write_nominatim_config(self.template_db)
137 self.run_setup_script('create-db', 'setup-db')
138 # remove external data to speed up indexing for tests
139 conn = self.connect_database(self.template_db)
141 cur.execute("""select tablename from pg_tables
142 where tablename in ('gb_postcode', 'us_postcode')""")
144 conn.cursor().execute('TRUNCATE TABLE {}'.format(t[0]))
148 # execute osm2pgsql import on an empty file to get the right tables
149 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
150 fd.write(b'<osm version="0.6"></osm>')
152 self.run_setup_script('import-data',
156 'create-partition-tables',
157 'create-partition-functions',
159 'create-search-indices',
161 osm2pgsql_cache='200')
163 self.db_drop_database(self.template_db)
167 def setup_api_db(self):
168 """ Setup a test against the API test database.
170 self.write_nominatim_config(self.api_test_db)
175 self.api_db_done = True
177 if self._reuse_or_drop_db(self.api_test_db):
180 testdata = Path('__file__') / '..' / '..' / 'testdb'
181 self.test_env['NOMINATIM_TIGER_DATA_PATH'] = str((testdata / 'tiger').resolve())
182 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
185 self.run_setup_script('all', osm_file=self.api_test_file)
186 self.run_setup_script('import-tiger-data')
187 self.run_setup_script('drop')
189 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
190 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
192 self.db_drop_database(self.api_test_db)
196 def setup_unknown_db(self):
197 """ Setup a test against a non-existing database.
199 self.write_nominatim_config('UNKNOWN_DATABASE_NAME')
201 def setup_db(self, context):
202 """ Setup a test against a fresh, empty test database.
204 self.setup_template_db()
205 self.write_nominatim_config(self.test_db)
206 conn = self.connect_database(self.template_db)
207 conn.set_isolation_level(0)
209 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
210 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
212 context.db = self.connect_database(self.test_db)
213 context.db.autocommit = True
214 psycopg2.extras.register_hstore(context.db, globally=False)
216 def teardown_db(self, context):
217 """ Remove the test database, if it exists.
222 if not self.keep_scenario_db:
223 self.db_drop_database(self.test_db)
225 def _reuse_or_drop_db(self, name):
226 """ Check for the existance of the given DB. If reuse is enabled,
227 then the function checks for existance and returns True if the
228 database is already there. Otherwise an existing database is
229 dropped and always false returned.
231 if self.reuse_template:
232 conn = self.connect_database('postgres')
233 with conn.cursor() as cur:
234 cur.execute('select count(*) from pg_database where datname = %s',
236 if cur.fetchone()[0] == 1:
240 self.db_drop_database(name)
244 def reindex_placex(self, db):
245 """ Run the indexing step until all data in the placex has
246 been processed. Indexing during updates can produce more data
247 to index under some circumstances. That is why indexing may have
248 to be run multiple times.
250 with db.cursor() as cur:
252 self.run_update_script('index')
254 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
255 if cur.rowcount == 0:
258 def run_setup_script(self, *args, **kwargs):
259 """ Run the Nominatim setup script with the given arguments.
261 self.run_nominatim_script('setup', *args, **kwargs)
263 def run_update_script(self, *args, **kwargs):
264 """ Run the Nominatim update script with the given arguments.
266 self.run_nominatim_script('update', *args, **kwargs)
268 def run_nominatim_script(self, script, *args, **kwargs):
269 """ Run one of the Nominatim utility scripts with the given arguments.
271 cmd = ['/usr/bin/env', 'php', '-Cq']
272 cmd.append((Path(self.src_dir) / 'lib-php' / 'admin' / '{}.php'.format(script)).resolve())
273 cmd.extend(['--' + x for x in args])
274 for k, v in kwargs.items():
275 cmd.extend(('--' + k.replace('_', '-'), str(v)))
277 if self.website_dir is not None:
278 cwd = self.website_dir.name
282 run_script(cmd, cwd=cwd, env=self.test_env)
284 def copy_from_place(self, db):
285 """ Copy data from place to the placex and location_property_osmline
286 tables invoking the appropriate triggers.
288 self.run_setup_script('create-functions', 'create-partition-functions')
290 with db.cursor() as cur:
291 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
292 name, admin_level, address,
294 SELECT osm_type, osm_id, class, type,
295 name, admin_level, address,
298 WHERE not (class='place' and type='houses' and osm_type='W')""")
299 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
300 SELECT osm_id, address, geometry
302 WHERE class='place' and type='houses'
304 and ST_GeometryType(geometry) = 'ST_LineString'""")