]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/db/test_connection.py
release 4.5.0.post7
[nominatim.git] / test / python / db / test_connection.py
index a89d92896fa91044b07575f8e636504e794a5688..a8b5d677ce22e43f8e9b8e69ff222dc08202e560 100644 (file)
@@ -1,60 +1,83 @@
-# SPDX-License-Identifier: GPL-2.0-only
+# SPDX-License-Identifier: GPL-3.0-or-later
 #
 # This file is part of Nominatim. (https://nominatim.org)
 #
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2024 by the Nominatim developer community.
 # For a full list of authors see the git log.
 """
-Tests for specialised conenction and cursor classes.
+Tests for specialised connection and cursor classes.
 """
 import pytest
-import psycopg2
+import psycopg
 
-from nominatim.db.connection import connect, get_pg_env
+import nominatim_db.db.connection as nc
 
 @pytest.fixture
 def db(dsn):
-    with connect(dsn) as conn:
+    with nc.connect(dsn) as conn:
         yield conn
 
 
 def test_connection_table_exists(db, table_factory):
-    assert not db.table_exists('foobar')
+    assert not nc.table_exists(db, 'foobar')
 
     table_factory('foobar')
 
-    assert db.table_exists('foobar')
+    assert nc.table_exists(db, 'foobar')
 
 
+def test_has_column_no_table(db):
+    assert not nc.table_has_column(db, 'sometable', 'somecolumn')
+
+
+@pytest.mark.parametrize('name,result', [('tram', True), ('car', False)])
+def test_has_column(db, table_factory, name, result):
+    table_factory('stuff', 'tram TEXT')
+
+    assert nc.table_has_column(db, 'stuff', name) == result
+
 def test_connection_index_exists(db, table_factory, temp_db_cursor):
-    assert not db.index_exists('some_index')
+    assert not nc.index_exists(db, 'some_index')
 
     table_factory('foobar')
     temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
 
-    assert db.index_exists('some_index')
-    assert db.index_exists('some_index', table='foobar')
-    assert not db.index_exists('some_index', table='bar')
+    assert nc.index_exists(db, 'some_index')
+    assert nc.index_exists(db, 'some_index', table='foobar')
+    assert not nc.index_exists(db, 'some_index', table='bar')
 
 
 def test_drop_table_existing(db, table_factory):
     table_factory('dummy')
-    assert db.table_exists('dummy')
+    assert nc.table_exists(db, 'dummy')
+
+    nc.drop_tables(db, 'dummy')
+    assert not nc.table_exists(db, 'dummy')
+
+
+def test_drop_table_non_existing(db):
+    nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre')
+
+
+def test_drop_many_tables(db, table_factory):
+    tables = [f'table{n}' for n in range(5)]
 
-    db.drop_table('dummy')
-    assert not db.table_exists('dummy')
+    for t in tables:
+        table_factory(t)
+        assert nc.table_exists(db, t)
 
+    nc.drop_tables(db, *tables)
 
-def test_drop_table_non_existsing(db):
-    db.drop_table('dfkjgjriogjigjgjrdghehtre')
+    for t in tables:
+        assert not nc.table_exists(db, t)
 
 
 def test_drop_table_non_existing_force(db):
-    with pytest.raises(psycopg2.ProgrammingError, match='.*does not exist.*'):
-        db.drop_table('dfkjgjriogjigjgjrdghehtre', if_exists=False)
+    with pytest.raises(psycopg.ProgrammingError, match='.*does not exist.*'):
+        nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre', if_exists=False)
 
 def test_connection_server_version_tuple(db):
-    ver = db.server_version_tuple()
+    ver = nc.server_version_tuple(db)
 
     assert isinstance(ver, tuple)
     assert len(ver) == 2
@@ -62,7 +85,7 @@ def test_connection_server_version_tuple(db):
 
 
 def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
-    ver = db.postgis_version_tuple()
+    ver = nc.postgis_version_tuple(db)
 
     assert isinstance(ver, tuple)
     assert len(ver) == 2
@@ -72,27 +95,24 @@ def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
 def test_cursor_scalar(db, table_factory):
     table_factory('dummy')
 
-    with db.cursor() as cur:
-        assert cur.scalar('SELECT count(*) FROM dummy') == 0
+    assert nc.execute_scalar(db, 'SELECT count(*) FROM dummy') == 0
 
 
 def test_cursor_scalar_many_rows(db):
-    with db.cursor() as cur:
-        with pytest.raises(RuntimeError):
-            cur.scalar('SELECT * FROM pg_tables')
+    with pytest.raises(RuntimeError, match='Query did not return a single row.'):
+        nc.execute_scalar(db, 'SELECT * FROM pg_tables')
 
 
 def test_cursor_scalar_no_rows(db, table_factory):
     table_factory('dummy')
 
-    with db.cursor() as cur:
-        with pytest.raises(RuntimeError):
-            cur.scalar('SELECT id FROM dummy')
+    with pytest.raises(RuntimeError, match='Query did not return a single row.'):
+        nc.execute_scalar(db, 'SELECT id FROM dummy')
 
 
 def test_get_pg_env_add_variable(monkeypatch):
     monkeypatch.delenv('PGPASSWORD', raising=False)
-    env = get_pg_env('user=fooF')
+    env = nc.get_pg_env('user=fooF')
 
     assert env['PGUSER'] == 'fooF'
     assert 'PGPASSWORD' not in env
@@ -100,12 +120,12 @@ def test_get_pg_env_add_variable(monkeypatch):
 
 def test_get_pg_env_overwrite_variable(monkeypatch):
     monkeypatch.setenv('PGUSER', 'some default')
-    env = get_pg_env('user=overwriter')
+    env = nc.get_pg_env('user=overwriter')
 
     assert env['PGUSER'] == 'overwriter'
 
 
 def test_get_pg_env_ignore_unknown():
-    env = get_pg_env('client_encoding=stuff', base_env={})
+    env = nc.get_pg_env('client_encoding=stuff', base_env={})
 
     assert env == {}