]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/async_connection.py
Merge pull request #2305 from lonvia/tokenizer
[nominatim.git] / nominatim / db / async_connection.py
index 45e83664663ba835419db49304209b9da6491d35..a4f554965ab9144a89bb609296ee80ca82049ac3 100644 (file)
@@ -9,37 +9,77 @@ import logging
 import psycopg2
 from psycopg2.extras import wait_select
 
 import psycopg2
 from psycopg2.extras import wait_select
 
+# psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
+# module is available and adapt the error handling accordingly.
+try:
+    import psycopg2.errors # pylint: disable=no-name-in-module,import-error
+    __has_psycopg2_errors__ = True
+except ImportError:
+    __has_psycopg2_errors__ = False
+
 LOG = logging.getLogger()
 
 LOG = logging.getLogger()
 
+class DeadlockHandler:
+    """ Context manager that catches deadlock exceptions and calls
+        the given handler function. All other exceptions are passed on
+        normally.
+    """
+
+    def __init__(self, handler):
+        self.handler = handler
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        if __has_psycopg2_errors__:
+            if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
+                self.handler()
+                return True
+        else:
+            if exc_type == psycopg2.extensions.TransactionRollbackError:
+                if exc_value.pgcode == '40P01':
+                    self.handler()
+                    return True
+        return False
+
+
 class DBConnection:
     """ A single non-blocking database connection.
     """
 
 class DBConnection:
     """ A single non-blocking database connection.
     """
 
-    def __init__(self, dsn):
+    def __init__(self, dsn, cursor_factory=None):
         self.current_query = None
         self.current_params = None
         self.dsn = dsn
 
         self.conn = None
         self.cursor = None
         self.current_query = None
         self.current_params = None
         self.dsn = dsn
 
         self.conn = None
         self.cursor = None
-        self.connect()
+        self.connect(cursor_factory=cursor_factory)
+
+    def close(self):
+        """ Close all open connections. Does not wait for pending requests.
+        """
+        if self.conn is not None:
+            self.cursor.close()
+            self.conn.close()
+
+        self.conn = None
 
 
-    def connect(self):
+    def connect(self, cursor_factory=None):
         """ (Re)connect to the database. Creates an asynchronous connection
             with JIT and parallel processing disabled. If a connection was
             already open, it is closed and a new connection established.
             The caller must ensure that no query is pending before reconnecting.
         """
         """ (Re)connect to the database. Creates an asynchronous connection
             with JIT and parallel processing disabled. If a connection was
             already open, it is closed and a new connection established.
             The caller must ensure that no query is pending before reconnecting.
         """
-        if self.conn is not None:
-            self.cursor.close()
-            self.conn.close()
+        self.close()
 
         # Use a dict to hand in the parameters because async is a reserved
         # word in Python3.
         self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
         self.wait()
 
 
         # Use a dict to hand in the parameters because async is a reserved
         # word in Python3.
         self.conn = psycopg2.connect(**{'dsn' : self.dsn, 'async' : True})
         self.wait()
 
-        self.cursor = self.conn.cursor()
+        self.cursor = self.conn.cursor(cursor_factory=cursor_factory)
         # Disable JIT and parallel workers as they are known to cause problems.
         # Update pg_settings instead of using SET because it does not yield
         # errors on older versions of Postgres where the settings are not
         # Disable JIT and parallel workers as they are known to cause problems.
         # Update pg_settings instead of using SET because it does not yield
         # errors on older versions of Postgres where the settings are not
@@ -50,23 +90,18 @@ class DBConnection:
                    WHERE name = 'max_parallel_workers_per_gather';""")
         self.wait()
 
                    WHERE name = 'max_parallel_workers_per_gather';""")
         self.wait()
 
+    def _deadlock_handler(self):
+        LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
+        self.cursor.execute(self.current_query, self.current_params)
+
     def wait(self):
         """ Block until any pending operation is done.
         """
         while True:
     def wait(self):
         """ Block until any pending operation is done.
         """
         while True:
-            try:
+            with DeadlockHandler(self._deadlock_handler):
                 wait_select(self.conn)
                 self.current_query = None
                 return
                 wait_select(self.conn)
                 self.current_query = None
                 return
-            except psycopg2.extensions.TransactionRollbackError as error:
-                if error.pgcode == '40P01':
-                    LOG.info("Deadlock detected (params = %s), retry.",
-                             str(self.current_params))
-                    self.cursor.execute(self.current_query, self.current_params)
-                else:
-                    raise
-            except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
-                self.cursor.execute(self.current_query, self.current_params)
 
     def perform(self, sql, args=None):
         """ Send SQL query to the server. Returns immediately without
 
     def perform(self, sql, args=None):
         """ Send SQL query to the server. Returns immediately without
@@ -90,17 +125,9 @@ class DBConnection:
         if self.current_query is None:
             return True
 
         if self.current_query is None:
             return True
 
-        try:
+        with DeadlockHandler(self._deadlock_handler):
             if self.conn.poll() == psycopg2.extensions.POLL_OK:
                 self.current_query = None
                 return True
             if self.conn.poll() == psycopg2.extensions.POLL_OK:
                 self.current_query = None
                 return True
-        except psycopg2.extensions.TransactionRollbackError as error:
-            if error.pgcode == '40P01':
-                LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
-                self.cursor.execute(self.current_query, self.current_params)
-            else:
-                raise
-        except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
-            self.cursor.execute(self.current_query, self.current_params)
 
         return False
 
         return False