]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/db/status.py
make DB helper functions free functions
[nominatim.git] / src / nominatim_db / db / status.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Access and helper functions for the status and status log table.
9 """
10 from typing import Optional, Tuple, cast
11 import datetime as dt
12 import logging
13 import re
14
15 from .connection import Connection, table_exists, execute_scalar
16 from ..utils.url_utils import get_url
17 from ..errors import UsageError
18 from ..typing import TypedDict
19
20 LOG = logging.getLogger()
21 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
22
23
24 class StatusRow(TypedDict):
25     """ Dictionary of columns of the import_status table.
26     """
27     lastimportdate: dt.datetime
28     sequence_id: Optional[int]
29     indexed: Optional[bool]
30
31
32 def compute_database_date(conn: Connection, offline: bool = False) -> dt.datetime:
33     """ Determine the date of the database from the newest object in the
34         data base.
35     """
36     # If there is a date from osm2pgsql available, use that.
37     if table_exists(conn, 'osm2pgsql_properties'):
38         with conn.cursor() as cur:
39             cur.execute(""" SELECT value FROM osm2pgsql_properties
40                             WHERE property = 'current_timestamp' """)
41             row = cur.fetchone()
42             if row is not None:
43                 return dt.datetime.strptime(row[0], "%Y-%m-%dT%H:%M:%SZ")\
44                                   .replace(tzinfo=dt.timezone.utc)
45
46     if offline:
47         raise UsageError("Cannot determine database date from data in offline mode.")
48
49     # Else, find the node with the highest ID in the database
50     if table_exists(conn, 'place'):
51         osmid = execute_scalar(conn, "SELECT max(osm_id) FROM place WHERE osm_type='N'")
52     else:
53         osmid = execute_scalar(conn, "SELECT max(osm_id) FROM placex WHERE osm_type='N'")
54
55     if osmid is None:
56         LOG.fatal("No data found in the database.")
57         raise UsageError("No data found in the database.")
58
59     LOG.info("Using node id %d for timestamp lookup", osmid)
60     # Get the node from the API to find the timestamp when it was created.
61     node_url = f'https://www.openstreetmap.org/api/0.6/node/{osmid}/1'
62     data = get_url(node_url)
63
64     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
65
66     if match is None:
67         LOG.fatal("The node data downloaded from the API does not contain valid data.\n"
68                   "URL used: %s", node_url)
69         raise UsageError("Bad API data.")
70
71     LOG.debug("Found timestamp %s", match.group(1))
72
73     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
74
75
76 def set_status(conn: Connection, date: Optional[dt.datetime],
77                seq: Optional[int] = None, indexed: bool = True) -> None:
78     """ Replace the current status with the given status. If date is `None`
79         then only sequence and indexed will be updated as given. Otherwise
80         the whole status is replaced.
81         The change will be committed to the database.
82     """
83     assert date is None or date.tzinfo == dt.timezone.utc
84     with conn.cursor() as cur:
85         if date is None:
86             cur.execute("UPDATE import_status set sequence_id = %s, indexed = %s",
87                         (seq, indexed))
88         else:
89             cur.execute("TRUNCATE TABLE import_status")
90             cur.execute("""INSERT INTO import_status (lastimportdate, sequence_id, indexed)
91                            VALUES (%s, %s, %s)""", (date, seq, indexed))
92
93     conn.commit()
94
95
96 def get_status(conn: Connection) -> Tuple[Optional[dt.datetime], Optional[int], Optional[bool]]:
97     """ Return the current status as a triple of (date, sequence, indexed).
98         If status has not been set up yet, a triple of None is returned.
99     """
100     with conn.cursor() as cur:
101         cur.execute("SELECT * FROM import_status LIMIT 1")
102         if cur.rowcount < 1:
103             return None, None, None
104
105         row = cast(StatusRow, cur.fetchone())
106         return row['lastimportdate'], row['sequence_id'], row['indexed']
107
108
109 def set_indexed(conn: Connection, state: bool) -> None:
110     """ Set the indexed flag in the status table to the given state.
111     """
112     with conn.cursor() as cur:
113         cur.execute("UPDATE import_status SET indexed = %s", (state, ))
114     conn.commit()
115
116
117 def log_status(conn: Connection, start: dt.datetime,
118                event: str, batchsize: Optional[int] = None) -> None:
119     """ Write a new status line to the `import_osmosis_log` table.
120     """
121     with conn.cursor() as cur:
122         cur.execute("""INSERT INTO import_osmosis_log
123                        (batchend, batchseq, batchsize, starttime, endtime, event)
124                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
125                     (batchsize, start, event))
126     conn.commit()