]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/indexer/db.py
only index larger batches for rank 30
[nominatim.git] / nominatim / indexer / db.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim.
4 # Copyright (C) 2020 Sarah Hoffmann
5
6 import logging
7 import psycopg2
8 from psycopg2.extras import wait_select
9
10 log = logging.getLogger()
11
12 def make_connection(options, asynchronous=False):
13     params = {'dbname' : options.dbname,
14               'user' : options.user,
15               'password' : options.password,
16               'host' : options.host,
17               'port' : options.port,
18               'async' : asynchronous}
19
20     return psycopg2.connect(**params)
21
22 class DBConnection(object):
23     """ A single non-blocking database connection.
24     """
25
26     def __init__(self, options):
27         self.current_query = None
28         self.current_params = None
29         self.options = options
30
31         self.conn = None
32         self.connect()
33
34     def connect(self):
35         """ (Re)connect to the database. Creates an asynchronous connection
36             with JIT and parallel processing disabled. If a connection was
37             already open, it is closed and a new connection established.
38             The caller must ensure that no query is pending before reconnecting.
39         """
40         if self.conn is not None:
41             self.cursor.close()
42             self.conn.close()
43
44         self.conn = make_connection(self.options, asynchronous=True)
45         self.wait()
46
47         self.cursor = self.conn.cursor()
48         # Disable JIT and parallel workers as they are known to cause problems.
49         # Update pg_settings instead of using SET because it does not yield
50         # errors on older versions of Postgres where the settings are not
51         # implemented.
52         self.perform(
53             """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
54                 UPDATE pg_settings SET setting = 0 
55                    WHERE name = 'max_parallel_workers_per_gather';""")
56         self.wait()
57
58     def wait(self):
59         """ Block until any pending operation is done.
60         """
61         while True:
62             try:
63                 wait_select(self.conn)
64                 self.current_query = None
65                 return
66             except psycopg2.extensions.TransactionRollbackError as e:
67                 if e.pgcode == '40P01':
68                     log.info("Deadlock detected (params = {}), retry."
69                               .format(self.current_params))
70                     self.cursor.execute(self.current_query, self.current_params)
71                 else:
72                     raise
73             except psycopg2.errors.DeadlockDetected:
74                 self.cursor.execute(self.current_query, self.current_params)
75
76     def perform(self, sql, args=None):
77         """ Send SQL query to the server. Returns immediately without
78             blocking.
79         """
80         self.current_query = sql
81         self.current_params = args
82         self.cursor.execute(sql, args)
83
84     def fileno(self):
85         """ File descriptor to wait for. (Makes this class select()able.)
86         """
87         return self.conn.fileno()
88
89     def is_done(self):
90         """ Check if the connection is available for a new query.
91
92             Also checks if the previous query has run into a deadlock.
93             If so, then the previous query is repeated.
94         """
95         if self.current_query is None:
96             return True
97
98         try:
99             if self.conn.poll() == psycopg2.extensions.POLL_OK:
100                 self.current_query = None
101                 return True
102         except psycopg2.extensions.TransactionRollbackError as e:
103             if e.pgcode == '40P01':
104                 log.info("Deadlock detected (params = {}), retry.".format(self.current_params))
105                 self.cursor.execute(self.current_query, self.current_params)
106             else:
107                 raise
108         except psycopg2.errors.DeadlockDetected:
109             self.cursor.execute(self.current_query, self.current_params)
110
111         return False
112