X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/11c0dd235b8d6ee9afea569a482803cbd74e6f09..828da6a4254db001c05e3e3c2074dd791e33d8a5:/nominatim/nominatim.py?ds=sidebyside diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py old mode 100644 new mode 100755 index 75ddef2f..b29bf343 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#! /usr/bin/env python3 #----------------------------------------------------------------------------- # nominatim - [description] #----------------------------------------------------------------------------- @@ -35,9 +35,14 @@ import select log = logging.getLogger() def make_connection(options, asynchronous=False): - return psycopg2.connect(dbname=options.dbname, user=options.user, - password=options.password, host=options.host, - port=options.port, async_=asynchronous) + params = {'dbname' : options.dbname, + 'user' : options.user, + 'password' : options.password, + 'host' : options.host, + 'port' : options.port, + 'async' : asynchronous} + + return psycopg2.connect(**params) class RankRunner(object): @@ -100,23 +105,52 @@ class InterpolationRunner(object): class DBConnection(object): - """ A signle non-blocking database connection. + """ A single non-blocking database connection. """ def __init__(self, options): + self.current_query = None + self.current_params = None + + self.conn = None + self.connect() + + def connect(self): + if self.conn is not None: + self.cursor.close() + self.conn.close() + self.conn = make_connection(options, asynchronous=True) self.wait() self.cursor = self.conn.cursor() - - self.current_query = None - self.current_params = None + # Disable JIT and parallel workers as they are known to cause problems. + # Update pg_settings instead of using SET because it does not yield + # errors on older versions of Postgres where the settings are not + # implemented. + self.perform( + """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; + UPDATE pg_settings SET setting = 0 + WHERE name = 'max_parallel_workers_per_gather';""") + self.wait() def wait(self): """ Block until any pending operation is done. """ - wait_select(self.conn) - self.current_query = None + while True: + try: + wait_select(self.conn) + self.current_query = None + return + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry." + .format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) def perform(self, sql, args=None): """ Send SQL query to the server. Returns immediately without @@ -146,10 +180,12 @@ class DBConnection(object): return True except psycopg2.extensions.TransactionRollbackError as e: if e.pgcode == '40P01': - log.info("Deadlock detected, retry.") + log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) self.cursor.execute(self.current_query, self.current_params) else: raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) return False @@ -167,7 +203,7 @@ class Indexer(object): def run(self): """ Run indexing over the entire database. """ - log.info("Starting indexing rank ({} to {}) using {} threads".format( + log.warning("Starting indexing rank ({} to {}) using {} threads".format( self.minrank, self.maxrank, len(self.threads))) for rank in range(self.minrank, self.maxrank): @@ -175,13 +211,14 @@ class Indexer(object): if self.maxrank == 30: self.index(InterpolationRunner()) - self.index(RankRunner(30)) + + self.index(RankRunner(self.maxrank)) def index(self, obj): """ Index a single rank or table. `obj` describes the SQL to use for indexing. """ - log.info("Starting {}".format(obj.name())) + log.warning("Starting {}".format(obj.name())) cur = self.conn.cursor(name='main') cur.execute(obj.sql_index_sectors()) @@ -200,6 +237,9 @@ class Indexer(object): sector_sql = obj.sql_sector_places() index_sql = obj.sql_index_place() min_grouped_tuples = total_tuples - len(self.threads) * 1000 + + next_info = 100 if log.isEnabledFor(logging.INFO) else total_tuples + 1 + for r in cur: sector = r[0] @@ -221,6 +261,16 @@ class Indexer(object): thread.perform(index_sql, (place_id,)) done_tuples += 1 + if done_tuples >= next_info: + now = datetime.now() + done_time = (now - rank_start_time).total_seconds() + tuples_per_sec = done_tuples / done_time + log.info("Done {} in {} @ {:.3f} per second - {} ETA (seconds): {:.2f}" + .format(done_tuples, int(done_time), + tuples_per_sec, obj.name(), + (total_tuples - done_tuples)/tuples_per_sec)) + next_info += int(tuples_per_sec) + pcur.close() if do_all: @@ -234,7 +284,7 @@ class Indexer(object): rank_end_time = datetime.now() diff_seconds = (rank_end_time-rank_start_time).total_seconds() - log.info("Done {}/{} in {} @ {} per second - FINISHED {}\n".format( + log.warning("Done {}/{} in {} @ {:.3f} per second - FINISHED {}\n".format( done_tuples, total_tuples, int(diff_seconds), done_tuples/diff_seconds, obj.name())) @@ -243,15 +293,27 @@ class Indexer(object): sending a query. """ ready = self.threads + command_stat = 0 while True: for thread in ready: if thread.is_done(): + command_stat += 1 yield thread - ready, _, _ = select.select(self.threads, [], []) + # refresh the connections occasionaly to avoid potential + # memory leaks in Postgresql. + if command_stat > 100000: + for t in self.threads: + while not t.is_done(): + t.wait() + t.connect() + command_stat = 0 + ready = self.threads + else: + ready, _, _ = select.select(self.threads, [], []) - assert(False, "Unreachable code") + assert False, "Unreachable code" def nominatim_arg_parser():