]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_db_status.py
port replication update function to python
[nominatim.git] / test / python / test_db_status.py
1 """
2 Tests for status table manipulation.
3 """
4 import datetime as dt
5
6 import pytest
7
8 import nominatim.db.status
9
10 def test_compute_database_date_place_empty(status_table, place_table, temp_db_conn):
11     with pytest.raises(RuntimeError):
12         nominatim.db.status.compute_database_date(temp_db_conn)
13
14 OSM_NODE_DATA = """\
15 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
16 <node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
17 </node>
18 </osm>
19 """
20
21 def test_compute_database_date_valid(monkeypatch, status_table, place_row, temp_db_conn):
22     place_row(osm_type='N', osm_id=45673)
23
24     requested_url = []
25     def mock_url(url):
26         requested_url.append(url)
27         return OSM_NODE_DATA
28
29     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
30
31     date = nominatim.db.status.compute_database_date(temp_db_conn)
32
33     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
34     assert date == dt.datetime.fromisoformat('2006-01-27T22:09:10').replace(tzinfo=dt.timezone.utc)
35
36
37 def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_db_conn):
38     place_row(osm_type='N', osm_id=45673)
39
40     requested_url = []
41     def mock_url(url):
42         requested_url.append(url)
43         return '<osm version="0.6" generator="OpenStre'
44
45     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
46
47     with pytest.raises(RuntimeError):
48         date = nominatim.db.status.compute_database_date(temp_db_conn)
49
50
51 def test_set_status_empty_table(status_table, temp_db_conn, temp_db_cursor):
52     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
53     nominatim.db.status.set_status(temp_db_conn, date=date)
54
55     temp_db_cursor.execute("SELECT * FROM import_status")
56
57     assert temp_db_cursor.rowcount == 1
58     assert temp_db_cursor.fetchone() == [date, None, True]
59
60
61 def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
62     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
63     nominatim.db.status.set_status(temp_db_conn, date=date)
64
65     assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
66
67
68     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
69     nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
70
71     temp_db_cursor.execute("SELECT * FROM import_status")
72
73     assert temp_db_cursor.rowcount == 1
74     assert temp_db_cursor.fetchone() == [date, 456, False]
75
76
77 def test_get_status_empty_table(status_table, temp_db_conn):
78     assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
79
80
81 def test_get_status_success(status_table, temp_db_conn):
82     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
83     nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
84
85     assert nominatim.db.status.get_status(temp_db_conn) == \
86              (date, 667, False)
87
88
89 @pytest.mark.parametrize("old_state", [True, False])
90 @pytest.mark.parametrize("new_state", [True, False])
91 def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_state):
92     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
93     nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
94     nominatim.db.status.set_indexed(temp_db_conn, new_state)
95
96     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
97
98
99 def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor):
100     nominatim.db.status.set_indexed(temp_db_conn, True)
101
102     assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0
103
104
105 def text_log_status(status_table, temp_db_conn):
106     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
107     start = dt.datetime.now() - dt.timedelta(hours=1)
108     nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
109     nominatim.db.status.log_status(temp_db_conn, start, 'index')
110
111     assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1
112     assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56
113     assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date