]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/sql_preprocessor.py
add sanitizer for TIGER tags
[nominatim.git] / nominatim / db / sql_preprocessor.py
index 4de53886142f665d4a640539d62b67e98ee374cb..31b4a8c0f9a0042877cb652f6578fe2e896a899a 100644 (file)
@@ -7,10 +7,14 @@
 """
 Preprocessing of SQL files.
 """
 """
 Preprocessing of SQL files.
 """
+from typing import Set, Dict, Any
 import jinja2
 
 import jinja2
 
+from nominatim.db.connection import Connection
+from nominatim.db.async_connection import WorkerPool
+from nominatim.config import Configuration
 
 
-def _get_partitions(conn):
+def _get_partitions(conn: Connection) -> Set[int]:
     """ Get the set of partitions currently in use.
     """
     with conn.cursor() as cur:
     """ Get the set of partitions currently in use.
     """
     with conn.cursor() as cur:
@@ -22,7 +26,7 @@ def _get_partitions(conn):
     return partitions
 
 
     return partitions
 
 
-def _get_tables(conn):
+def _get_tables(conn: Connection) -> Set[str]:
     """ Return the set of tables currently in use.
         Only includes non-partitioned
     """
     """ Return the set of tables currently in use.
         Only includes non-partitioned
     """
@@ -32,7 +36,7 @@ def _get_tables(conn):
         return set((row[0] for row in list(cur)))
 
 
         return set((row[0] for row in list(cur)))
 
 
-def _setup_tablespace_sql(config):
+def _setup_tablespace_sql(config: Configuration) -> Dict[str, str]:
     """ Returns a dict with tablespace expressions for the different tablespace
         kinds depending on whether a tablespace is configured or not.
     """
     """ Returns a dict with tablespace expressions for the different tablespace
         kinds depending on whether a tablespace is configured or not.
     """
@@ -47,7 +51,7 @@ def _setup_tablespace_sql(config):
     return out
 
 
     return out
 
 
-def _setup_postgresql_features(conn):
+def _setup_postgresql_features(conn: Connection) -> Dict[str, Any]:
     """ Set up a dictionary with various optional Postgresql/Postgis features that
         depend on the database version.
     """
     """ Set up a dictionary with various optional Postgresql/Postgis features that
         depend on the database version.
     """
@@ -69,11 +73,11 @@ class SQLPreprocessor:
         and follows its syntax.
     """
 
         and follows its syntax.
     """
 
-    def __init__(self, conn, config):
+    def __init__(self, conn: Connection, config: Configuration) -> None:
         self.env = jinja2.Environment(autoescape=False,
                                       loader=jinja2.FileSystemLoader(str(config.lib_dir.sql)))
 
         self.env = jinja2.Environment(autoescape=False,
                                       loader=jinja2.FileSystemLoader(str(config.lib_dir.sql)))
 
-        db_info = {}
+        db_info: Dict[str, Any] = {}
         db_info['partitions'] = _get_partitions(conn)
         db_info['tables'] = _get_tables(conn)
         db_info['reverse_only'] = 'search_name' not in db_info['tables']
         db_info['partitions'] = _get_partitions(conn)
         db_info['tables'] = _get_tables(conn)
         db_info['reverse_only'] = 'search_name' not in db_info['tables']
@@ -84,7 +88,7 @@ class SQLPreprocessor:
         self.env.globals['postgres'] = _setup_postgresql_features(conn)
 
 
         self.env.globals['postgres'] = _setup_postgresql_features(conn)
 
 
-    def run_sql_file(self, conn, name, **kwargs):
+    def run_sql_file(self, conn: Connection, name: str, **kwargs: Any) -> None:
         """ Execute the given SQL file on the connection. The keyword arguments
             may supply additional parameters for preprocessing.
         """
         """ Execute the given SQL file on the connection. The keyword arguments
             may supply additional parameters for preprocessing.
         """
@@ -93,3 +97,21 @@ class SQLPreprocessor:
         with conn.cursor() as cur:
             cur.execute(sql)
         conn.commit()
         with conn.cursor() as cur:
             cur.execute(sql)
         conn.commit()
+
+
+    def run_parallel_sql_file(self, dsn: str, name: str, num_threads: int = 1,
+                              **kwargs: Any) -> None:
+        """ Execure the given SQL files using parallel asynchronous connections.
+            The keyword arguments may supply additional parameters for
+            preprocessing.
+
+            After preprocessing the SQL code is cut at lines containing only
+            '---'. Each chunk is sent to one of the `num_threads` workers.
+        """
+        sql = self.env.get_template(name).render(**kwargs)
+
+        parts = sql.split('\n---\n')
+
+        with WorkerPool(dsn, num_threads) as pool:
+            for part in parts:
+                pool.next_free_worker().perform(part)