+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
"""
Main work horse for indexing (computing addresses) the database.
"""
import logging
-import select
import time
import psycopg2.extras
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import DBConnection, WorkerPool
from nominatim.db.connection import connect
LOG = logging.getLogger()
self.conn.wait()
self.close()
-class WorkerPool:
- """ A pool of asynchronous database connections.
-
- The pool may be used as a context manager.
- """
- REOPEN_CONNECTIONS_AFTER = 100000
-
- def __init__(self, dsn, pool_size):
- self.threads = [DBConnection(dsn) for _ in range(pool_size)]
- self.free_workers = self._yield_free_worker()
- self.wait_time = 0
-
-
- def finish_all(self):
- """ Wait for all connection to finish.
- """
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
-
- self.free_workers = self._yield_free_worker()
-
- def close(self):
- """ Close all connections and clear the pool.
- """
- for thread in self.threads:
- thread.close()
- self.threads = []
- self.free_workers = None
-
-
- def next_free_worker(self):
- """ Get the next free connection.
- """
- return next(self.free_workers)
-
-
- def _yield_free_worker(self):
- ready = self.threads
- command_stat = 0
- while True:
- for thread in ready:
- if thread.is_done():
- command_stat += 1
- yield thread
-
- if command_stat > self.REOPEN_CONNECTIONS_AFTER:
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
- thread.connect()
- ready = self.threads
- command_stat = 0
- else:
- tstart = time.time()
- _, ready, _ = select.select([], self.threads, [])
- self.wait_time += time.time() - tstart
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.finish_all()
- self.close()
-
class Indexer:
""" Main indexing routine.
self.num_threads = num_threads
+ def has_pending(self):
+ """ Check if any data still needs indexing.
+ This function must only be used after the import has finished.
+ Otherwise it will be very expensive.
+ """
+ with connect(self.dsn) as conn:
+ with conn.cursor() as cur:
+ cur.execute("SELECT 'a' FROM placex WHERE indexed_status > 0 LIMIT 1")
+ return cur.rowcount > 0
+
+
def index_full(self, analyse=True):
- """ Index the complete database. This will first index boudnaries
+ """ Index the complete database. This will first index boundaries
followed by all other objects. When `analyse` is True, then the
database will be analysed at the appropriate places to
ensure that database statistics are updated.
with connect(self.dsn) as conn:
conn.autocommit = True
- if analyse:
- def _analyze():
+ def _analyze():
+ if analyse:
with conn.cursor() as cur:
cur.execute('ANALYZE')
- else:
- def _analyze():
- pass
self.index_by_rank(0, 4)
_analyze()
minrank, maxrank, self.num_threads)
with self.tokenizer.name_analyzer() as analyzer:
- for rank in range(max(1, minrank), maxrank):
- self._index(runners.RankRunner(rank, analyzer))
+ for rank in range(max(1, minrank), maxrank + 1):
+ self._index(runners.RankRunner(rank, analyzer), 20 if rank == 30 else 1)
if maxrank == 30:
self._index(runners.RankRunner(0, analyzer))
self._index(runners.InterpolationRunner(analyzer), 20)
- self._index(runners.RankRunner(30, analyzer), 20)
- else:
- self._index(runners.RankRunner(maxrank, analyzer))
def index_postcodes(self):
# And insert the curent batch
for idx in range(0, len(places), batch):
- part = places[idx:idx+batch]
+ part = places[idx:idx + batch]
LOG.debug("Processing places: %s", str(part))
runner.index_places(pool.next_free_worker(), part)
progress.add(len(part))