]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_db/db/status.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_db / db / status.py
index 1278359cc02ebae2ac1513162a0b49a0ef37d211..4fe9f4449c1ce24e6af45feb34e8fad1bde9d9f8 100644 (file)
@@ -7,34 +7,25 @@
 """
 Access and helper functions for the status and status log table.
 """
-from typing import Optional, Tuple, cast
+from typing import Optional, Tuple
 import datetime as dt
 import logging
 import re
 
-from .connection import Connection
+from .connection import Connection, table_exists, execute_scalar
 from ..utils.url_utils import get_url
 from ..errors import UsageError
-from ..typing import TypedDict
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
 
-class StatusRow(TypedDict):
-    """ Dictionary of columns of the import_status table.
-    """
-    lastimportdate: dt.datetime
-    sequence_id: Optional[int]
-    indexed: Optional[bool]
-
-
 def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetime:
     """ Determine the date of the database from the newest object in the
         data base.
     """
     # If there is a date from osm2pgsql available, use that.
-    if conn.table_exists('osm2pgsql_properties'):
+    if table_exists(conn, 'osm2pgsql_properties'):
         with conn.cursor() as cur:
             cur.execute(""" SELECT value FROM osm2pgsql_properties
                             WHERE property = 'current_timestamp' """)
@@ -47,15 +38,14 @@ def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetim
         raise UsageError("Cannot determine database date from data in offline mode.")
 
     # Else, find the node with the highest ID in the database
-    with conn.cursor() as cur:
-        if conn.table_exists('place'):
-            osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'")
-        else:
-            osmid = cur.scalar("SELECT max(osm_id) FROM placex WHERE osm_type='N'")
+    if table_exists(conn, 'place'):
+        osmid = execute_scalar(conn, "SELECT max(osm_id) FROM place WHERE osm_type='N'")
+    else:
+        osmid = execute_scalar(conn, "SELECT max(osm_id) FROM placex WHERE osm_type='N'")
 
-        if osmid is None:
-            LOG.fatal("No data found in the database.")
-            raise UsageError("No data found in the database.")
+    if osmid is None:
+        LOG.fatal("No data found in the database.")
+        raise UsageError("No data found in the database.")
 
     LOG.info("Using node id %d for timestamp lookup", osmid)
     # Get the node from the API to find the timestamp when it was created.
@@ -103,8 +93,9 @@ def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int],
         if cur.rowcount < 1:
             return None, None, None
 
-        row = cast(StatusRow, cur.fetchone())
-        return row['lastimportdate'], row['sequence_id'], row['indexed']
+        row = cur.fetchone()
+        assert row
+        return row.lastimportdate, row.sequence_id, row.indexed
 
 
 def set_indexed(conn: Connection, state: bool) -> None: