""" Query execution that logs the SQL query when debugging is enabled.
"""
if LOG.isEnabledFor(logging.DEBUG):
- LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore
+ LOG.debug(self.mogrify(query, args).decode('utf-8')) # type: ignore[no-untyped-call]
super().execute(query, args)
if self.rowcount != 1:
raise RuntimeError("Query did not return a single row.")
- result = self.fetchone() # type: ignore
+ result = self.fetchone() # type: ignore[no-untyped-call]
assert result is not None
return result[0]
if cascade:
sql += ' CASCADE'
- self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore
+ self.execute(pysql.SQL(sql).format(pysql.Identifier(name))) # type: ignore[no-untyped-call]
class Connection(psycopg2.extensions.connection):
return False
if table is not None:
- row = cur.fetchone() # type: ignore
+ row = cur.fetchone() # type: ignore[no-untyped-call]
if row is None or not isinstance(row[0], str):
return False
return row[0] == table
"""
Access and helper functions for the status and status log table.
"""
+from typing import Optional, Tuple, cast
import datetime as dt
import logging
import re
+from typing_extensions import TypedDict
+
+from nominatim.db.connection import Connection
from nominatim.tools.exec_utils import get_url
from nominatim.errors import UsageError
LOG = logging.getLogger()
ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
-def compute_database_date(conn):
+
+class StatusRow(TypedDict):
+ """ Dictionary of columns of the import_status table.
+ """
+ lastimportdate: dt.datetime
+ sequence_id: Optional[int]
+ indexed: Optional[bool]
+
+
+def compute_database_date(conn: Connection) -> dt.datetime:
""" Determine the date of the database from the newest object in the
data base.
"""
return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
-def set_status(conn, date, seq=None, indexed=True):
+def set_status(conn: Connection, date: Optional[dt.datetime],
+ seq: Optional[int] = None, indexed: bool = True) -> None:
""" Replace the current status with the given status. If date is `None`
then only sequence and indexed will be updated as given. Otherwise
the whole status is replaced.
+ The change will be committed to the database.
"""
assert date is None or date.tzinfo == dt.timezone.utc
with conn.cursor() as cur:
conn.commit()
-def get_status(conn):
+def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
""" Return the current status as a triple of (date, sequence, indexed).
If status has not been set up yet, a triple of None is returned.
"""
if cur.rowcount < 1:
return None, None, None
- row = cur.fetchone()
+ row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
return row['lastimportdate'], row['sequence_id'], row['indexed']
-def set_indexed(conn, state):
+def set_indexed(conn: Connection, state: bool) -> None:
""" Set the indexed flag in the status table to the given state.
"""
with conn.cursor() as cur:
conn.commit()
-def log_status(conn, start, event, batchsize=None):
+def log_status(conn: Connection, start: dt.datetime,
+ event: str, batchsize: Optional[int] = None) -> None:
""" Write a new status line to the `import_osmosis_log` table.
"""
with conn.cursor() as cur:
(batchend, batchseq, batchsize, starttime, endtime, event)
SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
(batchsize, start, event))
+ conn.commit()