+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2024 by the Nominatim developer community.
+# For a full list of authors see the git log.
import itertools
import sys
from pathlib import Path
-import psycopg2
-import psycopg2.extras
+import psycopg
+from psycopg import sql as pysql
import pytest
-SRC_DIR = Path(__file__) / '..' / '..' / '..'
-
# always test against the source
-sys.path.insert(0, str(SRC_DIR.resolve()))
-
-from nominatim.config import Configuration
-from nominatim.db import connection
-from nominatim.db.sql_preprocessor import SQLPreprocessor
-
-class _TestingCursor(psycopg2.extras.DictCursor):
- """ Extension to the DictCursor class that provides execution
- short-cuts that simplify writing assertions.
- """
-
- def scalar(self, sql, params=None):
- """ Execute a query with a single return value and return this value.
- Raises an assertion when not exactly one row is returned.
- """
- self.execute(sql, params)
- assert self.rowcount == 1
- return self.fetchone()[0]
+SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
+sys.path.insert(0, str(SRC_DIR / 'src'))
- def row_set(self, sql, params=None):
- """ Execute a query and return the result as a set of tuples.
- """
- self.execute(sql, params)
- if self.rowcount == 1:
- return set(tuple(self.fetchone()))
+from nominatim_db.config import Configuration
+from nominatim_db.db import connection
+from nominatim_db.db.sql_preprocessor import SQLPreprocessor
+import nominatim_db.tokenizer.factory
- return set((tuple(row) for row in self))
+import dummy_tokenizer
+import mocks
+from cursor import CursorForTesting
- def table_exists(self, table):
- """ Check that a table with the given name exists in the database.
- """
- num = self.scalar("""SELECT count(*) FROM pg_tables
- WHERE tablename = %s""", (table, ))
- return num == 1
- def table_rows(self, table):
- """ Return the number of rows in the given table.
- """
- return self.scalar('SELECT count(*) FROM ' + table)
+@pytest.fixture
+def src_dir():
+ return SRC_DIR
@pytest.fixture
exported into NOMINATIM_DATABASE_DSN.
"""
name = 'test_nominatim_python_unittest'
- conn = psycopg2.connect(database='postgres')
- conn.set_isolation_level(0)
- with conn.cursor() as cur:
- cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
- cur.execute('CREATE DATABASE {}'.format(name))
+ with psycopg.connect(dbname='postgres', autocommit=True) as conn:
+ with conn.cursor() as cur:
+ cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
+ cur.execute(pysql.SQL('CREATE DATABASE') + pysql.Identifier(name))
- conn.close()
+ monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
- monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
+ with psycopg.connect(dbname=name) as conn:
+ with conn.cursor() as cur:
+ cur.execute('CREATE EXTENSION hstore')
yield name
- conn = psycopg2.connect(database='postgres')
-
- conn.set_isolation_level(0)
- with conn.cursor() as cur:
- cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
-
- conn.close()
+ with psycopg.connect(dbname='postgres', autocommit=True) as conn:
+ with conn.cursor() as cur:
+ cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
@pytest.fixture
@pytest.fixture
def temp_db_with_extensions(temp_db):
- conn = psycopg2.connect(database=temp_db)
- with conn.cursor() as cur:
- cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
- conn.commit()
- conn.close()
+ with psycopg.connect(dbname=temp_db) as conn:
+ with conn.cursor() as cur:
+ cur.execute('CREATE EXTENSION postgis')
return temp_db
def temp_db_conn(temp_db):
""" Connection to the test database.
"""
- with connection.connect('dbname=' + temp_db) as conn:
+ with connection.connect('', autocommit=True, dbname=temp_db) as conn:
+ connection.register_hstore(conn)
yield conn
""" Connection and cursor towards the test database. The connection will
be in auto-commit mode.
"""
- conn = psycopg2.connect('dbname=' + temp_db)
- conn.set_isolation_level(0)
- with conn.cursor(cursor_factory=_TestingCursor) as cur:
- yield cur
- conn.close()
+ with psycopg.connect(dbname=temp_db, autocommit=True, cursor_factory=CursorForTesting) as conn:
+ connection.register_hstore(conn)
+ with conn.cursor() as cur:
+ yield cur
@pytest.fixture
-def table_factory(temp_db_cursor):
+def table_factory(temp_db_conn):
+ """ A fixture that creates new SQL tables, potentially filled with
+ content.
+ """
def mk_table(name, definition='id INT', content=None):
- temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
- if content is not None:
- if not isinstance(content, str):
- content = '),('.join([str(x) for x in content])
- temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
+ with psycopg.ClientCursor(temp_db_conn) as cur:
+ cur.execute('CREATE TABLE {} ({})'.format(name, definition))
+ if content:
+ sql = pysql.SQL("INSERT INTO {} VALUES ({})")\
+ .format(pysql.Identifier(name),
+ pysql.SQL(',').join([pysql.Placeholder() for _ in range(len(content[0]))]))
+ cur.executemany(sql , content)
return mk_table
@pytest.fixture
def def_config():
- return Configuration(None, SRC_DIR.resolve() / 'settings')
+ cfg = Configuration(None)
+ cfg.set_libdirs(module='.', osm2pgsql='.')
+ return cfg
+
@pytest.fixture
-def src_dir():
- return SRC_DIR.resolve()
+def project_env(tmp_path):
+ projdir = tmp_path / 'project'
+ projdir.mkdir()
+ cfg = Configuration(projdir)
+ cfg.set_libdirs(module='.', osm2pgsql='.')
+ return cfg
+
+
+@pytest.fixture
+def property_table(table_factory, temp_db_conn):
+ table_factory('nominatim_properties', 'property TEXT, value TEXT')
+
+ return mocks.MockPropertyTable(temp_db_conn)
+
@pytest.fixture
-def status_table(temp_db_conn):
+def status_table(table_factory):
""" Create an empty version of the status table and
the status logging table.
"""
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE import_status (
- lastimportdate timestamp with time zone NOT NULL,
- sequence_id integer,
- indexed boolean
- )""")
- cur.execute("""CREATE TABLE import_osmosis_log (
- batchend timestamp,
- batchseq integer,
- batchsize bigint,
- starttime timestamp,
- endtime timestamp,
- event text
- )""")
- temp_db_conn.commit()
+ table_factory('import_status',
+ """lastimportdate timestamp with time zone NOT NULL,
+ sequence_id integer,
+ indexed boolean""")
+ table_factory('import_osmosis_log',
+ """batchend timestamp,
+ batchseq integer,
+ batchsize bigint,
+ starttime timestamp,
+ endtime timestamp,
+ event text""")
@pytest.fixture
-def place_table(temp_db_with_extensions, temp_db_conn):
+def place_table(temp_db_with_extensions, table_factory):
""" Create an empty version of the place table.
"""
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE place (
- osm_id int8 NOT NULL,
- osm_type char(1) NOT NULL,
- class text NOT NULL,
- type text NOT NULL,
- name hstore,
- admin_level smallint,
- address hstore,
- extratags hstore,
- geometry Geometry(Geometry,4326) NOT NULL)""")
- temp_db_conn.commit()
+ table_factory('place',
+ """osm_id int8 NOT NULL,
+ osm_type char(1) NOT NULL,
+ class text NOT NULL,
+ type text NOT NULL,
+ name hstore,
+ admin_level smallint,
+ address hstore,
+ extratags hstore,
+ geometry Geometry(Geometry,4326) NOT NULL""")
@pytest.fixture
def placex_table(temp_db_with_extensions, temp_db_conn):
""" Create an empty version of the place table.
"""
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE placex (
- place_id BIGINT,
- parent_place_id BIGINT,
- linked_place_id BIGINT,
- importance FLOAT,
- indexed_date TIMESTAMP,
- geometry_sector INTEGER,
- rank_address SMALLINT,
- rank_search SMALLINT,
- partition SMALLINT,
- indexed_status SMALLINT,
- osm_id int8,
- osm_type char(1),
- class text,
- type text,
- name hstore,
- admin_level smallint,
- address hstore,
- extratags hstore,
- geometry Geometry(Geometry,4326),
- wikipedia TEXT,
- country_code varchar(2),
- housenumber TEXT,
- postcode TEXT,
- centroid GEOMETRY(Geometry, 4326))""")
- temp_db_conn.commit()
+ return mocks.MockPlacexTable(temp_db_conn)
@pytest.fixture
-def osmline_table(temp_db_with_extensions, temp_db_conn):
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE location_property_osmline (
- place_id BIGINT,
- osm_id BIGINT,
- parent_place_id BIGINT,
- geometry_sector INTEGER,
- indexed_date TIMESTAMP,
- startnumber INTEGER,
- endnumber INTEGER,
- partition SMALLINT,
- indexed_status SMALLINT,
- linegeo GEOMETRY,
- interpolationtype TEXT,
- address HSTORE,
- postcode TEXT,
- country_code VARCHAR(2))""")
- temp_db_conn.commit()
+def osmline_table(temp_db_with_extensions, table_factory):
+ table_factory('location_property_osmline',
+ """place_id BIGINT,
+ osm_id BIGINT,
+ parent_place_id BIGINT,
+ geometry_sector INTEGER,
+ indexed_date TIMESTAMP,
+ startnumber INTEGER,
+ endnumber INTEGER,
+ partition SMALLINT,
+ indexed_status SMALLINT,
+ linegeo GEOMETRY,
+ interpolationtype TEXT,
+ address HSTORE,
+ postcode TEXT,
+ country_code VARCHAR(2)""")
@pytest.fixture
-def word_table(temp_db, temp_db_conn):
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE word (
- word_id INTEGER,
- word_token text,
- word text,
- class text,
- type text,
- country_code varchar(2),
- search_name_count INTEGER,
- operator TEXT)""")
- temp_db_conn.commit()
+def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions):
+ table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
+ cfg = Configuration(None)
+ cfg.set_libdirs(module='.', osm2pgsql='.', sql=tmp_path)
+ return cfg
@pytest.fixture
-def osm2pgsql_options(temp_db):
- return dict(osm2pgsql='echo',
- osm2pgsql_cache=10,
- osm2pgsql_style='style.file',
- threads=1,
- dsn='dbname=' + temp_db,
- flatnode_file='',
- tablespaces=dict(slim_data='', slim_index='',
- main_data='', main_index=''))
+def sql_preprocessor(sql_preprocessor_cfg, temp_db_conn):
+ return SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg)
+
@pytest.fixture
-def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
- monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
- table_factory('country_name', 'partition INT', (0, 1, 2))
- return SQLPreprocessor(temp_db_conn, def_config, tmp_path)
+def tokenizer_mock(monkeypatch, property_table):
+ """ Sets up the configuration so that the test dummy tokenizer will be
+ loaded when the tokenizer factory is used. Also returns a factory
+ with which a new dummy tokenizer may be created.
+ """
+ monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
+
+ def _import_dummy(*args, **kwargs):
+ return dummy_tokenizer
+
+ monkeypatch.setattr(nominatim_db.tokenizer.factory,
+ "_import_tokenizer", _import_dummy)
+ property_table.set('tokenizer', 'dummy')
+
+ def _create_tokenizer():
+ return dummy_tokenizer.DummyTokenizer(None, None)
+
+ return _create_tokenizer