1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim.
4 # Copyright (C) 2021 by the Nominatim developer community.
5 # For a full list of authors see the git log.
6 """ Database helper functions for the indexer.
10 from psycopg2.extras import wait_select
12 LOG = logging.getLogger()
15 """ A single non-blocking database connection.
18 def __init__(self, dsn):
19 self.current_query = None
20 self.current_params = None
28 """ (Re)connect to the database. Creates an asynchronous connection
29 with JIT and parallel processing disabled. If a connection was
30 already open, it is closed and a new connection established.
31 The caller must ensure that no query is pending before reconnecting.
33 if self.conn is not None:
37 # Use a dict to hand in the parameters because async is a reserved
39 self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
42 self.cursor = self.conn.cursor()
43 # Disable JIT and parallel workers as they are known to cause problems.
44 # Update pg_settings instead of using SET because it does not yield
45 # errors on older versions of Postgres where the settings are not
48 """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
49 UPDATE pg_settings SET setting = 0
50 WHERE name = 'max_parallel_workers_per_gather';""")
54 """ Block until any pending operation is done.
58 wait_select(self.conn)
59 self.current_query = None
61 except psycopg2.extensions.TransactionRollbackError as error:
62 if error.pgcode == '40P01':
63 LOG.info("Deadlock detected (params = %s), retry.",
64 str(self.current_params))
65 self.cursor.execute(self.current_query, self.current_params)
68 except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
69 self.cursor.execute(self.current_query, self.current_params)
71 def perform(self, sql, args=None):
72 """ Send SQL query to the server. Returns immediately without
75 self.current_query = sql
76 self.current_params = args
77 self.cursor.execute(sql, args)
80 """ File descriptor to wait for. (Makes this class select()able.)
82 return self.conn.fileno()
85 """ Check if the connection is available for a new query.
87 Also checks if the previous query has run into a deadlock.
88 If so, then the previous query is repeated.
90 if self.current_query is None:
94 if self.conn.poll() == psycopg2.extensions.POLL_OK:
95 self.current_query = None
97 except psycopg2.extensions.TransactionRollbackError as error:
98 if error.pgcode == '40P01':
99 LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
100 self.cursor.execute(self.current_query, self.current_params)
103 except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
104 self.cursor.execute(self.current_query, self.current_params)