]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/status.py
Merge pull request #3447 from lonvia/avoid-invalidating-large-features
[nominatim.git] / nominatim / db / status.py
index f0039546474244d97e419281d6b9e365cdb5c3c8..5f92d9599ce2577ffcc5373256d5b85c17b06529 100644 (file)
@@ -12,11 +12,10 @@ import datetime as dt
 import logging
 import re
 
 import logging
 import re
 
-from typing_extensions import TypedDict
-
 from nominatim.db.connection import Connection
 from nominatim.tools.exec_utils import get_url
 from nominatim.errors import UsageError
 from nominatim.db.connection import Connection
 from nominatim.tools.exec_utils import get_url
 from nominatim.errors import UsageError
+from nominatim.typing import TypedDict
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
@@ -30,11 +29,24 @@ class StatusRow(TypedDict):
     indexed: Optional[bool]
 
 
     indexed: Optional[bool]
 
 
-def compute_database_date(conn: Connection) -> dt.datetime:
+def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetime:
     """ Determine the date of the database from the newest object in the
         data base.
     """
     """ Determine the date of the database from the newest object in the
         data base.
     """
-    # First, find the node with the highest ID in the database
+    # If there is a date from osm2pgsql available, use that.
+    if conn.table_exists('osm2pgsql_properties'):
+        with conn.cursor() as cur:
+            cur.execute(""" SELECT value FROM osm2pgsql_properties
+                            WHERE property = 'current_timestamp' """)
+            row = cur.fetchone()
+            if row is not None:
+                return dt.datetime.strptime(row[0], "%Y-%m-%dT%H:%M:%SZ")\
+                                  .replace(tzinfo=dt.timezone.utc)
+
+    if offline:
+        raise UsageError("Cannot determine database date from data in offline mode.")
+
+    # Else, find the node with the highest ID in the database
     with conn.cursor() as cur:
         if conn.table_exists('place'):
             osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'")
     with conn.cursor() as cur:
         if conn.table_exists('place'):
             osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'")
@@ -91,7 +103,7 @@ def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int],
         if cur.rowcount < 1:
             return None, None, None
 
         if cur.rowcount < 1:
             return None, None, None
 
-        row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
+        row = cast(StatusRow, cur.fetchone())
         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
 
         return row['lastimportdate'], row['sequence_id'], row['indexed']