1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.tools import refresh
13 from nominatim.tokenizer import factory as tokenizer_factory
14 from steps.utils import run_script
16 class NominatimEnvironment:
17 """ Collects all functions for the execution of Nominatim functions.
20 def __init__(self, config):
21 self.build_dir = Path(config['BUILDDIR']).resolve()
22 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
23 self.db_host = config['DB_HOST']
24 self.db_port = config['DB_PORT']
25 self.db_user = config['DB_USER']
26 self.db_pass = config['DB_PASS']
27 self.template_db = config['TEMPLATE_DB']
28 self.test_db = config['TEST_DB']
29 self.api_test_db = config['API_TEST_DB']
30 self.api_test_file = config['API_TEST_FILE']
31 self.tokenizer = config['TOKENIZER']
32 self.server_module_path = config['SERVER_MODULE_PATH']
33 self.reuse_template = not config['REMOVE_TEMPLATE']
34 self.keep_scenario_db = config['KEEP_TEST_DB']
35 self.code_coverage_path = config['PHPCOV']
36 self.code_coverage_id = 1
38 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
40 self.template_db_done = False
41 self.api_db_done = False
42 self.website_dir = None
44 def connect_database(self, dbname):
45 """ Return a connection to the database with the given name.
46 Uses configured host, user and port.
48 dbargs = {'database': dbname}
50 dbargs['host'] = self.db_host
52 dbargs['port'] = self.db_port
54 dbargs['user'] = self.db_user
56 dbargs['password'] = self.db_pass
57 conn = psycopg2.connect(**dbargs)
60 def next_code_coverage_file(self):
61 """ Generate the next name for a coverage file.
63 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
64 self.code_coverage_id += 1
68 def write_nominatim_config(self, dbname):
69 """ Set up a custom test configuration that connects to the given
70 database. This sets up the environment variables so that they can
71 be picked up by dotenv and creates a project directory with the
72 appropriate website scripts.
74 dsn = 'pgsql:dbname={}'.format(dbname)
76 dsn += ';host=' + self.db_host
78 dsn += ';port=' + self.db_port
80 dsn += ';user=' + self.db_user
82 dsn += ';password=' + self.db_pass
84 if self.website_dir is not None \
85 and self.test_env is not None \
86 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
87 return # environment already set uo
89 self.test_env = dict(self.default_config)
90 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
91 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
92 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
93 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
94 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
95 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
96 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
97 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
98 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
99 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
100 if self.tokenizer is not None:
101 self.test_env['NOMINATIM_TOKENIZER'] = self.tokenizer
103 if self.server_module_path:
104 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
106 # avoid module being copied into the temporary environment
107 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
109 if self.website_dir is not None:
110 self.website_dir.cleanup()
112 self.website_dir = tempfile.TemporaryDirectory()
113 refresh.setup_website(Path(self.website_dir.name) / 'website',
114 self.get_test_config())
117 def get_test_config(self):
118 cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
119 environ=self.test_env)
120 cfg.set_libdirs(module=self.build_dir / 'module',
121 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
122 php=self.src_dir / 'lib-php',
123 sql=self.src_dir / 'lib-sql',
124 data=self.src_dir / 'data')
127 def get_libpq_dsn(self):
128 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
130 def quote_param(param):
131 key, val = param.split('=')
132 val = val.replace('\\', '\\\\').replace("'", "\\'")
134 val = "'" + val + "'"
135 return key + '=' + val
137 if dsn.startswith('pgsql:'):
138 # Old PHP DSN format. Convert before returning.
139 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
144 def db_drop_database(self, name):
145 """ Drop the database with the given name.
147 conn = self.connect_database('postgres')
148 conn.set_isolation_level(0)
150 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
153 def setup_template_db(self):
154 """ Setup a template database that already contains common test data.
155 Having a template database speeds up tests considerably but at
156 the price that the tests sometimes run with stale data.
158 if self.template_db_done:
161 self.template_db_done = True
163 if self._reuse_or_drop_db(self.template_db):
166 self.write_nominatim_config(self.template_db)
169 # execute nominatim import on an empty file to get the right tables
170 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
171 fd.write(b'<osm version="0.6"></osm>')
173 self.run_nominatim('import', '--osm-file', fd.name,
174 '--osm2pgsql-cache', '1',
177 self.db_drop_database(self.template_db)
181 def setup_api_db(self):
182 """ Setup a test against the API test database.
184 self.write_nominatim_config(self.api_test_db)
186 if not self.api_db_done:
187 self.api_db_done = True
189 if not self._reuse_or_drop_db(self.api_test_db):
190 testdata = Path('__file__') / '..' / '..' / 'testdb'
191 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
194 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
195 if self.tokenizer != 'legacy_icu':
196 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
197 self.run_nominatim('freeze')
199 if self.tokenizer != 'legacy_icu':
200 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
201 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
203 # XXX Temporary use the wiki while there is no CSV import
205 self.test_env['NOMINATIM_LANGUAGES'] = 'en'
206 self.run_nominatim('special-phrases', '--import-from-wiki')
207 del self.test_env['NOMINATIM_LANGUAGES']
209 self.db_drop_database(self.api_test_db)
212 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
215 def setup_unknown_db(self):
216 """ Setup a test against a non-existing database.
218 # The tokenizer needs an existing database to function.
219 # So start with the usual database
224 self.setup_db(context)
225 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
227 # Then drop the DB again
228 self.teardown_db(context, force_drop=True)
230 def setup_db(self, context):
231 """ Setup a test against a fresh, empty test database.
233 self.setup_template_db()
234 self.write_nominatim_config(self.test_db)
235 conn = self.connect_database(self.template_db)
236 conn.set_isolation_level(0)
238 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
239 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
241 context.db = self.connect_database(self.test_db)
242 context.db.autocommit = True
243 psycopg2.extras.register_hstore(context.db, globally=False)
245 def teardown_db(self, context, force_drop=False):
246 """ Remove the test database, if it exists.
248 if hasattr(context, 'db'):
251 if force_drop or not self.keep_scenario_db:
252 self.db_drop_database(self.test_db)
254 def _reuse_or_drop_db(self, name):
255 """ Check for the existance of the given DB. If reuse is enabled,
256 then the function checks for existance and returns True if the
257 database is already there. Otherwise an existing database is
258 dropped and always false returned.
260 if self.reuse_template:
261 conn = self.connect_database('postgres')
262 with conn.cursor() as cur:
263 cur.execute('select count(*) from pg_database where datname = %s',
265 if cur.fetchone()[0] == 1:
269 self.db_drop_database(name)
273 def reindex_placex(self, db):
274 """ Run the indexing step until all data in the placex has
275 been processed. Indexing during updates can produce more data
276 to index under some circumstances. That is why indexing may have
277 to be run multiple times.
279 with db.cursor() as cur:
281 self.run_nominatim('index')
283 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
284 if cur.rowcount == 0:
287 def run_nominatim(self, *cmdline):
288 """ Run the nominatim command-line tool via the library.
290 if self.website_dir is not None:
291 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
293 cli.nominatim(module_dir='',
294 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
295 phplib_dir=str(self.src_dir / 'lib-php'),
296 sqllib_dir=str(self.src_dir / 'lib-sql'),
297 data_dir=str(self.src_dir / 'data'),
298 config_dir=str(self.src_dir / 'settings'),
301 environ=self.test_env)
304 def copy_from_place(self, db):
305 """ Copy data from place to the placex and location_property_osmline
306 tables invoking the appropriate triggers.
308 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
310 with db.cursor() as cur:
311 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
312 name, admin_level, address,
314 SELECT osm_type, osm_id, class, type,
315 name, admin_level, address,
318 WHERE not (class='place' and type='houses' and osm_type='W')""")
319 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
320 SELECT osm_id, address, geometry
322 WHERE class='place' and type='houses'
324 and ST_GeometryType(geometry) = 'ST_LineString'""")