self.wait()
self.current_query = None
+ self.current_params = None
def wait(self):
wait_select(self.conn)
def perform(self, sql, args=None):
self.current_query = sql
+ self.current_params = args
self.cursor.execute(sql, args)
+ def fileno(self):
+ return self.conn.fileno()
+
def is_done(self):
if self.current_query is None:
return True
raise RuntimeError("Postgres exception has no error code")
if e.pgcode == '40P01':
log.info("Deadlock detected, retry.")
- self.cursor.execute(self.sql)
+ self.cursor.execute(self.current_query, self.current_params)
else:
raise
for i in range(options.threads):
t = IndexingThread(i, options)
self.threads.append(t)
- self.poll.register(t.conn.fileno(), select.EPOLLIN)
- self.next_thread = 0
+ self.poll.register(t, select.EPOLLIN)
def run(self):
log.info("Starting indexing rank ({} to {}) using {} threads".format(
cur.scroll(0, mode='absolute')
+ next_thread = self.find_free_thread()
done_tuples = 0
rank_start_time = datetime.now()
for r in cur:
for place in pcur:
place_id = place[0]
log.debug("Processing place {}".format(place_id))
- thread = self.find_free_thread()
+ thread = next(next_thread)
thread.perform(obj.sql_index_place(), (place_id,))
done_tuples += 1
done_tuples/diff_seconds, obj.name()))
def find_free_thread(self):
- while True:
- for t in self.threads:
- if t.is_done():
- return t
+ thread_lookup = { t.fileno() : t for t in self.threads}
- self.poll.poll()
+ done_fids = [ t.fileno() for t in self.threads ]
+
+ while True:
+ for fid in done_fids:
+ thread = thread_lookup[fid]
+ if thread.is_done():
+ yield thread
+ else:
+ print("not good", fid)
+
+ done_fids = [ x[0] for x in self.poll.poll()]
assert(False, "Unreachable code")