X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/7a1d22ff15849a349e3cffdf604877a11060c749..5011fde1769338135329004b91fbace6d7a2660b:/nominatim/db/async_connection.py diff --git a/nominatim/db/async_connection.py b/nominatim/db/async_connection.py index 1c550198..07767e48 100644 --- a/nominatim/db/async_connection.py +++ b/nominatim/db/async_connection.py @@ -6,7 +6,7 @@ # For a full list of authors see the git log. """ Non-blocking database connections. """ -from typing import Callable, Any, Optional, List, Iterator +from typing import Callable, Any, Optional, Iterator, Sequence import logging import select import time @@ -22,7 +22,7 @@ try: except ImportError: __has_psycopg2_errors__ = False -from nominatim.typing import T_cursor +from nominatim.typing import T_cursor, Query LOG = logging.getLogger() @@ -65,12 +65,12 @@ class DBConnection: ignore_sql_errors: bool = False) -> None: self.dsn = dsn - self.current_query: Optional[str] = None - self.current_params: Optional[List[Any]] = None + self.current_query: Optional[Query] = None + self.current_params: Optional[Sequence[Any]] = None self.ignore_sql_errors = ignore_sql_errors - self.conn: Optional['psycopg2.connection'] = None - self.cursor: Optional['psycopg2.cursor'] = None + self.conn: Optional['psycopg2._psycopg.connection'] = None + self.cursor: Optional['psycopg2._psycopg.cursor'] = None self.connect(cursor_factory=cursor_factory) def close(self) -> None: @@ -78,7 +78,7 @@ class DBConnection: """ if self.conn is not None: if self.cursor is not None: - self.cursor.close() # type: ignore[no-untyped-call] + self.cursor.close() self.cursor = None self.conn.close() @@ -94,7 +94,8 @@ class DBConnection: # Use a dict to hand in the parameters because async is a reserved # word in Python3. - self.conn = psycopg2.connect(**{'dsn': self.dsn, 'async': True}) + self.conn = psycopg2.connect(**{'dsn': self.dsn, 'async': True}) # type: ignore + assert self.conn self.wait() if cursor_factory is not None: @@ -128,7 +129,7 @@ class DBConnection: self.current_query = None return - def perform(self, sql: str, args: Optional[List[Any]] = None) -> None: + def perform(self, sql: Query, args: Optional[Sequence[Any]] = None) -> None: """ Send SQL query to the server. Returns immediately without blocking. """