"""
Access and helper functions for the status and status log table.
"""
-from typing import Optional, Tuple, cast
+from typing import Optional, Tuple
import datetime as dt
import logging
import re
-from .connection import Connection
+from .connection import Connection, table_exists, execute_scalar
from ..utils.url_utils import get_url
from ..errors import UsageError
-from ..typing import TypedDict
LOG = logging.getLogger()
ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
-class StatusRow(TypedDict):
- """ Dictionary of columns of the import_status table.
- """
- lastimportdate: dt.datetime
- sequence_id: Optional[int]
- indexed: Optional[bool]
-
-
def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetime:
""" Determine the date of the database from the newest object in the
data base.
"""
# If there is a date from osm2pgsql available, use that.
- if conn.table_exists('osm2pgsql_properties'):
+ if table_exists(conn, 'osm2pgsql_properties'):
with conn.cursor() as cur:
cur.execute(""" SELECT value FROM osm2pgsql_properties
WHERE property = 'current_timestamp' """)
raise UsageError("Cannot determine database date from data in offline mode.")
# Else, find the node with the highest ID in the database
- with conn.cursor() as cur:
- if conn.table_exists('place'):
- osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'")
- else:
- osmid = cur.scalar("SELECT max(osm_id) FROM placex WHERE osm_type='N'")
+ if table_exists(conn, 'place'):
+ osmid = execute_scalar(conn, "SELECT max(osm_id) FROM place WHERE osm_type='N'")
+ else:
+ osmid = execute_scalar(conn, "SELECT max(osm_id) FROM placex WHERE osm_type='N'")
- if osmid is None:
- LOG.fatal("No data found in the database.")
- raise UsageError("No data found in the database.")
+ if osmid is None:
+ LOG.fatal("No data found in the database.")
+ raise UsageError("No data found in the database.")
LOG.info("Using node id %d for timestamp lookup", osmid)
# Get the node from the API to find the timestamp when it was created.
if cur.rowcount < 1:
return None, None, None
- row = cast(StatusRow, cur.fetchone())
- return row['lastimportdate'], row['sequence_id'], row['indexed']
+ row = cur.fetchone()
+ assert row
+ return row.lastimportdate, row.sequence_id, row.indexed
def set_indexed(conn: Connection, state: bool) -> None: