X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c314a3092c5b51c7782015f6fa9ac093b46fa174..122ecd46269d48a8b2aacba2474311c0400d2a9d:/src/nominatim_db/db/status.py diff --git a/src/nominatim_db/db/status.py b/src/nominatim_db/db/status.py index 1278359c..4fe9f444 100644 --- a/src/nominatim_db/db/status.py +++ b/src/nominatim_db/db/status.py @@ -7,34 +7,25 @@ """ Access and helper functions for the status and status log table. """ -from typing import Optional, Tuple, cast +from typing import Optional, Tuple import datetime as dt import logging import re -from .connection import Connection +from .connection import Connection, table_exists, execute_scalar from ..utils.url_utils import get_url from ..errors import UsageError -from ..typing import TypedDict LOG = logging.getLogger() ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S' -class StatusRow(TypedDict): - """ Dictionary of columns of the import_status table. - """ - lastimportdate: dt.datetime - sequence_id: Optional[int] - indexed: Optional[bool] - - def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetime: """ Determine the date of the database from the newest object in the data base. """ # If there is a date from osm2pgsql available, use that. - if conn.table_exists('osm2pgsql_properties'): + if table_exists(conn, 'osm2pgsql_properties'): with conn.cursor() as cur: cur.execute(""" SELECT value FROM osm2pgsql_properties WHERE property = 'current_timestamp' """) @@ -47,15 +38,14 @@ def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetim raise UsageError("Cannot determine database date from data in offline mode.") # Else, find the node with the highest ID in the database - with conn.cursor() as cur: - if conn.table_exists('place'): - osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'") - else: - osmid = cur.scalar("SELECT max(osm_id) FROM placex WHERE osm_type='N'") + if table_exists(conn, 'place'): + osmid = execute_scalar(conn, "SELECT max(osm_id) FROM place WHERE osm_type='N'") + else: + osmid = execute_scalar(conn, "SELECT max(osm_id) FROM placex WHERE osm_type='N'") - if osmid is None: - LOG.fatal("No data found in the database.") - raise UsageError("No data found in the database.") + if osmid is None: + LOG.fatal("No data found in the database.") + raise UsageError("No data found in the database.") LOG.info("Using node id %d for timestamp lookup", osmid) # Get the node from the API to find the timestamp when it was created. @@ -103,8 +93,9 @@ def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], if cur.rowcount < 1: return None, None, None - row = cast(StatusRow, cur.fetchone()) - return row['lastimportdate'], row['sequence_id'], row['indexed'] + row = cur.fetchone() + assert row + return row.lastimportdate, row.sequence_id, row.indexed def set_indexed(conn: Connection, state: bool) -> None: