2 Tests for function providing a non-blocking query interface towards PostgreSQL.
4 from contextlib import closing
5 import concurrent.futures
9 from psycopg2.extras import wait_select
11 from nominatim.db.async_connection import DBConnection, DeadlockHandler
16 with closing(DBConnection('dbname=' + temp_db)) as c:
21 def simple_conns(temp_db):
22 conn1 = psycopg2.connect('dbname=' + temp_db)
23 conn2 = psycopg2.connect('dbname=' + temp_db)
25 yield conn1.cursor(), conn2.cursor()
31 def test_simple_query(conn, temp_db_conn):
34 conn.perform('CREATE TABLE foo (id INT)')
37 temp_db_conn.table_exists('foo')
40 def test_wait_for_query(conn):
43 conn.perform('SELECT pg_sleep(1)')
45 assert not conn.is_done()
50 def test_bad_query(conn):
53 conn.perform('SELECT efasfjsea')
55 with pytest.raises(psycopg2.ProgrammingError):
59 def exec_with_deadlock(cur, sql, detector):
60 with DeadlockHandler(lambda *args: detector.append(1)):
64 def test_deadlock(simple_conns):
65 print(psycopg2.__version__)
66 cur1, cur2 = simple_conns
68 cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
69 INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
70 cur1.connection.commit()
72 cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
73 cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
75 # This is the tricky part of the test. The first SQL command runs into
76 # a lock and blocks, so we have to run it in a separate thread. When the
77 # second deadlocking SQL statement is issued, Postgresql will abort one of
78 # the two transactions that cause the deadlock. There is no way to tell
79 # which one of the two. Therefore wrap both in a DeadlockHandler and
80 # expect that exactly one of the two triggers.
81 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
84 future = executor.submit(exec_with_deadlock, cur2,
85 "UPDATE t1 SET t = 'y' WHERE id = 1",
88 while not future.running():
92 exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
95 # Whatever happens, make sure the deadlock gets resolved.
96 cur1.connection.rollback()
100 assert len(deadlock_check) == 1