4 from pathlib import Path
11 SRC_DIR = Path(__file__) / '..' / '..' / '..'
13 # always test against the source
14 sys.path.insert(0, str(SRC_DIR.resolve()))
16 from nominatim.config import Configuration
17 from nominatim.db import connection
18 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 from nominatim.db import properties
21 import dummy_tokenizer
23 class _TestingCursor(psycopg2.extras.DictCursor):
24 """ Extension to the DictCursor class that provides execution
25 short-cuts that simplify writing assertions.
28 def scalar(self, sql, params=None):
29 """ Execute a query with a single return value and return this value.
30 Raises an assertion when not exactly one row is returned.
32 self.execute(sql, params)
33 assert self.rowcount == 1
34 return self.fetchone()[0]
36 def row_set(self, sql, params=None):
37 """ Execute a query and return the result as a set of tuples.
39 self.execute(sql, params)
41 return set((tuple(row) for row in self))
43 def table_exists(self, table):
44 """ Check that a table with the given name exists in the database.
46 num = self.scalar("""SELECT count(*) FROM pg_tables
47 WHERE tablename = %s""", (table, ))
50 def table_rows(self, table):
51 """ Return the number of rows in the given table.
53 return self.scalar('SELECT count(*) FROM ' + table)
57 def temp_db(monkeypatch):
58 """ Create an empty database for the test. The database name is also
59 exported into NOMINATIM_DATABASE_DSN.
61 name = 'test_nominatim_python_unittest'
62 conn = psycopg2.connect(database='postgres')
64 conn.set_isolation_level(0)
65 with conn.cursor() as cur:
66 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
67 cur.execute('CREATE DATABASE {}'.format(name))
71 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
75 conn = psycopg2.connect(database='postgres')
77 conn.set_isolation_level(0)
78 with conn.cursor() as cur:
79 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
86 return 'dbname=' + temp_db
90 def temp_db_with_extensions(temp_db):
91 conn = psycopg2.connect(database=temp_db)
92 with conn.cursor() as cur:
93 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
100 def temp_db_conn(temp_db):
101 """ Connection to the test database.
103 with connection.connect('dbname=' + temp_db) as conn:
108 def temp_db_cursor(temp_db):
109 """ Connection and cursor towards the test database. The connection will
110 be in auto-commit mode.
112 conn = psycopg2.connect('dbname=' + temp_db)
113 conn.set_isolation_level(0)
114 with conn.cursor(cursor_factory=_TestingCursor) as cur:
120 def table_factory(temp_db_cursor):
121 def mk_table(name, definition='id INT', content=None):
122 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
123 if content is not None:
124 if not isinstance(content, str):
125 content = '),('.join([str(x) for x in content])
126 temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
133 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
134 cfg.set_libdirs(module='.', osm2pgsql='.',
135 php=SRC_DIR / 'lib-php',
136 sql=SRC_DIR / 'lib-sql',
137 data=SRC_DIR / 'data')
142 return SRC_DIR.resolve()
145 def tmp_phplib_dir():
146 with tempfile.TemporaryDirectory() as phpdir:
147 (Path(phpdir) / 'admin').mkdir()
153 def property_table(table_factory):
154 table_factory('nominatim_properties', 'property TEXT, value TEXT')
157 def status_table(temp_db_conn):
158 """ Create an empty version of the status table and
159 the status logging table.
161 with temp_db_conn.cursor() as cur:
162 cur.execute("""CREATE TABLE import_status (
163 lastimportdate timestamp with time zone NOT NULL,
167 cur.execute("""CREATE TABLE import_osmosis_log (
175 temp_db_conn.commit()
179 def place_table(temp_db_with_extensions, temp_db_conn):
180 """ Create an empty version of the place table.
182 with temp_db_conn.cursor() as cur:
183 cur.execute("""CREATE TABLE place (
184 osm_id int8 NOT NULL,
185 osm_type char(1) NOT NULL,
189 admin_level smallint,
192 geometry Geometry(Geometry,4326) NOT NULL)""")
193 temp_db_conn.commit()
197 def place_row(place_table, temp_db_cursor):
198 """ A factory for rows in the place table. The table is created as a
199 prerequisite to the fixture.
201 idseq = itertools.count(1001)
202 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
203 admin_level=None, address=None, extratags=None, geom=None):
204 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
205 (osm_id or next(idseq), osm_type, cls, typ, names,
206 admin_level, address, extratags,
207 geom or 'SRID=4326;POINT(0 0)'))
212 def placex_table(temp_db_with_extensions, temp_db_conn):
213 """ Create an empty version of the place table.
215 with temp_db_conn.cursor() as cur:
216 cur.execute("""CREATE TABLE placex (
218 parent_place_id BIGINT,
219 linked_place_id BIGINT,
221 indexed_date TIMESTAMP,
222 geometry_sector INTEGER,
223 rank_address SMALLINT,
224 rank_search SMALLINT,
226 indexed_status SMALLINT,
232 admin_level smallint,
235 geometry Geometry(Geometry,4326),
237 country_code varchar(2),
240 centroid GEOMETRY(Geometry, 4326))""")
241 temp_db_conn.commit()
245 def osmline_table(temp_db_with_extensions, temp_db_conn):
246 with temp_db_conn.cursor() as cur:
247 cur.execute("""CREATE TABLE location_property_osmline (
250 parent_place_id BIGINT,
251 geometry_sector INTEGER,
252 indexed_date TIMESTAMP,
256 indexed_status SMALLINT,
258 interpolationtype TEXT,
261 country_code VARCHAR(2))""")
262 temp_db_conn.commit()
266 def word_table(temp_db, temp_db_conn):
267 with temp_db_conn.cursor() as cur:
268 cur.execute("""CREATE TABLE word (
274 country_code varchar(2),
275 search_name_count INTEGER,
277 temp_db_conn.commit()
281 def osm2pgsql_options(temp_db):
282 return dict(osm2pgsql='echo',
284 osm2pgsql_style='style.file',
286 dsn='dbname=' + temp_db,
288 tablespaces=dict(slim_data='', slim_index='',
289 main_data='', main_index=''))
292 def sql_preprocessor(temp_db_conn, tmp_path, monkeypatch, table_factory):
293 table_factory('country_name', 'partition INT', (0, 1, 2))
294 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
295 cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
296 sql=tmp_path, data=SRC_DIR / 'data')
298 return SQLPreprocessor(temp_db_conn, cfg)
302 def tokenizer_mock(monkeypatch, property_table, temp_db_conn, dsn):
303 """ Sets up the configuration so that the test dummy tokenizer will be
306 monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
308 def _import_dummy(module, *args, **kwargs):
309 return dummy_tokenizer
311 monkeypatch.setattr(importlib, "import_module", _import_dummy)
312 properties.set_property(temp_db_conn, 'tokenizer', 'dummy')