1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim.
4 # Copyright (C) 2021 by the Nominatim developer community.
5 # For a full list of authors see the git log.
6 """ Database helper functions for the indexer.
10 from psycopg2.extras import wait_select
12 # psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
13 # module is available and adapt the error handling accordingly.
15 import psycopg2.errors # pylint: disable=no-name-in-module,import-error
16 __has_psycopg2_errors__ = True
17 except ModuleNotFoundError:
18 __has_psycopg2_errors__ = False
20 LOG = logging.getLogger()
22 class DeadlockHandler:
23 """ Context manager that catches deadlock exceptions and calls
24 the given handler function. All other exceptions are passed on
28 def __init__(self, handler):
29 self.handler = handler
34 def __exit__(self, exc_type, exc_value, traceback):
35 if __has_psycopg2_errors__:
36 if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
40 if exc_type == psycopg2.extensions.TransactionRollbackError:
41 if exc_value.pgcode == '40P01':
48 """ A single non-blocking database connection.
51 def __init__(self, dsn):
52 self.current_query = None
53 self.current_params = None
61 """ Close all open connections. Does not wait for pending requests.
63 if self.conn is not None:
70 """ (Re)connect to the database. Creates an asynchronous connection
71 with JIT and parallel processing disabled. If a connection was
72 already open, it is closed and a new connection established.
73 The caller must ensure that no query is pending before reconnecting.
77 # Use a dict to hand in the parameters because async is a reserved
79 self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
82 self.cursor = self.conn.cursor()
83 # Disable JIT and parallel workers as they are known to cause problems.
84 # Update pg_settings instead of using SET because it does not yield
85 # errors on older versions of Postgres where the settings are not
88 """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
89 UPDATE pg_settings SET setting = 0
90 WHERE name = 'max_parallel_workers_per_gather';""")
93 def _deadlock_handler(self):
94 LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
95 self.cursor.execute(self.current_query, self.current_params)
98 """ Block until any pending operation is done.
101 with DeadlockHandler(self._deadlock_handler):
102 wait_select(self.conn)
103 self.current_query = None
106 def perform(self, sql, args=None):
107 """ Send SQL query to the server. Returns immediately without
110 self.current_query = sql
111 self.current_params = args
112 self.cursor.execute(sql, args)
115 """ File descriptor to wait for. (Makes this class select()able.)
117 return self.conn.fileno()
120 """ Check if the connection is available for a new query.
122 Also checks if the previous query has run into a deadlock.
123 If so, then the previous query is repeated.
125 if self.current_query is None:
128 with DeadlockHandler(self._deadlock_handler):
129 if self.conn.poll() == psycopg2.extensions.POLL_OK:
130 self.current_query = None