X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/3c186f80304b5b66795e9ef3cb9edb8f343c9416..6e908fb81c5a1d66e80d71c7b4676cacb1d8e801:/nominatim/indexer/indexer.py?ds=inline diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 61971497..06c05e1d 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -119,13 +119,37 @@ class PostcodeRunner: WHERE place_id IN ({}) """.format(','.join((str(i) for i in ids))) + +def _analyse_db_if(conn, condition): + if condition: + with conn.cursor() as cur: + cur.execute('ANALYSE') + + class Indexer: """ Main indexing routine. """ def __init__(self, dsn, num_threads): - self.conn = psycopg2.connect(dsn) - self.threads = [DBConnection(dsn) for _ in range(num_threads)] + self.dsn = dsn + self.num_threads = num_threads + self.conn = None + self.threads = [] + + + def _setup_connections(self): + self.conn = psycopg2.connect(self.dsn) + self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)] + + + def _close_connections(self): + if self.conn: + self.conn.close() + self.conn = None + + for thread in self.threads: + thread.close() + self.threads = [] def index_full(self, analyse=True): @@ -134,34 +158,41 @@ class Indexer: database will be analysed at the appropriate places to ensure that database statistics are updated. """ - self.index_by_rank(0, 4) - self._analyse_db_if(analyse) + conn = psycopg2.connect(self.dsn) + conn.autocommit = True - self.index_boundaries(0, 30) - self._analyse_db_if(analyse) + try: + self.index_by_rank(0, 4) + _analyse_db_if(conn, analyse) - self.index_by_rank(5, 25) - self._analyse_db_if(analyse) + self.index_boundaries(0, 30) + _analyse_db_if(conn, analyse) - self.index_by_rank(26, 30) - self._analyse_db_if(analyse) + self.index_by_rank(5, 25) + _analyse_db_if(conn, analyse) - self.index_postcodes() - self._analyse_db_if(analyse) + self.index_by_rank(26, 30) + _analyse_db_if(conn, analyse) + + self.index_postcodes() + _analyse_db_if(conn, analyse) + finally: + conn.close() - def _analyse_db_if(self, condition): - if condition: - with self.conn.cursor() as cur: - cur.execute('ANALYSE') def index_boundaries(self, minrank, maxrank): """ Index only administrative boundaries within the given rank range. """ LOG.warning("Starting indexing boundaries using %s threads", - len(self.threads)) + self.num_threads) + + self._setup_connections() - for rank in range(max(minrank, 4), min(maxrank, 26)): - self.index(BoundaryRunner(rank)) + try: + for rank in range(max(minrank, 4), min(maxrank, 26)): + self.index(BoundaryRunner(rank)) + finally: + self._close_connections() def index_by_rank(self, minrank, maxrank): """ Index all entries of placex in the given rank range (inclusive) @@ -172,30 +203,48 @@ class Indexer: """ maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", - minrank, maxrank, len(self.threads)) + minrank, maxrank, self.num_threads) - for rank in range(max(1, minrank), maxrank): - self.index(RankRunner(rank)) + self._setup_connections() - if maxrank == 30: - self.index(RankRunner(0)) - self.index(InterpolationRunner(), 20) - self.index(RankRunner(30), 20) - else: - self.index(RankRunner(maxrank)) + try: + for rank in range(max(1, minrank), maxrank): + self.index(RankRunner(rank)) + + if maxrank == 30: + self.index(RankRunner(0)) + self.index(InterpolationRunner(), 20) + self.index(RankRunner(30), 20) + else: + self.index(RankRunner(maxrank)) + finally: + self._close_connections() def index_postcodes(self): """Index the entries ofthe location_postcode table. """ - self.index(PostcodeRunner(), 20) + LOG.warning("Starting indexing postcodes using %s threads", self.num_threads) + + self._setup_connections() + + try: + self.index(PostcodeRunner(), 20) + finally: + self._close_connections() def update_status_table(self): """ Update the status in the status table to 'indexed'. """ - with self.conn.cursor() as cur: - cur.execute('UPDATE import_status SET indexed = true') - self.conn.commit() + conn = psycopg2.connect(self.dsn) + + try: + with conn.cursor() as cur: + cur.execute('UPDATE import_status SET indexed = true') + + conn.commit() + finally: + conn.close() def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use