]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/test_db_connection.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / test_db_connection.py
index dcbfb8bf2d9d26125d28fd78166328fe091764df..41978e59135513682cb04bc088d00338df78c499 100644 (file)
@@ -2,34 +2,51 @@
 Tests for specialised conenction and cursor classes.
 """
 import pytest
 Tests for specialised conenction and cursor classes.
 """
 import pytest
+import psycopg2
 
 from nominatim.db.connection import connect, get_pg_env
 
 @pytest.fixture
 
 from nominatim.db.connection import connect, get_pg_env
 
 @pytest.fixture
-def db(temp_db):
-    with connect('dbname=' + temp_db) as conn:
+def db(dsn):
+    with connect(dsn) as conn:
         yield conn
 
 
         yield conn
 
 
-def test_connection_table_exists(db, temp_db_cursor):
-    assert db.table_exists('foobar') == False
+def test_connection_table_exists(db, table_factory):
+    assert not db.table_exists('foobar')
 
 
-    temp_db_cursor.execute('CREATE TABLE foobar (id INT)')
+    table_factory('foobar')
 
 
-    assert db.table_exists('foobar') == True
+    assert db.table_exists('foobar')
 
 
 
 
-def test_connection_index_exists(db, temp_db_cursor):
-    assert db.index_exists('some_index') == False
+def test_connection_index_exists(db, table_factory, temp_db_cursor):
+    assert not db.index_exists('some_index')
 
 
-    temp_db_cursor.execute('CREATE TABLE foobar (id INT)')
+    table_factory('foobar')
     temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
 
     temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
 
-    assert db.index_exists('some_index') == True
-    assert db.index_exists('some_index', table='foobar') == True
-    assert db.index_exists('some_index', table='bar') == False
+    assert db.index_exists('some_index')
+    assert db.index_exists('some_index', table='foobar')
+    assert not db.index_exists('some_index', table='bar')
 
 
 
 
+def test_drop_table_existing(db, table_factory):
+    table_factory('dummy')
+    assert db.table_exists('dummy')
+
+    db.drop_table('dummy')
+    assert not db.table_exists('dummy')
+
+
+def test_drop_table_non_existsing(db):
+    db.drop_table('dfkjgjriogjigjgjrdghehtre')
+
+
+def test_drop_table_non_existing_force(db):
+    with pytest.raises(psycopg2.ProgrammingError, match='.*does not exist.*'):
+        db.drop_table('dfkjgjriogjigjgjrdghehtre', if_exists=False)
+
 def test_connection_server_version_tuple(db):
     ver = db.server_version_tuple()
 
 def test_connection_server_version_tuple(db):
     ver = db.server_version_tuple()
 
@@ -38,9 +55,7 @@ def test_connection_server_version_tuple(db):
     assert ver[0] > 8
 
 
     assert ver[0] > 8
 
 
-def test_connection_postgis_version_tuple(db, temp_db_cursor):
-    temp_db_cursor.execute('CREATE EXTENSION postgis')
-
+def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
     ver = db.postgis_version_tuple()
 
     assert isinstance(ver, tuple)
     ver = db.postgis_version_tuple()
 
     assert isinstance(ver, tuple)
@@ -48,8 +63,8 @@ def test_connection_postgis_version_tuple(db, temp_db_cursor):
     assert ver[0] >= 2
 
 
     assert ver[0] >= 2
 
 
-def test_cursor_scalar(db, temp_db_cursor):
-    temp_db_cursor.execute('CREATE TABLE dummy (id INT)')
+def test_cursor_scalar(db, table_factory):
+    table_factory('dummy')
 
     with db.cursor() as cur:
         assert cur.scalar('SELECT count(*) FROM dummy') == 0
 
     with db.cursor() as cur:
         assert cur.scalar('SELECT count(*) FROM dummy') == 0
@@ -61,6 +76,14 @@ def test_cursor_scalar_many_rows(db):
             cur.scalar('SELECT * FROM pg_tables')
 
 
             cur.scalar('SELECT * FROM pg_tables')
 
 
+def test_cursor_scalar_no_rows(db, table_factory):
+    table_factory('dummy')
+
+    with db.cursor() as cur:
+        with pytest.raises(RuntimeError):
+            cur.scalar('SELECT id FROM dummy')
+
+
 def test_get_pg_env_add_variable(monkeypatch):
     monkeypatch.delenv('PGPASSWORD', raising=False)
     env = get_pg_env('user=fooF')
 def test_get_pg_env_add_variable(monkeypatch):
     monkeypatch.delenv('PGPASSWORD', raising=False)
     env = get_pg_env('user=fooF')