]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/conftest.py
Merge pull request #3552 from lonvia/drop-some-migrations
[nominatim.git] / test / python / conftest.py
index d16dceffccb1008a1fe6082ab0a5e9d04cd2a439..a25ff8ec5046766416be3bcb87091dff8ff90fd2 100644 (file)
@@ -1,52 +1,34 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2024 by the Nominatim developer community.
+# For a full list of authors see the git log.
 import itertools
 import sys
 from pathlib import Path
 
-import psycopg2
-import psycopg2.extras
+import psycopg
+from psycopg import sql as pysql
 import pytest
 
-SRC_DIR = Path(__file__) / '..' / '..' / '..'
-
 # always test against the source
-sys.path.insert(0, str(SRC_DIR.resolve()))
-
-from nominatim.config import Configuration
-from nominatim.db import connection
-
-class _TestingCursor(psycopg2.extras.DictCursor):
-    """ Extension to the DictCursor class that provides execution
-        short-cuts that simplify writing assertions.
-    """
+SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
+sys.path.insert(0, str(SRC_DIR / 'src'))
 
-    def scalar(self, sql, params=None):
-        """ Execute a query with a single return value and return this value.
-            Raises an assertion when not exactly one row is returned.
-        """
-        self.execute(sql, params)
-        assert self.rowcount == 1
-        return self.fetchone()[0]
+from nominatim_db.config import Configuration
+from nominatim_db.db import connection
+from nominatim_db.db.sql_preprocessor import SQLPreprocessor
+import nominatim_db.tokenizer.factory
 
-    def row_set(self, sql, params=None):
-        """ Execute a query and return the result as a set of tuples.
-        """
-        self.execute(sql, params)
-        if self.rowcount == 1:
-            return set(tuple(self.fetchone()))
+import dummy_tokenizer
+import mocks
+from cursor import CursorForTesting
 
-        return set((tuple(row) for row in self))
 
-    def table_exists(self, table):
-        """ Check that a table with the given name exists in the database.
-        """
-        num = self.scalar("""SELECT count(*) FROM pg_tables
-                             WHERE tablename = %s""", (table, ))
-        return num == 1
-
-    def table_rows(self, table):
-        """ Return the number of rows in the given table.
-        """
-        return self.scalar('SELECT count(*) FROM ' + table)
+@pytest.fixture
+def src_dir():
+    return SRC_DIR
 
 
 @pytest.fixture
@@ -55,26 +37,23 @@ def temp_db(monkeypatch):
         exported into NOMINATIM_DATABASE_DSN.
     """
     name = 'test_nominatim_python_unittest'
-    conn = psycopg2.connect(database='postgres')
 
-    conn.set_isolation_level(0)
-    with conn.cursor() as cur:
-        cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
-        cur.execute('CREATE DATABASE {}'.format(name))
+    with psycopg.connect(dbname='postgres', autocommit=True) as conn:
+        with conn.cursor() as cur:
+            cur.execute(pysql.SQL('DROP DATABASE IF EXISTS') + pysql.Identifier(name))
+            cur.execute(pysql.SQL('CREATE DATABASE') + pysql.Identifier(name))
 
-    conn.close()
+    monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
 
-    monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
+    with psycopg.connect(dbname=name) as conn:
+        with conn.cursor() as cur:
+            cur.execute('CREATE EXTENSION hstore')
 
     yield name
 
-    conn = psycopg2.connect(database='postgres')
-
-    conn.set_isolation_level(0)
-    with conn.cursor() as cur:
-        cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
-
-    conn.close()
+    with psycopg.connect(dbname='postgres', autocommit=True) as conn:
+        with conn.cursor() as cur:
+            cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
 
 
 @pytest.fixture
@@ -84,11 +63,9 @@ def dsn(temp_db):
 
 @pytest.fixture
 def temp_db_with_extensions(temp_db):
-    conn = psycopg2.connect(database=temp_db)
-    with conn.cursor() as cur:
-        cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
-    conn.commit()
-    conn.close()
+    with psycopg.connect(dbname=temp_db) as conn:
+        with conn.cursor() as cur:
+            cur.execute('CREATE EXTENSION postgis')
 
     return temp_db
 
@@ -96,7 +73,8 @@ def temp_db_with_extensions(temp_db):
 def temp_db_conn(temp_db):
     """ Connection to the test database.
     """
-    with connection.connect('dbname=' + temp_db) as conn:
+    with connection.connect('', autocommit=True, dbname=temp_db) as conn:
+        connection.register_hstore(conn)
         yield conn
 
 
@@ -105,71 +83,84 @@ def temp_db_cursor(temp_db):
     """ Connection and cursor towards the test database. The connection will
         be in auto-commit mode.
     """
-    conn = psycopg2.connect('dbname=' + temp_db)
-    conn.set_isolation_level(0)
-    with conn.cursor(cursor_factory=_TestingCursor) as cur:
-        yield cur
-    conn.close()
+    with psycopg.connect(dbname=temp_db, autocommit=True, cursor_factory=CursorForTesting) as conn:
+        connection.register_hstore(conn)
+        with conn.cursor() as cur:
+            yield cur
 
 
 @pytest.fixture
-def table_factory(temp_db_cursor):
+def table_factory(temp_db_conn):
+    """ A fixture that creates new SQL tables, potentially filled with
+        content.
+    """
     def mk_table(name, definition='id INT', content=None):
-        temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
-        if content is not None:
-            if not isinstance(content, str):
-                content = '),('.join([str(x) for x in content])
-            temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
+        with psycopg.ClientCursor(temp_db_conn) as cur:
+            cur.execute('CREATE TABLE {} ({})'.format(name, definition))
+            if content:
+                sql = pysql.SQL("INSERT INTO {} VALUES ({})")\
+                           .format(pysql.Identifier(name),
+                                   pysql.SQL(',').join([pysql.Placeholder() for _ in range(len(content[0]))]))
+                cur.executemany(sql , content)
 
     return mk_table
 
 
 @pytest.fixture
 def def_config():
-    return Configuration(None, SRC_DIR.resolve() / 'settings')
+    cfg = Configuration(None)
+    cfg.set_libdirs(osm2pgsql=None)
+    return cfg
+
 
 @pytest.fixture
-def src_dir():
-    return SRC_DIR.resolve()
+def project_env(tmp_path):
+    projdir = tmp_path / 'project'
+    projdir.mkdir()
+    cfg = Configuration(projdir)
+    cfg.set_libdirs(osm2pgsql=None)
+    return cfg
+
 
 @pytest.fixture
-def status_table(temp_db_conn):
+def property_table(table_factory, temp_db_conn):
+    table_factory('nominatim_properties', 'property TEXT, value TEXT')
+
+    return mocks.MockPropertyTable(temp_db_conn)
+
+
+@pytest.fixture
+def status_table(table_factory):
     """ Create an empty version of the status table and
         the status logging table.
     """
-    with temp_db_conn.cursor() as cur:
-        cur.execute("""CREATE TABLE import_status (
-                           lastimportdate timestamp with time zone NOT NULL,
-                           sequence_id integer,
-                           indexed boolean
-                       )""")
-        cur.execute("""CREATE TABLE import_osmosis_log (
-                           batchend timestamp,
-                           batchseq integer,
-                           batchsize bigint,
-                           starttime timestamp,
-                           endtime timestamp,
-                           event text
-                           )""")
-    temp_db_conn.commit()
+    table_factory('import_status',
+                  """lastimportdate timestamp with time zone NOT NULL,
+                     sequence_id integer,
+                     indexed boolean""")
+    table_factory('import_osmosis_log',
+                  """batchend timestamp,
+                     batchseq integer,
+                     batchsize bigint,
+                     starttime timestamp,
+                     endtime timestamp,
+                     event text""")
 
 
 @pytest.fixture
-def place_table(temp_db_with_extensions, temp_db_conn):
+def place_table(temp_db_with_extensions, table_factory):
     """ Create an empty version of the place table.
     """
-    with temp_db_conn.cursor() as cur:
-        cur.execute("""CREATE TABLE place (
-                           osm_id int8 NOT NULL,
-                           osm_type char(1) NOT NULL,
-                           class text NOT NULL,
-                           type text NOT NULL,
-                           name hstore,
-                           admin_level smallint,
-                           address hstore,
-                           extratags hstore,
-                           geometry Geometry(Geometry,4326) NOT NULL)""")
-    temp_db_conn.commit()
+    table_factory('place',
+                  """osm_id int8 NOT NULL,
+                     osm_type char(1) NOT NULL,
+                     class text NOT NULL,
+                     type text NOT NULL,
+                     name hstore,
+                     admin_level smallint,
+                     address hstore,
+                     extratags hstore,
+                     geometry Geometry(Geometry,4326) NOT NULL""")
 
 
 @pytest.fixture
@@ -191,78 +182,57 @@ def place_row(place_table, temp_db_cursor):
 def placex_table(temp_db_with_extensions, temp_db_conn):
     """ Create an empty version of the place table.
     """
-    with temp_db_conn.cursor() as cur:
-        cur.execute("""CREATE TABLE placex (
-                           place_id BIGINT,
-                           parent_place_id BIGINT,
-                           linked_place_id BIGINT,
-                           importance FLOAT,
-                           indexed_date TIMESTAMP,
-                           geometry_sector INTEGER,
-                           rank_address SMALLINT,
-                           rank_search SMALLINT,
-                           partition SMALLINT,
-                           indexed_status SMALLINT,
-                           osm_id int8,
-                           osm_type char(1),
-                           class text,
-                           type text,
-                           name hstore,
-                           admin_level smallint,
-                           address hstore,
-                           extratags hstore,
-                           geometry Geometry(Geometry,4326),
-                           wikipedia TEXT,
-                           country_code varchar(2),
-                           housenumber TEXT,
-                           postcode TEXT,
-                           centroid GEOMETRY(Geometry, 4326))""")
-    temp_db_conn.commit()
+    return mocks.MockPlacexTable(temp_db_conn)
+
+
+@pytest.fixture
+def osmline_table(temp_db_with_extensions, table_factory):
+    table_factory('location_property_osmline',
+                  """place_id BIGINT,
+                     osm_id BIGINT,
+                     parent_place_id BIGINT,
+                     geometry_sector INTEGER,
+                     indexed_date TIMESTAMP,
+                     startnumber INTEGER,
+                     endnumber INTEGER,
+                     partition SMALLINT,
+                     indexed_status SMALLINT,
+                     linegeo GEOMETRY,
+                     interpolationtype TEXT,
+                     address HSTORE,
+                     postcode TEXT,
+                     country_code VARCHAR(2)""")
 
 
 @pytest.fixture
-def osmline_table(temp_db_with_extensions, temp_db_conn):
-    with temp_db_conn.cursor() as cur:
-        cur.execute("""CREATE TABLE location_property_osmline (
-                           place_id BIGINT,
-                           osm_id BIGINT,
-                           parent_place_id BIGINT,
-                           geometry_sector INTEGER,
-                           indexed_date TIMESTAMP,
-                           startnumber INTEGER,
-                           endnumber INTEGER,
-                           partition SMALLINT,
-                           indexed_status SMALLINT,
-                           linegeo GEOMETRY,
-                           interpolationtype TEXT,
-                           address HSTORE,
-                           postcode TEXT,
-                           country_code VARCHAR(2))""")
-    temp_db_conn.commit()
+def sql_preprocessor_cfg(tmp_path, table_factory, temp_db_with_extensions):
+    table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
+    cfg = Configuration(None)
+    cfg.set_libdirs(osm2pgsql=None, sql=tmp_path)
+    return cfg
 
 
 @pytest.fixture
-def word_table(temp_db, temp_db_conn):
-    with temp_db_conn.cursor() as cur:
-        cur.execute("""CREATE TABLE word (
-                           word_id INTEGER,
-                           word_token text,
-                           word text,
-                           class text,
-                           type text,
-                           country_code varchar(2),
-                           search_name_count INTEGER,
-                           operator TEXT)""")
-    temp_db_conn.commit()
+def sql_preprocessor(sql_preprocessor_cfg, temp_db_conn):
+    return SQLPreprocessor(temp_db_conn, sql_preprocessor_cfg)
 
 
 @pytest.fixture
-def osm2pgsql_options(temp_db):
-    return dict(osm2pgsql='echo',
-                osm2pgsql_cache=10,
-                osm2pgsql_style='style.file',
-                threads=1,
-                dsn='dbname=' + temp_db,
-                flatnode_file='',
-                tablespaces=dict(slim_data='', slim_index='',
-                                 main_data='', main_index=''))
+def tokenizer_mock(monkeypatch, property_table):
+    """ Sets up the configuration so that the test dummy tokenizer will be
+        loaded when the tokenizer factory is used. Also returns a factory
+        with which a new dummy tokenizer may be created.
+    """
+    monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
+
+    def _import_dummy(*args, **kwargs):
+        return dummy_tokenizer
+
+    monkeypatch.setattr(nominatim_db.tokenizer.factory,
+                        "_import_tokenizer", _import_dummy)
+    property_table.set('tokenizer', 'dummy')
+
+    def _create_tokenizer():
+        return dummy_tokenizer.DummyTokenizer(None, None)
+
+    return _create_tokenizer