normally.
"""
- def __init__(self, handler):
+ def __init__(self, handler, ignore_sql_errors=False):
self.handler = handler
+ self.ignore_sql_errors = ignore_sql_errors
def __enter__(self):
pass
if exc_value.pgcode == '40P01':
self.handler()
return True
+
+ if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
+ LOG.info("SQL error ignored: %s", exc_value)
+ return True
+
return False
""" A single non-blocking database connection.
"""
- def __init__(self, dsn, cursor_factory=None):
+ def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
self.current_query = None
self.current_params = None
self.dsn = dsn
+ self.ignore_sql_errors = ignore_sql_errors
self.conn = None
self.cursor = None
""" Block until any pending operation is done.
"""
while True:
- with DeadlockHandler(self._deadlock_handler):
+ with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
wait_select(self.conn)
self.current_query = None
return
if self.current_query is None:
return True
- with DeadlockHandler(self._deadlock_handler):
+ with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
"""
REOPEN_CONNECTIONS_AFTER = 100000
- def __init__(self, dsn, pool_size):
- self.threads = [DBConnection(dsn) for _ in range(pool_size)]
+ def __init__(self, dsn, pool_size, ignore_sql_errors=False):
+ self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
+ for _ in range(pool_size)]
self.free_workers = self._yield_free_worker()
self.wait_time = 0
import logging
import os
import tarfile
-import selectors
from nominatim.db.connection import connect
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor
return sql_files, tar
-def handle_threaded_sql_statements(sel, file):
+def handle_threaded_sql_statements(pool, file):
""" Handles sql statement with multiplexing
"""
lines = 0
- end_of_file = False
# Using pool of database connections to execute sql statements
- while not end_of_file:
- for key, _ in sel.select(1):
- conn = key.data
- try:
- if conn.is_done():
- sql_query = file.readline()
- lines += 1
- if not sql_query:
- end_of_file = True
- break
- conn.perform(sql_query)
- if lines == 1000:
- print('. ', end='', flush=True)
- lines = 0
- except Exception as exc: # pylint: disable=broad-except
- LOG.info('Wrong SQL statement: %s', exc)
-
-def handle_unregister_connection_pool(sel, place_threads):
- """ Handles unregistering pool of connections
- """
+ for sql_query in file:
+ pool.next_free_worker().perform(sql_query)
+
+ lines += 1
+ if lines == 1000:
+ print('.', end='', flush=True)
+ lines = 0
- while place_threads > 0:
- for key, _ in sel.select(1):
- conn = key.data
- sel.unregister(conn)
- try:
- conn.wait()
- except Exception as exc: # pylint: disable=broad-except
- LOG.info('Wrong SQL statement: %s', exc)
- conn.close()
- place_threads -= 1
def add_tiger_data(data_dir, config, threads):
""" Import tiger data from directory or tar file `data dir`.
# Reading sql_files and then for each file line handling
# sql_query in <threads - 1> chunks.
- sel = selectors.DefaultSelector()
place_threads = max(1, threads - 1)
- # Creates a pool of database connections
- for _ in range(place_threads):
- conn = DBConnection(dsn)
- conn.connect()
- sel.register(conn, selectors.EVENT_WRITE, conn)
-
- for sql_file in sql_files:
- if not tar:
- file = open(sql_file)
- else:
- file = tar.extractfile(sql_file)
-
- handle_threaded_sql_statements(sel, file)
+ with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
+ for sql_file in sql_files:
+ if not tar:
+ file = open(sql_file)
+ else:
+ file = tar.extractfile(sql_file)
- # Unregistering pool of database connections
- handle_unregister_connection_pool(sel, place_threads)
+ handle_threaded_sql_statements(pool, file)
if tar:
tar.close()
conn.wait()
+def test_bad_query_ignore(temp_db):
+ with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
+ conn.connect()
+
+ conn.perform('SELECT efasfjsea')
+
+ conn.wait()
+
+
def exec_with_deadlock(cur, sql, detector):
with DeadlockHandler(lambda *args: detector.append(1)):
cur.execute(sql)
def test_deadlock(simple_conns):
- print(psycopg2.__version__)
cur1, cur2 = simple_conns
cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);