LOG = logging.getLogger()
+
class Indexer:
""" Main indexing routine.
"""
self.tokenizer = tokenizer
self.num_threads = num_threads
-
def has_pending(self) -> bool:
""" Check if any data still needs indexing.
This function must only be used after the import has finished.
cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
return cur.rowcount > 0
-
async def index_full(self, analyse: bool = True) -> None:
""" Index the complete database. This will first index boundaries
followed by all other objects. When `analyse` is True, then the
if not self.has_pending():
break
-
async def index_boundaries(self, minrank: int, maxrank: int) -> int:
""" Index only administrative boundaries within the given rank range.
"""
(minrank, maxrank))
total_tuples = {row.rank_address: row.count for row in cur}
-
with self.tokenizer.name_analyzer() as analyzer:
for rank in range(max(1, minrank), maxrank + 1):
if rank >= 30:
return total
-
async def index_postcodes(self) -> int:
"""Index the entries of the location_postcode table.
"""
return await self._index(runners.PostcodeRunner(), batch=20)
-
def update_status_table(self) -> None:
""" Update the status in the status table to 'indexed'.
"""
if total_tuples > 0:
async with await psycopg.AsyncConnection.connect(
- self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
+ self.dsn, row_factory=psycopg.rows.dict_row) as aconn, \
QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
fetcher_time = 0.0
tstart = time.time()
return progress.done()
-
def _prepare_indexing(self, runner: runners.Runner) -> int:
with connect(self.dsn) as conn:
hstore_info = psycopg.types.TypeInfo.fetch(conn, "hstore")