3 from pathlib import Path
10 SRC_DIR = Path(__file__) / '..' / '..' / '..'
12 # always test against the source
13 sys.path.insert(0, str(SRC_DIR.resolve()))
15 from nominatim.config import Configuration
16 from nominatim.db import connection
17 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 class _TestingCursor(psycopg2.extras.DictCursor):
20 """ Extension to the DictCursor class that provides execution
21 short-cuts that simplify writing assertions.
24 def scalar(self, sql, params=None):
25 """ Execute a query with a single return value and return this value.
26 Raises an assertion when not exactly one row is returned.
28 self.execute(sql, params)
29 assert self.rowcount == 1
30 return self.fetchone()[0]
32 def row_set(self, sql, params=None):
33 """ Execute a query and return the result as a set of tuples.
35 self.execute(sql, params)
37 return set((tuple(row) for row in self))
39 def table_exists(self, table):
40 """ Check that a table with the given name exists in the database.
42 num = self.scalar("""SELECT count(*) FROM pg_tables
43 WHERE tablename = %s""", (table, ))
46 def table_rows(self, table):
47 """ Return the number of rows in the given table.
49 return self.scalar('SELECT count(*) FROM ' + table)
53 def temp_db(monkeypatch):
54 """ Create an empty database for the test. The database name is also
55 exported into NOMINATIM_DATABASE_DSN.
57 name = 'test_nominatim_python_unittest'
58 conn = psycopg2.connect(database='postgres')
60 conn.set_isolation_level(0)
61 with conn.cursor() as cur:
62 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
63 cur.execute('CREATE DATABASE {}'.format(name))
67 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
71 conn = psycopg2.connect(database='postgres')
73 conn.set_isolation_level(0)
74 with conn.cursor() as cur:
75 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
82 return 'dbname=' + temp_db
86 def temp_db_with_extensions(temp_db):
87 conn = psycopg2.connect(database=temp_db)
88 with conn.cursor() as cur:
89 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
96 def temp_db_conn(temp_db):
97 """ Connection to the test database.
99 with connection.connect('dbname=' + temp_db) as conn:
104 def temp_db_cursor(temp_db):
105 """ Connection and cursor towards the test database. The connection will
106 be in auto-commit mode.
108 conn = psycopg2.connect('dbname=' + temp_db)
109 conn.set_isolation_level(0)
110 with conn.cursor(cursor_factory=_TestingCursor) as cur:
116 def table_factory(temp_db_cursor):
117 def mk_table(name, definition='id INT', content=None):
118 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
119 if content is not None:
120 if not isinstance(content, str):
121 content = '),('.join([str(x) for x in content])
122 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
129 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
130 cfg.set_libdirs(module='.', osm2pgsql='.',
131 php=SRC_DIR / 'lib-php',
132 sql=SRC_DIR / 'lib-sql',
133 data=SRC_DIR / 'data')
138 return SRC_DIR.resolve()
141 def tmp_phplib_dir():
142 with tempfile.TemporaryDirectory() as phpdir:
143 (Path(phpdir) / 'admin').mkdir()
148 def status_table(temp_db_conn):
149 """ Create an empty version of the status table and
150 the status logging table.
152 with temp_db_conn.cursor() as cur:
153 cur.execute("""CREATE TABLE import_status (
154 lastimportdate timestamp with time zone NOT NULL,
158 cur.execute("""CREATE TABLE import_osmosis_log (
166 temp_db_conn.commit()
170 def place_table(temp_db_with_extensions, temp_db_conn):
171 """ Create an empty version of the place table.
173 with temp_db_conn.cursor() as cur:
174 cur.execute("""CREATE TABLE place (
175 osm_id int8 NOT NULL,
176 osm_type char(1) NOT NULL,
180 admin_level smallint,
183 geometry Geometry(Geometry,4326) NOT NULL)""")
184 temp_db_conn.commit()
188 def place_row(place_table, temp_db_cursor):
189 """ A factory for rows in the place table. The table is created as a
190 prerequisite to the fixture.
192 idseq = itertools.count(1001)
193 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
194 admin_level=None, address=None, extratags=None, geom=None):
195 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
196 (osm_id or next(idseq), osm_type, cls, typ, names,
197 admin_level, address, extratags,
198 geom or 'SRID=4326;POINT(0 0)'))
203 def placex_table(temp_db_with_extensions, temp_db_conn):
204 """ Create an empty version of the place table.
206 with temp_db_conn.cursor() as cur:
207 cur.execute("""CREATE TABLE placex (
209 parent_place_id BIGINT,
210 linked_place_id BIGINT,
212 indexed_date TIMESTAMP,
213 geometry_sector INTEGER,
214 rank_address SMALLINT,
215 rank_search SMALLINT,
217 indexed_status SMALLINT,
223 admin_level smallint,
226 geometry Geometry(Geometry,4326),
228 country_code varchar(2),
231 centroid GEOMETRY(Geometry, 4326))""")
232 temp_db_conn.commit()
236 def osmline_table(temp_db_with_extensions, temp_db_conn):
237 with temp_db_conn.cursor() as cur:
238 cur.execute("""CREATE TABLE location_property_osmline (
241 parent_place_id BIGINT,
242 geometry_sector INTEGER,
243 indexed_date TIMESTAMP,
247 indexed_status SMALLINT,
249 interpolationtype TEXT,
252 country_code VARCHAR(2))""")
253 temp_db_conn.commit()
257 def word_table(temp_db, temp_db_conn):
258 with temp_db_conn.cursor() as cur:
259 cur.execute("""CREATE TABLE word (
265 country_code varchar(2),
266 search_name_count INTEGER,
268 temp_db_conn.commit()
272 def osm2pgsql_options(temp_db):
273 return dict(osm2pgsql='echo',
275 osm2pgsql_style='style.file',
277 dsn='dbname=' + temp_db,
279 tablespaces=dict(slim_data='', slim_index='',
280 main_data='', main_index=''))
283 def sql_preprocessor(temp_db_conn, tmp_path, monkeypatch, table_factory):
284 monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
285 table_factory('country_name', 'partition INT', (0, 1, 2))
286 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
287 cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
288 sql=tmp_path, data=SRC_DIR / 'data')
290 return SQLPreprocessor(temp_db_conn, cfg)