]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/db/status.py
add tests for new data invalidation functions
[nominatim.git] / nominatim / db / status.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Access and helper functions for the status and status log table.
9 """
10 import datetime as dt
11 import logging
12 import re
13
14 from nominatim.tools.exec_utils import get_url
15 from nominatim.errors import UsageError
16
17 LOG = logging.getLogger()
18 ISODATE_FORMAT = '%Y-%m-%dT%H:%M:%S'
19
20 def compute_database_date(conn):
21     """ Determine the date of the database from the newest object in the
22         data base.
23     """
24     # First, find the node with the highest ID in the database
25     with conn.cursor() as cur:
26         if conn.table_exists('place'):
27             osmid = cur.scalar("SELECT max(osm_id) FROM place WHERE osm_type='N'")
28         else:
29             osmid = cur.scalar("SELECT max(osm_id) FROM placex WHERE osm_type='N'")
30
31         if osmid is None:
32             LOG.fatal("No data found in the database.")
33             raise UsageError("No data found in the database.")
34
35     LOG.info("Using node id %d for timestamp lookup", osmid)
36     # Get the node from the API to find the timestamp when it was created.
37     node_url = 'https://www.openstreetmap.org/api/0.6/node/{}/1'.format(osmid)
38     data = get_url(node_url)
39
40     match = re.search(r'timestamp="((\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2}))Z"', data)
41
42     if match is None:
43         LOG.fatal("The node data downloaded from the API does not contain valid data.\n"
44                   "URL used: %s", node_url)
45         raise UsageError("Bad API data.")
46
47     LOG.debug("Found timestamp %s", match.group(1))
48
49     return dt.datetime.strptime(match.group(1), ISODATE_FORMAT).replace(tzinfo=dt.timezone.utc)
50
51
52 def set_status(conn, date, seq=None, indexed=True):
53     """ Replace the current status with the given status. If date is `None`
54         then only sequence and indexed will be updated as given. Otherwise
55         the whole status is replaced.
56     """
57     assert date is None or date.tzinfo == dt.timezone.utc
58     with conn.cursor() as cur:
59         if date is None:
60             cur.execute("UPDATE import_status set sequence_id = %s, indexed = %s",
61                         (seq, indexed))
62         else:
63             cur.execute("TRUNCATE TABLE import_status")
64             cur.execute("""INSERT INTO import_status (lastimportdate, sequence_id, indexed)
65                            VALUES (%s, %s, %s)""", (date, seq, indexed))
66
67     conn.commit()
68
69
70 def get_status(conn):
71     """ Return the current status as a triple of (date, sequence, indexed).
72         If status has not been set up yet, a triple of None is returned.
73     """
74     with conn.cursor() as cur:
75         cur.execute("SELECT * FROM import_status LIMIT 1")
76         if cur.rowcount < 1:
77             return None, None, None
78
79         row = cur.fetchone()
80         return row['lastimportdate'], row['sequence_id'], row['indexed']
81
82
83 def set_indexed(conn, state):
84     """ Set the indexed flag in the status table to the given state.
85     """
86     with conn.cursor() as cur:
87         cur.execute("UPDATE import_status SET indexed = %s", (state, ))
88     conn.commit()
89
90
91 def log_status(conn, start, event, batchsize=None):
92     """ Write a new status line to the `import_osmosis_log` table.
93     """
94     with conn.cursor() as cur:
95         cur.execute("""INSERT INTO import_osmosis_log
96                        (batchend, batchseq, batchsize, starttime, endtime, event)
97                        SELECT lastimportdate, sequence_id, %s, %s, now(), %s FROM import_status""",
98                     (batchsize, start, event))