3 from pathlib import Path
10 SRC_DIR = Path(__file__) / '..' / '..' / '..'
12 # always test against the source
13 sys.path.insert(0, str(SRC_DIR.resolve()))
15 from nominatim.config import Configuration
16 from nominatim.db import connection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 class _TestingCursor(psycopg2.extras.DictCursor):
20 """ Extension to the DictCursor class that provides execution
21 short-cuts that simplify writing assertions.
24 def scalar(self, sql, params=None):
25 """ Execute a query with a single return value and return this value.
26 Raises an assertion when not exactly one row is returned.
28 self.execute(sql, params)
29 assert self.rowcount == 1
30 return self.fetchone()[0]
32 def row_set(self, sql, params=None):
33 """ Execute a query and return the result as a set of tuples.
35 self.execute(sql, params)
37 return set((tuple(row) for row in self))
39 def table_exists(self, table):
40 """ Check that a table with the given name exists in the database.
42 num = self.scalar("""SELECT count(*) FROM pg_tables
43 WHERE tablename = %s""", (table, ))
46 def table_rows(self, table):
47 """ Return the number of rows in the given table.
49 return self.scalar('SELECT count(*) FROM ' + table)
53 def temp_db(monkeypatch):
54 """ Create an empty database for the test. The database name is also
55 exported into NOMINATIM_DATABASE_DSN.
57 name = 'test_nominatim_python_unittest'
58 conn = psycopg2.connect(database='postgres')
60 conn.set_isolation_level(0)
61 with conn.cursor() as cur:
62 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
63 cur.execute('CREATE DATABASE {}'.format(name))
67 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
71 conn = psycopg2.connect(database='postgres')
73 conn.set_isolation_level(0)
74 with conn.cursor() as cur:
75 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
82 return 'dbname=' + temp_db
86 def temp_db_with_extensions(temp_db):
87 conn = psycopg2.connect(database=temp_db)
88 with conn.cursor() as cur:
89 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
96 def temp_db_conn(temp_db):
97 """ Connection to the test database.
99 with connection.connect('dbname=' + temp_db) as conn:
104 def temp_db_cursor(temp_db):
105 """ Connection and cursor towards the test database. The connection will
106 be in auto-commit mode.
108 conn = psycopg2.connect('dbname=' + temp_db)
109 conn.set_isolation_level(0)
110 with conn.cursor(cursor_factory=_TestingCursor) as cur:
116 def table_factory(temp_db_cursor):
117 def mk_table(name, definition='id INT', content=None):
118 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
119 if content is not None:
120 if not isinstance(content, str):
121 content = '),('.join([str(x) for x in content])
122 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
129 return Configuration(None, SRC_DIR.resolve() / 'settings')
133 return SRC_DIR.resolve()
136 def tmp_phplib_dir():
137 with tempfile.TemporaryDirectory() as phpdir:
138 (Path(phpdir) / 'admin').mkdir()
143 def status_table(temp_db_conn):
144 """ Create an empty version of the status table and
145 the status logging table.
147 with temp_db_conn.cursor() as cur:
148 cur.execute("""CREATE TABLE import_status (
149 lastimportdate timestamp with time zone NOT NULL,
153 cur.execute("""CREATE TABLE import_osmosis_log (
161 temp_db_conn.commit()
165 def place_table(temp_db_with_extensions, temp_db_conn):
166 """ Create an empty version of the place table.
168 with temp_db_conn.cursor() as cur:
169 cur.execute("""CREATE TABLE place (
170 osm_id int8 NOT NULL,
171 osm_type char(1) NOT NULL,
175 admin_level smallint,
178 geometry Geometry(Geometry,4326) NOT NULL)""")
179 temp_db_conn.commit()
183 def place_row(place_table, temp_db_cursor):
184 """ A factory for rows in the place table. The table is created as a
185 prerequisite to the fixture.
187 idseq = itertools.count(1001)
188 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
189 admin_level=None, address=None, extratags=None, geom=None):
190 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
191 (osm_id or next(idseq), osm_type, cls, typ, names,
192 admin_level, address, extratags,
193 geom or 'SRID=4326;POINT(0 0)'))
198 def placex_table(temp_db_with_extensions, temp_db_conn):
199 """ Create an empty version of the place table.
201 with temp_db_conn.cursor() as cur:
202 cur.execute("""CREATE TABLE placex (
204 parent_place_id BIGINT,
205 linked_place_id BIGINT,
207 indexed_date TIMESTAMP,
208 geometry_sector INTEGER,
209 rank_address SMALLINT,
210 rank_search SMALLINT,
212 indexed_status SMALLINT,
218 admin_level smallint,
221 geometry Geometry(Geometry,4326),
223 country_code varchar(2),
226 centroid GEOMETRY(Geometry, 4326))""")
227 temp_db_conn.commit()
231 def osmline_table(temp_db_with_extensions, temp_db_conn):
232 with temp_db_conn.cursor() as cur:
233 cur.execute("""CREATE TABLE location_property_osmline (
236 parent_place_id BIGINT,
237 geometry_sector INTEGER,
238 indexed_date TIMESTAMP,
242 indexed_status SMALLINT,
244 interpolationtype TEXT,
247 country_code VARCHAR(2))""")
248 temp_db_conn.commit()
252 def word_table(temp_db, temp_db_conn):
253 with temp_db_conn.cursor() as cur:
254 cur.execute("""CREATE TABLE word (
260 country_code varchar(2),
261 search_name_count INTEGER,
263 temp_db_conn.commit()
267 def osm2pgsql_options(temp_db):
268 return dict(osm2pgsql='echo',
270 osm2pgsql_style='style.file',
272 dsn='dbname=' + temp_db,
274 tablespaces=dict(slim_data='', slim_index='',
275 main_data='', main_index=''))
278 def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
279 monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
280 table_factory('country_name', 'partition INT', (0, 1, 2))
281 return SQLPreprocessor(temp_db_conn, def_config, tmp_path)