]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/conftest.py
US TIGER data 2021 released
[nominatim.git] / test / python / conftest.py
index d92df5c5efdecf7ca89db3767fea55562b391741..2fc9772671069c06f337c0d24ca410b1c5a093b6 100644 (file)
@@ -1,38 +1,25 @@
+import itertools
 import sys
 from pathlib import Path
 
 import psycopg2
-import psycopg2.extras
 import pytest
 
-SRC_DIR = Path(__file__) / '..' / '..' / '..'
+SRC_DIR = (Path(__file__) / '..' / '..' / '..').resolve()
 
 # always test against the source
 sys.path.insert(0, str(SRC_DIR.resolve()))
 
 from nominatim.config import Configuration
+from nominatim.db import connection
+from nominatim.db.sql_preprocessor import SQLPreprocessor
+import nominatim.tokenizer.factory
+import nominatim.cli
 
-class _TestingCursor(psycopg2.extras.DictCursor):
-    """ Extension to the DictCursor class that provides execution
-        short-cuts that simplify writing assertions.
-    """
-
-    def scalar(self, sql, params=None):
-        """ Execute a query with a single return value and return this value.
-            Raises an assertion when not exactly one row is returned.
-        """
-        self.execute(sql, params)
-        assert self.rowcount == 1
-        return self.fetchone()[0]
-
-    def row_set(self, sql, params=None):
-        """ Execute a query and return the result as a set of tuples.
-        """
-        self.execute(sql, params)
-        if self.rowcount == 1:
-            return set(tuple(self.fetchone()))
+import dummy_tokenizer
+import mocks
+from cursor import CursorForTesting
 
-        return set((tuple(row) for row in self))
 
 @pytest.fixture
 def temp_db(monkeypatch):
@@ -40,29 +27,49 @@ def temp_db(monkeypatch):
         exported into NOMINATIM_DATABASE_DSN.
     """
     name = 'test_nominatim_python_unittest'
-    with psycopg2.connect(database='postgres') as conn:
-        conn.set_isolation_level(0)
-        with conn.cursor() as cur:
-            cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
-            cur.execute('CREATE DATABASE {}'.format(name))
+    conn = psycopg2.connect(database='postgres')
+
+    conn.set_isolation_level(0)
+    with conn.cursor() as cur:
+        cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
+        cur.execute('CREATE DATABASE {}'.format(name))
 
-    monkeypatch.setenv('NOMINATIM_DATABASE_DSN' , 'dbname=' + name)
+    conn.close()
+
+    monkeypatch.setenv('NOMINATIM_DATABASE_DSN', 'dbname=' + name)
 
     yield name
 
-    with psycopg2.connect(database='postgres') as conn:
-        conn.set_isolation_level(0)
-        with conn.cursor() as cur:
-            cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
+    conn = psycopg2.connect(database='postgres')
+
+    conn.set_isolation_level(0)
+    with conn.cursor() as cur:
+        cur.execute('DROP DATABASE IF EXISTS {}'.format(name))
+
+    conn.close()
+
+
+@pytest.fixture
+def dsn(temp_db):
+    return 'dbname=' + temp_db
+
+
+@pytest.fixture
+def temp_db_with_extensions(temp_db):
+    conn = psycopg2.connect(database=temp_db)
+    with conn.cursor() as cur:
+        cur.execute('CREATE EXTENSION hstore; CREATE EXTENSION postgis;')
+    conn.commit()
+    conn.close()
 
+    return temp_db
 
 @pytest.fixture
 def temp_db_conn(temp_db):
     """ Connection to the test database.
     """
-    conn = psycopg2.connect(database=temp_db)
-    yield conn
-    conn.close()
+    with connection.connect('dbname=' + temp_db) as conn:
+        yield conn
 
 
 @pytest.fixture
@@ -72,11 +79,178 @@ def temp_db_cursor(temp_db):
     """
     conn = psycopg2.connect('dbname=' + temp_db)
     conn.set_isolation_level(0)
-    with conn.cursor(cursor_factory=_TestingCursor) as cur:
+    with conn.cursor(cursor_factory=CursorForTesting) as cur:
         yield cur
     conn.close()
 
 
+@pytest.fixture
+def table_factory(temp_db_cursor):
+    """ A fixture that creates new SQL tables, potentially filled with
+        content.
+    """
+    def mk_table(name, definition='id INT', content=None):
+        temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
+        if content is not None:
+            temp_db_cursor.execute_values("INSERT INTO {} VALUES %s".format(name), content)
+
+    return mk_table
+
+
 @pytest.fixture
 def def_config():
-    return Configuration(None, SRC_DIR.resolve() / 'settings')
+    cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
+    cfg.set_libdirs(module='.', osm2pgsql='.',
+                    php=SRC_DIR / 'lib-php',
+                    sql=SRC_DIR / 'lib-sql',
+                    data=SRC_DIR / 'data')
+    return cfg
+
+
+@pytest.fixture
+def src_dir():
+    return SRC_DIR.resolve()
+
+
+@pytest.fixture
+def cli_call():
+    def _call_nominatim(*args):
+        return nominatim.cli.nominatim(module_dir='MODULE NOT AVAILABLE',
+                                       osm2pgsql_path='OSM2PGSQL NOT AVAILABLE',
+                                       phplib_dir=str(SRC_DIR / 'lib-php'),
+                                       data_dir=str(SRC_DIR / 'data'),
+                                       phpcgi_path='/usr/bin/php-cgi',
+                                       sqllib_dir=str(SRC_DIR / 'lib-sql'),
+                                       config_dir=str(SRC_DIR / 'settings'),
+                                       cli_args=args)
+
+    return _call_nominatim
+
+
+@pytest.fixture
+def property_table(table_factory, temp_db_conn):
+    table_factory('nominatim_properties', 'property TEXT, value TEXT')
+
+    return mocks.MockPropertyTable(temp_db_conn)
+
+
+@pytest.fixture
+def status_table(table_factory):
+    """ Create an empty version of the status table and
+        the status logging table.
+    """
+    table_factory('import_status',
+                  """lastimportdate timestamp with time zone NOT NULL,
+                     sequence_id integer,
+                     indexed boolean""")
+    table_factory('import_osmosis_log',
+                  """batchend timestamp,
+                     batchseq integer,
+                     batchsize bigint,
+                     starttime timestamp,
+                     endtime timestamp,
+                     event text""")
+
+
+@pytest.fixture
+def place_table(temp_db_with_extensions, table_factory):
+    """ Create an empty version of the place table.
+    """
+    table_factory('place',
+                  """osm_id int8 NOT NULL,
+                     osm_type char(1) NOT NULL,
+                     class text NOT NULL,
+                     type text NOT NULL,
+                     name hstore,
+                     admin_level smallint,
+                     address hstore,
+                     extratags hstore,
+                     geometry Geometry(Geometry,4326) NOT NULL""")
+
+
+@pytest.fixture
+def place_row(place_table, temp_db_cursor):
+    """ A factory for rows in the place table. The table is created as a
+        prerequisite to the fixture.
+    """
+    psycopg2.extras.register_hstore(temp_db_cursor)
+    idseq = itertools.count(1001)
+    def _insert(osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
+                admin_level=None, address=None, extratags=None, geom=None):
+        temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
+                               (osm_id or next(idseq), osm_type, cls, typ, names,
+                                admin_level, address, extratags,
+                                geom or 'SRID=4326;POINT(0 0)'))
+
+    return _insert
+
+@pytest.fixture
+def placex_table(temp_db_with_extensions, temp_db_conn):
+    """ Create an empty version of the place table.
+    """
+    return mocks.MockPlacexTable(temp_db_conn)
+
+
+@pytest.fixture
+def osmline_table(temp_db_with_extensions, table_factory):
+    table_factory('location_property_osmline',
+                  """place_id BIGINT,
+                     osm_id BIGINT,
+                     parent_place_id BIGINT,
+                     geometry_sector INTEGER,
+                     indexed_date TIMESTAMP,
+                     startnumber INTEGER,
+                     endnumber INTEGER,
+                     partition SMALLINT,
+                     indexed_status SMALLINT,
+                     linegeo GEOMETRY,
+                     interpolationtype TEXT,
+                     address HSTORE,
+                     postcode TEXT,
+                     country_code VARCHAR(2)""")
+
+
+@pytest.fixture
+def word_table(temp_db_conn):
+    return mocks.MockWordTable(temp_db_conn)
+
+
+@pytest.fixture
+def osm2pgsql_options(temp_db):
+    return dict(osm2pgsql='echo',
+                osm2pgsql_cache=10,
+                osm2pgsql_style='style.file',
+                threads=1,
+                dsn='dbname=' + temp_db,
+                flatnode_file='',
+                tablespaces=dict(slim_data='', slim_index='',
+                                 main_data='', main_index=''))
+
+@pytest.fixture
+def sql_preprocessor(temp_db_conn, tmp_path, table_factory):
+    table_factory('country_name', 'partition INT', ((0, ), (1, ), (2, )))
+    cfg = Configuration(None, SRC_DIR.resolve() / 'settings')
+    cfg.set_libdirs(module='.', osm2pgsql='.', php=SRC_DIR / 'lib-php',
+                    sql=tmp_path, data=SRC_DIR / 'data')
+
+    return SQLPreprocessor(temp_db_conn, cfg)
+
+
+@pytest.fixture
+def tokenizer_mock(monkeypatch, property_table):
+    """ Sets up the configuration so that the test dummy tokenizer will be
+        loaded when the tokenizer factory is used. Also returns a factory
+        with which a new dummy tokenizer may be created.
+    """
+    monkeypatch.setenv('NOMINATIM_TOKENIZER', 'dummy')
+
+    def _import_dummy(*args, **kwargs):
+        return dummy_tokenizer
+
+    monkeypatch.setattr(nominatim.tokenizer.factory, "_import_tokenizer", _import_dummy)
+    property_table.set('tokenizer', 'dummy')
+
+    def _create_tokenizer():
+        return dummy_tokenizer.DummyTokenizer(None, None)
+
+    return _create_tokenizer