]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_db_async_connection.py
Merge pull request #2326 from lonvia/wokerpool-for-tiger-data
[nominatim.git] / test / python / test_db_async_connection.py
1 """
2 Tests for function providing a non-blocking query interface towards PostgreSQL.
3 """
4 from contextlib import closing
5 import concurrent.futures
6
7 import pytest
8 import psycopg2
9 from psycopg2.extras import wait_select
10
11 from nominatim.db.async_connection import DBConnection, DeadlockHandler
12
13
14 @pytest.fixture
15 def conn(temp_db):
16     with closing(DBConnection('dbname=' + temp_db)) as c:
17         yield c
18
19
20 @pytest.fixture
21 def simple_conns(temp_db):
22     conn1 = psycopg2.connect('dbname=' + temp_db)
23     conn2 = psycopg2.connect('dbname=' + temp_db)
24
25     yield conn1.cursor(), conn2.cursor()
26
27     conn1.close()
28     conn2.close()
29
30
31 def test_simple_query(conn, temp_db_conn):
32     conn.connect()
33
34     conn.perform('CREATE TABLE foo (id INT)')
35     conn.wait()
36
37     temp_db_conn.table_exists('foo')
38
39
40 def test_wait_for_query(conn):
41     conn.connect()
42
43     conn.perform('SELECT pg_sleep(1)')
44
45     assert not conn.is_done()
46
47     conn.wait()
48
49
50 def test_bad_query(conn):
51     conn.connect()
52
53     conn.perform('SELECT efasfjsea')
54
55     with pytest.raises(psycopg2.ProgrammingError):
56         conn.wait()
57
58
59 def test_bad_query_ignore(temp_db):
60     with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
61         conn.connect()
62
63         conn.perform('SELECT efasfjsea')
64
65         conn.wait()
66
67
68 def exec_with_deadlock(cur, sql, detector):
69     with DeadlockHandler(lambda *args: detector.append(1)):
70         cur.execute(sql)
71
72
73 def test_deadlock(simple_conns):
74     cur1, cur2 = simple_conns
75
76     cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
77                     INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
78     cur1.connection.commit()
79
80     cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
81     cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
82
83     # This is the tricky part of the test. The first SQL command runs into
84     # a lock and blocks, so we have to run it in a separate thread. When the
85     # second deadlocking SQL statement is issued, Postgresql will abort one of
86     # the two transactions that cause the deadlock. There is no way to tell
87     # which one of the two. Therefore wrap both in a DeadlockHandler and
88     # expect that exactly one of the two triggers.
89     with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
90         deadlock_check = []
91         try:
92             future = executor.submit(exec_with_deadlock, cur2,
93                                      "UPDATE t1 SET t = 'y' WHERE id = 1",
94                                      deadlock_check)
95
96             while not future.running():
97                 pass
98
99
100             exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
101                                deadlock_check)
102         finally:
103             # Whatever happens, make sure the deadlock gets resolved.
104             cur1.connection.rollback()
105
106         future.result()
107
108         assert len(deadlock_check) == 1
109
110