1 from pathlib import Path
8 sys.path.insert(1, str((Path(__file__) / '..' / '..' / '..' / '..').resolve()))
10 from nominatim import cli
11 from nominatim.config import Configuration
12 from nominatim.tools import refresh
13 from steps.utils import run_script
15 class NominatimEnvironment:
16 """ Collects all functions for the execution of Nominatim functions.
19 def __init__(self, config):
20 self.build_dir = Path(config['BUILDDIR']).resolve()
21 self.src_dir = (Path(__file__) / '..' / '..' / '..' / '..').resolve()
22 self.db_host = config['DB_HOST']
23 self.db_port = config['DB_PORT']
24 self.db_user = config['DB_USER']
25 self.db_pass = config['DB_PASS']
26 self.template_db = config['TEMPLATE_DB']
27 self.test_db = config['TEST_DB']
28 self.api_test_db = config['API_TEST_DB']
29 self.api_test_file = config['API_TEST_FILE']
30 self.server_module_path = config['SERVER_MODULE_PATH']
31 self.reuse_template = not config['REMOVE_TEMPLATE']
32 self.keep_scenario_db = config['KEEP_TEST_DB']
33 self.code_coverage_path = config['PHPCOV']
34 self.code_coverage_id = 1
36 self.default_config = Configuration(None, self.src_dir / 'settings').get_os_env()
38 self.template_db_done = False
39 self.api_db_done = False
40 self.website_dir = None
42 def connect_database(self, dbname):
43 """ Return a connection to the database with the given name.
44 Uses configured host, user and port.
46 dbargs = {'database': dbname}
48 dbargs['host'] = self.db_host
50 dbargs['port'] = self.db_port
52 dbargs['user'] = self.db_user
54 dbargs['password'] = self.db_pass
55 conn = psycopg2.connect(**dbargs)
58 def next_code_coverage_file(self):
59 """ Generate the next name for a coverage file.
61 fn = Path(self.code_coverage_path) / "{:06d}.cov".format(self.code_coverage_id)
62 self.code_coverage_id += 1
66 def write_nominatim_config(self, dbname):
67 """ Set up a custom test configuration that connects to the given
68 database. This sets up the environment variables so that they can
69 be picked up by dotenv and creates a project directory with the
70 appropriate website scripts.
72 dsn = 'pgsql:dbname={}'.format(dbname)
74 dsn += ';host=' + self.db_host
76 dsn += ';port=' + self.db_port
78 dsn += ';user=' + self.db_user
80 dsn += ';password=' + self.db_pass
82 if self.website_dir is not None \
83 and self.test_env is not None \
84 and dsn == self.test_env['NOMINATIM_DATABASE_DSN']:
85 return # environment already set uo
87 self.test_env = dict(self.default_config)
88 self.test_env['NOMINATIM_DATABASE_DSN'] = dsn
89 self.test_env['NOMINATIM_FLATNODE_FILE'] = ''
90 self.test_env['NOMINATIM_IMPORT_STYLE'] = 'full'
91 self.test_env['NOMINATIM_USE_US_TIGER_DATA'] = 'yes'
92 self.test_env['NOMINATIM_DATADIR'] = str((self.src_dir / 'data').resolve())
93 self.test_env['NOMINATIM_SQLDIR'] = str((self.src_dir / 'lib-sql').resolve())
94 self.test_env['NOMINATIM_CONFIGDIR'] = str((self.src_dir / 'settings').resolve())
95 self.test_env['NOMINATIM_DATABASE_MODULE_SRC_PATH'] = str((self.build_dir / 'module').resolve())
96 self.test_env['NOMINATIM_OSM2PGSQL_BINARY'] = str((self.build_dir / 'osm2pgsql' / 'osm2pgsql').resolve())
97 self.test_env['NOMINATIM_NOMINATIM_TOOL'] = str((self.build_dir / 'nominatim').resolve())
99 if self.server_module_path:
100 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = self.server_module_path
102 # avoid module being copied into the temporary environment
103 self.test_env['NOMINATIM_DATABASE_MODULE_PATH'] = str((self.build_dir / 'module').resolve())
105 if self.website_dir is not None:
106 self.website_dir.cleanup()
108 self.website_dir = tempfile.TemporaryDirectory()
109 cfg = Configuration(None, self.src_dir / 'settings', environ=self.test_env)
110 refresh.setup_website(Path(self.website_dir.name) / 'website', self.src_dir / 'lib-php', cfg)
112 def get_libpq_dsn(self):
113 dsn = self.test_env['NOMINATIM_DATABASE_DSN']
115 def quote_param(param):
116 key, val = param.split('=')
117 val = val.replace('\\', '\\\\').replace("'", "\\'")
119 val = "'" + val + "'"
120 return key + '=' + val
122 if dsn.startswith('pgsql:'):
123 # Old PHP DSN format. Convert before returning.
124 return ' '.join([quote_param(p) for p in dsn[6:].split(';')])
129 def db_drop_database(self, name):
130 """ Drop the database with the given name.
132 conn = self.connect_database('postgres')
133 conn.set_isolation_level(0)
135 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
138 def setup_template_db(self):
139 """ Setup a template database that already contains common test data.
140 Having a template database speeds up tests considerably but at
141 the price that the tests sometimes run with stale data.
143 if self.template_db_done:
146 self.template_db_done = True
148 if self._reuse_or_drop_db(self.template_db):
151 self.write_nominatim_config(self.template_db)
154 # execute nominatim import on an empty file to get the right tables
155 with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.xml') as fd:
156 fd.write(b'<osm version="0.6"></osm>')
158 self.run_nominatim('import', '--osm-file', fd.name,
159 '--osm2pgsql-cache', '1',
162 self.db_drop_database(self.template_db)
166 def setup_api_db(self):
167 """ Setup a test against the API test database.
169 self.write_nominatim_config(self.api_test_db)
174 self.api_db_done = True
176 if self._reuse_or_drop_db(self.api_test_db):
179 testdata = Path('__file__') / '..' / '..' / 'testdb'
180 self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
183 self.run_nominatim('import', '--osm-file', str(self.api_test_file))
184 self.run_nominatim('add-data', '--tiger-data', str((testdata / 'tiger').resolve()))
185 self.run_nominatim('freeze')
187 phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
188 run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
190 self.db_drop_database(self.api_test_db)
194 def setup_unknown_db(self):
195 """ Setup a test against a non-existing database.
197 self.write_nominatim_config('UNKNOWN_DATABASE_NAME')
199 def setup_db(self, context):
200 """ Setup a test against a fresh, empty test database.
202 self.setup_template_db()
203 self.write_nominatim_config(self.test_db)
204 conn = self.connect_database(self.template_db)
205 conn.set_isolation_level(0)
207 cur.execute('DROP DATABASE IF EXISTS {}'.format(self.test_db))
208 cur.execute('CREATE DATABASE {} TEMPLATE = {}'.format(self.test_db, self.template_db))
210 context.db = self.connect_database(self.test_db)
211 context.db.autocommit = True
212 psycopg2.extras.register_hstore(context.db, globally=False)
214 def teardown_db(self, context):
215 """ Remove the test database, if it exists.
220 if not self.keep_scenario_db:
221 self.db_drop_database(self.test_db)
223 def _reuse_or_drop_db(self, name):
224 """ Check for the existance of the given DB. If reuse is enabled,
225 then the function checks for existance and returns True if the
226 database is already there. Otherwise an existing database is
227 dropped and always false returned.
229 if self.reuse_template:
230 conn = self.connect_database('postgres')
231 with conn.cursor() as cur:
232 cur.execute('select count(*) from pg_database where datname = %s',
234 if cur.fetchone()[0] == 1:
238 self.db_drop_database(name)
242 def reindex_placex(self, db):
243 """ Run the indexing step until all data in the placex has
244 been processed. Indexing during updates can produce more data
245 to index under some circumstances. That is why indexing may have
246 to be run multiple times.
248 with db.cursor() as cur:
250 self.run_nominatim('index')
252 cur.execute("SELECT 'a' FROM placex WHERE indexed_status != 0 LIMIT 1")
253 if cur.rowcount == 0:
256 def run_nominatim(self, *cmdline):
257 """ Run the nominatim command-line tool via the library.
259 if self.website_dir is not None:
260 cmdline = list(cmdline) + ['--project-dir', self.website_dir.name]
262 cli.nominatim(module_dir='',
263 osm2pgsql_path=str(self.build_dir / 'osm2pgsql' / 'osm2pgsql'),
264 phplib_dir=str(self.src_dir / 'lib-php'),
265 sqllib_dir=str(self.src_dir / 'lib-sql'),
266 data_dir=str(self.src_dir / 'data'),
267 config_dir=str(self.src_dir / 'settings'),
270 environ=self.test_env)
273 def copy_from_place(self, db):
274 """ Copy data from place to the placex and location_property_osmline
275 tables invoking the appropriate triggers.
277 self.run_nominatim('refresh', '--functions', '--no-diff-updates')
279 with db.cursor() as cur:
280 cur.execute("""INSERT INTO placex (osm_type, osm_id, class, type,
281 name, admin_level, address,
283 SELECT osm_type, osm_id, class, type,
284 name, admin_level, address,
287 WHERE not (class='place' and type='houses' and osm_type='W')""")
288 cur.execute("""INSERT INTO location_property_osmline (osm_id, address, linegeo)
289 SELECT osm_id, address, geometry
291 WHERE class='place' and type='houses'
293 and ST_GeometryType(geometry) = 'ST_LineString'""")