X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c77877a93401dd2f87e3caefb7aa6f04d05f7c95..6430371d7d033f7b4562a1dda6055f9887534db0:/nominatim/indexer/indexer.py diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index d86303c4..7b826d96 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -1,149 +1,163 @@ """ Main work horse for indexing (computing addresses) the database. """ -# pylint: disable=C0111 import logging import select import psycopg2 -from .progress import ProgressLogger -from ..db.async_connection import DBConnection +from nominatim.indexer.progress import ProgressLogger +from nominatim.indexer import runners +from nominatim.db.async_connection import DBConnection LOG = logging.getLogger() -class RankRunner: - """ Returns SQL commands for indexing one rank within the placex table. + +class Indexer: + """ Main indexing routine. """ - def __init__(self, rank): - self.rank = rank + def __init__(self, dsn, num_threads): + self.dsn = dsn + self.num_threads = num_threads + self.conn = None + self.threads = [] - def name(self): - return "rank {}".format(self.rank) - def sql_count_objects(self): - return """SELECT count(*) FROM placex - WHERE rank_address = {} and indexed_status > 0 - """.format(self.rank) + def _setup_connections(self): + self.conn = psycopg2.connect(self.dsn) + self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)] - def sql_get_objects(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_address = {} - ORDER BY geometry_sector""".format(self.rank) - @staticmethod - def sql_index_place(ids): - return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ - .format(','.join((str(i) for i in ids))) + def _close_connections(self): + if self.conn: + self.conn.close() + self.conn = None + for thread in self.threads: + thread.close() + self.threads = [] -class InterpolationRunner: - """ Returns SQL commands for indexing the address interpolation table - location_property_osmline. - """ - @staticmethod - def name(): - return "interpolation lines (location_property_osmline)" - - @staticmethod - def sql_count_objects(): - return """SELECT count(*) FROM location_property_osmline - WHERE indexed_status > 0""" - - @staticmethod - def sql_get_objects(): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 - ORDER BY geometry_sector""" - - @staticmethod - def sql_index_place(ids): - return """UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id IN ({})"""\ - .format(','.join((str(i) for i in ids))) - -class BoundaryRunner: - """ Returns SQL commands for indexing the administrative boundaries - of a certain rank. - """ + def index_full(self, analyse=True): + """ Index the complete database. This will first index boudnaries + followed by all other objects. When `analyse` is True, then the + database will be analysed at the appropriate places to + ensure that database statistics are updated. + """ + with psycopg2.connect(self.dsn) as conn: + conn.autocommit = True - def __init__(self, rank): - self.rank = rank + if analyse: + def _analyse(): + with conn.cursor() as cur: + cur.execute('ANALYSE') + else: + def _analyse(): + pass - def name(self): - return "boundaries rank {}".format(self.rank) + self.index_by_rank(0, 4) + _analyse() - def sql_count_objects(self): - return """SELECT count(*) FROM placex - WHERE indexed_status > 0 - AND rank_search = {} - AND class = 'boundary' and type = 'administrative'""".format(self.rank) + self.index_boundaries(0, 30) + _analyse() - def sql_get_objects(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - and class = 'boundary' and type = 'administrative' - ORDER BY partition, admin_level""".format(self.rank) + self.index_by_rank(5, 25) + _analyse() - @staticmethod - def sql_index_place(ids): - return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ - .format(','.join((str(i) for i in ids))) + self.index_by_rank(26, 30) + _analyse() -class Indexer: - """ Main indexing routine. - """ + self.index_postcodes() + _analyse() - def __init__(self, dsn, num_threads): - self.conn = psycopg2.connect(dsn) - self.threads = [DBConnection(dsn) for _ in range(num_threads)] def index_boundaries(self, minrank, maxrank): + """ Index only administrative boundaries within the given rank range. + """ LOG.warning("Starting indexing boundaries using %s threads", - len(self.threads)) + self.num_threads) - for rank in range(max(minrank, 5), min(maxrank, 26)): - self.index(BoundaryRunner(rank)) + self._setup_connections() + + try: + for rank in range(max(minrank, 4), min(maxrank, 26)): + self._index(runners.BoundaryRunner(rank)) + finally: + self._close_connections() def index_by_rank(self, minrank, maxrank): - """ Run classic indexing by rank. + """ Index all entries of placex in the given rank range (inclusive) + in order of their address rank. + + When rank 30 is requested then also interpolations and + places with address rank 0 will be indexed. """ maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", - minrank, maxrank, len(self.threads)) + minrank, maxrank, self.num_threads) + + self._setup_connections() + + try: + for rank in range(max(1, minrank), maxrank): + self._index(runners.RankRunner(rank)) + + if maxrank == 30: + self._index(runners.RankRunner(0)) + self._index(runners.InterpolationRunner(), 20) + self._index(runners.RankRunner(30), 20) + else: + self._index(runners.RankRunner(maxrank)) + finally: + self._close_connections() + + + def index_postcodes(self): + """Index the entries ofthe location_postcode table. + """ + LOG.warning("Starting indexing postcodes using %s threads", self.num_threads) + + self._setup_connections() + + try: + self._index(runners.PostcodeRunner(), 20) + finally: + self._close_connections() + + def update_status_table(self): + """ Update the status in the status table to 'indexed'. + """ + conn = psycopg2.connect(self.dsn) - for rank in range(max(1, minrank), maxrank): - self.index(RankRunner(rank)) + try: + with conn.cursor() as cur: + cur.execute('UPDATE import_status SET indexed = true') - if maxrank == 30: - self.index(RankRunner(0)) - self.index(InterpolationRunner(), 20) - self.index(RankRunner(30), 20) - else: - self.index(RankRunner(maxrank)) + conn.commit() + finally: + conn.close() - def index(self, obj, batch=1): - """ Index a single rank or table. `obj` describes the SQL to use + def _index(self, runner, batch=1): + """ Index a single rank or table. `runner` describes the SQL to use for indexing. `batch` describes the number of objects that should be processed with a single SQL statement """ - LOG.warning("Starting %s (using batch size %s)", obj.name(), batch) + LOG.warning("Starting %s (using batch size %s)", runner.name(), batch) cur = self.conn.cursor() - cur.execute(obj.sql_count_objects()) + cur.execute(runner.sql_count_objects()) total_tuples = cur.fetchone()[0] LOG.debug("Total number of rows: %i", total_tuples) cur.close() - progress = ProgressLogger(obj.name(), total_tuples) + progress = ProgressLogger(runner.name(), total_tuples) if total_tuples > 0: cur = self.conn.cursor(name='places') - cur.execute(obj.sql_get_objects()) + cur.execute(runner.sql_get_objects()) next_thread = self.find_free_thread() while True: @@ -154,7 +168,7 @@ class Indexer: LOG.debug("Processing places: %s", str(places)) thread = next(next_thread) - thread.perform(obj.sql_index_place(places)) + thread.perform(runner.sql_index_place(places)) progress.add(len(places)) cur.close()