X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/ffc2d82b0ed150d52a718dc563f9399062e579a7..6ce6f62b8ef7cec8b5950293516845e319dd5f06:/nominatim/indexer/indexer.py diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index d0c6ea0c..d685e83a 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -3,6 +3,7 @@ Main work horse for indexing (computing addresses) the database. """ import logging import select +import time import psycopg2.extras @@ -183,28 +184,70 @@ class Indexer: total_tuples = cur.scalar(runner.sql_count_objects()) LOG.debug("Total number of rows: %i", total_tuples) + # need to fetch those manually because register_hstore cannot + # fetch them on an asynchronous connection below. + hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid") + hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid") + conn.commit() progress = ProgressLogger(runner.name(), total_tuples) + fetcher_wait = 0 + pool_wait = 0 + if total_tuples > 0: with conn.cursor(name='places') as cur: cur.execute(runner.sql_get_objects()) + fetcher = DBConnection(self.dsn, cursor_factory=psycopg2.extras.DictCursor) + psycopg2.extras.register_hstore(fetcher.conn, + oid=hstore_oid, + array_oid=hstore_array_oid) + with WorkerPool(self.dsn, self.num_threads) as pool: - while True: - places = cur.fetchmany(batch) + places = self._fetch_next_batch(cur, fetcher, runner) + while places is not None: if not places: - break - - LOG.debug("Processing places: %s", str(places)) - worker = pool.next_free_worker() - - runner.index_places(worker, places) - progress.add(len(places)) + t0 = time.time() + fetcher.wait() + fetcher_wait += time.time() - t0 + places = fetcher.cursor.fetchall() + + # asynchronously get the next batch + next_places = self._fetch_next_batch(cur, fetcher, runner) + + # And insert the curent batch + for idx in range(0, len(places), batch): + t0 = time.time() + worker = pool.next_free_worker() + pool_wait += time.time() - t0 + part = places[idx:idx+batch] + LOG.debug("Processing places: %s", str(part)) + runner.index_places(worker, part) + progress.add(len(part)) + + places = next_places pool.finish_all() + fetcher.wait() + fetcher.close() + conn.commit() progress.done() + LOG.warning("Wait time: fetcher: {}s, pool: {}s".format(fetcher_wait, pool_wait)) + + + def _fetch_next_batch(self, cur, fetcher, runner): + ids = cur.fetchmany(100) + + if not ids: + return None + + if not hasattr(runner, 'get_place_details'): + return ids + + runner.get_place_details(fetcher, ids) + return []