-def make_connection(options, asynchronous=False):
- return psycopg2.connect(dbname=options.dbname, user=options.user,
- password=options.password, host=options.host,
- port=options.port, async_=asynchronous)
-
-class IndexingThread(object):
-
- def __init__(self, thread_num, options):
- log.debug("Creating thread {}".format(thread_num))
- self.thread_num = thread_num
- self.conn = make_connection(options, asynchronous=True)
- self.wait()
-
- self.cursor = self.conn.cursor()
- self.perform("SET lc_messages TO 'C'")
- self.wait()
-
- self.current_query = None
-
- def wait(self):
- wait_select(self.conn)
- self.current_query = None
-
- def perform(self, sql, args=None):
- self.current_query = sql
- self.cursor.execute(sql, args)
-
- def is_done(self):
- if self.current_query is None:
- return True
-
- try:
- if self.conn.poll() == psycopg2.extensions.POLL_OK:
- self.current_query = None
- return True
- except psycopg2.extensions.TransactionRollbackError as e:
- if e.pgcode is None:
- raise RuntimeError("Postgres exception has no error code")
- if e.pgcode == '40P01':
- log.info("Deadlock detected, retry.")
- self.cursor.execute(sql)
- else:
- raise