1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.tools import refresh
13 from steps.utils import run_script
15 class NominatimEnvironment:
16 """ Collects all functions for the execution of Nominatim functions.
19 def __init__(self, config):
20 self.build_dir = Path(config['BUILDDIR']).resolve()
21 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
22 self.db_host = config['DB_HOST']
23 self.db_port = config['DB_PORT']
24 self.db_user = config['DB_USER']
25 self.db_pass = config['DB_PASS']
26 self.template_db = config['TEMPLATE_DB']
27 self.test_db = config['TEST_DB']
28 self.api_test_db = config['API_TEST_DB']
29 self.api_test_file = config['API_TEST_FILE']
30 self.server_module_path = config['SERVER_MODULE_PATH']
31 self.reuse_template = not config['REMOVE_TEMPLATE']
32 self.keep_scenario_db = config['KEEP_TEST_DB']
33 self.code_coverage_path = config['PHPCOV']
34 self.code_coverage_id = 1
36 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
38 self.template_db_done = False
39 self.api_db_done = False
40 self.website_dir = None
42 def connect_database(self, dbname):
43 """ Return a connection to the database with the given name.
44 Uses configured host, user and port.
46 dbargs = {'database': dbname}
48 dbargs['host'] = self.db_host
50 dbargs['port'] = self.db_port
52 dbargs['user'] = self.db_user
54 dbargs['password'] = self.db_pass
55 conn = psycopg2.connect(**dbargs)
58 def next_code_coverage_file(self):
59 """ Generate the next name for a coverage file.
61 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
62 self.code_coverage_id += 1
66 def write_nominatim_config(self, dbname):
67 """ Set up a custom test configuration that connects to the given
68 database. This sets up the environment variables so that they can
69 be picked up by dotenv and creates a project directory with the
70 appropriate website scripts.
72 dsn = 'pgsql:dbname={}'.format(dbname)
74 dsn += ';host=' + self.db_host
76 dsn += ';port=' + self.db_port
78 dsn += ';user=' + self.db_user
80 dsn += ';password=' + self.db_pass
82 if self.website_dir is not None \
83 and self.test_env is not None \
84 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
85 return # environment already set uo
87 self.test_env = dict(self.default_config)
88 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
89 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
90 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
91 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
92 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
93 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
94 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
95 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
96 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
97 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
99 if self.server_module_path:
100 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
102 # avoid module being copied into the temporary environment
103 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
105 if self.website_dir is not None:
106 self.website_dir.cleanup()
108 self.website_dir = tempfile.TemporaryDirectory()
109 cfg = Configuration(None, self.src_dir / 'settings', environ=self.test_env)
110 cfg.lib_dir.php = self.src_dir / 'lib-php'
111 refresh.setup_website(Path(self.website_dir.name) / 'website', cfg)
113 def get_libpq_dsn(self):
114 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
116 def quote_param(param):
117 key, val = param.split('=')
118 val = val.replace('\\', '\\\\').replace("'", "\\'")
120 val = "'" + val + "'"
121 return key + '=' + val
123 if dsn.startswith('pgsql:'):
124 # Old PHP DSN format. Convert before returning.
125 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
130 def db_drop_database(self, name):
131 """ Drop the database with the given name.
133 conn = self.connect_database('postgres')
134 conn.set_isolation_level(0)
136 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
139 def setup_template_db(self):
140 """ Setup a template database that already contains common test data.
141 Having a template database speeds up tests considerably but at
142 the price that the tests sometimes run with stale data.
144 if self.template_db_done:
147 self.template_db_done = True
149 if self._reuse_or_drop_db(self.template_db):
152 self.write_nominatim_config(self.template_db)
155 # execute nominatim import on an empty file to get the right tables
156 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
157 fd.write(b'<osm version="0.6"></osm>')
159 self.run_nominatim('import', '--osm-file', fd.name,
160 '--osm2pgsql-cache', '1',
163 self.db_drop_database(self.template_db)
167 def setup_api_db(self):
168 """ Setup a test against the API test database.
170 self.write_nominatim_config(self.api_test_db)
175 self.api_db_done = True
177 if self._reuse_or_drop_db(self.api_test_db):
180 testdata = Path('__file__') / '..' / '..' / 'testdb'
181 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
184 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
185 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
186 self.run_nominatim('freeze')
188 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
189 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
191 self.db_drop_database(self.api_test_db)
195 def setup_unknown_db(self):
196 """ Setup a test against a non-existing database.
198 self.write_nominatim_config('UNKNOWN_DATABASE_NAME')
200 def setup_db(self, context):
201 """ Setup a test against a fresh, empty test database.
203 self.setup_template_db()
204 self.write_nominatim_config(self.test_db)
205 conn = self.connect_database(self.template_db)
206 conn.set_isolation_level(0)
208 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
209 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
211 context.db = self.connect_database(self.test_db)
212 context.db.autocommit = True
213 psycopg2.extras.register_hstore(context.db, globally=False)
215 def teardown_db(self, context):
216 """ Remove the test database, if it exists.
221 if not self.keep_scenario_db:
222 self.db_drop_database(self.test_db)
224 def _reuse_or_drop_db(self, name):
225 """ Check for the existance of the given DB. If reuse is enabled,
226 then the function checks for existance and returns True if the
227 database is already there. Otherwise an existing database is
228 dropped and always false returned.
230 if self.reuse_template:
231 conn = self.connect_database('postgres')
232 with conn.cursor() as cur:
233 cur.execute('select count(*) from pg_database where datname = %s',
235 if cur.fetchone()[0] == 1:
239 self.db_drop_database(name)
243 def reindex_placex(self, db):
244 """ Run the indexing step until all data in the placex has
245 been processed. Indexing during updates can produce more data
246 to index under some circumstances. That is why indexing may have
247 to be run multiple times.
249 with db.cursor() as cur:
251 self.run_nominatim('index')
253 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
254 if cur.rowcount == 0:
257 def run_nominatim(self, *cmdline):
258 """ Run the nominatim command-line tool via the library.
260 if self.website_dir is not None:
261 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
263 cli.nominatim(module_dir='',
264 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
265 phplib_dir=str(self.src_dir / 'lib-php'),
266 sqllib_dir=str(self.src_dir / 'lib-sql'),
267 data_dir=str(self.src_dir / 'data'),
268 config_dir=str(self.src_dir / 'settings'),
271 environ=self.test_env)
274 def copy_from_place(self, db):
275 """ Copy data from place to the placex and location_property_osmline
276 tables invoking the appropriate triggers.
278 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
280 with db.cursor() as cur:
281 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
282 name, admin_level, address,
284 SELECT osm_type, osm_id, class, type,
285 name, admin_level, address,
288 WHERE not (class='place' and type='houses' and osm_type='W')""")
289 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
290 SELECT osm_id, address, geometry
292 WHERE class='place' and type='houses'
294 and ST_GeometryType(geometry) = 'ST_LineString'""")