X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c4f22a42eba499ed1854c5967b2e3932fe9b3896..HEAD:/test/python/db/test_connection.py?ds=sidebyside diff --git a/test/python/db/test_connection.py b/test/python/db/test_connection.py index a89d9289..a8b5d677 100644 --- a/test/python/db/test_connection.py +++ b/test/python/db/test_connection.py @@ -1,60 +1,83 @@ -# SPDX-License-Identifier: GPL-2.0-only +# SPDX-License-Identifier: GPL-3.0-or-later # # This file is part of Nominatim. (https://nominatim.org) # -# Copyright (C) 2022 by the Nominatim developer community. +# Copyright (C) 2024 by the Nominatim developer community. # For a full list of authors see the git log. """ -Tests for specialised conenction and cursor classes. +Tests for specialised connection and cursor classes. """ import pytest -import psycopg2 +import psycopg -from nominatim.db.connection import connect, get_pg_env +import nominatim_db.db.connection as nc @pytest.fixture def db(dsn): - with connect(dsn) as conn: + with nc.connect(dsn) as conn: yield conn def test_connection_table_exists(db, table_factory): - assert not db.table_exists('foobar') + assert not nc.table_exists(db, 'foobar') table_factory('foobar') - assert db.table_exists('foobar') + assert nc.table_exists(db, 'foobar') +def test_has_column_no_table(db): + assert not nc.table_has_column(db, 'sometable', 'somecolumn') + + +@pytest.mark.parametrize('name,result', [('tram', True), ('car', False)]) +def test_has_column(db, table_factory, name, result): + table_factory('stuff', 'tram TEXT') + + assert nc.table_has_column(db, 'stuff', name) == result + def test_connection_index_exists(db, table_factory, temp_db_cursor): - assert not db.index_exists('some_index') + assert not nc.index_exists(db, 'some_index') table_factory('foobar') temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)') - assert db.index_exists('some_index') - assert db.index_exists('some_index', table='foobar') - assert not db.index_exists('some_index', table='bar') + assert nc.index_exists(db, 'some_index') + assert nc.index_exists(db, 'some_index', table='foobar') + assert not nc.index_exists(db, 'some_index', table='bar') def test_drop_table_existing(db, table_factory): table_factory('dummy') - assert db.table_exists('dummy') + assert nc.table_exists(db, 'dummy') + + nc.drop_tables(db, 'dummy') + assert not nc.table_exists(db, 'dummy') + + +def test_drop_table_non_existing(db): + nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre') + + +def test_drop_many_tables(db, table_factory): + tables = [f'table{n}' for n in range(5)] - db.drop_table('dummy') - assert not db.table_exists('dummy') + for t in tables: + table_factory(t) + assert nc.table_exists(db, t) + nc.drop_tables(db, *tables) -def test_drop_table_non_existsing(db): - db.drop_table('dfkjgjriogjigjgjrdghehtre') + for t in tables: + assert not nc.table_exists(db, t) def test_drop_table_non_existing_force(db): - with pytest.raises(psycopg2.ProgrammingError, match='.*does not exist.*'): - db.drop_table('dfkjgjriogjigjgjrdghehtre', if_exists=False) + with pytest.raises(psycopg.ProgrammingError, match='.*does not exist.*'): + nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre', if_exists=False) def test_connection_server_version_tuple(db): - ver = db.server_version_tuple() + ver = nc.server_version_tuple(db) assert isinstance(ver, tuple) assert len(ver) == 2 @@ -62,7 +85,7 @@ def test_connection_server_version_tuple(db): def test_connection_postgis_version_tuple(db, temp_db_with_extensions): - ver = db.postgis_version_tuple() + ver = nc.postgis_version_tuple(db) assert isinstance(ver, tuple) assert len(ver) == 2 @@ -72,27 +95,24 @@ def test_connection_postgis_version_tuple(db, temp_db_with_extensions): def test_cursor_scalar(db, table_factory): table_factory('dummy') - with db.cursor() as cur: - assert cur.scalar('SELECT count(*) FROM dummy') == 0 + assert nc.execute_scalar(db, 'SELECT count(*) FROM dummy') == 0 def test_cursor_scalar_many_rows(db): - with db.cursor() as cur: - with pytest.raises(RuntimeError): - cur.scalar('SELECT * FROM pg_tables') + with pytest.raises(RuntimeError, match='Query did not return a single row.'): + nc.execute_scalar(db, 'SELECT * FROM pg_tables') def test_cursor_scalar_no_rows(db, table_factory): table_factory('dummy') - with db.cursor() as cur: - with pytest.raises(RuntimeError): - cur.scalar('SELECT id FROM dummy') + with pytest.raises(RuntimeError, match='Query did not return a single row.'): + nc.execute_scalar(db, 'SELECT id FROM dummy') def test_get_pg_env_add_variable(monkeypatch): monkeypatch.delenv('PGPASSWORD', raising=False) - env = get_pg_env('user=fooF') + env = nc.get_pg_env('user=fooF') assert env['PGUSER'] == 'fooF' assert 'PGPASSWORD' not in env @@ -100,12 +120,12 @@ def test_get_pg_env_add_variable(monkeypatch): def test_get_pg_env_overwrite_variable(monkeypatch): monkeypatch.setenv('PGUSER', 'some default') - env = get_pg_env('user=overwriter') + env = nc.get_pg_env('user=overwriter') assert env['PGUSER'] == 'overwriter' def test_get_pg_env_ignore_unknown(): - env = get_pg_env('client_encoding=stuff', base_env={}) + env = nc.get_pg_env('client_encoding=stuff', base_env={}) assert env == {}