3 from pathlib import Path
10 SRC_DIR = Path(__file__) / '..' / '..' / '..'
12 # always test against the source
13 sys.path.insert(0, str(SRC_DIR.resolve()))
15 from nominatim.config import Configuration
16 from nominatim.db import connection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 class _TestingCursor(psycopg2.extras.DictCursor):
20 """ Extension to the DictCursor class that provides execution
21 short-cuts that simplify writing assertions.
24 def scalar(self, sql, params=None):
25 """ Execute a query with a single return value and return this value.
26 Raises an assertion when not exactly one row is returned.
28 self.execute(sql, params)
29 assert self.rowcount == 1
30 return self.fetchone()[0]
32 def row_set(self, sql, params=None):
33 """ Execute a query and return the result as a set of tuples.
35 self.execute(sql, params)
37 return set((tuple(row) for row in self))
39 def table_exists(self, table):
40 """ Check that a table with the given name exists in the database.
42 num = self.scalar("""SELECT count(*) FROM pg_tables
43 WHERE tablename = %s""", (table, ))
46 def table_rows(self, table):
47 """ Return the number of rows in the given table.
49 return self.scalar('SELECT count(*) FROM ' + table)
53 def temp_db(monkeypatch):
54 """ Create an empty database for the test. The database name is also
55 exported into NOMINATIM_DATABASE_DSN.
57 name = 'test_nominatim_python_unittest'
58 conn = psycopg2.connect(database='postgres')
60 conn.set_isolation_level(0)
61 with conn.cursor() as cur:
62 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
63 cur.execute('CREATE DATABASE {}'.format(name))
67 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
71 conn = psycopg2.connect(database='postgres')
73 conn.set_isolation_level(0)
74 with conn.cursor() as cur:
75 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
82 return 'dbname=' + temp_db
86 def temp_db_with_extensions(temp_db):
87 conn = psycopg2.connect(database=temp_db)
88 with conn.cursor() as cur:
89 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
96 def temp_db_conn(temp_db):
97 """ Connection to the test database.
99 with connection.connect('dbname=' + temp_db) as conn:
104 def temp_db_cursor(temp_db):
105 """ Connection and cursor towards the test database. The connection will
106 be in auto-commit mode.
108 conn = psycopg2.connect('dbname=' + temp_db)
109 conn.set_isolation_level(0)
110 with conn.cursor(cursor_factory=_TestingCursor) as cur:
116 def table_factory(temp_db_cursor):
117 def mk_table(name, definition='id INT', content=None):
118 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
119 if content is not None:
120 if not isinstance(content, str):
121 content = '),('.join([str(x) for x in content])
122 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
129 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
130 cfg.set_libdirs(module='.', osm2pgsql='.',
131 php=SRC_DIR / 'lib-php',
132 sql=SRC_DIR / 'lib-sql',
133 data=SRC_DIR / 'data')
138 return SRC_DIR.resolve()
141 def tmp_phplib_dir():
142 with tempfile.TemporaryDirectory() as phpdir:
143 (Path(phpdir) / 'admin').mkdir()
149 def property_table(table_factory):
150 table_factory('nominatim_properties', 'property TEXT, value TEXT')
153 def status_table(temp_db_conn):
154 """ Create an empty version of the status table and
155 the status logging table.
157 with temp_db_conn.cursor() as cur:
158 cur.execute("""CREATE TABLE import_status (
159 lastimportdate timestamp with time zone NOT NULL,
163 cur.execute("""CREATE TABLE import_osmosis_log (
171 temp_db_conn.commit()
175 def place_table(temp_db_with_extensions, temp_db_conn):
176 """ Create an empty version of the place table.
178 with temp_db_conn.cursor() as cur:
179 cur.execute("""CREATE TABLE place (
180 osm_id int8 NOT NULL,
181 osm_type char(1) NOT NULL,
185 admin_level smallint,
188 geometry Geometry(Geometry,4326) NOT NULL)""")
189 temp_db_conn.commit()
193 def place_row(place_table, temp_db_cursor):
194 """ A factory for rows in the place table. The table is created as a
195 prerequisite to the fixture.
197 idseq = itertools.count(1001)
198 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
199 admin_level=None, address=None, extratags=None, geom=None):
200 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
201 (osm_id or next(idseq), osm_type, cls, typ, names,
202 admin_level, address, extratags,
203 geom or 'SRID=4326;POINT(0 0)'))
208 def placex_table(temp_db_with_extensions, temp_db_conn):
209 """ Create an empty version of the place table.
211 with temp_db_conn.cursor() as cur:
212 cur.execute("""CREATE TABLE placex (
214 parent_place_id BIGINT,
215 linked_place_id BIGINT,
217 indexed_date TIMESTAMP,
218 geometry_sector INTEGER,
219 rank_address SMALLINT,
220 rank_search SMALLINT,
222 indexed_status SMALLINT,
228 admin_level smallint,
231 geometry Geometry(Geometry,4326),
233 country_code varchar(2),
236 centroid GEOMETRY(Geometry, 4326))""")
237 temp_db_conn.commit()
241 def osmline_table(temp_db_with_extensions, temp_db_conn):
242 with temp_db_conn.cursor() as cur:
243 cur.execute("""CREATE TABLE location_property_osmline (
246 parent_place_id BIGINT,
247 geometry_sector INTEGER,
248 indexed_date TIMESTAMP,
252 indexed_status SMALLINT,
254 interpolationtype TEXT,
257 country_code VARCHAR(2))""")
258 temp_db_conn.commit()
262 def word_table(temp_db, temp_db_conn):
263 with temp_db_conn.cursor() as cur:
264 cur.execute("""CREATE TABLE word (
270 country_code varchar(2),
271 search_name_count INTEGER,
273 temp_db_conn.commit()
277 def osm2pgsql_options(temp_db):
278 return dict(osm2pgsql='echo',
280 osm2pgsql_style='style.file',
282 dsn='dbname=' + temp_db,
284 tablespaces=dict(slim_data='', slim_index='',
285 main_data='', main_index=''))
288 def sql_preprocessor(temp_db_conn, tmp_path, monkeypatch, table_factory):
289 monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
290 table_factory('country_name', 'partition INT', (0, 1, 2))
291 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
292 cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
293 sql=tmp_path, data=SRC_DIR / 'data')
295 return SQLPreprocessor(temp_db_conn, cfg)