2 Tests for function providing a non-blocking query interface towards PostgreSQL.
4 from contextlib import closing
5 import concurrent.futures
9 from psycopg2.extras import wait_select
11 from nominatim.db.async_connection import DBConnection, DeadlockHandler
16 with closing(DBConnection('dbname=' + temp_db)) as c:
21 def simple_conns(temp_db):
22 conn1 = psycopg2.connect('dbname=' + temp_db)
23 conn2 = psycopg2.connect('dbname=' + temp_db)
25 yield conn1.cursor(), conn2.cursor()
31 def test_simple_query(conn, temp_db_conn):
34 conn.perform('CREATE TABLE foo (id INT)')
37 temp_db_conn.table_exists('foo')
40 def test_wait_for_query(conn):
43 conn.perform('SELECT pg_sleep(1)')
45 assert not conn.is_done()
50 def test_bad_query(conn):
53 conn.perform('SELECT efasfjsea')
55 with pytest.raises(psycopg2.ProgrammingError):
59 def test_bad_query_ignore(temp_db):
60 with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
63 conn.perform('SELECT efasfjsea')
68 def exec_with_deadlock(cur, sql, detector):
69 with DeadlockHandler(lambda *args: detector.append(1)):
73 def test_deadlock(simple_conns):
74 cur1, cur2 = simple_conns
76 cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
77 INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
78 cur1.connection.commit()
80 cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
81 cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
83 # This is the tricky part of the test. The first SQL command runs into
84 # a lock and blocks, so we have to run it in a separate thread. When the
85 # second deadlocking SQL statement is issued, Postgresql will abort one of
86 # the two transactions that cause the deadlock. There is no way to tell
87 # which one of the two. Therefore wrap both in a DeadlockHandler and
88 # expect that exactly one of the two triggers.
89 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
92 future = executor.submit(exec_with_deadlock, cur2,
93 "UPDATE t1 SET t = 'y' WHERE id = 1",
96 while not future.running():
100 exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
103 # Whatever happens, make sure the deadlock gets resolved.
104 cur1.connection.rollback()
108 assert len(deadlock_check) == 1