From fc50eb8688754b8eae4abf5e22aa921593b97935 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Fri, 14 Aug 2020 10:33:58 +0200 Subject: [PATCH] nominatim: move DBConnection class into its own file --- nominatim/indexer/db.py | 112 ++++++++++++++++++++++++++++++++++++++++ nominatim/nominatim.py | 100 +---------------------------------- 2 files changed, 113 insertions(+), 99 deletions(-) create mode 100644 nominatim/indexer/db.py diff --git a/nominatim/indexer/db.py b/nominatim/indexer/db.py new file mode 100644 index 00000000..037c3fb2 --- /dev/null +++ b/nominatim/indexer/db.py @@ -0,0 +1,112 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. +# Copyright (C) 2020 Sarah Hoffmann + +import logging +import psycopg2 +from psycopg2.extras import wait_select + +log = logging.getLogger() + +def make_connection(options, asynchronous=False): + params = {'dbname' : options.dbname, + 'user' : options.user, + 'password' : options.password, + 'host' : options.host, + 'port' : options.port, + 'async' : asynchronous} + + return psycopg2.connect(**params) + +class DBConnection(object): + """ A single non-blocking database connection. + """ + + def __init__(self, options): + self.current_query = None + self.current_params = None + self.options = options + + self.conn = None + self.connect() + + def connect(self): + """ (Re)connect to the database. Creates an asynchronous connection + with JIT and parallel processing disabled. If a connection was + already open, it is closed and a new connection established. + The caller must ensure that no query is pending before reconnecting. + """ + if self.conn is not None: + self.cursor.close() + self.conn.close() + + self.conn = make_connection(self.options, asynchronous=True) + self.wait() + + self.cursor = self.conn.cursor() + # Disable JIT and parallel workers as they are known to cause problems. + # Update pg_settings instead of using SET because it does not yield + # errors on older versions of Postgres where the settings are not + # implemented. + self.perform( + """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; + UPDATE pg_settings SET setting = 0 + WHERE name = 'max_parallel_workers_per_gather';""") + self.wait() + + def wait(self): + """ Block until any pending operation is done. + """ + while True: + try: + wait_select(self.conn) + self.current_query = None + return + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry." + .format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + + def perform(self, sql, args=None): + """ Send SQL query to the server. Returns immediately without + blocking. + """ + self.current_query = sql + self.current_params = args + self.cursor.execute(sql, args) + + def fileno(self): + """ File descriptor to wait for. (Makes this class select()able.) + """ + return self.conn.fileno() + + def is_done(self): + """ Check if the connection is available for a new query. + + Also checks if the previous query has run into a deadlock. + If so, then the previous query is repeated. + """ + if self.current_query is None: + return True + + try: + if self.conn.poll() == psycopg2.extensions.POLL_OK: + self.current_query = None + return True + except psycopg2.extensions.TransactionRollbackError as e: + if e.pgcode == '40P01': + log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) + self.cursor.execute(self.current_query, self.current_params) + else: + raise + except psycopg2.errors.DeadlockDetected: + self.cursor.execute(self.current_query, self.current_params) + + return False + diff --git a/nominatim/nominatim.py b/nominatim/nominatim.py index e8600ca8..67cd42ee 100755 --- a/nominatim/nominatim.py +++ b/nominatim/nominatim.py @@ -28,25 +28,13 @@ import sys import re import getpass from datetime import datetime -import psycopg2 -from psycopg2.extras import wait_select import select from indexer.progress import ProgressLogger +from indexer.db import DBConnection, make_connection log = logging.getLogger() -def make_connection(options, asynchronous=False): - params = {'dbname' : options.dbname, - 'user' : options.user, - 'password' : options.password, - 'host' : options.host, - 'port' : options.port, - 'async' : asynchronous} - - return psycopg2.connect(**params) - - class RankRunner(object): """ Returns SQL commands for indexing one rank within the placex table. """ @@ -95,92 +83,6 @@ class InterpolationRunner(object): .format(','.join((str(i) for i in ids))) -class DBConnection(object): - """ A single non-blocking database connection. - """ - - def __init__(self, options): - self.current_query = None - self.current_params = None - - self.conn = None - self.connect() - - def connect(self): - if self.conn is not None: - self.cursor.close() - self.conn.close() - - self.conn = make_connection(options, asynchronous=True) - self.wait() - - self.cursor = self.conn.cursor() - # Disable JIT and parallel workers as they are known to cause problems. - # Update pg_settings instead of using SET because it does not yield - # errors on older versions of Postgres where the settings are not - # implemented. - self.perform( - """ UPDATE pg_settings SET setting = -1 WHERE name = 'jit_above_cost'; - UPDATE pg_settings SET setting = 0 - WHERE name = 'max_parallel_workers_per_gather';""") - self.wait() - - def wait(self): - """ Block until any pending operation is done. - """ - while True: - try: - wait_select(self.conn) - self.current_query = None - return - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry." - .format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - def perform(self, sql, args=None): - """ Send SQL query to the server. Returns immediately without - blocking. - """ - self.current_query = sql - self.current_params = args - self.cursor.execute(sql, args) - - def fileno(self): - """ File descriptor to wait for. (Makes this class select()able.) - """ - return self.conn.fileno() - - def is_done(self): - """ Check if the connection is available for a new query. - - Also checks if the previous query has run into a deadlock. - If so, then the previous query is repeated. - """ - if self.current_query is None: - return True - - try: - if self.conn.poll() == psycopg2.extensions.POLL_OK: - self.current_query = None - return True - except psycopg2.extensions.TransactionRollbackError as e: - if e.pgcode == '40P01': - log.info("Deadlock detected (params = {}), retry.".format(self.current_params)) - self.cursor.execute(self.current_query, self.current_params) - else: - raise - except psycopg2.errors.DeadlockDetected: - self.cursor.execute(self.current_query, self.current_params) - - return False - - class Indexer(object): """ Main indexing routine. """ -- 2.39.5