X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/2323923becb127d01636a6eadda33f95a1e80379..aaabb46f20bd6189caebf02c1f1a925cbd08a263:/nominatim/nominatim.py diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index a8221737..b20673d2 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -28,25 +28,13 @@ import sys import re import getpass from datetime import datetime -import psycopg2 -from psycopg2.extras import wait_select import select from indexer.progress import ProgressLogger +from indexer.db import DBConnection, make_connection log = logging.getLogger() -def make_connection(options, asynchronous=False): - params = {'dbname' : options.dbname, - 'user' : options.user, - 'password' : options.password, - 'host' : options.host, - 'port' : options.port, - 'async' : asynchronous} - - return psycopg2.connect(**params) - - class RankRunner(object): """ Returns SQL commands for indexing one rank within the placex table. """ @@ -59,16 +47,17 @@ class RankRunner(object): def sql_count_objects(self): return """SELECT count(*) FROM placex - WHERE rank_search = {} and indexed_status > 0 + WHERE rank_address = {} and indexed_status > 0 """.format(self.rank) def sql_get_objects(self): return """SELECT place_id FROM placex - WHERE indexed_status > 0 and rank_search = {} + WHERE indexed_status > 0 and rank_address = {} ORDER BY geometry_sector""".format(self.rank) - def sql_index_place(self): - return "UPDATE placex SET indexed_status = 0 WHERE place_id = %s" + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) class InterpolationRunner(object): @@ -88,126 +77,77 @@ class InterpolationRunner(object): WHERE indexed_status > 0 ORDER BY geometry_sector""" - def sql_index_place(self): + def sql_index_place(self, ids): return """UPDATE location_property_osmline - SET indexed_status = 0 WHERE place_id = %s""" + SET indexed_status = 0 WHERE place_id IN ({})"""\ + .format(','.join((str(i) for i in ids))) - -class DBConnection(object): - """ A single non-blocking database connection. +class BoundaryRunner(object): + """ Returns SQL commands for indexing the administrative boundaries + of a certain rank. """ - def __init__(self, options): - self.current_query = None - self.current_params = None - - self.conn = None - self.connect() - - def connect(self): - if self.conn is not None: - self.cursor.close() - self.conn.close() - - self.conn = make_connection(options, asynchronous=True) - self.wait() - - self.cursor = self.conn.cursor() - # Disable JIT and parallel workers as they are known to cause problems. - # Update pg_settings instead of using SET because it does not yield - # errors on older versions of Postgres where the settings are not - # implemented. - self.perform( - """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; - UPDATE pg_settings SET setting = 0 - WHERE name = 'max_parallel_workers_per_gather';""") - self.wait() - - def wait(self): - """ Block until any pending operation is done. - """ - while True: - try: - wait_select(self.conn) - self.current_query = None - return - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry." - .format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - def perform(self, sql, args=None): - """ Send SQL query to the server. Returns immediately without - blocking. - """ - self.current_query = sql - self.current_params = args - self.cursor.execute(sql, args) - - def fileno(self): - """ File descriptor to wait for. (Makes this class select()able.) - """ - return self.conn.fileno() + def __init__(self, rank): + self.rank = rank - def is_done(self): - """ Check if the connection is available for a new query. + def name(self): + return "boundaries rank {}".format(self.rank) - Also checks if the previous query has run into a deadlock. - If so, then the previous query is repeated. - """ - if self.current_query is None: - return True - - try: - if self.conn.poll() == psycopg2.extensions.POLL_OK: - self.current_query = None - return True - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) + def sql_count_objects(self): + return """SELECT count(*) FROM placex + WHERE indexed_status > 0 + AND rank_search = {} + AND class = 'boundary' and type = 'administrative'""".format(self.rank) - return False + def sql_get_objects(self): + return """SELECT place_id FROM placex + WHERE indexed_status > 0 and rank_search = {} + and class = 'boundary' and type = 'administrative' + ORDER BY partition, admin_level""".format(self.rank) + def sql_index_place(self, ids): + return "UPDATE placex SET indexed_status = 0 WHERE place_id IN ({})"\ + .format(','.join((str(i) for i in ids))) class Indexer(object): """ Main indexing routine. """ def __init__(self, options): - self.minrank = max(0, options.minrank) + self.minrank = max(1, options.minrank) self.maxrank = min(30, options.maxrank) self.conn = make_connection(options) self.threads = [DBConnection(options) for i in range(options.threads)] - def run(self): - """ Run indexing over the entire database. + def index_boundaries(self): + log.warning("Starting indexing boundaries using {} threads".format( + len(self.threads))) + + for rank in range(max(self.minrank, 5), min(self.maxrank, 26)): + self.index(BoundaryRunner(rank)) + + def index_by_rank(self): + """ Run classic indexing by rank. """ log.warning("Starting indexing rank ({} to {}) using {} threads".format( self.minrank, self.maxrank, len(self.threads))) - for rank in range(self.minrank, self.maxrank): + for rank in range(max(1, self.minrank), self.maxrank): self.index(RankRunner(rank)) if self.maxrank == 30: - self.index(InterpolationRunner()) + self.index(RankRunner(0)) + self.index(InterpolationRunner(), 20) + self.index(RankRunner(self.maxrank), 20) + else: + self.index(RankRunner(self.maxrank)) - self.index(RankRunner(self.maxrank)) - - def index(self, obj): + def index(self, obj, batch=1): """ Index a single rank or table. `obj` describes the SQL to use - for indexing. + for indexing. `batch` describes the number of objects that + should be processed with a single SQL statement """ - log.warning("Starting {}".format(obj.name())) + log.warning("Starting %s (using batch size %s)", obj.name(), batch) cur = self.conn.cursor() cur.execute(obj.sql_count_objects()) @@ -217,24 +157,28 @@ class Indexer(object): cur.close() - next_thread = self.find_free_thread() progress = ProgressLogger(obj.name(), total_tuples) - cur = self.conn.cursor(name='places') - cur.execute(obj.sql_get_objects()) + if total_tuples > 0: + cur = self.conn.cursor(name='places') + cur.execute(obj.sql_get_objects()) - for place in cur: - place_id = place[0] - log.debug("Processing place {}".format(place_id)) - thread = next(next_thread) + next_thread = self.find_free_thread() + while True: + places = [p[0] for p in cur.fetchmany(batch)] + if len(places) == 0: + break - thread.perform(obj.sql_index_place(), (place_id,)) - progress.add() + log.debug("Processing places: {}".format(places)) + thread = next(next_thread) - cur.close() + thread.perform(obj.sql_index_place(places)) + progress.add(len(places)) + + cur.close() - for t in self.threads: - t.wait() + for t in self.threads: + t.wait() progress.done() @@ -290,6 +234,9 @@ def nominatim_arg_parser(): p.add_argument('-P', '--port', dest='port', action='store', help='PostgreSQL server port') + p.add_argument('-b', '--boundary-only', + dest='boundary_only', action='store_true', + help='Only index administrative boundaries (ignores min/maxrank).') p.add_argument('-r', '--minrank', dest='minrank', type=int, metavar='RANK', default=0, help='Minimum/starting rank.') @@ -317,4 +264,7 @@ if __name__ == '__main__': password = getpass.getpass("Database password: ") options.password = password - Indexer(options).run() + if options.boundary_only: + Indexer(options).index_boundaries() + else: + Indexer(options).index_by_rank()