4 from pathlib import Path
11 SRC_DIR = Path(__file__) / '..' / '..' / '..'
13 # always test against the source
14 sys.path.insert(0, str(SRC_DIR.resolve()))
16 from nominatim.config import Configuration
17 from nominatim.db import connection
18 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 from nominatim.db import properties
21 import dummy_tokenizer
23 class _TestingCursor(psycopg2.extras.DictCursor):
24 """ Extension to the DictCursor class that provides execution
25 short-cuts that simplify writing assertions.
28 def scalar(self, sql, params=None):
29 """ Execute a query with a single return value and return this value.
30 Raises an assertion when not exactly one row is returned.
32 self.execute(sql, params)
33 assert self.rowcount == 1
34 return self.fetchone()[0]
36 def row_set(self, sql, params=None):
37 """ Execute a query and return the result as a set of tuples.
39 self.execute(sql, params)
41 return set((tuple(row) for row in self))
43 def table_exists(self, table):
44 """ Check that a table with the given name exists in the database.
46 num = self.scalar("""SELECT count(*) FROM pg_tables
47 WHERE tablename = %s""", (table, ))
50 def table_rows(self, table):
51 """ Return the number of rows in the given table.
53 return self.scalar('SELECT count(*) FROM ' + table)
57 def temp_db(monkeypatch):
58 """ Create an empty database for the test. The database name is also
59 exported into NOMINATIM_DATABASE_DSN.
61 name = 'test_nominatim_python_unittest'
62 conn = psycopg2.connect(database='postgres')
64 conn.set_isolation_level(0)
65 with conn.cursor() as cur:
66 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
67 cur.execute('CREATE DATABASE {}'.format(name))
71 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
75 conn = psycopg2.connect(database='postgres')
77 conn.set_isolation_level(0)
78 with conn.cursor() as cur:
79 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
86 return 'dbname=' + temp_db
90 def temp_db_with_extensions(temp_db):
91 conn = psycopg2.connect(database=temp_db)
92 with conn.cursor() as cur:
93 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
100 def temp_db_conn(temp_db):
101 """ Connection to the test database.
103 with connection.connect('dbname=' + temp_db) as conn:
108 def temp_db_cursor(temp_db):
109 """ Connection and cursor towards the test database. The connection will
110 be in auto-commit mode.
112 conn = psycopg2.connect('dbname=' + temp_db)
113 conn.set_isolation_level(0)
114 with conn.cursor(cursor_factory=_TestingCursor) as cur:
120 def table_factory(temp_db_cursor):
121 def mk_table(name, definition='id INT', content=None):
122 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
123 if content is not None:
124 psycopg2.extras.execute_values(
125 temp_db_cursor, "INSERT INTO {} VALUES %s".format(name), content)
132 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
133 cfg.set_libdirs(module='.', osm2pgsql='.',
134 php=SRC_DIR / 'lib-php',
135 sql=SRC_DIR / 'lib-sql',
136 data=SRC_DIR / 'data')
141 return SRC_DIR.resolve()
144 def tmp_phplib_dir():
145 with tempfile.TemporaryDirectory() as phpdir:
146 (Path(phpdir) / 'admin').mkdir()
152 def property_table(table_factory):
153 table_factory('nominatim_properties', 'property TEXT, value TEXT')
156 def status_table(temp_db_conn):
157 """ Create an empty version of the status table and
158 the status logging table.
160 with temp_db_conn.cursor() as cur:
161 cur.execute("""CREATE TABLE import_status (
162 lastimportdate timestamp with time zone NOT NULL,
166 cur.execute("""CREATE TABLE import_osmosis_log (
174 temp_db_conn.commit()
178 def place_table(temp_db_with_extensions, temp_db_conn):
179 """ Create an empty version of the place table.
181 with temp_db_conn.cursor() as cur:
182 cur.execute("""CREATE TABLE place (
183 osm_id int8 NOT NULL,
184 osm_type char(1) NOT NULL,
188 admin_level smallint,
191 geometry Geometry(Geometry,4326) NOT NULL)""")
192 temp_db_conn.commit()
196 def place_row(place_table, temp_db_cursor):
197 """ A factory for rows in the place table. The table is created as a
198 prerequisite to the fixture.
200 idseq = itertools.count(1001)
201 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
202 admin_level=None, address=None, extratags=None, geom=None):
203 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
204 (osm_id or next(idseq), osm_type, cls, typ, names,
205 admin_level, address, extratags,
206 geom or 'SRID=4326;POINT(0 0)'))
211 def placex_table(temp_db_with_extensions, temp_db_conn):
212 """ Create an empty version of the place table.
214 with temp_db_conn.cursor() as cur:
215 cur.execute("""CREATE TABLE placex (
217 parent_place_id BIGINT,
218 linked_place_id BIGINT,
220 indexed_date TIMESTAMP,
221 geometry_sector INTEGER,
222 rank_address SMALLINT,
223 rank_search SMALLINT,
225 indexed_status SMALLINT,
231 admin_level smallint,
234 geometry Geometry(Geometry,4326),
236 country_code varchar(2),
239 centroid GEOMETRY(Geometry, 4326))""")
240 temp_db_conn.commit()
244 def osmline_table(temp_db_with_extensions, temp_db_conn):
245 with temp_db_conn.cursor() as cur:
246 cur.execute("""CREATE TABLE location_property_osmline (
249 parent_place_id BIGINT,
250 geometry_sector INTEGER,
251 indexed_date TIMESTAMP,
255 indexed_status SMALLINT,
257 interpolationtype TEXT,
260 country_code VARCHAR(2))""")
261 temp_db_conn.commit()
265 def word_table(temp_db, temp_db_conn):
266 with temp_db_conn.cursor() as cur:
267 cur.execute("""CREATE TABLE word (
273 country_code varchar(2),
274 search_name_count INTEGER,
276 temp_db_conn.commit()
280 def osm2pgsql_options(temp_db):
281 return dict(osm2pgsql='echo',
283 osm2pgsql_style='style.file',
285 dsn='dbname=' + temp_db,
287 tablespaces=dict(slim_data='', slim_index='',
288 main_data='', main_index=''))
291 def sql_preprocessor(temp_db_conn, tmp_path, monkeypatch, table_factory):
292 table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
293 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
294 cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
295 sql=tmp_path, data=SRC_DIR / 'data')
297 return SQLPreprocessor(temp_db_conn, cfg)
301 def tokenizer_mock(monkeypatch, property_table, temp_db_conn, tmp_path):
302 """ Sets up the configuration so that the test dummy tokenizer will be
303 loaded when the tokenizer factory is used. Also returns a factory
304 with which a new dummy tokenizer may be created.
306 monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
308 def _import_dummy(module, *args, **kwargs):
309 return dummy_tokenizer
311 monkeypatch.setattr(importlib, "import_module", _import_dummy)
312 properties.set_property(temp_db_conn, 'tokenizer', 'dummy')
314 def _create_tokenizer():
315 return dummy_tokenizer.DummyTokenizer(None, None)
317 return _create_tokenizer