1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Non-blocking database connections.
9 from typing import Callable, Any, Optional, Iterator, Sequence
15 from psycopg2.extras import wait_select
17 # psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
18 # module is available and adapt the error handling accordingly.
20 import psycopg2.errors # pylint: disable=no-name-in-module,import-error
21 __has_psycopg2_errors__ = True
23 __has_psycopg2_errors__ = False
25 from ..typing import T_cursor, Query
27 LOG = logging.getLogger()
29 class DeadlockHandler:
30 """ Context manager that catches deadlock exceptions and calls
31 the given handler function. All other exceptions are passed on
35 def __init__(self, handler: Callable[[], None], ignore_sql_errors: bool = False) -> None:
36 self.handler = handler
37 self.ignore_sql_errors = ignore_sql_errors
39 def __enter__(self) -> 'DeadlockHandler':
42 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool:
43 if __has_psycopg2_errors__:
44 if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
47 elif exc_type == psycopg2.extensions.TransactionRollbackError \
48 and exc_value.pgcode == '40P01':
52 if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
53 LOG.info("SQL error ignored: %s", exc_value)
60 """ A single non-blocking database connection.
63 def __init__(self, dsn: str,
64 cursor_factory: Optional[Callable[..., T_cursor]] = None,
65 ignore_sql_errors: bool = False) -> None:
68 self.current_query: Optional[Query] = None
69 self.current_params: Optional[Sequence[Any]] = None
70 self.ignore_sql_errors = ignore_sql_errors
72 self.conn: Optional['psycopg2._psycopg.connection'] = None
73 self.cursor: Optional['psycopg2._psycopg.cursor'] = None
74 self.connect(cursor_factory=cursor_factory)
76 def close(self) -> None:
77 """ Close all open connections. Does not wait for pending requests.
79 if self.conn is not None:
80 if self.cursor is not None:
87 def connect(self, cursor_factory: Optional[Callable[..., T_cursor]] = None) -> None:
88 """ (Re)connect to the database. Creates an asynchronous connection
89 with JIT and parallel processing disabled. If a connection was
90 already open, it is closed and a new connection established.
91 The caller must ensure that no query is pending before reconnecting.
95 # Use a dict to hand in the parameters because async is a reserved
97 self.conn = psycopg2.connect(**{'dsn': self.dsn, 'async': True}) # type: ignore
101 if cursor_factory is not None:
102 self.cursor = self.conn.cursor(cursor_factory=cursor_factory)
104 self.cursor = self.conn.cursor()
105 # Disable JIT and parallel workers as they are known to cause problems.
106 # Update pg_settings instead of using SET because it does not yield
107 # errors on older versions of Postgres where the settings are not
110 """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
111 UPDATE pg_settings SET setting = 0
112 WHERE name = 'max_parallel_workers_per_gather';""")
115 def _deadlock_handler(self) -> None:
116 LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
117 assert self.cursor is not None
118 assert self.current_query is not None
119 assert self.current_params is not None
121 self.cursor.execute(self.current_query, self.current_params)
123 def wait(self) -> None:
124 """ Block until any pending operation is done.
127 with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
128 wait_select(self.conn)
129 self.current_query = None
132 def perform(self, sql: Query, args: Optional[Sequence[Any]] = None) -> None:
133 """ Send SQL query to the server. Returns immediately without
136 assert self.cursor is not None
137 self.current_query = sql
138 self.current_params = args
139 self.cursor.execute(sql, args)
141 def fileno(self) -> int:
142 """ File descriptor to wait for. (Makes this class select()able.)
144 assert self.conn is not None
145 return self.conn.fileno()
147 def is_done(self) -> bool:
148 """ Check if the connection is available for a new query.
150 Also checks if the previous query has run into a deadlock.
151 If so, then the previous query is repeated.
153 assert self.conn is not None
155 if self.current_query is None:
158 with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
159 if self.conn.poll() == psycopg2.extensions.POLL_OK:
160 self.current_query = None
167 """ A pool of asynchronous database connections.
169 The pool may be used as a context manager.
171 REOPEN_CONNECTIONS_AFTER = 100000
173 def __init__(self, dsn: str, pool_size: int, ignore_sql_errors: bool = False) -> None:
174 self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
175 for _ in range(pool_size)]
176 self.free_workers = self._yield_free_worker()
180 def finish_all(self) -> None:
181 """ Wait for all connection to finish.
183 for thread in self.threads:
184 while not thread.is_done():
187 self.free_workers = self._yield_free_worker()
189 def close(self) -> None:
190 """ Close all connections and clear the pool.
192 for thread in self.threads:
195 self.free_workers = iter([])
198 def next_free_worker(self) -> DBConnection:
199 """ Get the next free connection.
201 return next(self.free_workers)
204 def _yield_free_worker(self) -> Iterator[DBConnection]:
213 if command_stat > self.REOPEN_CONNECTIONS_AFTER:
214 self._reconnect_threads()
219 _, ready, _ = select.select([], self.threads, [])
220 self.wait_time += time.time() - tstart
223 def _reconnect_threads(self) -> None:
224 for thread in self.threads:
225 while not thread.is_done():
230 def __enter__(self) -> 'WorkerPool':
234 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: