3 from pathlib import Path
10 SRC_DIR = Path(__file__) / '..' / '..' / '..'
12 # always test against the source
13 sys.path.insert(0, str(SRC_DIR.resolve()))
15 from nominatim.config import Configuration
16 from nominatim.db import connection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 class _TestingCursor(psycopg2.extras.DictCursor):
20 """ Extension to the DictCursor class that provides execution
21 short-cuts that simplify writing assertions.
24 def scalar(self, sql, params=None):
25 """ Execute a query with a single return value and return this value.
26 Raises an assertion when not exactly one row is returned.
28 self.execute(sql, params)
29 assert self.rowcount == 1
30 return self.fetchone()[0]
32 def row_set(self, sql, params=None):
33 """ Execute a query and return the result as a set of tuples.
35 self.execute(sql, params)
36 if self.rowcount == 1:
37 return set(tuple(self.fetchone()))
39 return set((tuple(row) for row in self))
41 def table_exists(self, table):
42 """ Check that a table with the given name exists in the database.
44 num = self.scalar("""SELECT count(*) FROM pg_tables
45 WHERE tablename = %s""", (table, ))
48 def table_rows(self, table):
49 """ Return the number of rows in the given table.
51 return self.scalar('SELECT count(*) FROM ' + table)
55 def temp_db(monkeypatch):
56 """ Create an empty database for the test. The database name is also
57 exported into NOMINATIM_DATABASE_DSN.
59 name = 'test_nominatim_python_unittest'
60 conn = psycopg2.connect(database='postgres')
62 conn.set_isolation_level(0)
63 with conn.cursor() as cur:
64 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
65 cur.execute('CREATE DATABASE {}'.format(name))
69 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
73 conn = psycopg2.connect(database='postgres')
75 conn.set_isolation_level(0)
76 with conn.cursor() as cur:
77 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
84 return 'dbname=' + temp_db
88 def temp_db_with_extensions(temp_db):
89 conn = psycopg2.connect(database=temp_db)
90 with conn.cursor() as cur:
91 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
98 def temp_db_conn(temp_db):
99 """ Connection to the test database.
101 with connection.connect('dbname=' + temp_db) as conn:
106 def temp_db_cursor(temp_db):
107 """ Connection and cursor towards the test database. The connection will
108 be in auto-commit mode.
110 conn = psycopg2.connect('dbname=' + temp_db)
111 conn.set_isolation_level(0)
112 with conn.cursor(cursor_factory=_TestingCursor) as cur:
118 def table_factory(temp_db_cursor):
119 def mk_table(name, definition='id INT', content=None):
120 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
121 if content is not None:
122 if not isinstance(content, str):
123 content = '),('.join([str(x) for x in content])
124 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
131 return Configuration(None, SRC_DIR.resolve() / 'settings')
135 return SRC_DIR.resolve()
138 def tmp_phplib_dir():
139 with tempfile.TemporaryDirectory() as phpdir:
140 (Path(phpdir) / 'admin').mkdir()
145 def status_table(temp_db_conn):
146 """ Create an empty version of the status table and
147 the status logging table.
149 with temp_db_conn.cursor() as cur:
150 cur.execute("""CREATE TABLE import_status (
151 lastimportdate timestamp with time zone NOT NULL,
155 cur.execute("""CREATE TABLE import_osmosis_log (
163 temp_db_conn.commit()
167 def place_table(temp_db_with_extensions, temp_db_conn):
168 """ Create an empty version of the place table.
170 with temp_db_conn.cursor() as cur:
171 cur.execute("""CREATE TABLE place (
172 osm_id int8 NOT NULL,
173 osm_type char(1) NOT NULL,
177 admin_level smallint,
180 geometry Geometry(Geometry,4326) NOT NULL)""")
181 temp_db_conn.commit()
185 def place_row(place_table, temp_db_cursor):
186 """ A factory for rows in the place table. The table is created as a
187 prerequisite to the fixture.
189 idseq = itertools.count(1001)
190 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
191 admin_level=None, address=None, extratags=None, geom=None):
192 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
193 (osm_id or next(idseq), osm_type, cls, typ, names,
194 admin_level, address, extratags,
195 geom or 'SRID=4326;POINT(0 0)'))
200 def placex_table(temp_db_with_extensions, temp_db_conn):
201 """ Create an empty version of the place table.
203 with temp_db_conn.cursor() as cur:
204 cur.execute("""CREATE TABLE placex (
206 parent_place_id BIGINT,
207 linked_place_id BIGINT,
209 indexed_date TIMESTAMP,
210 geometry_sector INTEGER,
211 rank_address SMALLINT,
212 rank_search SMALLINT,
214 indexed_status SMALLINT,
220 admin_level smallint,
223 geometry Geometry(Geometry,4326),
225 country_code varchar(2),
228 centroid GEOMETRY(Geometry, 4326))""")
229 temp_db_conn.commit()
233 def osmline_table(temp_db_with_extensions, temp_db_conn):
234 with temp_db_conn.cursor() as cur:
235 cur.execute("""CREATE TABLE location_property_osmline (
238 parent_place_id BIGINT,
239 geometry_sector INTEGER,
240 indexed_date TIMESTAMP,
244 indexed_status SMALLINT,
246 interpolationtype TEXT,
249 country_code VARCHAR(2))""")
250 temp_db_conn.commit()
254 def word_table(temp_db, temp_db_conn):
255 with temp_db_conn.cursor() as cur:
256 cur.execute("""CREATE TABLE word (
262 country_code varchar(2),
263 search_name_count INTEGER,
265 temp_db_conn.commit()
269 def osm2pgsql_options(temp_db):
270 return dict(osm2pgsql='echo',
272 osm2pgsql_style='style.file',
274 dsn='dbname=' + temp_db,
276 tablespaces=dict(slim_data='', slim_index='',
277 main_data='', main_index=''))
280 def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
281 monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
282 table_factory('country_name', 'partition INT', (0, 1, 2))
283 return SQLPreprocessor(temp_db_conn, def_config, tmp_path)