# For a full list of authors see the git log.
""" Non-blocking database connections.
"""
-from typing import Callable, Any, Optional, List, Iterator
+from typing import Callable, Any, Optional, Iterator, Sequence
import logging
import select
import time
except ImportError:
__has_psycopg2_errors__ = False
-from nominatim.typing import T_cursor
+from nominatim.typing import T_cursor, Query
LOG = logging.getLogger()
ignore_sql_errors: bool = False) -> None:
self.dsn = dsn
- self.current_query: Optional[str] = None
- self.current_params: Optional[List[Any]] = None
+ self.current_query: Optional[Query] = None
+ self.current_params: Optional[Sequence[Any]] = None
self.ignore_sql_errors = ignore_sql_errors
- self.conn: Optional['psycopg2.connection'] = None
- self.cursor: Optional['psycopg2.cursor'] = None
+ self.conn: Optional['psycopg2._psycopg.connection'] = None
+ self.cursor: Optional['psycopg2._psycopg.cursor'] = None
self.connect(cursor_factory=cursor_factory)
def close(self) -> None:
"""
if self.conn is not None:
if self.cursor is not None:
- self.cursor.close() # type: ignore[no-untyped-call]
+ self.cursor.close()
self.cursor = None
self.conn.close()
# Use a dict to hand in the parameters because async is a reserved
# word in Python3.
- self.conn = psycopg2.connect(**{'dsn': self.dsn, 'async': True})
+ self.conn = psycopg2.connect(**{'dsn': self.dsn, 'async': True}) # type: ignore
+ assert self.conn
self.wait()
if cursor_factory is not None:
self.current_query = None
return
- def perform(self, sql: str, args: Optional[List[Any]] = None) -> None:
+ def perform(self, sql: Query, args: Optional[Sequence[Any]] = None) -> None:
""" Send SQL query to the server. Returns immediately without
blocking.
"""