]> git.openstreetmap.org Git - nominatim.git/blob - test/python/db/test_status.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / db / test_status.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for status table manipulation.
9 """
10 import datetime as dt
11
12 import pytest
13
14 import nominatim_db.db.status
15 from nominatim_db.errors import UsageError
16
17 OSM_NODE_DATA = """\
18 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
19 <node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
20 </node>
21 </osm>
22 """  # noqa
23
24
25 def iso_date(date):
26     return dt.datetime.strptime(date, nominatim_db.db.status.ISODATE_FORMAT)\
27                .replace(tzinfo=dt.timezone.utc)
28
29
30 @pytest.fixture(autouse=True)
31 def setup_status_table(status_table):
32     pass
33
34
35 @pytest.mark.parametrize('offline', [True, False])
36 def test_compute_database_date_from_osm2pgsql(table_factory, temp_db_conn, offline):
37     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT',
38                   content=(('current_timestamp', '2024-01-03T23:45:54Z'), ))
39
40     date = nominatim_db.db.status.compute_database_date(temp_db_conn, offline=offline)
41     assert date == iso_date('2024-01-03T23:45:54')
42
43
44 def test_compute_database_date_from_osm2pgsql_nodata(table_factory, temp_db_conn):
45     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT')
46
47     with pytest.raises(UsageError,
48                        match='Cannot determine database date from data in offline mode'):
49         nominatim_db.db.status.compute_database_date(temp_db_conn, offline=True)
50
51
52 def test_compute_database_date_place_empty(place_table, temp_db_conn):
53     with pytest.raises(UsageError):
54         nominatim_db.db.status.compute_database_date(temp_db_conn)
55
56
57 def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
58     place_row(osm_type='N', osm_id=45673)
59
60     requested_url = []
61
62     def mock_url(url):
63         requested_url.append(url)
64         return OSM_NODE_DATA
65
66     monkeypatch.setattr(nominatim_db.db.status, "get_url", mock_url)
67
68     date = nominatim_db.db.status.compute_database_date(temp_db_conn)
69
70     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
71     assert date == iso_date('2006-01-27T22:09:10')
72
73
74 def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn):
75     place_row(osm_type='N', osm_id=45673)
76
77     requested_url = []
78
79     def mock_url(url):
80         requested_url.append(url)
81         return '<osm version="0.6" generator="OpenStre'
82
83     monkeypatch.setattr(nominatim_db.db.status, "get_url", mock_url)
84
85     with pytest.raises(UsageError):
86         nominatim_db.db.status.compute_database_date(temp_db_conn)
87
88
89 def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
90     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
91     nominatim_db.db.status.set_status(temp_db_conn, date=date)
92
93     assert temp_db_cursor.row_set("SELECT * FROM import_status") == {(date, None, True)}
94
95
96 def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
97     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
98     nominatim_db.db.status.set_status(temp_db_conn, date=date)
99
100     assert temp_db_cursor.table_rows('import_status') == 1
101
102     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
103     nominatim_db.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
104
105     assert temp_db_cursor.row_set("SELECT * FROM import_status") == {(date, 456, False)}
106
107
108 def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
109     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
110     nominatim_db.db.status.set_status(temp_db_conn, date=date)
111
112     assert temp_db_cursor.table_rows('import_status') == 1
113
114     nominatim_db.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
115
116     assert temp_db_cursor.row_set("SELECT * FROM import_status") == {(date, 456, False)}
117
118
119 def test_get_status_empty_table(temp_db_conn):
120     assert nominatim_db.db.status.get_status(temp_db_conn) == (None, None, None)
121
122
123 def test_get_status_success(temp_db_conn):
124     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
125     nominatim_db.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
126
127     assert nominatim_db.db.status.get_status(temp_db_conn) == (date, 667, False)
128
129
130 @pytest.mark.parametrize("old_state", [True, False])
131 @pytest.mark.parametrize("new_state", [True, False])
132 def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
133     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
134     nominatim_db.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
135     nominatim_db.db.status.set_indexed(temp_db_conn, new_state)
136
137     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
138
139
140 def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
141     nominatim_db.db.status.set_indexed(temp_db_conn, True)
142
143     assert temp_db_cursor.table_rows("import_status") == 0
144
145
146 def test_log_status(temp_db_conn, temp_db_cursor):
147     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
148     start = dt.datetime.now() - dt.timedelta(hours=1)
149
150     nominatim_db.db.status.set_status(temp_db_conn, date=date, seq=56)
151     nominatim_db.db.status.log_status(temp_db_conn, start, 'index')
152
153     temp_db_conn.commit()
154
155     assert temp_db_cursor.table_rows("import_osmosis_log") == 1
156     assert temp_db_cursor.scalar("SELECT batchseq FROM import_osmosis_log") == 56
157     assert temp_db_cursor.scalar("SELECT event FROM import_osmosis_log") == 'index'