]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/db/status.py
Merge pull request #3353 from mtmail/add-codespell
[nominatim.git] / nominatim / db / status.py
index 12b24a837d965e07006256cf655248567541cdde..2c01de71466dca8fb237461509062c5c8c3e248a 100644 (file)
@@ -7,17 +7,29 @@
 """
 Access and helper functions for the status and status log table.
 """
 """
 Access and helper functions for the status and status log table.
 """
+from typing import Optional, Tuple, cast
 import datetime as dt
 import logging
 import re
 
 import datetime as dt
 import logging
 import re
 
+from nominatim.db.connection import Connection
 from nominatim.tools.exec_utils import get_url
 from nominatim.errors import UsageError
 from nominatim.tools.exec_utils import get_url
 from nominatim.errors import UsageError
+from nominatim.typing import TypedDict
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
 
 LOG = logging.getLogger()
 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
 
-def compute_database_date(conn):
+
+class StatusRow(TypedDict):
+    """ Dictionary of columns of the import_status table.
+    """
+    lastimportdate: dt.datetime
+    sequence_id: Optional[int]
+    indexed: Optional[bool]
+
+
+def compute_database_date(conn: Connection) -> dt.datetime:
     """ Determine the date of the database from the newest object in the
         data base.
     """
     """ Determine the date of the database from the newest object in the
         data base.
     """
@@ -34,7 +46,7 @@ def compute_database_date(conn):
 
     LOG.info("Using node id %d for timestamp lookup", osmid)
     # Get the node from the API to find the timestamp when it was created.
 
     LOG.info("Using node id %d for timestamp lookup", osmid)
     # Get the node from the API to find the timestamp when it was created.
-    node_url = 'https://www.openstreetmap.org/api/0.6/node/{}/1'.format(osmid)
+    node_url = f'https://www.openstreetmap.org/api/0.6/node/{osmid}/1'
     data = get_url(node_url)
 
     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
     data = get_url(node_url)
 
     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
@@ -49,10 +61,12 @@ def compute_database_date(conn):
     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
 
 
     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
 
 
-def set_status(conn, date, seq=None, indexed=True):
+def set_status(conn: Connection, date: Optional[dt.datetime],
+               seq: Optional[int] = None, indexed: bool = True) -> None:
     """ Replace the current status with the given status. If date is `None`
         then only sequence and indexed will be updated as given. Otherwise
         the whole status is replaced.
     """ Replace the current status with the given status. If date is `None`
         then only sequence and indexed will be updated as given. Otherwise
         the whole status is replaced.
+        The change will be committed to the database.
     """
     assert date is None or date.tzinfo == dt.timezone.utc
     with conn.cursor() as cur:
     """
     assert date is None or date.tzinfo == dt.timezone.utc
     with conn.cursor() as cur:
@@ -67,7 +81,7 @@ def set_status(conn, date, seq=None, indexed=True):
     conn.commit()
 
 
     conn.commit()
 
 
-def get_status(conn):
+def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
     """ Return the current status as a triple of (date, sequence, indexed).
         If status has not been set up yet, a triple of None is returned.
     """
     """ Return the current status as a triple of (date, sequence, indexed).
         If status has not been set up yet, a triple of None is returned.
     """
@@ -76,11 +90,11 @@ def get_status(conn):
         if cur.rowcount < 1:
             return None, None, None
 
         if cur.rowcount < 1:
             return None, None, None
 
-        row = cur.fetchone()
+        row = cast(StatusRow, cur.fetchone())
         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
 
         return row['lastimportdate'], row['sequence_id'], row['indexed']
 
 
-def set_indexed(conn, state):
+def set_indexed(conn: Connection, state: bool) -> None:
     """ Set the indexed flag in the status table to the given state.
     """
     with conn.cursor() as cur:
     """ Set the indexed flag in the status table to the given state.
     """
     with conn.cursor() as cur:
@@ -88,7 +102,8 @@ def set_indexed(conn, state):
     conn.commit()
 
 
     conn.commit()
 
 
-def log_status(conn, start, event, batchsize=None):
+def log_status(conn: Connection, start: dt.datetime,
+               event: str, batchsize: Optional[int] = None) -> None:
     """ Write a new status line to the `import_osmosis_log` table.
     """
     with conn.cursor() as cur:
     """ Write a new status line to the `import_osmosis_log` table.
     """
     with conn.cursor() as cur:
@@ -96,3 +111,4 @@ def log_status(conn, start, event, batchsize=None):
                        (batchend, batchseq, batchsize, starttime, endtime, event)
                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
                     (batchsize, start, event))
                        (batchend, batchseq, batchsize, starttime, endtime, event)
                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
                     (batchsize, start, event))
+    conn.commit()