From 11c0dd235b8d6ee9afea569a482803cbd74e6f09 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Mon, 20 Jan 2020 22:19:33 +0100 Subject: [PATCH] clean up and document script --- nominatim/nominatim.py | 181 +++++++++++++++++++++-------------------- 1 file changed, 94 insertions(+), 87 deletions(-) diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index f525b744..75ddef2f 100644 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -39,38 +39,104 @@ def make_connection(options, asynchronous=False): password=options.password, host=options.host, port=options.port, async_=asynchronous) -class IndexingThread(object): - def __init__(self, thread_num, options): - log.debug("Creating thread {}".format(thread_num)) - self.thread_num = thread_num +class RankRunner(object): + """ Returns SQL commands for indexing one rank within the placex table. + """ + + def __init__(self, rank): + self.rank = rank + + def name(self): + return "rank {}".format(self.rank) + + def sql_index_sectors(self): + return """SELECT geometry_sector, count(*) FROM placex + WHERE rank_search = {} and indexed_status > 0 + GROUP BY geometry_sector + ORDER BY geometry_sector""".format(self.rank) + + def sql_nosector_places(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + ORDER BY geometry_sector""".format(self.rank) + + def sql_sector_places(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + and geometry_sector = %s""".format(self.rank) + + def sql_index_place(self): + return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" + + +class InterpolationRunner(object): + """ Returns SQL commands for indexing the address interpolation table + location_property_osmline. + """ + + def name(self): + return "interpolation lines (location_property_osmline)" + + def sql_index_sectors(self): + return """SELECT geometry_sector, count(*) FROM location_property_osmline + WHERE indexed_status > 0 + GROUP BY geometry_sector + ORDER BY geometry_sector""" + + def sql_nosector_places(self): + return """SELECT place_id FROM location_property_osmline + WHERE indexed_status > 0 + ORDER BY geometry_sector""" + + def sql_sector_places(self): + return """SELECT place_id FROM location_property_osmline + WHERE indexed_status > 0 and geometry_sector = %s + ORDER BY geometry_sector""" + + def sql_index_place(self): + return """UPDATE location_property_osmline + SET indexed_status = 0 WHERE place_id = %s""" + + +class DBConnection(object): + """ A signle non-blocking database connection. + """ + + def __init__(self, options): self.conn = make_connection(options, asynchronous=True) self.wait() self.cursor = self.conn.cursor() - self.perform("SET lc_messages TO 'C'") - self.wait() - self.perform(InterpolationRunner.prepare()) - self.wait() - self.perform(RankRunner.prepare()) - self.wait() self.current_query = None self.current_params = None def wait(self): + """ Block until any pending operation is done. + """ wait_select(self.conn) self.current_query = None def perform(self, sql, args=None): + """ Send SQL query to the server. Returns immediately without + blocking. + """ self.current_query = sql self.current_params = args self.cursor.execute(sql, args) def fileno(self): + """ File descriptor to wait for. (Makes this class select()able.) + """ return self.conn.fileno() def is_done(self): + """ Check if the connection is available for a new query. + + Also checks if the previous query has run into a deadlock. + If so, then the previous query is repeated. + """ if self.current_query is None: return True @@ -79,40 +145,42 @@ class IndexingThread(object): self.current_query = None return True except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode is None: - raise RuntimeError("Postgres exception has no error code") if e.pgcode == '40P01': log.info("Deadlock detected, retry.") self.cursor.execute(self.current_query, self.current_params) else: raise + return False class Indexer(object): + """ Main indexing routine. + """ def __init__(self, options): - self.options = options + self.minrank = max(0, options.minrank) + self.maxrank = min(30, options.maxrank) self.conn = make_connection(options) - - self.threads = [] - for i in range(options.threads): - t = IndexingThread(i, options) - self.threads.append(t) + self.threads = [DBConnection(options) for i in range(options.threads)] def run(self): + """ Run indexing over the entire database. + """ log.info("Starting indexing rank ({} to {}) using {} threads".format( - self.options.minrank, self.options.maxrank, - self.options.threads)) + self.minrank, self.maxrank, len(self.threads))) - for rank in range(self.options.minrank, min(self.options.maxrank, 30)): + for rank in range(self.minrank, self.maxrank): self.index(RankRunner(rank)) - if self.options.maxrank >= 30: + if self.maxrank == 30: self.index(InterpolationRunner()) self.index(RankRunner(30)) def index(self, obj): + """ Index a single rank or table. `obj` describes the SQL to use + for indexing. + """ log.info("Starting {}".format(obj.name())) cur = self.conn.cursor(name='main') @@ -171,6 +239,9 @@ class Indexer(object): done_tuples/diff_seconds, obj.name())) def find_free_thread(self): + """ Generator that returns the next connection that is free for + sending a query. + """ ready = self.threads while True: @@ -182,70 +253,6 @@ class Indexer(object): assert(False, "Unreachable code") -class RankRunner(object): - - def __init__(self, rank): - self.rank = rank - - def name(self): - return "rank {}".format(self.rank) - - @classmethod - def prepare(cls): - return """PREPARE rnk_index AS - UPDATE placex - SET indexed_status = 0 WHERE place_id = $1""" - - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM placex - WHERE rank_search = {} and indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""".format(self.rank) - - def sql_nosector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - ORDER BY geometry_sector""".format(self.rank) - - def sql_sector_places(self): - return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} - and geometry_sector = %s""".format(self.rank) - - def sql_index_place(self): - return "EXECUTE rnk_index(%s)" - - -class InterpolationRunner(object): - - def name(self): - return "interpolation lines (location_property_osmline)" - - @classmethod - def prepare(cls): - return """PREPARE ipl_index AS - UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id = $1""" - - def sql_index_sectors(self): - return """SELECT geometry_sector, count(*) FROM location_property_osmline - WHERE indexed_status > 0 - GROUP BY geometry_sector - ORDER BY geometry_sector""" - - def sql_nosector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 - ORDER BY geometry_sector""" - - def sql_sector_places(self): - return """SELECT place_id FROM location_property_osmline - WHERE indexed_status > 0 and geometry_sector = %s - ORDER BY geometry_sector""" - - def sql_index_place(self): - return "EXECUTE ipl_index(%s)" - def nominatim_arg_parser(): """ Setup the command-line parser for the tool. @@ -253,7 +260,7 @@ def nominatim_arg_parser(): def h(s): return re.sub("\s\s+" , " ", s) - p = ArgumentParser(description=__doc__, + p = ArgumentParser(description="Indexing tool for Nominatim.", formatter_class=RawDescriptionHelpFormatter) p.add_argument('-d', '--database', -- 2.39.5