]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_db_async_connection.py
only instantiate indexer once for replication
[nominatim.git] / test / python / test_db_async_connection.py
1 """
2 Tests for function providing a non-blocking query interface towards PostgreSQL.
3 """
4 from contextlib import closing
5 import concurrent.futures
6
7 import pytest
8 import psycopg2
9
10 from nominatim.db.async_connection import DBConnection, DeadlockHandler
11
12
13 @pytest.fixture
14 def conn(temp_db):
15     with closing(DBConnection('dbname=' + temp_db)) as connection:
16         yield connection
17
18
19 @pytest.fixture
20 def simple_conns(temp_db):
21     conn1 = psycopg2.connect('dbname=' + temp_db)
22     conn2 = psycopg2.connect('dbname=' + temp_db)
23
24     yield conn1.cursor(), conn2.cursor()
25
26     conn1.close()
27     conn2.close()
28
29
30 def test_simple_query(conn, temp_db_conn):
31     conn.connect()
32
33     conn.perform('CREATE TABLE foo (id INT)')
34     conn.wait()
35
36     temp_db_conn.table_exists('foo')
37
38
39 def test_wait_for_query(conn):
40     conn.connect()
41
42     conn.perform('SELECT pg_sleep(1)')
43
44     assert not conn.is_done()
45
46     conn.wait()
47
48
49 def test_bad_query(conn):
50     conn.connect()
51
52     conn.perform('SELECT efasfjsea')
53
54     with pytest.raises(psycopg2.ProgrammingError):
55         conn.wait()
56
57
58 def test_bad_query_ignore(temp_db):
59     with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
60         conn.connect()
61
62         conn.perform('SELECT efasfjsea')
63
64         conn.wait()
65
66
67 def exec_with_deadlock(cur, sql, detector):
68     with DeadlockHandler(lambda *args: detector.append(1)):
69         cur.execute(sql)
70
71
72 def test_deadlock(simple_conns):
73     cur1, cur2 = simple_conns
74
75     cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
76                     INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
77     cur1.connection.commit()
78
79     cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
80     cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
81
82     # This is the tricky part of the test. The first SQL command runs into
83     # a lock and blocks, so we have to run it in a separate thread. When the
84     # second deadlocking SQL statement is issued, Postgresql will abort one of
85     # the two transactions that cause the deadlock. There is no way to tell
86     # which one of the two. Therefore wrap both in a DeadlockHandler and
87     # expect that exactly one of the two triggers.
88     with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
89         deadlock_check = []
90         try:
91             future = executor.submit(exec_with_deadlock, cur2,
92                                      "UPDATE t1 SET t = 'y' WHERE id = 1",
93                                      deadlock_check)
94
95             while not future.running():
96                 pass
97
98
99             exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
100                                deadlock_check)
101         finally:
102             # Whatever happens, make sure the deadlock gets resolved.
103             cur1.connection.rollback()
104
105         future.result()
106
107         assert len(deadlock_check) == 1