3 from pathlib import Path
9 SRC_DIR = Path(__file__) / '..' / '..' / '..'
11 # always test against the source
12 sys.path.insert(0, str(SRC_DIR.resolve()))
14 from nominatim.config import Configuration
15 from nominatim.db import connection
16 from nominatim.db.sql_preprocessor import SQLPreprocessor
18 class _TestingCursor(psycopg2.extras.DictCursor):
19 """ Extension to the DictCursor class that provides execution
20 short-cuts that simplify writing assertions.
23 def scalar(self, sql, params=None):
24 """ Execute a query with a single return value and return this value.
25 Raises an assertion when not exactly one row is returned.
27 self.execute(sql, params)
28 assert self.rowcount == 1
29 return self.fetchone()[0]
31 def row_set(self, sql, params=None):
32 """ Execute a query and return the result as a set of tuples.
34 self.execute(sql, params)
35 if self.rowcount == 1:
36 return set(tuple(self.fetchone()))
38 return set((tuple(row) for row in self))
40 def table_exists(self, table):
41 """ Check that a table with the given name exists in the database.
43 num = self.scalar("""SELECT count(*) FROM pg_tables
44 WHERE tablename = %s""", (table, ))
47 def table_rows(self, table):
48 """ Return the number of rows in the given table.
50 return self.scalar('SELECT count(*) FROM ' + table)
54 def temp_db(monkeypatch):
55 """ Create an empty database for the test. The database name is also
56 exported into NOMINATIM_DATABASE_DSN.
58 name = 'test_nominatim_python_unittest'
59 conn = psycopg2.connect(database='postgres')
61 conn.set_isolation_level(0)
62 with conn.cursor() as cur:
63 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
64 cur.execute('CREATE DATABASE {}'.format(name))
68 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
72 conn = psycopg2.connect(database='postgres')
74 conn.set_isolation_level(0)
75 with conn.cursor() as cur:
76 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
83 return 'dbname=' + temp_db
87 def temp_db_with_extensions(temp_db):
88 conn = psycopg2.connect(database=temp_db)
89 with conn.cursor() as cur:
90 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
97 def temp_db_conn(temp_db):
98 """ Connection to the test database.
100 with connection.connect('dbname=' + temp_db) as conn:
105 def temp_db_cursor(temp_db):
106 """ Connection and cursor towards the test database. The connection will
107 be in auto-commit mode.
109 conn = psycopg2.connect('dbname=' + temp_db)
110 conn.set_isolation_level(0)
111 with conn.cursor(cursor_factory=_TestingCursor) as cur:
117 def table_factory(temp_db_cursor):
118 def mk_table(name, definition='id INT', content=None):
119 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
120 if content is not None:
121 if not isinstance(content, str):
122 content = '),('.join([str(x) for x in content])
123 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
130 return Configuration(None, SRC_DIR.resolve() / 'settings')
134 return SRC_DIR.resolve()
137 def status_table(temp_db_conn):
138 """ Create an empty version of the status table and
139 the status logging table.
141 with temp_db_conn.cursor() as cur:
142 cur.execute("""CREATE TABLE import_status (
143 lastimportdate timestamp with time zone NOT NULL,
147 cur.execute("""CREATE TABLE import_osmosis_log (
155 temp_db_conn.commit()
159 def place_table(temp_db_with_extensions, temp_db_conn):
160 """ Create an empty version of the place table.
162 with temp_db_conn.cursor() as cur:
163 cur.execute("""CREATE TABLE place (
164 osm_id int8 NOT NULL,
165 osm_type char(1) NOT NULL,
169 admin_level smallint,
172 geometry Geometry(Geometry,4326) NOT NULL)""")
173 temp_db_conn.commit()
177 def place_row(place_table, temp_db_cursor):
178 """ A factory for rows in the place table. The table is created as a
179 prerequisite to the fixture.
181 idseq = itertools.count(1001)
182 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
183 admin_level=None, address=None, extratags=None, geom=None):
184 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
185 (osm_id or next(idseq), osm_type, cls, typ, names,
186 admin_level, address, extratags,
187 geom or 'SRID=4326;POINT(0 0)'))
192 def placex_table(temp_db_with_extensions, temp_db_conn):
193 """ Create an empty version of the place table.
195 with temp_db_conn.cursor() as cur:
196 cur.execute("""CREATE TABLE placex (
198 parent_place_id BIGINT,
199 linked_place_id BIGINT,
201 indexed_date TIMESTAMP,
202 geometry_sector INTEGER,
203 rank_address SMALLINT,
204 rank_search SMALLINT,
206 indexed_status SMALLINT,
212 admin_level smallint,
215 geometry Geometry(Geometry,4326),
217 country_code varchar(2),
220 centroid GEOMETRY(Geometry, 4326))""")
221 temp_db_conn.commit()
225 def osmline_table(temp_db_with_extensions, temp_db_conn):
226 with temp_db_conn.cursor() as cur:
227 cur.execute("""CREATE TABLE location_property_osmline (
230 parent_place_id BIGINT,
231 geometry_sector INTEGER,
232 indexed_date TIMESTAMP,
236 indexed_status SMALLINT,
238 interpolationtype TEXT,
241 country_code VARCHAR(2))""")
242 temp_db_conn.commit()
246 def word_table(temp_db, temp_db_conn):
247 with temp_db_conn.cursor() as cur:
248 cur.execute("""CREATE TABLE word (
254 country_code varchar(2),
255 search_name_count INTEGER,
257 temp_db_conn.commit()
261 def osm2pgsql_options(temp_db):
262 return dict(osm2pgsql='echo',
264 osm2pgsql_style='style.file',
266 dsn='dbname=' + temp_db,
268 tablespaces=dict(slim_data='', slim_index='',
269 main_data='', main_index=''))
272 def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
273 monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
274 table_factory('country_name', 'partition INT', (0, 1, 2))
275 return SQLPreprocessor(temp_db_conn, def_config, tmp_path)