]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/db/async_connection.py
85b844312ba271a638be8ef084dbe34a9274889d
[nominatim.git] / nominatim / db / async_connection.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim.
4 # Copyright (C) 2021 by the Nominatim developer community.
5 # For a full list of authors see the git log.
6 """ Database helper functions for the indexer.
7 """
8 import logging
9 import psycopg2
10 from psycopg2.extras import wait_select
11
12 LOG = logging.getLogger()
13
14 def make_connection(options, asynchronous=False):
15     """ Create a psycopg2 connection from the given options.
16     """
17     params = {'dbname' : options.dbname,
18               'user' : options.user,
19               'password' : options.password,
20               'host' : options.host,
21               'port' : options.port,
22               'async' : asynchronous}
23
24     return psycopg2.connect(**params)
25
26 class DBConnection:
27     """ A single non-blocking database connection.
28     """
29
30     def __init__(self, options):
31         self.current_query = None
32         self.current_params = None
33         self.options = options
34
35         self.conn = None
36         self.cursor = None
37         self.connect()
38
39     def connect(self):
40         """ (Re)connect to the database. Creates an asynchronous connection
41             with JIT and parallel processing disabled. If a connection was
42             already open, it is closed and a new connection established.
43             The caller must ensure that no query is pending before reconnecting.
44         """
45         if self.conn is not None:
46             self.cursor.close()
47             self.conn.close()
48
49         self.conn = make_connection(self.options, asynchronous=True)
50         self.wait()
51
52         self.cursor = self.conn.cursor()
53         # Disable JIT and parallel workers as they are known to cause problems.
54         # Update pg_settings instead of using SET because it does not yield
55         # errors on older versions of Postgres where the settings are not
56         # implemented.
57         self.perform(
58             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
59                 UPDATE pg_settings SET setting = 0
60                    WHERE name = 'max_parallel_workers_per_gather';""")
61         self.wait()
62
63     def wait(self):
64         """ Block until any pending operation is done.
65         """
66         while True:
67             try:
68                 wait_select(self.conn)
69                 self.current_query = None
70                 return
71             except psycopg2.extensions.TransactionRollbackError as error:
72                 if error.pgcode == '40P01':
73                     LOG.info("Deadlock detected (params = %s), retry.",
74                              str(self.current_params))
75                     self.cursor.execute(self.current_query, self.current_params)
76                 else:
77                     raise
78             except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
79                 self.cursor.execute(self.current_query, self.current_params)
80
81     def perform(self, sql, args=None):
82         """ Send SQL query to the server. Returns immediately without
83             blocking.
84         """
85         self.current_query = sql
86         self.current_params = args
87         self.cursor.execute(sql, args)
88
89     def fileno(self):
90         """ File descriptor to wait for. (Makes this class select()able.)
91         """
92         return self.conn.fileno()
93
94     def is_done(self):
95         """ Check if the connection is available for a new query.
96
97             Also checks if the previous query has run into a deadlock.
98             If so, then the previous query is repeated.
99         """
100         if self.current_query is None:
101             return True
102
103         try:
104             if self.conn.poll() == psycopg2.extensions.POLL_OK:
105                 self.current_query = None
106                 return True
107         except psycopg2.extensions.TransactionRollbackError as error:
108             if error.pgcode == '40P01':
109                 LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
110                 self.cursor.execute(self.current_query, self.current_params)
111             else:
112                 raise
113         except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
114             self.cursor.execute(self.current_query, self.current_params)
115
116         return False