X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/6c0d6d31788fb18940da0126733cd018e098e24e..49083c2597f7abcf8e6b73de9662029c22807481:/nominatim/nominatim.py?ds=sidebyside diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py old mode 100644 new mode 100755 index 6b25cf5c..b20673d2 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#! /usr/bin/env python3 #----------------------------------------------------------------------------- # nominatim - [description] #----------------------------------------------------------------------------- @@ -28,226 +28,186 @@ import sys import re import getpass from datetime import datetime -import psycopg2 -from psycopg2.extras import wait_select import select -log = logging.getLogger() - -def make_connection(options, asynchronous=False): - return psycopg2.connect(dbname=options.dbname, user=options.user, - password=options.password, host=options.host, - port=options.port, async_=asynchronous) - -class IndexingThread(object): - - def __init__(self, thread_num, options): - log.debug("Creating thread {}".format(thread_num)) - self.thread_num = thread_num - self.conn = make_connection(options, asynchronous=True) - self.wait() - - self.cursor = self.conn.cursor() - self.perform("SET lc_messages TO 'C'") - self.wait() - self.perform(InterpolationRunner.prepare()) - self.wait() - self.perform(RankRunner.prepare()) - self.wait() - - self.current_query = None - self.current_params = None - - def wait(self): - wait_select(self.conn) - self.current_query = None - - def perform(self, sql, args=None): - self.current_query = sql - self.current_params = args - self.cursor.execute(sql, args) - - def fileno(self): - return self.conn.fileno() - - def is_done(self): - if self.current_query is None: - return True - - try: - if self.conn.poll() == psycopg2.extensions.POLL_OK: - self.current_query = None - return True - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode is None: - raise RuntimeError("Postgres exception has no error code") - if e.pgcode == '40P01': - log.info("Deadlock detected, retry.") - self.cursor.execute(self.current_query, self.current_params) - else: - raise - - +from indexer.progress import ProgressLogger +from indexer.db import DBConnection, make_connection -class Indexer(object): - - def __init__(self, options): - self.options = options - self.conn = make_connection(options) +log = logging.getLogger() - self.threads = [] - self.poll = select.poll() - for i in range(options.threads): - t = IndexingThread(i, options) - self.threads.append(t) - self.poll.register(t, select.EPOLLIN) +class RankRunner(object): + """ Returns SQL commands for indexing one rank within the placex table. + """ - def run(self): - log.info("Starting indexing rank ({} to {}) using {} threads".format( - self.options.minrank, self.options.maxrank, - self.options.threads)) + def __init__(self, rank): + self.rank = rank - for rank in range(self.options.minrank, 30): - self.index(RankRunner(rank)) + def name(self): + return "rank {}".format(self.rank) - if self.options.maxrank >= 30: - self.index(InterpolationRunner()) - self.index(RankRunner(30)) + def sql_count_objects(self): + return """SELECT count(*) FROM placex + WHERE rank_address = {} and indexed_status > 0 + """.format(self.rank) - def index(self, obj): - log.info("Starting {}".format(obj.name())) + def sql_get_objects(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_address = {} + ORDER BY geometry_sector""".format(self.rank) - cur = self.conn.cursor(name="main") - cur.execute(obj.sql_index_sectors()) + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) - total_tuples = 0 - for r in cur: - total_tuples += r[1] - log.debug("Total number of rows; {}".format(total_tuples)) - cur.scroll(0, mode='absolute') +class InterpolationRunner(object): + """ Returns SQL commands for indexing the address interpolation table + location_property_osmline. + """ - next_thread = self.find_free_thread() - done_tuples = 0 - rank_start_time = datetime.now() - for r in cur: - sector = r[0] + def name(self): + return "interpolation lines (location_property_osmline)" - # Should we do the remaining ones together? - do_all = total_tuples - done_tuples < len(self.threads) * 1000 + def sql_count_objects(self): + return """SELECT count(*) FROM location_property_osmline + WHERE indexed_status > 0""" - pcur = self.conn.cursor(name='places') + def sql_get_objects(self): + return """SELECT place_id FROM location_property_osmline + WHERE indexed_status > 0 + ORDER BY geometry_sector""" - if do_all: - pcur.execute(obj.sql_nosector_places()) - else: - pcur.execute(obj.sql_sector_places(), (sector, )) + def sql_index_place(self, ids): + return """UPDATE location_property_osmline + SET indexed_status = 0 WHERE place_id IN ({})"""\ + .format(','.join((str(i) for i in ids))) - for place in pcur: - place_id = place[0] - log.debug("Processing place {}".format(place_id)) - thread = next(next_thread) +class BoundaryRunner(object): + """ Returns SQL commands for indexing the administrative boundaries + of a certain rank. + """ - thread.perform(obj.sql_index_place(), (place_id,)) - done_tuples += 1 + def __init__(self, rank): + self.rank = rank - pcur.close() + def name(self): + return "boundaries rank {}".format(self.rank) - if do_all: - break + def sql_count_objects(self): + return """SELECT count(*) FROM placex + WHERE indexed_status > 0 + AND rank_search = {} + AND class = 'boundary' and type = 'administrative'""".format(self.rank) - cur.close() + def sql_get_objects(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + and class = 'boundary' and type = 'administrative' + ORDER BY partition, admin_level""".format(self.rank) - for t in self.threads: - t.wait() + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) - rank_end_time = datetime.now() - diff_seconds = (rank_end_time-rank_start_time).total_seconds() +class Indexer(object): + """ Main indexing routine. + """ - log.info("Done {} in {} @ {} per second - FINISHED {}\n".format( - done_tuples, int(diff_seconds), - done_tuples/diff_seconds, obj.name())) + def __init__(self, options): + self.minrank = max(1, options.minrank) + self.maxrank = min(30, options.maxrank) + self.conn = make_connection(options) + self.threads = [DBConnection(options) for i in range(options.threads)] - def find_free_thread(self): - thread_lookup = { t.fileno() : t for t in self.threads} + def index_boundaries(self): + log.warning("Starting indexing boundaries using {} threads".format( + len(self.threads))) - done_fids = [ t.fileno() for t in self.threads ] + for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): + self.index(BoundaryRunner(rank)) - while True: - for fid in done_fids: - thread = thread_lookup[fid] - if thread.is_done(): - yield thread - else: - print("not good", fid) + def index_by_rank(self): + """ Run classic indexing by rank. + """ + log.warning("Starting indexing rank ({} to {}) using {} threads".format( + self.minrank, self.maxrank, len(self.threads))) - done_fids = [ x[0] for x in self.poll.poll()] + for rank in range(max(1, self.minrank), self.maxrank): + self.index(RankRunner(rank)) - assert(False, "Unreachable code") + if self.maxrank == 30: + self.index(RankRunner(0)) + self.index(InterpolationRunner(), 20) + self.index(RankRunner(self.maxrank), 20) + else: + self.index(RankRunner(self.maxrank)) -class RankRunner(object): + def index(self, obj, batch=1): + """ Index a single rank or table. `obj` describes the SQL to use + for indexing. `batch` describes the number of objects that + should be processed with a single SQL statement + """ + log.warning("Starting %s (using batch size %s)", obj.name(), batch) - def __init__(self, rank): - self.rank = rank + cur = self.conn.cursor() + cur.execute(obj.sql_count_objects()) - def name(self): - return "rank {}".format(self.rank) + total_tuples = cur.fetchone()[0] + log.debug("Total number of rows: {}".format(total_tuples)) - @classmethod - def prepare(cls): - return """PREPARE rnk_index AS - UPDATE placex - SET indexed_status = 0 WHERE place_id = $1""" + cur.close() - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM placex - WHERE rank_search = {} and indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""".format(self.rank) + progress = ProgressLogger(obj.name(), total_tuples) - def sql_nosector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - ORDER BY geometry_sector""".format(self.rank) + if total_tuples > 0: + cur = self.conn.cursor(name='places') + cur.execute(obj.sql_get_objects()) - def sql_sector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" + next_thread = self.find_free_thread() + while True: + places = [p[0] for p in cur.fetchmany(batch)] + if len(places) == 0: + break - def sql_index_place(self): - return "EXECUTE rnk_index(%s)" + log.debug("Processing places: {}".format(places)) + thread = next(next_thread) + thread.perform(obj.sql_index_place(places)) + progress.add(len(places)) -class InterpolationRunner(object): + cur.close() - def name(self): - return "interpolation lines (location_property_osmline)" + for t in self.threads: + t.wait() - @classmethod - def prepare(cls): - return """PREPARE ipl_index AS - UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id = $1""" + progress.done() - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM location_property_osmline - WHERE indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""" + def find_free_thread(self): + """ Generator that returns the next connection that is free for + sending a query. + """ + ready = self.threads + command_stat = 0 - def sql_nosector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 - ORDER BY geometry_sector""" + while True: + for thread in ready: + if thread.is_done(): + command_stat += 1 + yield thread - def sql_sector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" + # refresh the connections occasionaly to avoid potential + # memory leaks in Postgresql. + if command_stat > 100000: + for t in self.threads: + while not t.is_done(): + t.wait() + t.connect() + command_stat = 0 + ready = self.threads + else: + ready, _, _ = select.select(self.threads, [], []) - def sql_index_place(self): - return "EXECUTE ipl_index(%s)" + assert False, "Unreachable code" def nominatim_arg_parser(): @@ -256,7 +216,7 @@ def nominatim_arg_parser(): def h(s): return re.sub("\s\s+" , " ", s) - p = ArgumentParser(description=__doc__, + p = ArgumentParser(description="Indexing tool for Nominatim.", formatter_class=RawDescriptionHelpFormatter) p.add_argument('-d', '--database', @@ -274,6 +234,9 @@ def nominatim_arg_parser(): p.add_argument('-P', '--port', dest='port', action='store', help='PostgreSQL server port') + p.add_argument('-b', '--boundary-only', + dest='boundary_only', action='store_true', + help='Only index administrative boundaries (ignores min/maxrank).') p.add_argument('-r', '--minrank', dest='minrank', type=int, metavar='RANK', default=0, help='Minimum/starting rank.') @@ -301,4 +264,7 @@ if __name__ == '__main__': password = getpass.getpass("Database password: ") options.password = password - Indexer(options).run() + if options.boundary_only: + Indexer(options).index_boundaries() + else: + Indexer(options).index_by_rank()