""" Database helper functions for the indexer.
"""
import logging
+import select
+import time
+
import psycopg2
from psycopg2.extras import wait_select
normally.
"""
- def __init__(self, handler):
+ def __init__(self, handler, ignore_sql_errors=False):
self.handler = handler
+ self.ignore_sql_errors = ignore_sql_errors
def __enter__(self):
pass
if exc_value.pgcode == '40P01':
self.handler()
return True
+
+ if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
+ LOG.info("SQL error ignored: %s", exc_value)
+ return True
+
return False
""" A single non-blocking database connection.
"""
- def __init__(self, dsn, cursor_factory=None):
+ def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
self.current_query = None
self.current_params = None
self.dsn = dsn
+ self.ignore_sql_errors = ignore_sql_errors
self.conn = None
self.cursor = None
""" Block until any pending operation is done.
"""
while True:
- with DeadlockHandler(self._deadlock_handler):
+ with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
wait_select(self.conn)
self.current_query = None
return
if self.current_query is None:
return True
- with DeadlockHandler(self._deadlock_handler):
+ with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
return False
+
+
+class WorkerPool:
+ """ A pool of asynchronous database connections.
+
+ The pool may be used as a context manager.
+ """
+ REOPEN_CONNECTIONS_AFTER = 100000
+
+ def __init__(self, dsn, pool_size, ignore_sql_errors=False):
+ self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
+ for _ in range(pool_size)]
+ self.free_workers = self._yield_free_worker()
+ self.wait_time = 0
+
+
+ def finish_all(self):
+ """ Wait for all connection to finish.
+ """
+ for thread in self.threads:
+ while not thread.is_done():
+ thread.wait()
+
+ self.free_workers = self._yield_free_worker()
+
+ def close(self):
+ """ Close all connections and clear the pool.
+ """
+ for thread in self.threads:
+ thread.close()
+ self.threads = []
+ self.free_workers = None
+
+
+ def next_free_worker(self):
+ """ Get the next free connection.
+ """
+ return next(self.free_workers)
+
+
+ def _yield_free_worker(self):
+ ready = self.threads
+ command_stat = 0
+ while True:
+ for thread in ready:
+ if thread.is_done():
+ command_stat += 1
+ yield thread
+
+ if command_stat > self.REOPEN_CONNECTIONS_AFTER:
+ for thread in self.threads:
+ while not thread.is_done():
+ thread.wait()
+ thread.connect()
+ ready = self.threads
+ command_stat = 0
+ else:
+ tstart = time.time()
+ _, ready, _ = select.select([], self.threads, [])
+ self.wait_time += time.time() - tstart
+
+
+ def __enter__(self):
+ return self
+
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.finish_all()
+ self.close()
Main work horse for indexing (computing addresses) the database.
"""
import logging
-import select
import time
import psycopg2.extras
from nominatim.indexer.progress import ProgressLogger
from nominatim.indexer import runners
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import DBConnection, WorkerPool
from nominatim.db.connection import connect
LOG = logging.getLogger()
self.conn.wait()
self.close()
-class WorkerPool:
- """ A pool of asynchronous database connections.
-
- The pool may be used as a context manager.
- """
- REOPEN_CONNECTIONS_AFTER = 100000
-
- def __init__(self, dsn, pool_size):
- self.threads = [DBConnection(dsn) for _ in range(pool_size)]
- self.free_workers = self._yield_free_worker()
- self.wait_time = 0
-
-
- def finish_all(self):
- """ Wait for all connection to finish.
- """
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
-
- self.free_workers = self._yield_free_worker()
-
- def close(self):
- """ Close all connections and clear the pool.
- """
- for thread in self.threads:
- thread.close()
- self.threads = []
- self.free_workers = None
-
-
- def next_free_worker(self):
- """ Get the next free connection.
- """
- return next(self.free_workers)
-
-
- def _yield_free_worker(self):
- ready = self.threads
- command_stat = 0
- while True:
- for thread in ready:
- if thread.is_done():
- command_stat += 1
- yield thread
-
- if command_stat > self.REOPEN_CONNECTIONS_AFTER:
- for thread in self.threads:
- while not thread.is_done():
- thread.wait()
- thread.connect()
- ready = self.threads
- command_stat = 0
- else:
- tstart = time.time()
- _, ready, _ = select.select([], self.threads, [])
- self.wait_time += time.time() - tstart
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.finish_all()
- self.close()
-
class Indexer:
""" Main indexing routine.
import logging
import os
import tarfile
-import selectors
from nominatim.db.connection import connect
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import WorkerPool
from nominatim.db.sql_preprocessor import SQLPreprocessor
return sql_files, tar
-def handle_threaded_sql_statements(sel, file):
+def handle_threaded_sql_statements(pool, file):
""" Handles sql statement with multiplexing
"""
lines = 0
- end_of_file = False
# Using pool of database connections to execute sql statements
- while not end_of_file:
- for key, _ in sel.select(1):
- conn = key.data
- try:
- if conn.is_done():
- sql_query = file.readline()
- lines += 1
- if not sql_query:
- end_of_file = True
- break
- conn.perform(sql_query)
- if lines == 1000:
- print('. ', end='', flush=True)
- lines = 0
- except Exception as exc: # pylint: disable=broad-except
- LOG.info('Wrong SQL statement: %s', exc)
-
-def handle_unregister_connection_pool(sel, place_threads):
- """ Handles unregistering pool of connections
- """
+ for sql_query in file:
+ pool.next_free_worker().perform(sql_query)
+
+ lines += 1
+ if lines == 1000:
+ print('.', end='', flush=True)
+ lines = 0
- while place_threads > 0:
- for key, _ in sel.select(1):
- conn = key.data
- sel.unregister(conn)
- try:
- conn.wait()
- except Exception as exc: # pylint: disable=broad-except
- LOG.info('Wrong SQL statement: %s', exc)
- conn.close()
- place_threads -= 1
def add_tiger_data(data_dir, config, threads):
""" Import tiger data from directory or tar file `data dir`.
# Reading sql_files and then for each file line handling
# sql_query in <threads - 1> chunks.
- sel = selectors.DefaultSelector()
place_threads = max(1, threads - 1)
- # Creates a pool of database connections
- for _ in range(place_threads):
- conn = DBConnection(dsn)
- conn.connect()
- sel.register(conn, selectors.EVENT_WRITE, conn)
-
- for sql_file in sql_files:
- if not tar:
- file = open(sql_file)
- else:
- file = tar.extractfile(sql_file)
-
- handle_threaded_sql_statements(sel, file)
+ with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
+ for sql_file in sql_files:
+ if not tar:
+ file = open(sql_file)
+ else:
+ file = tar.extractfile(sql_file)
- # Unregistering pool of database connections
- handle_unregister_connection_pool(sel, place_threads)
+ handle_threaded_sql_statements(pool, file)
if tar:
tar.close()
conn.wait()
+def test_bad_query_ignore(temp_db):
+ with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
+ conn.connect()
+
+ conn.perform('SELECT efasfjsea')
+
+ conn.wait()
+
+
def exec_with_deadlock(cur, sql, detector):
with DeadlockHandler(lambda *args: detector.append(1)):
cur.execute(sql)
def test_deadlock(simple_conns):
- print(psycopg2.__version__)
cur1, cur2 = simple_conns
cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);