]> git.openstreetmap.org Git - nominatim.git/commitdiff
use generator for thread choice
authorSarah Hoffmann <lonvia@denofr.de>
Sun, 19 Jan 2020 13:51:15 +0000 (14:51 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Fri, 24 Jan 2020 21:06:30 +0000 (22:06 +0100)
nominatim/nominatim.py

index 023b4b23b9ff2fa6614e29ad6130cbd82269c5b8..6b25cf5c0623217c7cddebf3de468f2b0ec69658 100644 (file)
@@ -56,6 +56,7 @@ class IndexingThread(object):
         self.wait()
 
         self.current_query = None
+        self.current_params = None
 
     def wait(self):
         wait_select(self.conn)
@@ -63,8 +64,12 @@ class IndexingThread(object):
 
     def perform(self, sql, args=None):
         self.current_query = sql
+        self.current_params = args
         self.cursor.execute(sql, args)
 
+    def fileno(self):
+        return self.conn.fileno()
+
     def is_done(self):
         if self.current_query is None:
             return True
@@ -78,7 +83,7 @@ class IndexingThread(object):
                 raise RuntimeError("Postgres exception has no error code")
             if e.pgcode == '40P01':
                 log.info("Deadlock detected, retry.")
-                self.cursor.execute(self.sql)
+                self.cursor.execute(self.current_query, self.current_params)
             else:
                 raise
 
@@ -95,8 +100,7 @@ class Indexer(object):
         for i in range(options.threads):
             t = IndexingThread(i, options)
             self.threads.append(t)
-            self.poll.register(t.conn.fileno(), select.EPOLLIN)
-        self.next_thread = 0
+            self.poll.register(t, select.EPOLLIN)
 
     def run(self):
         log.info("Starting indexing rank ({} to {}) using {} threads".format(
@@ -123,6 +127,7 @@ class Indexer(object):
 
         cur.scroll(0, mode='absolute')
 
+        next_thread = self.find_free_thread()
         done_tuples = 0
         rank_start_time = datetime.now()
         for r in cur:
@@ -141,7 +146,7 @@ class Indexer(object):
             for place in pcur:
                 place_id = place[0]
                 log.debug("Processing place {}".format(place_id))
-                thread = self.find_free_thread()
+                thread = next(next_thread)
 
                 thread.perform(obj.sql_index_place(), (place_id,))
                 done_tuples += 1
@@ -164,12 +169,19 @@ class Indexer(object):
                  done_tuples/diff_seconds, obj.name()))
 
     def find_free_thread(self):
-        while True:
-            for t in self.threads:
-                if t.is_done():
-                    return t
+        thread_lookup = { t.fileno() : t for t in self.threads}
 
-            self.poll.poll()
+        done_fids = [ t.fileno() for t in self.threads ]
+
+        while True:
+            for fid in done_fids:
+                thread = thread_lookup[fid]
+                if thread.is_done():
+                    yield thread
+                else:
+                    print("not good", fid)
+
+            done_fids = [ x[0] for x in self.poll.poll()]
 
         assert(False, "Unreachable code")