1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.tools import refresh
13 from nominatim.tokenizer import factory as tokenizer_factory
14 from steps.utils import run_script
16 class NominatimEnvironment:
17 """ Collects all functions for the execution of Nominatim functions.
20 def __init__(self, config):
21 self.build_dir = Path(config['BUILDDIR']).resolve()
22 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
23 self.db_host = config['DB_HOST']
24 self.db_port = config['DB_PORT']
25 self.db_user = config['DB_USER']
26 self.db_pass = config['DB_PASS']
27 self.template_db = config['TEMPLATE_DB']
28 self.test_db = config['TEST_DB']
29 self.api_test_db = config['API_TEST_DB']
30 self.api_test_file = config['API_TEST_FILE']
31 self.server_module_path = config['SERVER_MODULE_PATH']
32 self.reuse_template = not config['REMOVE_TEMPLATE']
33 self.keep_scenario_db = config['KEEP_TEST_DB']
34 self.code_coverage_path = config['PHPCOV']
35 self.code_coverage_id = 1
37 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
39 self.template_db_done = False
40 self.api_db_done = False
41 self.website_dir = None
43 def connect_database(self, dbname):
44 """ Return a connection to the database with the given name.
45 Uses configured host, user and port.
47 dbargs = {'database': dbname}
49 dbargs['host'] = self.db_host
51 dbargs['port'] = self.db_port
53 dbargs['user'] = self.db_user
55 dbargs['password'] = self.db_pass
56 conn = psycopg2.connect(**dbargs)
59 def next_code_coverage_file(self):
60 """ Generate the next name for a coverage file.
62 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
63 self.code_coverage_id += 1
67 def write_nominatim_config(self, dbname):
68 """ Set up a custom test configuration that connects to the given
69 database. This sets up the environment variables so that they can
70 be picked up by dotenv and creates a project directory with the
71 appropriate website scripts.
73 dsn = 'pgsql:dbname={}'.format(dbname)
75 dsn += ';host=' + self.db_host
77 dsn += ';port=' + self.db_port
79 dsn += ';user=' + self.db_user
81 dsn += ';password=' + self.db_pass
83 if self.website_dir is not None \
84 and self.test_env is not None \
85 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
86 return # environment already set uo
88 self.test_env = dict(self.default_config)
89 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
90 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
91 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
92 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
93 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
94 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
95 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
96 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
97 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
98 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
100 if self.server_module_path:
101 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
103 # avoid module being copied into the temporary environment
104 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
106 if self.website_dir is not None:
107 self.website_dir.cleanup()
109 self.website_dir = tempfile.TemporaryDirectory()
110 refresh.setup_website(Path(self.website_dir.name) / 'website',
111 self.get_test_config())
114 def get_test_config(self):
115 cfg = Configuration(Path(self.website_dir.name), self.src_dir / 'settings',
116 environ=self.test_env)
117 cfg.set_libdirs(module=self.build_dir / 'module',
118 osm2pgsql=self.build_dir / 'osm2pgsql' / 'osm2pgsql',
119 php=self.src_dir / 'lib-php',
120 sql=self.src_dir / 'lib-sql',
121 data=self.src_dir / 'data')
124 def get_libpq_dsn(self):
125 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
127 def quote_param(param):
128 key, val = param.split('=')
129 val = val.replace('\\', '\\\\').replace("'", "\\'")
131 val = "'" + val + "'"
132 return key + '=' + val
134 if dsn.startswith('pgsql:'):
135 # Old PHP DSN format. Convert before returning.
136 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
141 def db_drop_database(self, name):
142 """ Drop the database with the given name.
144 conn = self.connect_database('postgres')
145 conn.set_isolation_level(0)
147 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
150 def setup_template_db(self):
151 """ Setup a template database that already contains common test data.
152 Having a template database speeds up tests considerably but at
153 the price that the tests sometimes run with stale data.
155 if self.template_db_done:
158 self.template_db_done = True
160 if self._reuse_or_drop_db(self.template_db):
163 self.write_nominatim_config(self.template_db)
166 # execute nominatim import on an empty file to get the right tables
167 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
168 fd.write(b'<osm version="0.6"></osm>')
170 self.run_nominatim('import', '--osm-file', fd.name,
171 '--osm2pgsql-cache', '1',
174 self.db_drop_database(self.template_db)
178 def setup_api_db(self):
179 """ Setup a test against the API test database.
181 self.write_nominatim_config(self.api_test_db)
183 if not self.api_db_done:
184 self.api_db_done = True
186 if not self._reuse_or_drop_db(self.api_test_db):
187 testdata = Path('__file__') / '..' / '..' / 'testdb'
188 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
191 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
192 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
193 self.run_nominatim('freeze')
195 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
196 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
198 self.db_drop_database(self.api_test_db)
201 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
204 def setup_unknown_db(self):
205 """ Setup a test against a non-existing database.
207 # The tokenizer needs an existing database to function.
208 # So start with the usual database
213 self.setup_db(context)
214 tokenizer_factory.create_tokenizer(self.get_test_config(), init_db=False)
216 # Then drop the DB again
217 self.teardown_db(context, force_drop=True)
219 def setup_db(self, context):
220 """ Setup a test against a fresh, empty test database.
222 self.setup_template_db()
223 self.write_nominatim_config(self.test_db)
224 conn = self.connect_database(self.template_db)
225 conn.set_isolation_level(0)
227 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
228 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
230 context.db = self.connect_database(self.test_db)
231 context.db.autocommit = True
232 psycopg2.extras.register_hstore(context.db, globally=False)
234 def teardown_db(self, context, force_drop=False):
235 """ Remove the test database, if it exists.
237 if hasattr(context, 'db'):
240 if force_drop or not self.keep_scenario_db:
241 self.db_drop_database(self.test_db)
243 def _reuse_or_drop_db(self, name):
244 """ Check for the existance of the given DB. If reuse is enabled,
245 then the function checks for existance and returns True if the
246 database is already there. Otherwise an existing database is
247 dropped and always false returned.
249 if self.reuse_template:
250 conn = self.connect_database('postgres')
251 with conn.cursor() as cur:
252 cur.execute('select count(*) from pg_database where datname = %s',
254 if cur.fetchone()[0] == 1:
258 self.db_drop_database(name)
262 def reindex_placex(self, db):
263 """ Run the indexing step until all data in the placex has
264 been processed. Indexing during updates can produce more data
265 to index under some circumstances. That is why indexing may have
266 to be run multiple times.
268 with db.cursor() as cur:
270 self.run_nominatim('index')
272 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
273 if cur.rowcount == 0:
276 def run_nominatim(self, *cmdline):
277 """ Run the nominatim command-line tool via the library.
279 if self.website_dir is not None:
280 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
282 cli.nominatim(module_dir='',
283 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
284 phplib_dir=str(self.src_dir / 'lib-php'),
285 sqllib_dir=str(self.src_dir / 'lib-sql'),
286 data_dir=str(self.src_dir / 'data'),
287 config_dir=str(self.src_dir / 'settings'),
290 environ=self.test_env)
293 def copy_from_place(self, db):
294 """ Copy data from place to the placex and location_property_osmline
295 tables invoking the appropriate triggers.
297 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
299 with db.cursor() as cur:
300 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
301 name, admin_level, address,
303 SELECT osm_type, osm_id, class, type,
304 name, admin_level, address,
307 WHERE not (class='place' and type='houses' and osm_type='W')""")
308 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
309 SELECT osm_id, address, geometry
311 WHERE class='place' and type='houses'
313 and ST_GeometryType(geometry) = 'ST_LineString'""")