]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_db_status.py
US TIGER data 2021 released
[nominatim.git] / test / python / test_db_status.py
1 """
2 Tests for status table manipulation.
3 """
4 import datetime as dt
5
6 import pytest
7
8 import nominatim.db.status
9 from nominatim.errors import UsageError
10
11 OSM_NODE_DATA = """\
12 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
13 <node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
14 </node>
15 </osm>
16 """
17
18 def iso_date(date):
19     return dt.datetime.strptime(date, nominatim.db.status.ISODATE_FORMAT)\
20                .replace(tzinfo=dt.timezone.utc)
21
22
23 @pytest.fixture(autouse=True)
24 def setup_status_table(status_table):
25     pass
26
27
28 def test_compute_database_date_place_empty(place_table, temp_db_conn):
29     with pytest.raises(UsageError):
30         nominatim.db.status.compute_database_date(temp_db_conn)
31
32
33 def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
34     place_row(osm_type='N', osm_id=45673)
35
36     requested_url = []
37     def mock_url(url):
38         requested_url.append(url)
39         return OSM_NODE_DATA
40
41     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
42
43     date = nominatim.db.status.compute_database_date(temp_db_conn)
44
45     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
46     assert date == iso_date('2006-01-27T22:09:10')
47
48
49 def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn):
50     place_row(osm_type='N', osm_id=45673)
51
52     requested_url = []
53     def mock_url(url):
54         requested_url.append(url)
55         return '<osm version="0.6" generator="OpenStre'
56
57     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
58
59     with pytest.raises(UsageError):
60         nominatim.db.status.compute_database_date(temp_db_conn)
61
62
63 def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
64     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
65     nominatim.db.status.set_status(temp_db_conn, date=date)
66
67     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
68              {(date, None, True)}
69
70
71 def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
72     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
73     nominatim.db.status.set_status(temp_db_conn, date=date)
74
75     assert temp_db_cursor.table_rows('import_status') == 1
76
77     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
78     nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
79
80     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
81              {(date, 456, False)}
82
83
84 def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
85     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
86     nominatim.db.status.set_status(temp_db_conn, date=date)
87
88     assert temp_db_cursor.table_rows('import_status') == 1
89
90     nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
91
92     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
93              {(date, 456, False)}
94
95
96 def test_get_status_empty_table(temp_db_conn):
97     assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
98
99
100 def test_get_status_success(temp_db_conn):
101     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
102     nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
103
104     assert nominatim.db.status.get_status(temp_db_conn) == \
105              (date, 667, False)
106
107
108 @pytest.mark.parametrize("old_state", [True, False])
109 @pytest.mark.parametrize("new_state", [True, False])
110 def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
111     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
112     nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
113     nominatim.db.status.set_indexed(temp_db_conn, new_state)
114
115     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
116
117
118 def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
119     nominatim.db.status.set_indexed(temp_db_conn, True)
120
121     assert temp_db_cursor.table_rows("import_status") == 0
122
123
124 def test_log_status(temp_db_conn, temp_db_cursor):
125     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
126     start = dt.datetime.now() - dt.timedelta(hours=1)
127
128     nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
129     nominatim.db.status.log_status(temp_db_conn, start, 'index')
130
131     temp_db_conn.commit()
132
133     assert temp_db_cursor.table_rows("import_osmosis_log") == 1
134     assert temp_db_cursor.scalar("SELECT batchseq FROM import_osmosis_log") == 56
135     assert temp_db_cursor.scalar("SELECT event FROM import_osmosis_log") == 'index'