]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_db/indexer/indexer.py
fix style issue found by flake8
[nominatim.git] / src / nominatim_db / indexer / indexer.py
index 9680e5a9db14d382e84f385c7ffeaf6cb1c3dd13..d467efbd6e27b7dbaa207ee042d2ea3ac2cadcf9 100644 (file)
@@ -7,7 +7,7 @@
 """
 Main work horse for indexing (computing addresses) the database.
 """
-from typing import cast, List, Any
+from typing import cast, List, Any, Optional
 import logging
 import time
 
@@ -21,6 +21,7 @@ from . import runners
 
 LOG = logging.getLogger()
 
+
 class Indexer:
     """ Main indexing routine.
     """
@@ -30,7 +31,6 @@ class Indexer:
         self.tokenizer = tokenizer
         self.num_threads = num_threads
 
-
     def has_pending(self) -> bool:
         """ Check if any data still needs indexing.
             This function must only be used after the import has finished.
@@ -41,7 +41,6 @@ class Indexer:
                 cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
                 return cur.rowcount > 0
 
-
     async def index_full(self, analyse: bool = True) -> None:
         """ Index the complete database. This will first index boundaries
             followed by all other objects. When `analyse` is True, then the
@@ -75,7 +74,6 @@ class Indexer:
                 if not self.has_pending():
                     break
 
-
     async def index_boundaries(self, minrank: int, maxrank: int) -> int:
         """ Index only administrative boundaries within the given rank range.
         """
@@ -83,9 +81,30 @@ class Indexer:
         LOG.warning("Starting indexing boundaries using %s threads",
                     self.num_threads)
 
+        minrank = max(minrank, 4)
+        maxrank = min(maxrank, 25)
+
+        # Precompute number of rows to process for all rows
+        with connect(self.dsn) as conn:
+            hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+            if hstore_info is None:
+                raise RuntimeError('Hstore extension is requested but not installed.')
+            psycopg.types.hstore.register_hstore(hstore_info)
+
+            with conn.cursor() as cur:
+                cur = conn.execute(""" SELECT rank_search, count(*)
+                                       FROM placex
+                                       WHERE rank_search between %s and %s
+                                             AND class = 'boundary' and type = 'administrative'
+                                             AND indexed_status > 0
+                                       GROUP BY rank_search""",
+                                   (minrank, maxrank))
+                total_tuples = {row.rank_search: row.count for row in cur}
+
         with self.tokenizer.name_analyzer() as analyzer:
-            for rank in range(max(minrank, 4), min(maxrank, 26)):
-                total += await self._index(runners.BoundaryRunner(rank, analyzer))
+            for rank in range(minrank, maxrank + 1):
+                total += await self._index(runners.BoundaryRunner(rank, analyzer),
+                                           total_tuples=total_tuples.get(rank, 0))
 
         return total
 
@@ -101,6 +120,22 @@ class Indexer:
         LOG.warning("Starting indexing rank (%i to %i) using %i threads",
                     minrank, maxrank, self.num_threads)
 
+        # Precompute number of rows to process for all rows
+        with connect(self.dsn) as conn:
+            hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+            if hstore_info is None:
+                raise RuntimeError('Hstore extension is requested but not installed.')
+            psycopg.types.hstore.register_hstore(hstore_info)
+
+            with conn.cursor() as cur:
+                cur = conn.execute(""" SELECT rank_address, count(*)
+                                       FROM placex
+                                       WHERE rank_address between %s and %s
+                                             AND indexed_status > 0
+                                       GROUP BY rank_address""",
+                                   (minrank, maxrank))
+                total_tuples = {row.rank_address: row.count for row in cur}
+
         with self.tokenizer.name_analyzer() as analyzer:
             for rank in range(max(1, minrank), maxrank + 1):
                 if rank >= 30:
@@ -109,22 +144,21 @@ class Indexer:
                     batch = 5
                 else:
                     batch = 1
-                total += await self._index(runners.RankRunner(rank, analyzer), batch)
+                total += await self._index(runners.RankRunner(rank, analyzer),
+                                           batch=batch, total_tuples=total_tuples.get(rank, 0))
 
             if maxrank == 30:
                 total += await self._index(runners.RankRunner(0, analyzer))
-                total += await self._index(runners.InterpolationRunner(analyzer), 20)
+                total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
 
         return total
 
-
     async def index_postcodes(self) -> int:
         """Index the entries of the location_postcode table.
         """
         LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
 
-        return await self._index(runners.PostcodeRunner(), 20)
-
+        return await self._index(runners.PostcodeRunner(), batch=20)
 
     def update_status_table(self) -> None:
         """ Update the status in the status table to 'indexed'.
@@ -135,20 +169,26 @@ class Indexer:
 
             conn.commit()
 
-    async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
+    async def _index(self, runner: runners.Runner, batch: int = 1,
+                     total_tuples: Optional[int] = None) -> int:
         """ Index a single rank or table. `runner` describes the SQL to use
             for indexing. `batch` describes the number of objects that
-            should be processed with a single SQL statement
+            should be processed with a single SQL statement.
+
+            `total_tuples` may contain the total number of rows to process.
+            When not supplied, the value will be computed using the
+            approriate runner function.
         """
         LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
 
-        total_tuples = self._prepare_indexing(runner)
+        if total_tuples is None:
+            total_tuples = self._prepare_indexing(runner)
 
         progress = ProgressLogger(runner.name(), total_tuples)
 
         if total_tuples > 0:
             async with await psycopg.AsyncConnection.connect(
-                                 self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
+                                 self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
                        QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
                 fetcher_time = 0.0
                 tstart = time.time()
@@ -179,7 +219,6 @@ class Indexer:
 
         return progress.done()
 
-
     def _prepare_indexing(self, runner: runners.Runner) -> int:
         with connect(self.dsn) as conn:
             hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")