]> git.openstreetmap.org Git - nominatim.git/blob - test/python/db/test_status.py
adapt tests for windowing SQL
[nominatim.git] / test / python / db / test_status.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for status table manipulation.
9 """
10 import datetime as dt
11
12 import pytest
13
14 import nominatim.db.status
15 from nominatim.errors import UsageError
16
17 OSM_NODE_DATA = """\
18 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
19 <node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
20 </node>
21 </osm>
22 """
23
24 def iso_date(date):
25     return dt.datetime.strptime(date, nominatim.db.status.ISODATE_FORMAT)\
26                .replace(tzinfo=dt.timezone.utc)
27
28
29 @pytest.fixture(autouse=True)
30 def setup_status_table(status_table):
31     pass
32
33
34 @pytest.mark.parametrize('offline', [True, False])
35 def test_compute_database_date_from_osm2pgsql(table_factory, temp_db_conn, offline):
36     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT',
37                   content=(('current_timestamp', '2024-01-03T23:45:54Z'), ))
38
39     date = nominatim.db.status.compute_database_date(temp_db_conn, offline=offline)
40     assert date == iso_date('2024-01-03T23:45:54')
41
42
43 def test_compute_database_date_from_osm2pgsql_nodata(table_factory, temp_db_conn):
44     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT')
45
46     with pytest.raises(UsageError, match='Cannot determine database date from data in offline mode'):
47         nominatim.db.status.compute_database_date(temp_db_conn, offline=True)
48
49
50 def test_compute_database_date_place_empty(place_table, temp_db_conn):
51     with pytest.raises(UsageError):
52         nominatim.db.status.compute_database_date(temp_db_conn)
53
54
55 def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
56     place_row(osm_type='N', osm_id=45673)
57
58     requested_url = []
59     def mock_url(url):
60         requested_url.append(url)
61         return OSM_NODE_DATA
62
63     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
64
65     date = nominatim.db.status.compute_database_date(temp_db_conn)
66
67     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
68     assert date == iso_date('2006-01-27T22:09:10')
69
70
71 def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn):
72     place_row(osm_type='N', osm_id=45673)
73
74     requested_url = []
75     def mock_url(url):
76         requested_url.append(url)
77         return '<osm version="0.6" generator="OpenStre'
78
79     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
80
81     with pytest.raises(UsageError):
82         nominatim.db.status.compute_database_date(temp_db_conn)
83
84
85 def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
86     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
87     nominatim.db.status.set_status(temp_db_conn, date=date)
88
89     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
90              {(date, None, True)}
91
92
93 def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
94     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
95     nominatim.db.status.set_status(temp_db_conn, date=date)
96
97     assert temp_db_cursor.table_rows('import_status') == 1
98
99     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
100     nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
101
102     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
103              {(date, 456, False)}
104
105
106 def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
107     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
108     nominatim.db.status.set_status(temp_db_conn, date=date)
109
110     assert temp_db_cursor.table_rows('import_status') == 1
111
112     nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
113
114     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
115              {(date, 456, False)}
116
117
118 def test_get_status_empty_table(temp_db_conn):
119     assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
120
121
122 def test_get_status_success(temp_db_conn):
123     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
124     nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
125
126     assert nominatim.db.status.get_status(temp_db_conn) == \
127              (date, 667, False)
128
129
130 @pytest.mark.parametrize("old_state", [True, False])
131 @pytest.mark.parametrize("new_state", [True, False])
132 def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
133     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
134     nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
135     nominatim.db.status.set_indexed(temp_db_conn, new_state)
136
137     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
138
139
140 def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
141     nominatim.db.status.set_indexed(temp_db_conn, True)
142
143     assert temp_db_cursor.table_rows("import_status") == 0
144
145
146 def test_log_status(temp_db_conn, temp_db_cursor):
147     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
148     start = dt.datetime.now() - dt.timedelta(hours=1)
149
150     nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
151     nominatim.db.status.log_status(temp_db_conn, start, 'index')
152
153     temp_db_conn.commit()
154
155     assert temp_db_cursor.table_rows("import_osmosis_log") == 1
156     assert temp_db_cursor.scalar("SELECT batchseq FROM import_osmosis_log") == 56
157     assert temp_db_cursor.scalar("SELECT event FROM import_osmosis_log") == 'index'