From 602728895e7c35ed1f434acb8af6acfaa66033f3 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Thu, 29 Apr 2021 21:57:43 +0200 Subject: [PATCH] indexer: fetch ids in batches --- nominatim/indexer/indexer.py | 46 ++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index d0c6ea0c..41535af8 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -183,6 +183,9 @@ class Indexer: total_tuples = cur.scalar(runner.sql_count_objects()) LOG.debug("Total number of rows: %i", total_tuples) + hstore_oid = cur.scalar("SELECT 'hstore'::regtype::oid") + hstore_array_oid = cur.scalar("SELECT 'hstore[]'::regtype::oid") + conn.commit() progress = ProgressLogger(runner.name(), total_tuples) @@ -191,20 +194,49 @@ class Indexer: with conn.cursor(name='places') as cur: cur.execute(runner.sql_get_objects()) + fetcher = DBConnection(self.dsn) + psycopg2.extras.register_hstore(fetcher.conn, + oid=hstore_oid, + array_oid=hstore_array_oid) + with WorkerPool(self.dsn, self.num_threads) as pool: - while True: - places = cur.fetchmany(batch) + places = self._fetch_next_batch(cur, fetcher, runner) + while places is not None: if not places: - break + fetcher.wait() + places = fetcher.cursor.fetchall() + + # asynchronously get the next batch + next_places = self._fetch_next_batch(cur, fetcher, runner) - LOG.debug("Processing places: %s", str(places)) - worker = pool.next_free_worker() + # And insert the curent batch + for idx in range(0, len(places), batch): + worker = pool.next_free_worker() + part = places[idx:idx+batch] + LOG.debug("Processing places: %s", str(part)) + runner.index_places(worker, part) + progress.add(len(part)) - runner.index_places(worker, places) - progress.add(len(places)) + places = next_places pool.finish_all() + fetcher.wait() + fetcher.close() + conn.commit() progress.done() + + + def _fetch_next_batch(self, cur, fetcher, runner): + ids = cur.fetchmany(1000) + + if not ids: + return None + + if not hasattr(runner, 'get_place_details'): + return ids + + runner.get_place_details(fetcher, ids) + return [] -- 2.39.5