]> git.openstreetmap.org Git - nominatim.git/blob - test/python/db/test_connection.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / db / test_connection.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for specialised connection and cursor classes.
9 """
10 import pytest
11 import psycopg
12
13 import nominatim_db.db.connection as nc
14
15
16 @pytest.fixture
17 def db(dsn):
18     with nc.connect(dsn) as conn:
19         yield conn
20
21
22 def test_connection_table_exists(db, table_factory):
23     assert not nc.table_exists(db, 'foobar')
24
25     table_factory('foobar')
26
27     assert nc.table_exists(db, 'foobar')
28
29
30 def test_has_column_no_table(db):
31     assert not nc.table_has_column(db, 'sometable', 'somecolumn')
32
33
34 @pytest.mark.parametrize('name,result', [('tram', True), ('car', False)])
35 def test_has_column(db, table_factory, name, result):
36     table_factory('stuff', 'tram TEXT')
37
38     assert nc.table_has_column(db, 'stuff', name) == result
39
40
41 def test_connection_index_exists(db, table_factory, temp_db_cursor):
42     assert not nc.index_exists(db, 'some_index')
43
44     table_factory('foobar')
45     temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
46
47     assert nc.index_exists(db, 'some_index')
48     assert nc.index_exists(db, 'some_index', table='foobar')
49     assert not nc.index_exists(db, 'some_index', table='bar')
50
51
52 def test_drop_table_existing(db, table_factory):
53     table_factory('dummy')
54     assert nc.table_exists(db, 'dummy')
55
56     nc.drop_tables(db, 'dummy')
57     assert not nc.table_exists(db, 'dummy')
58
59
60 def test_drop_table_non_existing(db):
61     nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre')
62
63
64 def test_drop_many_tables(db, table_factory):
65     tables = [f'table{n}' for n in range(5)]
66
67     for t in tables:
68         table_factory(t)
69         assert nc.table_exists(db, t)
70
71     nc.drop_tables(db, *tables)
72
73     for t in tables:
74         assert not nc.table_exists(db, t)
75
76
77 def test_drop_table_non_existing_force(db):
78     with pytest.raises(psycopg.ProgrammingError, match='.*does not exist.*'):
79         nc.drop_tables(db, 'dfkjgjriogjigjgjrdghehtre', if_exists=False)
80
81
82 def test_connection_server_version_tuple(db):
83     ver = nc.server_version_tuple(db)
84
85     assert isinstance(ver, tuple)
86     assert len(ver) == 2
87     assert ver[0] > 8
88
89
90 def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
91     ver = nc.postgis_version_tuple(db)
92
93     assert isinstance(ver, tuple)
94     assert len(ver) == 2
95     assert ver[0] >= 2
96
97
98 def test_cursor_scalar(db, table_factory):
99     table_factory('dummy')
100
101     assert nc.execute_scalar(db, 'SELECT count(*) FROM dummy') == 0
102
103
104 def test_cursor_scalar_many_rows(db):
105     with pytest.raises(RuntimeError, match='Query did not return a single row.'):
106         nc.execute_scalar(db, 'SELECT * FROM pg_tables')
107
108
109 def test_cursor_scalar_no_rows(db, table_factory):
110     table_factory('dummy')
111
112     with pytest.raises(RuntimeError, match='Query did not return a single row.'):
113         nc.execute_scalar(db, 'SELECT id FROM dummy')
114
115
116 def test_get_pg_env_add_variable(monkeypatch):
117     monkeypatch.delenv('PGPASSWORD', raising=False)
118     env = nc.get_pg_env('user=fooF')
119
120     assert env['PGUSER'] == 'fooF'
121     assert 'PGPASSWORD' not in env
122
123
124 def test_get_pg_env_overwrite_variable(monkeypatch):
125     monkeypatch.setenv('PGUSER', 'some default')
126     env = nc.get_pg_env('user=overwriter')
127
128     assert env['PGUSER'] == 'overwriter'
129
130
131 def test_get_pg_env_ignore_unknown():
132     env = nc.get_pg_env('client_encoding=stuff', base_env={})
133
134     assert env == {}