]> git.openstreetmap.org Git - nominatim.git/commitdiff
Merge pull request #3510 from lonvia/indexing-precompute-count
authorSarah Hoffmann <lonvia@denofr.de>
Mon, 12 Aug 2024 21:55:19 +0000 (23:55 +0200)
committerGitHub <noreply@github.com>
Mon, 12 Aug 2024 21:55:19 +0000 (23:55 +0200)
Indexing: precompute counts of affected rows

src/nominatim_db/indexer/indexer.py

index 9680e5a9db14d382e84f385c7ffeaf6cb1c3dd13..9d42922b8e4f09372bc395dbcf209b41e7890030 100644 (file)
@@ -7,7 +7,7 @@
 """
 Main work horse for indexing (computing addresses) the database.
 """
-from typing import cast, List, Any
+from typing import cast, List, Any, Optional
 import logging
 import time
 
@@ -83,9 +83,30 @@ class Indexer:
         LOG.warning("Starting indexing boundaries using %s threads",
                     self.num_threads)
 
+        minrank = max(minrank, 4)
+        maxrank = min(maxrank, 25)
+
+        # Precompute number of rows to process for all rows
+        with connect(self.dsn) as conn:
+            hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+            if hstore_info is None:
+                raise RuntimeError('Hstore extension is requested but not installed.')
+            psycopg.types.hstore.register_hstore(hstore_info)
+
+            with conn.cursor() as cur:
+                cur = conn.execute(""" SELECT rank_search, count(*)
+                                       FROM placex
+                                       WHERE rank_search between %s and %s
+                                             AND class = 'boundary' and type = 'administrative'
+                                             AND indexed_status > 0
+                                       GROUP BY rank_search""",
+                                   (minrank, maxrank))
+                total_tuples = {row.rank_search: row.count for row in cur}
+
         with self.tokenizer.name_analyzer() as analyzer:
-            for rank in range(max(minrank, 4), min(maxrank, 26)):
-                total += await self._index(runners.BoundaryRunner(rank, analyzer))
+            for rank in range(minrank, maxrank + 1):
+                total += await self._index(runners.BoundaryRunner(rank, analyzer),
+                                           total_tuples=total_tuples.get(rank, 0))
 
         return total
 
@@ -101,6 +122,23 @@ class Indexer:
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
                     minrank, maxrank, self.num_threads)
 
+        # Precompute number of rows to process for all rows
+        with connect(self.dsn) as conn:
+            hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+            if hstore_info is None:
+                raise RuntimeError('Hstore extension is requested but not installed.')
+            psycopg.types.hstore.register_hstore(hstore_info)
+
+            with conn.cursor() as cur:
+                cur = conn.execute(""" SELECT rank_address, count(*)
+                                       FROM placex
+                                       WHERE rank_address between %s and %s
+                                             AND indexed_status > 0
+                                       GROUP BY rank_address""",
+                                   (minrank, maxrank))
+                total_tuples = {row.rank_address: row.count for row in cur}
+
+
         with self.tokenizer.name_analyzer() as analyzer:
             for rank in range(max(1, minrank), maxrank + 1):
                 if rank >= 30:
@@ -109,11 +147,12 @@ class Indexer:
                     batch = 5
                 else:
                     batch = 1
-                total += await self._index(runners.RankRunner(rank, analyzer), batch)
+                total += await self._index(runners.RankRunner(rank, analyzer),
+                                           batch=batch, total_tuples=total_tuples.get(rank, 0))
 
             if maxrank == 30:
                 total += await self._index(runners.RankRunner(0, analyzer))
-                total += await self._index(runners.InterpolationRunner(analyzer), 20)
+                total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
 
         return total
 
@@ -123,7 +162,7 @@ class Indexer:
         """
         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
-        return await self._index(runners.PostcodeRunner(), 20)
+        return await self._index(runners.PostcodeRunner(), batch=20)
 
 
     def update_status_table(self) -> None:
@@ -135,14 +174,20 @@ class Indexer:
 
             conn.commit()
 
-    async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
+    async def _index(self, runner: runners.Runner, batch: int = 1,
+                     total_tuples: Optional[int] = None) -> int:
         """ Index a single rank or table. `runner` describes the SQL to use
             for indexing. `batch` describes the number of objects that
-            should be processed with a single SQL statement
+            should be processed with a single SQL statement.
+
+            `total_tuples` may contain the total number of rows to process.
+            When not supplied, the value will be computed using the
+            approriate runner function.
         """
         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
-        total_tuples = self._prepare_indexing(runner)
+        if total_tuples is None:
+            total_tuples = self._prepare_indexing(runner)
 
         progress = ProgressLogger(runner.name(), total_tuples)