]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/indexer/indexer.py
fetch place info asynchronously
[nominatim.git] / nominatim / indexer / indexer.py
index 41535af8f2ae47bc7753d2cba1280fc617d4581a..d685e83a1546097366faa41fe8bc580ad5b4e660 100644 (file)
@@ -3,6 +3,7 @@ Main work horse for indexing (computing addresses) the database.
 """
 import logging
 import select
+import time
 
 import psycopg2.extras
 
@@ -183,6 +184,8 @@ class Indexer:
                 total_tuples = cur.scalar(runner.sql_count_objects())
                 LOG.debug("Total number of rows: %i", total_tuples)
 
+                # need to fetch those manually because register_hstore cannot
+                # fetch them on an asynchronous connection below.
                 hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid")
                 hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid")
 
@@ -190,11 +193,14 @@ class Indexer:
 
             progress = ProgressLogger(runner.name(), total_tuples)
 
+            fetcher_wait = 0
+            pool_wait = 0
+
             if total_tuples > 0:
                 with conn.cursor(name='places') as cur:
                     cur.execute(runner.sql_get_objects())
 
-                    fetcher = DBConnection(self.dsn)
+                    fetcher = DBConnection(self.dsn, cursor_factory=psycopg2.extras.DictCursor)
                     psycopg2.extras.register_hstore(fetcher.conn,
                                                     oid=hstore_oid,
                                                     array_oid=hstore_array_oid)
@@ -203,7 +209,9 @@ class Indexer:
                         places = self._fetch_next_batch(cur, fetcher, runner)
                         while places is not None:
                             if not places:
+                                t0 = time.time()
                                 fetcher.wait()
+                                fetcher_wait += time.time() - t0
                                 places = fetcher.cursor.fetchall()
 
                             # asynchronously get the next batch
@@ -211,7 +219,9 @@ class Indexer:
 
                             # And insert the curent batch
                             for idx in range(0, len(places), batch):
+                                t0 = time.time()
                                 worker = pool.next_free_worker()
+                                pool_wait += time.time() - t0
                                 part = places[idx:idx+batch]
                                 LOG.debug("Processing places: %s", str(part))
                                 runner.index_places(worker, part)
@@ -227,10 +237,11 @@ class Indexer:
                 conn.commit()
 
         progress.done()
+        LOG.warning("Wait time: fetcher: {}s,  pool: {}s".format(fetcher_wait, pool_wait))
 
 
     def _fetch_next_batch(self, cur, fetcher, runner):
-        ids = cur.fetchmany(1000)
+        ids = cur.fetchmany(100)
 
         if not ids:
             return None