1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim.config import Configuration
11 from steps.utils import run_script
13 class NominatimEnvironment:
14 """ Collects all functions for the execution of Nominatim functions.
17 def __init__(self, config):
18 self.build_dir = Path(config['BUILDDIR']).resolve()
19 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
20 self.db_host = config['DB_HOST']
21 self.db_port = config['DB_PORT']
22 self.db_user = config['DB_USER']
23 self.db_pass = config['DB_PASS']
24 self.template_db = config['TEMPLATE_DB']
25 self.test_db = config['TEST_DB']
26 self.api_test_db = config['API_TEST_DB']
27 self.api_test_file = config['API_TEST_FILE']
28 self.server_module_path = config['SERVER_MODULE_PATH']
29 self.reuse_template = not config['REMOVE_TEMPLATE']
30 self.keep_scenario_db = config['KEEP_TEST_DB']
31 self.code_coverage_path = config['PHPCOV']
32 self.code_coverage_id = 1
34 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
36 self.template_db_done = False
37 self.api_db_done = False
38 self.website_dir = None
40 def connect_database(self, dbname):
41 """ Return a connection to the database with the given name.
42 Uses configured host, user and port.
44 dbargs = {'database': dbname}
46 dbargs['host'] = self.db_host
48 dbargs['port'] = self.db_port
50 dbargs['user'] = self.db_user
52 dbargs['password'] = self.db_pass
53 conn = psycopg2.connect(**dbargs)
56 def next_code_coverage_file(self):
57 """ Generate the next name for a coverage file.
59 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
60 self.code_coverage_id += 1
64 def write_nominatim_config(self, dbname):
65 """ Set up a custom test configuration that connects to the given
66 database. This sets up the environment variables so that they can
67 be picked up by dotenv and creates a project directory with the
68 appropriate website scripts.
70 dsn = 'pgsql:dbname={}'.format(dbname)
72 dsn += ';host=' + self.db_host
74 dsn += ';port=' + self.db_port
76 dsn += ';user=' + self.db_user
78 dsn += ';password=' + self.db_pass
80 if self.website_dir is not None \
81 and self.test_env is not None \
82 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
83 return # environment already set uo
85 self.test_env = dict(self.default_config)
86 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
87 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
88 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
89 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
90 self.test_env['NOMINATIM_DATADIR'] = self.src_dir
91 self.test_env['NOMINATIM_SQLDIR'] = self.src_dir / 'lib-sql'
92 self.test_env['NOMINATIM_CONFIGDIR'] = self.src_dir / 'settings'
93 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = self.build_dir / 'module'
94 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = self.build_dir / 'osm2pgsql' / 'osm2pgsql'
95 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = self.build_dir / 'nominatim'
97 if self.server_module_path:
98 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
100 # avoid module being copied into the temporary environment
101 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.build_dir / 'module'
103 if self.website_dir is not None:
104 self.website_dir.cleanup()
106 self.website_dir = tempfile.TemporaryDirectory()
107 self.run_setup_script('setup-website')
110 def db_drop_database(self, name):
111 """ Drop the database with the given name.
113 conn = self.connect_database('postgres')
114 conn.set_isolation_level(0)
116 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
119 def setup_template_db(self):
120 """ Setup a template database that already contains common test data.
121 Having a template database speeds up tests considerably but at
122 the price that the tests sometimes run with stale data.
124 if self.template_db_done:
127 self.template_db_done = True
129 if self._reuse_or_drop_db(self.template_db):
133 # call the first part of database setup
134 self.write_nominatim_config(self.template_db)
135 self.run_setup_script('create-db', 'setup-db')
136 # remove external data to speed up indexing for tests
137 conn = self.connect_database(self.template_db)
139 cur.execute("""select tablename from pg_tables
140 where tablename in ('gb_postcode', 'us_postcode')""")
142 conn.cursor().execute('TRUNCATE TABLE {}'.format(t[0]))
146 # execute osm2pgsql import on an empty file to get the right tables
147 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
148 fd.write(b'<osm version="0.6"></osm>')
150 self.run_setup_script('import-data',
154 'create-partition-tables',
155 'create-partition-functions',
157 'create-search-indices',
159 osm2pgsql_cache='200')
161 self.db_drop_database(self.template_db)
165 def setup_api_db(self):
166 """ Setup a test against the API test database.
168 self.write_nominatim_config(self.api_test_db)
173 self.api_db_done = True
175 if self._reuse_or_drop_db(self.api_test_db):
178 testdata = Path('__file__') / '..' / '..' / 'testdb'
179 self.test_env['NOMINATIM_TIGER_DATA_PATH'] = str((testdata / 'tiger').resolve())
180 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
183 self.run_setup_script('all', osm_file=self.api_test_file)
184 self.run_setup_script('import-tiger-data')
186 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
187 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
189 self.db_drop_database(self.api_test_db)
193 def setup_unknown_db(self):
194 """ Setup a test against a non-existing database.
196 self.write_nominatim_config('UNKNOWN_DATABASE_NAME')
198 def setup_db(self, context):
199 """ Setup a test against a fresh, empty test database.
201 self.setup_template_db()
202 self.write_nominatim_config(self.test_db)
203 conn = self.connect_database(self.template_db)
204 conn.set_isolation_level(0)
206 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
207 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
209 context.db = self.connect_database(self.test_db)
210 context.db.autocommit = True
211 psycopg2.extras.register_hstore(context.db, globally=False)
213 def teardown_db(self, context):
214 """ Remove the test database, if it exists.
219 if not self.keep_scenario_db:
220 self.db_drop_database(self.test_db)
222 def _reuse_or_drop_db(self, name):
223 """ Check for the existance of the given DB. If reuse is enabled,
224 then the function checks for existance and returns True if the
225 database is already there. Otherwise an existing database is
226 dropped and always false returned.
228 if self.reuse_template:
229 conn = self.connect_database('postgres')
230 with conn.cursor() as cur:
231 cur.execute('select count(*) from pg_database where datname = %s',
233 if cur.fetchone()[0] == 1:
237 self.db_drop_database(name)
241 def reindex_placex(self, db):
242 """ Run the indexing step until all data in the placex has
243 been processed. Indexing during updates can produce more data
244 to index under some circumstances. That is why indexing may have
245 to be run multiple times.
247 with db.cursor() as cur:
249 self.run_update_script('index')
251 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
252 if cur.rowcount == 0:
255 def run_setup_script(self, *args, **kwargs):
256 """ Run the Nominatim setup script with the given arguments.
258 self.run_nominatim_script('setup', *args, **kwargs)
260 def run_update_script(self, *args, **kwargs):
261 """ Run the Nominatim update script with the given arguments.
263 self.run_nominatim_script('update', *args, **kwargs)
265 def run_nominatim_script(self, script, *args, **kwargs):
266 """ Run one of the Nominatim utility scripts with the given arguments.
268 cmd = ['/usr/bin/env', 'php', '-Cq']
269 cmd.append((Path(self.src_dir) / 'lib-php' / 'admin' / '{}.php'.format(script)).resolve())
270 cmd.extend(['--' + x for x in args])
271 for k, v in kwargs.items():
272 cmd.extend(('--' + k.replace('_', '-'), str(v)))
274 if self.website_dir is not None:
275 cwd = self.website_dir.name
279 run_script(cmd, cwd=cwd, env=self.test_env)
281 def copy_from_place(self, db):
282 """ Copy data from place to the placex and location_property_osmline
283 tables invoking the appropriate triggers.
285 self.run_setup_script('create-functions', 'create-partition-functions')
287 with db.cursor() as cur:
288 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
289 name, admin_level, address,
291 SELECT osm_type, osm_id, class, type,
292 name, admin_level, address,
295 WHERE not (class='place' and type='houses' and osm_type='W')""")
296 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
297 SELECT osm_id, address, geometry
299 WHERE class='place' and type='houses'
301 and ST_GeometryType(geometry) = 'ST_LineString'""")