X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/7e51aa4cef831885c72ac1284ba570987484fa3d..5469d02d03a8cc7a9e8aec8302668c50eb19b50c:/nominatim/nominatim.py diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py old mode 100644 new mode 100755 index 610e3de2..0db0777d --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#! /usr/bin/env python3 #----------------------------------------------------------------------------- # nominatim - [description] #----------------------------------------------------------------------------- @@ -35,33 +35,133 @@ import select log = logging.getLogger() def make_connection(options, asynchronous=False): - return psycopg2.connect(dbname=options.dbname, user=options.user, - password=options.password, host=options.host, - port=options.port, async_=asynchronous) + params = {'dbname' : options.dbname, + 'user' : options.user, + 'password' : options.password, + 'host' : options.host, + 'port' : options.port, + 'async' : asynchronous} -class IndexingThread(object): + return psycopg2.connect(**params) + + +class RankRunner(object): + """ Returns SQL commands for indexing one rank within the placex table. + """ + + def __init__(self, rank): + self.rank = rank + + def name(self): + return "rank {}".format(self.rank) + + def sql_index_sectors(self): + return """SELECT geometry_sector, count(*) FROM placex + WHERE rank_search = {} and indexed_status > 0 + GROUP BY geometry_sector + ORDER BY geometry_sector""".format(self.rank) + + def sql_nosector_places(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + ORDER BY geometry_sector""".format(self.rank) + + def sql_sector_places(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + and geometry_sector = %s""".format(self.rank) + + def sql_index_place(self): + return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" + + +class InterpolationRunner(object): + """ Returns SQL commands for indexing the address interpolation table + location_property_osmline. + """ + + def name(self): + return "interpolation lines (location_property_osmline)" + + def sql_index_sectors(self): + return """SELECT geometry_sector, count(*) FROM location_property_osmline + WHERE indexed_status > 0 + GROUP BY geometry_sector + ORDER BY geometry_sector""" + + def sql_nosector_places(self): + return """SELECT place_id FROM location_property_osmline + WHERE indexed_status > 0 + ORDER BY geometry_sector""" + + def sql_sector_places(self): + return """SELECT place_id FROM location_property_osmline + WHERE indexed_status > 0 and geometry_sector = %s + ORDER BY geometry_sector""" + + def sql_index_place(self): + return """UPDATE location_property_osmline + SET indexed_status = 0 WHERE place_id = %s""" + + +class DBConnection(object): + """ A single non-blocking database connection. + """ + + def __init__(self, options): + self.current_query = None + self.current_params = None + + self.conn = None + self.connect() + + def connect(self): + if self.conn is not None: + self.cursor.close() + self.conn.close() - def __init__(self, thread_num, options): - log.debug("Creating thread {}".format(thread_num)) - self.thread_num = thread_num self.conn = make_connection(options, asynchronous=True) self.wait() self.cursor = self.conn.cursor() - self.perform("SET lc_messages TO 'C'") - self.wait() - - self.current_query = None def wait(self): - wait_select(self.conn) - self.current_query = None + """ Block until any pending operation is done. + """ + while True: + try: + wait_select(self.conn) + self.current_query = None + return + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry." + .format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) def perform(self, sql, args=None): + """ Send SQL query to the server. Returns immediately without + blocking. + """ self.current_query = sql + self.current_params = args self.cursor.execute(sql, args) + def fileno(self): + """ File descriptor to wait for. (Makes this class select()able.) + """ + return self.conn.fileno() + def is_done(self): + """ Check if the connection is available for a new query. + + Also checks if the previous query has run into a deadlock. + If so, then the previous query is repeated. + """ if self.current_query is None: return True @@ -70,46 +170,48 @@ class IndexingThread(object): self.current_query = None return True except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode is None: - raise RuntimeError("Postgres exception has no error code") if e.pgcode == '40P01': - log.info("Deadlock detected, retry.") - self.cursor.execute(sql) + log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) else: raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + return False class Indexer(object): + """ Main indexing routine. + """ def __init__(self, options): - self.options = options + self.minrank = max(0, options.minrank) + self.maxrank = min(30, options.maxrank) self.conn = make_connection(options) - - self.threads = [] - self.poll = select.poll() - for i in range(options.threads): - t = IndexingThread(i, options) - self.threads.append(t) - self.poll.register(t.conn.fileno(), select.EPOLLIN) - self.next_thread = 0 + self.threads = [DBConnection(options) for i in range(options.threads)] def run(self): - log.info("Starting indexing rank ({} to {}) using {} threads".format( - self.options.minrank, self.options.maxrank, - self.options.threads)) + """ Run indexing over the entire database. + """ + log.warning("Starting indexing rank ({} to {}) using {} threads".format( + self.minrank, self.maxrank, len(self.threads))) - for rank in range(self.options.minrank, 30): + for rank in range(self.minrank, self.maxrank): self.index(RankRunner(rank)) - if self.options.maxrank >= 30: + if self.maxrank == 30: self.index(InterpolationRunner()) - self.index(RankRunner(30)) + + self.index(RankRunner(self.maxrank)) def index(self, obj): - log.info("Starting {}".format(obj.name())) + """ Index a single rank or table. `obj` describes the SQL to use + for indexing. + """ + log.warning("Starting {}".format(obj.name())) - cur = self.conn.cursor(name="main") + cur = self.conn.cursor(name='main') cur.execute(obj.sql_index_sectors()) total_tuples = 0 @@ -119,29 +221,47 @@ class Indexer(object): cur.scroll(0, mode='absolute') + next_thread = self.find_free_thread() done_tuples = 0 rank_start_time = datetime.now() + + sector_sql = obj.sql_sector_places() + index_sql = obj.sql_index_place() + min_grouped_tuples = total_tuples - len(self.threads) * 1000 + + next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1 + for r in cur: sector = r[0] # Should we do the remaining ones together? - do_all = total_tuples - done_tuples < len(self.threads) * 1000 + do_all = done_tuples > min_grouped_tuples pcur = self.conn.cursor(name='places') if do_all: pcur.execute(obj.sql_nosector_places()) else: - pcur.execute(obj.sql_sector_places(), (sector, )) + pcur.execute(sector_sql, (sector, )) for place in pcur: place_id = place[0] log.debug("Processing place {}".format(place_id)) - thread = self.find_free_thread() + thread = next(next_thread) - thread.perform(obj.sql_index_place(), (place_id,)) + thread.perform(index_sql, (place_id,)) done_tuples += 1 + if done_tuples >= next_info: + now = datetime.now() + done_time = (now - rank_start_time).total_seconds() + tuples_per_sec = done_tuples / done_time + log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}" + .format(done_tuples, int(done_time), + tuples_per_sec, obj.name(), + (total_tuples - done_tuples)/tuples_per_sec)) + next_info += int(tuples_per_sec) + pcur.close() if do_all: @@ -155,72 +275,36 @@ class Indexer(object): rank_end_time = datetime.now() diff_seconds = (rank_end_time-rank_start_time).total_seconds() - log.info("Done {} in {} @ {} per second - FINISHED {}\n".format( - done_tuples, int(diff_seconds), + log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format( + done_tuples, total_tuples, int(diff_seconds), done_tuples/diff_seconds, obj.name())) def find_free_thread(self): - while True: - for t in self.threads: - if t.is_done(): - return t - - self.poll.poll() - - assert(False, "Unreachable code") - -class RankRunner(object): - - def __init__(self, rank): - self.rank = rank - - def name(self): - return "rank {}".format(self.rank) + """ Generator that returns the next connection that is free for + sending a query. + """ + ready = self.threads + command_stat = 0 - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM placex - WHERE rank_search = {} and indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""".format(self.rank) - - def sql_nosector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - ORDER BY geometry_sector""".format(self.rank) - - def sql_sector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" - - def sql_index_place(self): - return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" - - -class InterpolationRunner(object): - - def name(self): - return "interpolation lines (location_property_osmline)" - - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM location_property_osmline - WHERE indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""" - - def sql_nosector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 - ORDER BY geometry_sector""" - - def sql_sector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" + while True: + for thread in ready: + if thread.is_done(): + command_stat += 1 + yield thread + + # refresh the connections occasionaly to avoid potential + # memory leaks in Postgresql. + if command_stat > 100000: + for t in self.threads: + while not t.is_done(): + wait_select(t.conn) + t.connect() + command_stat = 0 + ready = self.threads + else: + ready, _, _ = select.select(self.threads, [], []) - def sql_index_place(self): - return """UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id = %s""" + assert False, "Unreachable code" def nominatim_arg_parser(): @@ -229,7 +313,7 @@ def nominatim_arg_parser(): def h(s): return re.sub("\s\s+" , " ", s) - p = ArgumentParser(description=__doc__, + p = ArgumentParser(description="Indexing tool for Nominatim.", formatter_class=RawDescriptionHelpFormatter) p.add_argument('-d', '--database',