]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/conftest.py
Merge pull request #2269 from lonvia/fix-actions
[nominatim.git] / test / python / conftest.py
index 2e81e91997ae7a902727013a1f1593088fba0a9f..871365d90214f0515365179644d6ffe69e8a0100 100644 (file)
@@ -5,6 +5,7 @@ from pathlib import Path
 import psycopg2
 import psycopg2.extras
 import pytest
 import psycopg2
 import psycopg2.extras
 import pytest
+import tempfile
 
 SRC_DIR = Path(__file__) / '..' / '..' / '..'
 
 
 SRC_DIR = Path(__file__) / '..' / '..' / '..'
 
@@ -13,6 +14,7 @@ sys.path.insert(0, str(SRC_DIR.resolve()))
 
 from nominatim.config import Configuration
 from nominatim.db import connection
 
 from nominatim.config import Configuration
 from nominatim.db import connection
+from nominatim.db.sql_preprocessor import SQLPreprocessor
 
 class _TestingCursor(psycopg2.extras.DictCursor):
     """ Extension to the DictCursor class that provides execution
 
 class _TestingCursor(psycopg2.extras.DictCursor):
     """ Extension to the DictCursor class that provides execution
@@ -36,6 +38,19 @@ class _TestingCursor(psycopg2.extras.DictCursor):
 
         return set((tuple(row) for row in self))
 
 
         return set((tuple(row) for row in self))
 
+    def table_exists(self, table):
+        """ Check that a table with the given name exists in the database.
+        """
+        num = self.scalar("""SELECT count(*) FROM pg_tables
+                             WHERE tablename = %s""", (table, ))
+        return num == 1
+
+    def table_rows(self, table):
+        """ Return the number of rows in the given table.
+        """
+        return self.scalar('SELECT count(*) FROM ' + table)
+
+
 @pytest.fixture
 def temp_db(monkeypatch):
     """ Create an empty database for the test. The database name is also
 @pytest.fixture
 def temp_db(monkeypatch):
     """ Create an empty database for the test. The database name is also
@@ -63,6 +78,12 @@ def temp_db(monkeypatch):
 
     conn.close()
 
 
     conn.close()
 
+
+@pytest.fixture
+def dsn(temp_db):
+    return 'dbname=' + temp_db
+
+
 @pytest.fixture
 def temp_db_with_extensions(temp_db):
     conn = psycopg2.connect(database=temp_db)
 @pytest.fixture
 def temp_db_with_extensions(temp_db):
     conn = psycopg2.connect(database=temp_db)
@@ -77,9 +98,8 @@ def temp_db_with_extensions(temp_db):
 def temp_db_conn(temp_db):
     """ Connection to the test database.
     """
 def temp_db_conn(temp_db):
     """ Connection to the test database.
     """
-    conn = connection.connect('dbname=' + temp_db)
-    yield conn
-    conn.close()
+    with connection.connect('dbname=' + temp_db) as conn:
+        yield conn
 
 
 @pytest.fixture
 
 
 @pytest.fixture
@@ -94,14 +114,37 @@ def temp_db_cursor(temp_db):
     conn.close()
 
 
     conn.close()
 
 
+@pytest.fixture
+def table_factory(temp_db_cursor):
+    def mk_table(name, definition='id INT', content=None):
+        temp_db_cursor.execute('CREATE TABLE {} ({})'.format(name, definition))
+        if content is not None:
+            if not isinstance(content, str):
+                content = '),('.join([str(x) for x in content])
+            temp_db_cursor.execute("INSERT INTO {} VALUES ({})".format(name, content))
+
+    return mk_table
+
+
 @pytest.fixture
 def def_config():
     return Configuration(None, SRC_DIR.resolve() / 'settings')
 
 @pytest.fixture
 def def_config():
     return Configuration(None, SRC_DIR.resolve() / 'settings')
 
+@pytest.fixture
+def src_dir():
+    return SRC_DIR.resolve()
+
+@pytest.fixture
+def tmp_phplib_dir():
+    with tempfile.TemporaryDirectory() as phpdir:
+        (Path(phpdir) / 'admin').mkdir()
+
+        yield Path(phpdir)
 
 @pytest.fixture
 def status_table(temp_db_conn):
 
 @pytest.fixture
 def status_table(temp_db_conn):
-    """ Create an empty version of the status table.
+    """ Create an empty version of the status table and
+        the status logging table.
     """
     with temp_db_conn.cursor() as cur:
         cur.execute("""CREATE TABLE import_status (
     """
     with temp_db_conn.cursor() as cur:
         cur.execute("""CREATE TABLE import_status (
@@ -109,6 +152,14 @@ def status_table(temp_db_conn):
                            sequence_id integer,
                            indexed boolean
                        )""")
                            sequence_id integer,
                            indexed boolean
                        )""")
+        cur.execute("""CREATE TABLE import_osmosis_log (
+                           batchend timestamp,
+                           batchseq integer,
+                           batchsize bigint,
+                           starttime timestamp,
+                           endtime timestamp,
+                           event text
+                           )""")
     temp_db_conn.commit()
 
 
     temp_db_conn.commit()
 
 
@@ -141,6 +192,92 @@ def place_row(place_table, temp_db_cursor):
         temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
                                (osm_id or next(idseq), osm_type, cls, typ, names,
                                 admin_level, address, extratags,
         temp_db_cursor.execute("INSERT INTO place VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
                                (osm_id or next(idseq), osm_type, cls, typ, names,
                                 admin_level, address, extratags,
-                                geom or 'SRID=4326;POINT(0 0 )'))
+                                geom or 'SRID=4326;POINT(0 0)'))
 
     return _insert
 
     return _insert
+
+@pytest.fixture
+def placex_table(temp_db_with_extensions, temp_db_conn):
+    """ Create an empty version of the place table.
+    """
+    with temp_db_conn.cursor() as cur:
+        cur.execute("""CREATE TABLE placex (
+                           place_id BIGINT,
+                           parent_place_id BIGINT,
+                           linked_place_id BIGINT,
+                           importance FLOAT,
+                           indexed_date TIMESTAMP,
+                           geometry_sector INTEGER,
+                           rank_address SMALLINT,
+                           rank_search SMALLINT,
+                           partition SMALLINT,
+                           indexed_status SMALLINT,
+                           osm_id int8,
+                           osm_type char(1),
+                           class text,
+                           type text,
+                           name hstore,
+                           admin_level smallint,
+                           address hstore,
+                           extratags hstore,
+                           geometry Geometry(Geometry,4326),
+                           wikipedia TEXT,
+                           country_code varchar(2),
+                           housenumber TEXT,
+                           postcode TEXT,
+                           centroid GEOMETRY(Geometry, 4326))""")
+    temp_db_conn.commit()
+
+
+@pytest.fixture
+def osmline_table(temp_db_with_extensions, temp_db_conn):
+    with temp_db_conn.cursor() as cur:
+        cur.execute("""CREATE TABLE location_property_osmline (
+                           place_id BIGINT,
+                           osm_id BIGINT,
+                           parent_place_id BIGINT,
+                           geometry_sector INTEGER,
+                           indexed_date TIMESTAMP,
+                           startnumber INTEGER,
+                           endnumber INTEGER,
+                           partition SMALLINT,
+                           indexed_status SMALLINT,
+                           linegeo GEOMETRY,
+                           interpolationtype TEXT,
+                           address HSTORE,
+                           postcode TEXT,
+                           country_code VARCHAR(2))""")
+    temp_db_conn.commit()
+
+
+@pytest.fixture
+def word_table(temp_db, temp_db_conn):
+    with temp_db_conn.cursor() as cur:
+        cur.execute("""CREATE TABLE word (
+                           word_id INTEGER,
+                           word_token text,
+                           word text,
+                           class text,
+                           type text,
+                           country_code varchar(2),
+                           search_name_count INTEGER,
+                           operator TEXT)""")
+    temp_db_conn.commit()
+
+
+@pytest.fixture
+def osm2pgsql_options(temp_db):
+    return dict(osm2pgsql='echo',
+                osm2pgsql_cache=10,
+                osm2pgsql_style='style.file',
+                threads=1,
+                dsn='dbname=' + temp_db,
+                flatnode_file='',
+                tablespaces=dict(slim_data='', slim_index='',
+                                 main_data='', main_index=''))
+
+@pytest.fixture
+def sql_preprocessor(temp_db_conn, tmp_path, def_config, monkeypatch, table_factory):
+    monkeypatch.setenv('NOMINATIM_DATABASE_MODULE_PATH', '.')
+    table_factory('country_name', 'partition INT', (0, 1, 2))
+    return SQLPreprocessor(temp_db_conn, def_config, tmp_path)