X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/e1b096cf8cf1ac6904c9adf0963f0e1756ca65ea..6e908fb81c5a1d66e80d71c7b4676cacb1d8e801:/nominatim/indexer/indexer.py diff --git a/nominatim/indexer/indexer.py b/nominatim/indexer/indexer.py index 6e0ed60f..06c05e1d 100644 --- a/nominatim/indexer/indexer.py +++ b/nominatim/indexer/indexer.py @@ -61,8 +61,8 @@ class InterpolationRunner: @staticmethod def sql_index_place(ids): return """UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id IN ({})"""\ - .format(','.join((str(i) for i in ids))) + SET indexed_status = 0 WHERE place_id IN ({}) + """.format(','.join((str(i) for i in ids))) class BoundaryRunner: """ Returns SQL commands for indexing the administrative boundaries @@ -79,57 +79,172 @@ class BoundaryRunner: return """SELECT count(*) FROM placex WHERE indexed_status > 0 AND rank_search = {} - AND class = 'boundary' and type = 'administrative'""".format(self.rank) + AND class = 'boundary' and type = 'administrative' + """.format(self.rank) def sql_get_objects(self): return """SELECT place_id FROM placex WHERE indexed_status > 0 and rank_search = {} and class = 'boundary' and type = 'administrative' - ORDER BY partition, admin_level""".format(self.rank) + ORDER BY partition, admin_level + """.format(self.rank) @staticmethod def sql_index_place(ids): return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ .format(','.join((str(i) for i in ids))) + +class PostcodeRunner: + """ Provides the SQL commands for indexing the location_postcode table. + """ + + @staticmethod + def name(): + return "postcodes (location_postcode)" + + @staticmethod + def sql_count_objects(): + return 'SELECT count(*) FROM location_postcode WHERE indexed_status > 0' + + @staticmethod + def sql_get_objects(): + return """SELECT place_id FROM location_postcode + WHERE indexed_status > 0 + ORDER BY country_code, postcode""" + + @staticmethod + def sql_index_place(ids): + return """UPDATE location_postcode SET indexed_status = 0 + WHERE place_id IN ({}) + """.format(','.join((str(i) for i in ids))) + + +def _analyse_db_if(conn, condition): + if condition: + with conn.cursor() as cur: + cur.execute('ANALYSE') + + class Indexer: """ Main indexing routine. """ def __init__(self, dsn, num_threads): - self.conn = psycopg2.connect(dsn) - self.threads = [DBConnection(dsn) for _ in range(num_threads)] + self.dsn = dsn + self.num_threads = num_threads + self.conn = None + self.threads = [] + + + def _setup_connections(self): + self.conn = psycopg2.connect(self.dsn) + self.threads = [DBConnection(self.dsn) for _ in range(self.num_threads)] + + + def _close_connections(self): + if self.conn: + self.conn.close() + self.conn = None + + for thread in self.threads: + thread.close() + self.threads = [] + + + def index_full(self, analyse=True): + """ Index the complete database. This will first index boudnaries + followed by all other objects. When `analyse` is True, then the + database will be analysed at the appropriate places to + ensure that database statistics are updated. + """ + conn = psycopg2.connect(self.dsn) + conn.autocommit = True + + try: + self.index_by_rank(0, 4) + _analyse_db_if(conn, analyse) + + self.index_boundaries(0, 30) + _analyse_db_if(conn, analyse) + + self.index_by_rank(5, 25) + _analyse_db_if(conn, analyse) + + self.index_by_rank(26, 30) + _analyse_db_if(conn, analyse) + + self.index_postcodes() + _analyse_db_if(conn, analyse) + finally: + conn.close() + def index_boundaries(self, minrank, maxrank): + """ Index only administrative boundaries within the given rank range. + """ LOG.warning("Starting indexing boundaries using %s threads", - len(self.threads)) + self.num_threads) + + self._setup_connections() - for rank in range(max(minrank, 4), min(maxrank, 26)): - self.index(BoundaryRunner(rank)) + try: + for rank in range(max(minrank, 4), min(maxrank, 26)): + self.index(BoundaryRunner(rank)) + finally: + self._close_connections() def index_by_rank(self, minrank, maxrank): - """ Run classic indexing by rank. + """ Index all entries of placex in the given rank range (inclusive) + in order of their address rank. + + When rank 30 is requested then also interpolations and + places with address rank 0 will be indexed. """ maxrank = min(maxrank, 30) LOG.warning("Starting indexing rank (%i to %i) using %i threads", - minrank, maxrank, len(self.threads)) + minrank, maxrank, self.num_threads) + + self._setup_connections() + + try: + for rank in range(max(1, minrank), maxrank): + self.index(RankRunner(rank)) + + if maxrank == 30: + self.index(RankRunner(0)) + self.index(InterpolationRunner(), 20) + self.index(RankRunner(30), 20) + else: + self.index(RankRunner(maxrank)) + finally: + self._close_connections() + + + def index_postcodes(self): + """Index the entries ofthe location_postcode table. + """ + LOG.warning("Starting indexing postcodes using %s threads", self.num_threads) - for rank in range(max(1, minrank), maxrank): - self.index(RankRunner(rank)) + self._setup_connections() - if maxrank == 30: - self.index(RankRunner(0)) - self.index(InterpolationRunner(), 20) - self.index(RankRunner(30), 20) - else: - self.index(RankRunner(maxrank)) + try: + self.index(PostcodeRunner(), 20) + finally: + self._close_connections() def update_status_table(self): """ Update the status in the status table to 'indexed'. """ - with self.conn.cursor() as cur: - cur.execute('UPDATE import_status SET indexed = true') - self.conn.commit() + conn = psycopg2.connect(self.dsn) + + try: + with conn.cursor() as cur: + cur.execute('UPDATE import_status SET indexed = true') + + conn.commit() + finally: + conn.close() def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use