1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim.
4 # Copyright (C) 2020 Sarah Hoffmann
8 from psycopg2.extras import wait_select
10 log = logging.getLogger()
12 def make_connection(options, asynchronous=False):
13 params = {'dbname' : options.dbname,
14 'user' : options.user,
15 'password' : options.password,
16 'host' : options.host,
17 'port' : options.port,
18 'async' : asynchronous}
20 return psycopg2.connect(**params)
22 class DBConnection(object):
23 """ A single non-blocking database connection.
26 def __init__(self, options):
27 self.current_query = None
28 self.current_params = None
29 self.options = options
35 """ (Re)connect to the database. Creates an asynchronous connection
36 with JIT and parallel processing disabled. If a connection was
37 already open, it is closed and a new connection established.
38 The caller must ensure that no query is pending before reconnecting.
40 if self.conn is not None:
44 self.conn = make_connection(self.options, asynchronous=True)
47 self.cursor = self.conn.cursor()
48 # Disable JIT and parallel workers as they are known to cause problems.
49 # Update pg_settings instead of using SET because it does not yield
50 # errors on older versions of Postgres where the settings are not
53 """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
54 UPDATE pg_settings SET setting = 0
55 WHERE name = 'max_parallel_workers_per_gather';""")
59 """ Block until any pending operation is done.
63 wait_select(self.conn)
64 self.current_query = None
66 except psycopg2.extensions.TransactionRollbackError as e:
67 if e.pgcode == '40P01':
68 log.info("Deadlock detected (params = {}), retry."
69 .format(self.current_params))
70 self.cursor.execute(self.current_query, self.current_params)
73 except psycopg2.errors.DeadlockDetected:
74 self.cursor.execute(self.current_query, self.current_params)
76 def perform(self, sql, args=None):
77 """ Send SQL query to the server. Returns immediately without
80 self.current_query = sql
81 self.current_params = args
82 self.cursor.execute(sql, args)
85 """ File descriptor to wait for. (Makes this class select()able.)
87 return self.conn.fileno()
90 """ Check if the connection is available for a new query.
92 Also checks if the previous query has run into a deadlock.
93 If so, then the previous query is repeated.
95 if self.current_query is None:
99 if self.conn.poll() == psycopg2.extensions.POLL_OK:
100 self.current_query = None
102 except psycopg2.extensions.TransactionRollbackError as e:
103 if e.pgcode == '40P01':
104 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
105 self.cursor.execute(self.current_query, self.current_params)
108 except psycopg2.errors.DeadlockDetected:
109 self.cursor.execute(self.current_query, self.current_params)