-import psycopg2
-from psycopg2.extras import wait_select
-import threading
-from queue import Queue
-
-log = logging.getLogger()
-
-def make_connection(options, asynchronous=False):
- return psycopg2.connect(dbname=options.dbname, user=options.user,
- password=options.password, host=options.host,
- port=options.port, async_=asynchronous)
-
-class IndexingThread(threading.Thread):
-
- def __init__(self, queue, barrier, options):
- super().__init__()
- self.conn = make_connection(options)
- self.conn.autocommit = True
-
- self.cursor = self.conn.cursor()
- self.perform("SET lc_messages TO 'C'")
- self.perform(InterpolationRunner.prepare())
- self.perform(RankRunner.prepare())
- self.queue = queue
- self.barrier = barrier
-
- def run(self):
- sql = None
- while True:
- item = self.queue.get()
- if item is None:
- break
- elif isinstance(item, str):
- sql = item
- self.barrier.wait()
- else:
- self.perform(sql, (item,))
-
- def perform(self, sql, args=None):
- while True:
- try:
- self.cursor.execute(sql, args)
- return
- except psycopg2.extensions.TransactionRollbackError as e:
- if e.pgcode is None:
- raise RuntimeError("Postgres exception has no error code")
- if e.pgcode == '40P01':
- log.info("Deadlock detected, retry.")
- else:
- raise