]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/status.py
add type annotations to special phrase importer
[nominatim.git] / nominatim / db / status.py
index 71e587875cd4db6c633d98b44118d6bed3202112..f0039546474244d97e419281d6b9e365cdb5c3c8 100644 (file)
@@ -1,17 +1,36 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Access and helper functions for the status and status log table.
 """
 """
 Access and helper functions for the status and status log table.
 """
+from typing import Optional, Tuple, cast
 import datetime as dt
 import logging
 import re
 
 import datetime as dt
 import logging
 import re
 
+from typing_extensions import TypedDict
+
+from nominatim.db.connection import Connection
 from nominatim.tools.exec_utils import get_url
 from nominatim.errors import UsageError
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
 from nominatim.tools.exec_utils import get_url
 from nominatim.errors import UsageError
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
-def compute_database_date(conn):
+
+class StatusRow(TypedDict):
+    """ Dictionary of columns of the import_status table.
+    """
+    lastimportdate: dt.datetime
+    sequence_id: Optional[int]
+    indexed: Optional[bool]
+
+
+def compute_database_date(conn: Connection) -> dt.datetime:
     """ Determine the date of the database from the newest object in the
         data base.
     """
     """ Determine the date of the database from the newest object in the
         data base.
     """
@@ -28,7 +47,7 @@ def compute_database_date(conn):
 
     LOG.info("Using node id %d for timestamp lookup", osmid)
     # Get the node from the API to find the timestamp when it was created.
 
     LOG.info("Using node id %d for timestamp lookup", osmid)
     # Get the node from the API to find the timestamp when it was created.
-    node_url = 'https://www.openstreetmap.org/api/0.6/node/{}/1'.format(osmid)
+    node_url = f'https://www.openstreetmap.org/api/0.6/node/{osmid}/1'
     data = get_url(node_url)
 
     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
     data = get_url(node_url)
 
     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
@@ -43,10 +62,12 @@ def compute_database_date(conn):
     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
 
 
     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
 
 
-def set_status(conn, date, seq=None, indexed=True):
+def set_status(conn: Connection, date: Optional[dt.datetime],
+               seq: Optional[int] = None, indexed: bool = True) -> None:
     """ Replace the current status with the given status. If date is `None`
         then only sequence and indexed will be updated as given. Otherwise
         the whole status is replaced.
     """ Replace the current status with the given status. If date is `None`
         then only sequence and indexed will be updated as given. Otherwise
         the whole status is replaced.
+        The change will be committed to the database.
     """
     assert date is None or date.tzinfo == dt.timezone.utc
     with conn.cursor() as cur:
     """
     assert date is None or date.tzinfo == dt.timezone.utc
     with conn.cursor() as cur:
@@ -61,7 +82,7 @@ def set_status(conn, date, seq=None, indexed=True):
     conn.commit()
 
 
     conn.commit()
 
 
-def get_status(conn):
+def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
     """ Return the current status as a triple of (date, sequence, indexed).
         If status has not been set up yet, a triple of None is returned.
     """
     """ Return the current status as a triple of (date, sequence, indexed).
         If status has not been set up yet, a triple of None is returned.
     """
@@ -70,11 +91,11 @@ def get_status(conn):
         if cur.rowcount < 1:
             return None, None, None
 
         if cur.rowcount < 1:
             return None, None, None
 
-        row = cur.fetchone()
+        row = cast(StatusRow, cur.fetchone()) # type: ignore[no-untyped-call]
         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
 
         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
 
-def set_indexed(conn, state):
+def set_indexed(conn: Connection, state: bool) -> None:
     """ Set the indexed flag in the status table to the given state.
     """
     with conn.cursor() as cur:
     """ Set the indexed flag in the status table to the given state.
     """
     with conn.cursor() as cur:
@@ -82,7 +103,8 @@ def set_indexed(conn, state):
     conn.commit()
 
 
     conn.commit()
 
 
-def log_status(conn, start, event, batchsize=None):
+def log_status(conn: Connection, start: dt.datetime,
+               event: str, batchsize: Optional[int] = None) -> None:
     """ Write a new status line to the `import_osmosis_log` table.
     """
     with conn.cursor() as cur:
     """ Write a new status line to the `import_osmosis_log` table.
     """
     with conn.cursor() as cur:
@@ -90,3 +112,4 @@ def log_status(conn, start, event, batchsize=None):
                        (batchend, batchseq, batchsize, starttime, endtime, event)
                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
                     (batchsize, start, event))
                        (batchend, batchseq, batchsize, starttime, endtime, event)
                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
                     (batchsize, start, event))
+    conn.commit()