]> git.openstreetmap.org Git - nominatim.git/blob - test/python/db/test_connection.py
Merge pull request #3487 from lonvia/port-to-psycopg3
[nominatim.git] / test / python / db / test_connection.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for specialised connection and cursor classes.
9 """
10 import pytest
11 import psycopg
12
13 import nominatim_db.db.connection as nc
14
15 @pytest.fixture
16 def db(dsn):
17     with nc.connect(dsn) as conn:
18         yield conn
19
20
21 def test_connection_table_exists(db, table_factory):
22     assert not nc.table_exists(db, 'foobar')
23
24     table_factory('foobar')
25
26     assert nc.table_exists(db, 'foobar')
27
28
29 def test_has_column_no_table(db):
30     assert not nc.table_has_column(db, 'sometable', 'somecolumn')
31
32
33 @pytest.mark.parametrize('name,result', [('tram', True), ('car', False)])
34 def test_has_column(db, table_factory, name, result):
35     table_factory('stuff', 'tram TEXT')
36
37     assert nc.table_has_column(db, 'stuff', name) == result
38
39 def test_connection_index_exists(db, table_factory, temp_db_cursor):
40     assert not nc.index_exists(db, 'some_index')
41
42     table_factory('foobar')
43     temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
44
45     assert nc.index_exists(db, 'some_index')
46     assert nc.index_exists(db, 'some_index', table='foobar')
47     assert not nc.index_exists(db, 'some_index', table='bar')
48
49
50 def test_drop_table_existing(db, table_factory):
51     table_factory('dummy')
52     assert nc.table_exists(db, 'dummy')
53
54     nc.drop_tables(db, 'dummy')
55     assert not nc.table_exists(db, 'dummy')
56
57
58 def test_drop_table_non_existing(db):
59     nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre')
60
61
62 def test_drop_many_tables(db, table_factory):
63     tables = [f'table{n}' for n in range(5)]
64
65     for t in tables:
66         table_factory(t)
67         assert nc.table_exists(db, t)
68
69     nc.drop_tables(db, *tables)
70
71     for t in tables:
72         assert not nc.table_exists(db, t)
73
74
75 def test_drop_table_non_existing_force(db):
76     with pytest.raises(psycopg.ProgrammingError, match='.*does not exist.*'):
77         nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre', if_exists=False)
78
79 def test_connection_server_version_tuple(db):
80     ver = nc.server_version_tuple(db)
81
82     assert isinstance(ver, tuple)
83     assert len(ver) == 2
84     assert ver[0] > 8
85
86
87 def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
88     ver = nc.postgis_version_tuple(db)
89
90     assert isinstance(ver, tuple)
91     assert len(ver) == 2
92     assert ver[0] >= 2
93
94
95 def test_cursor_scalar(db, table_factory):
96     table_factory('dummy')
97
98     assert nc.execute_scalar(db, 'SELECT count(*) FROM dummy') == 0
99
100
101 def test_cursor_scalar_many_rows(db):
102     with pytest.raises(RuntimeError, match='Query did not return a single row.'):
103         nc.execute_scalar(db, 'SELECT * FROM pg_tables')
104
105
106 def test_cursor_scalar_no_rows(db, table_factory):
107     table_factory('dummy')
108
109     with pytest.raises(RuntimeError, match='Query did not return a single row.'):
110         nc.execute_scalar(db, 'SELECT id FROM dummy')
111
112
113 def test_get_pg_env_add_variable(monkeypatch):
114     monkeypatch.delenv('PGPASSWORD', raising=False)
115     env = nc.get_pg_env('user=fooF')
116
117     assert env['PGUSER'] == 'fooF'
118     assert 'PGPASSWORD' not in env
119
120
121 def test_get_pg_env_overwrite_variable(monkeypatch):
122     monkeypatch.setenv('PGUSER', 'some default')
123     env = nc.get_pg_env('user=overwriter')
124
125     assert env['PGUSER'] == 'overwriter'
126
127
128 def test_get_pg_env_ignore_unknown():
129     env = nc.get_pg_env('client_encoding=stuff', base_env={})
130
131     assert env == {}