1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for function providing a non-blocking query interface towards PostgreSQL.
10 from contextlib import closing
11 import concurrent.futures
16 from nominatim_db.db.async_connection import DBConnection, DeadlockHandler
21 with closing(DBConnection('dbname=' + temp_db)) as connection:
26 def simple_conns(temp_db):
27 conn1 = psycopg2.connect('dbname=' + temp_db)
28 conn2 = psycopg2.connect('dbname=' + temp_db)
30 yield conn1.cursor(), conn2.cursor()
36 def test_simple_query(conn, temp_db_conn):
39 conn.perform('CREATE TABLE foo (id INT)')
42 temp_db_conn.table_exists('foo')
45 def test_wait_for_query(conn):
48 conn.perform('SELECT pg_sleep(1)')
50 assert not conn.is_done()
55 def test_bad_query(conn):
58 conn.perform('SELECT efasfjsea')
60 with pytest.raises(psycopg2.ProgrammingError):
64 def test_bad_query_ignore(temp_db):
65 with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
68 conn.perform('SELECT efasfjsea')
73 def exec_with_deadlock(cur, sql, detector):
74 with DeadlockHandler(lambda *args: detector.append(1)):
78 def test_deadlock(simple_conns):
79 cur1, cur2 = simple_conns
81 cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
82 INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
83 cur1.connection.commit()
85 cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
86 cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
88 # This is the tricky part of the test. The first SQL command runs into
89 # a lock and blocks, so we have to run it in a separate thread. When the
90 # second deadlocking SQL statement is issued, Postgresql will abort one of
91 # the two transactions that cause the deadlock. There is no way to tell
92 # which one of the two. Therefore wrap both in a DeadlockHandler and
93 # expect that exactly one of the two triggers.
94 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
97 future = executor.submit(exec_with_deadlock, cur2,
98 "UPDATE t1 SET t = 'y' WHERE id = 1",
101 while not future.running():
105 exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
108 # Whatever happens, make sure the deadlock gets resolved.
109 cur1.connection.rollback()
113 assert len(deadlock_check) == 1