]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/connection.py
add support for new middle table format of osm2pgsql
[nominatim.git] / nominatim / db / connection.py
index cbdbbfef323ccf94abc9917ddbc6322739dceb81..82801ae7995c9d1e5527baec0d9dd89c85e70e4d 100644 (file)
@@ -7,7 +7,7 @@
 """
 Specialised connection and cursor functions.
 """
-from typing import Union, List, Optional, Any, Callable, ContextManager, Mapping, cast, TypeVar, overload, Tuple, Sequence
+from typing import Optional, Any, Callable, ContextManager, Dict, cast, overload, Tuple, Iterable
 import contextlib
 import logging
 import os
@@ -17,14 +17,12 @@ import psycopg2.extensions
 import psycopg2.extras
 from psycopg2 import sql as pysql
 
+from nominatim.typing import SysEnv, Query, T_cursor
 from nominatim.errors import UsageError
 
-Query = Union[str, bytes, pysql.Composable]
-T = TypeVar('T', bound=psycopg2.extensions.cursor)
-
 LOG = logging.getLogger()
 
-class _Cursor(psycopg2.extras.DictCursor):
+class Cursor(psycopg2.extras.DictCursor):
     """ A cursor returning dict-like objects and providing specialised
         execution functions.
     """
@@ -33,12 +31,13 @@ class _Cursor(psycopg2.extras.DictCursor):
         """ Query execution that logs the SQL query when debugging is enabled.
         """
         if LOG.isEnabledFor(logging.DEBUG):
-            LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore
+            LOG.debug(self.mogrify(query, args).decode('utf-8'))
 
         super().execute(query, args)
 
 
-    def execute_values(self, sql: Query, argslist: List[Any], template: Optional[str] = None) -> None:
+    def execute_values(self, sql: Query, argslist: Iterable[Tuple[Any, ...]],
+                       template: Optional[Query] = None) -> None:
         """ Wrapper for the psycopg2 convenience function to execute
             SQL for a list of values.
         """
@@ -56,7 +55,7 @@ class _Cursor(psycopg2.extras.DictCursor):
         if self.rowcount != 1:
             raise RuntimeError("Query did not return a single row.")
 
-        result = self.fetchone() # type: ignore
+        result = self.fetchone()
         assert result is not None
 
         return result[0]
@@ -64,7 +63,7 @@ class _Cursor(psycopg2.extras.DictCursor):
 
     def drop_table(self, name: str, if_exists: bool = True, cascade: bool = False) -> None:
         """ Drop the table with the given name.
-            Set `if_exists` to False if a non-existant table should raise
+            Set `if_exists` to False if a non-existent table should raise
             an exception instead of just being ignored. If 'cascade' is set
             to True then all dependent tables are deleted as well.
         """
@@ -75,26 +74,26 @@ class _Cursor(psycopg2.extras.DictCursor):
         if cascade:
             sql += ' CASCADE'
 
-        self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore
+        self.execute(pysql.SQL(sql).format(pysql.Identifier(name)))
 
 
-class _Connection(psycopg2.extensions.connection):
+class Connection(psycopg2.extensions.connection):
     """ A connection that provides the specialised cursor by default and
         adds convenience functions for administrating the database.
     """
     @overload # type: ignore[override]
-    def cursor(self) -> _Cursor:
+    def cursor(self) -> Cursor:
         ...
 
     @overload
-    def cursor(self, name: str) -> _Cursor:
+    def cursor(self, name: str) -> Cursor:
         ...
 
     @overload
-    def cursor(self, cursor_factory: Callable[..., T]) -> T:
+    def cursor(self, cursor_factory: Callable[..., T_cursor]) -> T_cursor:
         ...
 
-    def cursor(self, cursor_factory  = _Cursor, **kwargs): # type: ignore
+    def cursor(self, cursor_factory  = Cursor, **kwargs): # type: ignore
         """ Return a new cursor. By default the specialised cursor is returned.
         """
         return super().cursor(cursor_factory=cursor_factory, **kwargs)
@@ -132,7 +131,7 @@ class _Connection(psycopg2.extensions.connection):
                 return False
 
             if table is not None:
-                row = cur.fetchone() # type: ignore
+                row = cur.fetchone()
                 if row is None or not isinstance(row[0], str):
                     return False
                 return row[0] == table
@@ -142,7 +141,7 @@ class _Connection(psycopg2.extensions.connection):
 
     def drop_table(self, name: str, if_exists: bool = True, cascade: bool = False) -> None:
         """ Drop the table with the given name.
-            Set `if_exists` to False if a non-existant table should raise
+            Set `if_exists` to False if a non-existent table should raise
             an exception instead of just being ignored.
         """
         with self.cursor() as cur:
@@ -175,19 +174,31 @@ class _Connection(psycopg2.extensions.connection):
 
         return (int(version_parts[0]), int(version_parts[1]))
 
-class _ConnectionContext(ContextManager[_Connection]):
-    connection: _Connection
 
-def connect(dsn: str) -> _ConnectionContext:
+    def extension_loaded(self, extension_name: str) -> bool:
+        """ Return True if the hstore extension is loaded in the database.
+        """
+        with self.cursor() as cur:
+            cur.execute('SELECT extname FROM pg_extension WHERE extname = %s', (extension_name, ))
+            return cur.rowcount > 0
+
+
+class ConnectionContext(ContextManager[Connection]):
+    """ Context manager of the connection that also provides direct access
+        to the underlying connection.
+    """
+    connection: Connection
+
+def connect(dsn: str) -> ConnectionContext:
     """ Open a connection to the database using the specialised connection
         factory. The returned object may be used in conjunction with 'with'.
         When used outside a context manager, use the `connection` attribute
         to get the connection.
     """
     try:
-        conn = psycopg2.connect(dsn, connection_factory=_Connection)
-        ctxmgr = cast(_ConnectionContext, contextlib.closing(conn))
-        ctxmgr.connection = cast(_Connection, conn)
+        conn = psycopg2.connect(dsn, connection_factory=Connection)
+        ctxmgr = cast(ConnectionContext, contextlib.closing(conn))
+        ctxmgr.connection = conn
         return ctxmgr
     except psycopg2.OperationalError as err:
         raise UsageError(f"Cannot connect to database: {err}") from err
@@ -226,7 +237,7 @@ _PG_CONNECTION_STRINGS = {
 
 
 def get_pg_env(dsn: str,
-               base_env: Optional[Mapping[str, Optional[str]]] = None) -> Mapping[str, Optional[str]]:
+               base_env: Optional[SysEnv] = None) -> Dict[str, str]:
     """ Return a copy of `base_env` with the environment variables for
         PostgresSQL set up from the given database connection string.
         If `base_env` is None, then the OS environment is used as a base
@@ -234,7 +245,7 @@ def get_pg_env(dsn: str,
     """
     env = dict(base_env if base_env is not None else os.environ)
 
-    for param, value in psycopg2.extensions.parse_dsn(dsn).items(): # type: ignore
+    for param, value in psycopg2.extensions.parse_dsn(dsn).items():
         if param in _PG_CONNECTION_STRINGS:
             env[_PG_CONNECTION_STRINGS[param]] = value
         else: