1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim.
4 # Copyright (C) 2021 by the Nominatim developer community.
5 # For a full list of authors see the git log.
6 """ Database helper functions for the indexer.
10 from psycopg2.extras import wait_select
12 LOG = logging.getLogger()
14 def make_connection(options, asynchronous=False):
15 """ Create a psycopg2 connection from the given options.
17 params = {'dbname' : options.dbname,
18 'user' : options.user,
19 'password' : options.password,
20 'host' : options.host,
21 'port' : options.port,
22 'async' : asynchronous}
24 return psycopg2.connect(**params)
27 """ A single non-blocking database connection.
30 def __init__(self, options):
31 self.current_query = None
32 self.current_params = None
33 self.options = options
40 """ (Re)connect to the database. Creates an asynchronous connection
41 with JIT and parallel processing disabled. If a connection was
42 already open, it is closed and a new connection established.
43 The caller must ensure that no query is pending before reconnecting.
45 if self.conn is not None:
49 self.conn = make_connection(self.options, asynchronous=True)
52 self.cursor = self.conn.cursor()
53 # Disable JIT and parallel workers as they are known to cause problems.
54 # Update pg_settings instead of using SET because it does not yield
55 # errors on older versions of Postgres where the settings are not
58 """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
59 UPDATE pg_settings SET setting = 0
60 WHERE name = 'max_parallel_workers_per_gather';""")
64 """ Block until any pending operation is done.
68 wait_select(self.conn)
69 self.current_query = None
71 except psycopg2.extensions.TransactionRollbackError as error:
72 if error.pgcode == '40P01':
73 LOG.info("Deadlock detected (params = %s), retry.",
74 str(self.current_params))
75 self.cursor.execute(self.current_query, self.current_params)
78 except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
79 self.cursor.execute(self.current_query, self.current_params)
81 def perform(self, sql, args=None):
82 """ Send SQL query to the server. Returns immediately without
85 self.current_query = sql
86 self.current_params = args
87 self.cursor.execute(sql, args)
90 """ File descriptor to wait for. (Makes this class select()able.)
92 return self.conn.fileno()
95 """ Check if the connection is available for a new query.
97 Also checks if the previous query has run into a deadlock.
98 If so, then the previous query is repeated.
100 if self.current_query is None:
104 if self.conn.poll() == psycopg2.extensions.POLL_OK:
105 self.current_query = None
107 except psycopg2.extensions.TransactionRollbackError as error:
108 if error.pgcode == '40P01':
109 LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
110 self.cursor.execute(self.current_query, self.current_params)
113 except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
114 self.cursor.execute(self.current_query, self.current_params)