]> git.openstreetmap.org Git - nominatim.git/commitdiff
Merge pull request #2326 from lonvia/wokerpool-for-tiger-data
authorSarah Hoffmann <lonvia@denofr.de>
Thu, 13 May 2021 20:09:56 +0000 (22:09 +0200)
committerGitHub <noreply@github.com>
Thu, 13 May 2021 20:09:56 +0000 (22:09 +0200)
Use WorkerPool when importing Tiger data

nominatim/db/async_connection.py
nominatim/indexer/indexer.py
nominatim/tools/tiger_data.py
test/python/test_db_async_connection.py

index a4f554965ab9144a89bb609296ee80ca82049ac3..db4b89ce2fa74b6a03763dccac1f00df5f8ef22e 100644 (file)
@@ -6,6 +6,9 @@
 """ Database helper functions for the indexer.
 """
 import logging
+import select
+import time
+
 import psycopg2
 from psycopg2.extras import wait_select
 
@@ -25,8 +28,9 @@ class DeadlockHandler:
         normally.
     """
 
-    def __init__(self, handler):
+    def __init__(self, handler, ignore_sql_errors=False):
         self.handler = handler
+        self.ignore_sql_errors = ignore_sql_errors
 
     def __enter__(self):
         pass
@@ -41,6 +45,11 @@ class DeadlockHandler:
                 if exc_value.pgcode == '40P01':
                     self.handler()
                     return True
+
+        if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error):
+            LOG.info("SQL error ignored: %s", exc_value)
+            return True
+
         return False
 
 
@@ -48,10 +57,11 @@ class DBConnection:
     """ A single non-blocking database connection.
     """
 
-    def __init__(self, dsn, cursor_factory=None):
+    def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False):
         self.current_query = None
         self.current_params = None
         self.dsn = dsn
+        self.ignore_sql_errors = ignore_sql_errors
 
         self.conn = None
         self.cursor = None
@@ -98,7 +108,7 @@ class DBConnection:
         """ Block until any pending operation is done.
         """
         while True:
-            with DeadlockHandler(self._deadlock_handler):
+            with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
                 wait_select(self.conn)
                 self.current_query = None
                 return
@@ -125,9 +135,78 @@ class DBConnection:
         if self.current_query is None:
             return True
 
-        with DeadlockHandler(self._deadlock_handler):
+        with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors):
             if self.conn.poll() == psycopg2.extensions.POLL_OK:
                 self.current_query = None
                 return True
 
         return False
+
+
+class WorkerPool:
+    """ A pool of asynchronous database connections.
+
+        The pool may be used as a context manager.
+    """
+    REOPEN_CONNECTIONS_AFTER = 100000
+
+    def __init__(self, dsn, pool_size, ignore_sql_errors=False):
+        self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors)
+                        for _ in range(pool_size)]
+        self.free_workers = self._yield_free_worker()
+        self.wait_time = 0
+
+
+    def finish_all(self):
+        """ Wait for all connection to finish.
+        """
+        for thread in self.threads:
+            while not thread.is_done():
+                thread.wait()
+
+        self.free_workers = self._yield_free_worker()
+
+    def close(self):
+        """ Close all connections and clear the pool.
+        """
+        for thread in self.threads:
+            thread.close()
+        self.threads = []
+        self.free_workers = None
+
+
+    def next_free_worker(self):
+        """ Get the next free connection.
+        """
+        return next(self.free_workers)
+
+
+    def _yield_free_worker(self):
+        ready = self.threads
+        command_stat = 0
+        while True:
+            for thread in ready:
+                if thread.is_done():
+                    command_stat += 1
+                    yield thread
+
+            if command_stat > self.REOPEN_CONNECTIONS_AFTER:
+                for thread in self.threads:
+                    while not thread.is_done():
+                        thread.wait()
+                    thread.connect()
+                ready = self.threads
+                command_stat = 0
+            else:
+                tstart = time.time()
+                _, ready, _ = select.select([], self.threads, [])
+                self.wait_time += time.time() - tstart
+
+
+    def __enter__(self):
+        return self
+
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.finish_all()
+        self.close()
index b7673abaddc8090a896351c7ad230f372742a739..5ab0eac3dca5701562085ffecdce652e110f9cfd 100644 (file)
@@ -2,14 +2,13 @@
 Main work horse for indexing (computing addresses) the database.
 """
 import logging
-import select
 import time
 
 import psycopg2.extras
 
 from nominatim.indexer.progress import ProgressLogger
 from nominatim.indexer import runners
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import DBConnection, WorkerPool
 from nominatim.db.connection import connect
 
 LOG = logging.getLogger()
@@ -81,73 +80,6 @@ class PlaceFetcher:
         self.conn.wait()
         self.close()
 
-class WorkerPool:
-    """ A pool of asynchronous database connections.
-
-        The pool may be used as a context manager.
-    """
-    REOPEN_CONNECTIONS_AFTER = 100000
-
-    def __init__(self, dsn, pool_size):
-        self.threads = [DBConnection(dsn) for _ in range(pool_size)]
-        self.free_workers = self._yield_free_worker()
-        self.wait_time = 0
-
-
-    def finish_all(self):
-        """ Wait for all connection to finish.
-        """
-        for thread in self.threads:
-            while not thread.is_done():
-                thread.wait()
-
-        self.free_workers = self._yield_free_worker()
-
-    def close(self):
-        """ Close all connections and clear the pool.
-        """
-        for thread in self.threads:
-            thread.close()
-        self.threads = []
-        self.free_workers = None
-
-
-    def next_free_worker(self):
-        """ Get the next free connection.
-        """
-        return next(self.free_workers)
-
-
-    def _yield_free_worker(self):
-        ready = self.threads
-        command_stat = 0
-        while True:
-            for thread in ready:
-                if thread.is_done():
-                    command_stat += 1
-                    yield thread
-
-            if command_stat > self.REOPEN_CONNECTIONS_AFTER:
-                for thread in self.threads:
-                    while not thread.is_done():
-                        thread.wait()
-                    thread.connect()
-                ready = self.threads
-                command_stat = 0
-            else:
-                tstart = time.time()
-                _, ready, _ = select.select([], self.threads, [])
-                self.wait_time += time.time() - tstart
-
-
-    def __enter__(self):
-        return self
-
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        self.finish_all()
-        self.close()
-
 
 class Indexer:
     """ Main indexing routine.
index 07772c702a53f3a554cb8a42f54f9de388da56cd..fbcdb077bc12674782d6b0bc871538cad8b44e2f 100644 (file)
@@ -4,10 +4,9 @@ Functions for importing tiger data and handling tarbar and directory files
 import logging
 import os
 import tarfile
-import selectors
 
 from nominatim.db.connection import connect
-from nominatim.db.async_connection import DBConnection
+from nominatim.db.async_connection import WorkerPool
 from nominatim.db.sql_preprocessor import SQLPreprocessor
 
 
@@ -37,44 +36,20 @@ def handle_tarfile_or_directory(data_dir):
     return sql_files, tar
 
 
-def handle_threaded_sql_statements(sel, file):
+def handle_threaded_sql_statements(pool, file):
     """ Handles sql statement with multiplexing
     """
 
     lines = 0
-    end_of_file = False
     # Using pool of database connections to execute sql statements
-    while not end_of_file:
-        for key, _ in sel.select(1):
-            conn = key.data
-            try:
-                if conn.is_done():
-                    sql_query = file.readline()
-                    lines += 1
-                    if not sql_query:
-                        end_of_file = True
-                        break
-                    conn.perform(sql_query)
-                    if lines == 1000:
-                        print('. ', end='', flush=True)
-                        lines = 0
-            except Exception as exc: # pylint: disable=broad-except
-                LOG.info('Wrong SQL statement: %s', exc)
-
-def handle_unregister_connection_pool(sel, place_threads):
-    """ Handles unregistering pool of connections
-    """
+    for sql_query in file:
+        pool.next_free_worker().perform(sql_query)
+
+        lines += 1
+        if lines == 1000:
+            print('.', end='', flush=True)
+            lines = 0
 
-    while place_threads > 0:
-        for key, _ in sel.select(1):
-            conn = key.data
-            sel.unregister(conn)
-            try:
-                conn.wait()
-            except Exception as exc: # pylint: disable=broad-except
-                LOG.info('Wrong SQL statement: %s', exc)
-            conn.close()
-            place_threads -= 1
 
 def add_tiger_data(data_dir, config, threads):
     """ Import tiger data from directory or tar file `data dir`.
@@ -91,25 +66,16 @@ def add_tiger_data(data_dir, config, threads):
 
     # Reading sql_files and then for each file line handling
     # sql_query in <threads - 1> chunks.
-    sel = selectors.DefaultSelector()
     place_threads = max(1, threads - 1)
 
-    # Creates a pool of database connections
-    for _ in range(place_threads):
-        conn = DBConnection(dsn)
-        conn.connect()
-        sel.register(conn, selectors.EVENT_WRITE, conn)
-
-    for sql_file in sql_files:
-        if not tar:
-            file = open(sql_file)
-        else:
-            file = tar.extractfile(sql_file)
-
-        handle_threaded_sql_statements(sel, file)
+    with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool:
+        for sql_file in sql_files:
+            if not tar:
+                file = open(sql_file)
+            else:
+                file = tar.extractfile(sql_file)
 
-    # Unregistering pool of database connections
-    handle_unregister_connection_pool(sel, place_threads)
+            handle_threaded_sql_statements(pool, file)
 
     if tar:
         tar.close()
index b52f7053f4a8d601facfbf48a924ae564b7688e3..330b86f7cb46e4e324de82467d461523e0dd94b8 100644 (file)
@@ -56,13 +56,21 @@ def test_bad_query(conn):
         conn.wait()
 
 
+def test_bad_query_ignore(temp_db):
+    with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn:
+        conn.connect()
+
+        conn.perform('SELECT efasfjsea')
+
+        conn.wait()
+
+
 def exec_with_deadlock(cur, sql, detector):
     with DeadlockHandler(lambda *args: detector.append(1)):
         cur.execute(sql)
 
 
 def test_deadlock(simple_conns):
-    print(psycopg2.__version__)
     cur1, cur2 = simple_conns
 
     cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);