]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_db/db/sql_preprocessor.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_db / db / sql_preprocessor.py
index 691ab6c52cff562aafa8307ecea9d368ffa10c44..85dbaabc97bc582fa92a1126d812d7dc5ebe01fb 100644 (file)
@@ -8,11 +8,13 @@
 Preprocessing of SQL files.
 """
 from typing import Set, Dict, Any, cast
 Preprocessing of SQL files.
 """
 from typing import Set, Dict, Any, cast
+
 import jinja2
 
 from .connection import Connection, server_version_tuple, postgis_version_tuple
 import jinja2
 
 from .connection import Connection, server_version_tuple, postgis_version_tuple
-from .async_connection import WorkerPool
 from ..config import Configuration
 from ..config import Configuration
+from ..db.query_pool import QueryPool
+
 
 def _get_partitions(conn: Connection) -> Set[int]:
     """ Get the set of partitions currently in use.
 
 def _get_partitions(conn: Connection) -> Set[int]:
     """ Get the set of partitions currently in use.
@@ -34,6 +36,7 @@ def _get_tables(conn: Connection) -> Set[str]:
 
         return set((row[0] for row in list(cur)))
 
 
         return set((row[0] for row in list(cur)))
 
+
 def _get_middle_db_format(conn: Connection, tables: Set[str]) -> str:
     """ Returns the version of the slim middle tables.
     """
 def _get_middle_db_format(conn: Connection, tables: Set[str]) -> str:
     """ Returns the version of the slim middle tables.
     """
@@ -72,9 +75,10 @@ def _setup_postgresql_features(conn: Connection) -> Dict[str, Any]:
     ps3 = postgis_version >= (3, 0)
     return {
         'has_index_non_key_column': pg11plus,
     ps3 = postgis_version >= (3, 0)
     return {
         'has_index_non_key_column': pg11plus,
-        'spgist_geom' : 'SPGIST' if pg11plus and ps3 else 'GIST'
+        'spgist_geom': 'SPGIST' if pg11plus and ps3 else 'GIST'
     }
 
     }
 
+
 class SQLPreprocessor:
     """ A environment for preprocessing SQL files from the
         lib-sql directory.
 class SQLPreprocessor:
     """ A environment for preprocessing SQL files from the
         lib-sql directory.
@@ -101,7 +105,6 @@ class SQLPreprocessor:
         self.env.globals['db'] = db_info
         self.env.globals['postgres'] = _setup_postgresql_features(conn)
 
         self.env.globals['db'] = db_info
         self.env.globals['postgres'] = _setup_postgresql_features(conn)
 
-
     def run_string(self, conn: Connection, template: str, **kwargs: Any) -> None:
         """ Execute the given SQL template string on the connection.
             The keyword arguments may supply additional parameters
     def run_string(self, conn: Connection, template: str, **kwargs: Any) -> None:
         """ Execute the given SQL template string on the connection.
             The keyword arguments may supply additional parameters
@@ -113,7 +116,6 @@ class SQLPreprocessor:
             cur.execute(sql)
         conn.commit()
 
             cur.execute(sql)
         conn.commit()
 
-
     def run_sql_file(self, conn: Connection, name: str, **kwargs: Any) -> None:
         """ Execute the given SQL file on the connection. The keyword arguments
             may supply additional parameters for preprocessing.
     def run_sql_file(self, conn: Connection, name: str, **kwargs: Any) -> None:
         """ Execute the given SQL file on the connection. The keyword arguments
             may supply additional parameters for preprocessing.
@@ -124,9 +126,8 @@ class SQLPreprocessor:
             cur.execute(sql)
         conn.commit()
 
             cur.execute(sql)
         conn.commit()
 
-
-    def run_parallel_sql_file(self, dsn: str, name: str, num_threads: int = 1,
-                              **kwargs: Any) -> None:
+    async def run_parallel_sql_file(self, dsn: str, name: str, num_threads: int = 1,
+                                    **kwargs: Any) -> None:
         """ Execute the given SQL files using parallel asynchronous connections.
             The keyword arguments may supply additional parameters for
             preprocessing.
         """ Execute the given SQL files using parallel asynchronous connections.
             The keyword arguments may supply additional parameters for
             preprocessing.
@@ -138,6 +139,6 @@ class SQLPreprocessor:
 
         parts = sql.split('\n---\n')
 
 
         parts = sql.split('\n---\n')
 
-        with WorkerPool(dsn, num_threads) as pool:
+        async with QueryPool(dsn, num_threads) as pool:
             for part in parts:
             for part in parts:
-                pool.next_free_worker().perform(part)
+                await pool.put_query(part, None)