- def __init__(self, options):
- self.current_query = None
- self.current_params = None
-
- self.conn = None
- self.connect()
-
- def connect(self):
- if self.conn is not None:
- self.cursor.close()
- self.conn.close()
-
- self.conn = make_connection(options, asynchronous=True)
- self.wait()
-
- self.cursor = self.conn.cursor()
- # Disable JIT and parallel workers as they are known to cause problems.
- # Update pg_settings instead of using SET because it does not yield
- # errors on older versions of Postgres where the settings are not
- # implemented.
- self.perform(
- """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost';
- UPDATE pg_settings SET setting = 0
- WHERE name = 'max_parallel_workers_per_gather';""")
- self.wait()
-
- def wait(self):
- """ Block until any pending operation is done.
- """
- while True:
- try:
- wait_select(self.conn)
- self.current_query = None
- return
- except psycopg2.extensions.TransactionRollbackError as e:
- if e.pgcode == '40P01':
- log.info("Deadlock detected (params = {}), retry."
- .format(self.current_params))
- self.cursor.execute(self.current_query, self.current_params)
- else:
- raise
- except psycopg2.errors.DeadlockDetected:
- self.cursor.execute(self.current_query, self.current_params)
-
- def perform(self, sql, args=None):
- """ Send SQL query to the server. Returns immediately without
- blocking.
- """
- self.current_query = sql
- self.current_params = args
- self.cursor.execute(sql, args)
-
- def fileno(self):
- """ File descriptor to wait for. (Makes this class select()able.)
- """
- return self.conn.fileno()