"""
Main work horse for indexing (computing addresses) the database.
"""
-from typing import cast, List, Any
+from typing import cast, List, Any, Optional
import logging
import time
LOG = logging.getLogger()
+
class Indexer:
""" Main indexing routine.
"""
self.tokenizer = tokenizer
self.num_threads = num_threads
-
def has_pending(self) -> bool:
""" Check if any data still needs indexing.
This function must only be used after the import has finished.
cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
return cur.rowcount > 0
-
async def index_full(self, analyse: bool = True) -> None:
""" Index the complete database. This will first index boundaries
followed by all other objects. When `analyse` is True, then the
if not self.has_pending():
break
-
async def index_boundaries(self, minrank: int, maxrank: int) -> int:
""" Index only administrative boundaries within the given rank range.
"""
LOG.warning("Starting indexing boundaries using %s threads",
self.num_threads)
+ minrank = max(minrank, 4)
+ maxrank = min(maxrank, 25)
+
+ # Precompute number of rows to process for all rows
+ with connect(self.dsn) as conn:
+ hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+ if hstore_info is None:
+ raise RuntimeError('Hstore extension is requested but not installed.')
+ psycopg.types.hstore.register_hstore(hstore_info)
+
+ with conn.cursor() as cur:
+ cur = conn.execute(""" SELECT rank_search, count(*)
+ FROM placex
+ WHERE rank_search between %s and %s
+ AND class = 'boundary' and type = 'administrative'
+ AND indexed_status > 0
+ GROUP BY rank_search""",
+ (minrank, maxrank))
+ total_tuples = {row.rank_search: row.count for row in cur}
+
with self.tokenizer.name_analyzer() as analyzer:
- for rank in range(max(minrank, 4), min(maxrank, 26)):
- total += await self._index(runners.BoundaryRunner(rank, analyzer))
+ for rank in range(minrank, maxrank + 1):
+ total += await self._index(runners.BoundaryRunner(rank, analyzer),
+ total_tuples=total_tuples.get(rank, 0))
return total
LOG.warning("Starting indexing rank (%i to %i) using %i threads",
minrank, maxrank, self.num_threads)
+ # Precompute number of rows to process for all rows
+ with connect(self.dsn) as conn:
+ hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")
+ if hstore_info is None:
+ raise RuntimeError('Hstore extension is requested but not installed.')
+ psycopg.types.hstore.register_hstore(hstore_info)
+
+ with conn.cursor() as cur:
+ cur = conn.execute(""" SELECT rank_address, count(*)
+ FROM placex
+ WHERE rank_address between %s and %s
+ AND indexed_status > 0
+ GROUP BY rank_address""",
+ (minrank, maxrank))
+ total_tuples = {row.rank_address: row.count for row in cur}
+
with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
batch = 5
else:
batch = 1
- total += await self._index(runners.RankRunner(rank, analyzer), batch)
+ total += await self._index(runners.RankRunner(rank, analyzer),
+ batch=batch, total_tuples=total_tuples.get(rank, 0))
if maxrank == 30:
total += await self._index(runners.RankRunner(0, analyzer))
- total += await self._index(runners.InterpolationRunner(analyzer), 20)
+ total += await self._index(runners.InterpolationRunner(analyzer), batch=20)
return total
-
async def index_postcodes(self) -> int:
"""Index the entries of the location_postcode table.
"""
LOG.warning("Starting indexing postcodes using %s threads", self.num_threads)
- return await self._index(runners.PostcodeRunner(), 20)
-
+ return await self._index(runners.PostcodeRunner(), batch=20)
def update_status_table(self) -> None:
""" Update the status in the status table to 'indexed'.
conn.commit()
- async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
+ async def _index(self, runner: runners.Runner, batch: int = 1,
+ total_tuples: Optional[int] = None) -> int:
""" Index a single rank or table. `runner` describes the SQL to use
for indexing. `batch` describes the number of objects that
- should be processed with a single SQL statement
+ should be processed with a single SQL statement.
+
+ `total_tuples` may contain the total number of rows to process.
+ When not supplied, the value will be computed using the
+ approriate runner function.
"""
LOG.warning("Starting %s (using batch size %s)", runner.name(), batch)
- total_tuples = self._prepare_indexing(runner)
+ if total_tuples is None:
+ total_tuples = self._prepare_indexing(runner)
progress = ProgressLogger(runner.name(), total_tuples)
if total_tuples > 0:
async with await psycopg.AsyncConnection.connect(
- self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
+ self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
fetcher_time = 0.0
tstart = time.time()
return progress.done()
-
def _prepare_indexing(self, runner: runners.Runner) -> int:
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")