4 from pathlib import Path
11 SRC_DIR = Path(__file__) / '..' / '..' / '..'
13 # always test against the source
14 sys.path.insert(0, str(SRC_DIR.resolve()))
16 from nominatim.config import Configuration
17 from nominatim.db import connection
18 from nominatim.db.sql_preprocessor import SQLPreprocessor
19 from nominatim.db import properties
20 import nominatim.tokenizer.factory
22 import dummy_tokenizer
25 class _TestingCursor(psycopg2.extras.DictCursor):
26 """ Extension to the DictCursor class that provides execution
27 short-cuts that simplify writing assertions.
30 def scalar(self, sql, params=None):
31 """ Execute a query with a single return value and return this value.
32 Raises an assertion when not exactly one row is returned.
34 self.execute(sql, params)
35 assert self.rowcount == 1
36 return self.fetchone()[0]
38 def row_set(self, sql, params=None):
39 """ Execute a query and return the result as a set of tuples.
41 self.execute(sql, params)
43 return set((tuple(row) for row in self))
45 def table_exists(self, table):
46 """ Check that a table with the given name exists in the database.
48 num = self.scalar("""SELECT count(*) FROM pg_tables
49 WHERE tablename = %s""", (table, ))
52 def table_rows(self, table):
53 """ Return the number of rows in the given table.
55 return self.scalar('SELECT count(*) FROM ' + table)
59 def temp_db(monkeypatch):
60 """ Create an empty database for the test. The database name is also
61 exported into NOMINATIM_DATABASE_DSN.
63 name = 'test_nominatim_python_unittest'
64 conn = psycopg2.connect(database='postgres')
66 conn.set_isolation_level(0)
67 with conn.cursor() as cur:
68 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
69 cur.execute('CREATE DATABASE {}'.format(name))
73 monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
77 conn = psycopg2.connect(database='postgres')
79 conn.set_isolation_level(0)
80 with conn.cursor() as cur:
81 cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
88 return 'dbname=' + temp_db
92 def temp_db_with_extensions(temp_db):
93 conn = psycopg2.connect(database=temp_db)
94 with conn.cursor() as cur:
95 cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
102 def temp_db_conn(temp_db):
103 """ Connection to the test database.
105 with connection.connect('dbname=' + temp_db) as conn:
110 def temp_db_cursor(temp_db):
111 """ Connection and cursor towards the test database. The connection will
112 be in auto-commit mode.
114 conn = psycopg2.connect('dbname=' + temp_db)
115 conn.set_isolation_level(0)
116 with conn.cursor(cursor_factory=_TestingCursor) as cur:
122 def table_factory(temp_db_cursor):
123 def mk_table(name, definition='id INT', content=None):
124 temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
125 if content is not None:
126 psycopg2.extras.execute_values(
127 temp_db_cursor, "INSERT INTO {} VALUES %s".format(name), content)
134 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
135 cfg.set_libdirs(module='.', osm2pgsql='.',
136 php=SRC_DIR / 'lib-php',
137 sql=SRC_DIR / 'lib-sql',
138 data=SRC_DIR / 'data')
143 return SRC_DIR.resolve()
146 def tmp_phplib_dir():
147 with tempfile.TemporaryDirectory() as phpdir:
148 (Path(phpdir) / 'admin').mkdir()
154 def property_table(table_factory):
155 table_factory('nominatim_properties', 'property TEXT, value TEXT')
158 def status_table(temp_db_conn):
159 """ Create an empty version of the status table and
160 the status logging table.
162 with temp_db_conn.cursor() as cur:
163 cur.execute("""CREATE TABLE import_status (
164 lastimportdate timestamp with time zone NOT NULL,
168 cur.execute("""CREATE TABLE import_osmosis_log (
176 temp_db_conn.commit()
180 def place_table(temp_db_with_extensions, temp_db_conn):
181 """ Create an empty version of the place table.
183 with temp_db_conn.cursor() as cur:
184 cur.execute("""CREATE TABLE place (
185 osm_id int8 NOT NULL,
186 osm_type char(1) NOT NULL,
190 admin_level smallint,
193 geometry Geometry(Geometry,4326) NOT NULL)""")
194 temp_db_conn.commit()
198 def place_row(place_table, temp_db_cursor):
199 """ A factory for rows in the place table. The table is created as a
200 prerequisite to the fixture.
202 idseq = itertools.count(1001)
203 def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
204 admin_level=None, address=None, extratags=None, geom=None):
205 temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
206 (osm_id or next(idseq), osm_type, cls, typ, names,
207 admin_level, address, extratags,
208 geom or 'SRID=4326;POINT(0 0)'))
213 def placex_table(temp_db_with_extensions, temp_db_conn):
214 """ Create an empty version of the place table.
216 return mocks.MockPlacexTable(temp_db_conn)
220 def osmline_table(temp_db_with_extensions, temp_db_conn):
221 with temp_db_conn.cursor() as cur:
222 cur.execute("""CREATE TABLE location_property_osmline (
225 parent_place_id BIGINT,
226 geometry_sector INTEGER,
227 indexed_date TIMESTAMP,
231 indexed_status SMALLINT,
233 interpolationtype TEXT,
236 country_code VARCHAR(2))""")
237 temp_db_conn.commit()
241 def word_table(temp_db_conn):
242 return mocks.MockWordTable(temp_db_conn)
246 def osm2pgsql_options(temp_db):
247 return dict(osm2pgsql='echo',
249 osm2pgsql_style='style.file',
251 dsn='dbname=' + temp_db,
253 tablespaces=dict(slim_data='', slim_index='',
254 main_data='', main_index=''))
257 def sql_preprocessor(temp_db_conn, tmp_path, monkeypatch, table_factory):
258 table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
259 cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
260 cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
261 sql=tmp_path, data=SRC_DIR / 'data')
263 return SQLPreprocessor(temp_db_conn, cfg)
267 def tokenizer_mock(monkeypatch, property_table, temp_db_conn, tmp_path):
268 """ Sets up the configuration so that the test dummy tokenizer will be
269 loaded when the tokenizer factory is used. Also returns a factory
270 with which a new dummy tokenizer may be created.
272 monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
274 def _import_dummy(module, *args, **kwargs):
275 return dummy_tokenizer
277 monkeypatch.setattr(nominatim.tokenizer.factory, "_import_tokenizer", _import_dummy)
278 properties.set_property(temp_db_conn, 'tokenizer', 'dummy')
280 def _create_tokenizer():
281 return dummy_tokenizer.DummyTokenizer(None, None)
283 return _create_tokenizer