2 Tests for status table manipulation.
8 import nominatim.db.status
9 from nominatim.errors import UsageError
11 def test_compute_database_date_place_empty(status_table, place_table, temp_db_conn):
12 with pytest.raises(UsageError):
13 nominatim.db.status.compute_database_date(temp_db_conn)
16 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
17 <node id="45673" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
22 def test_compute_database_date_valid(monkeypatch, status_table, place_row, temp_db_conn):
23 place_row(osm_type='N', osm_id=45673)
27 requested_url.append(url)
30 monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
32 date = nominatim.db.status.compute_database_date(temp_db_conn)
34 assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
35 assert date == dt.datetime.fromisoformat('2006-01-27T22:09:10').replace(tzinfo=dt.timezone.utc)
38 def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_db_conn):
39 place_row(osm_type='N', osm_id=45673)
43 requested_url.append(url)
44 return '<osm version="0.6" generator="OpenStre'
46 monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
48 with pytest.raises(UsageError):
49 date = nominatim.db.status.compute_database_date(temp_db_conn)
52 def test_set_status_empty_table(status_table, temp_db_conn, temp_db_cursor):
53 date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
54 nominatim.db.status.set_status(temp_db_conn, date=date)
56 temp_db_cursor.execute("SELECT * FROM import_status")
58 assert temp_db_cursor.rowcount == 1
59 assert temp_db_cursor.fetchone() == [date, None, True]
62 def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
63 date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
64 nominatim.db.status.set_status(temp_db_conn, date=date)
66 assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
68 date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
69 nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
71 temp_db_cursor.execute("SELECT * FROM import_status")
73 assert temp_db_cursor.rowcount == 1
74 assert temp_db_cursor.fetchone() == [date, 456, False]
77 def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor):
78 date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
79 nominatim.db.status.set_status(temp_db_conn, date=date)
81 assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
83 nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
85 temp_db_cursor.execute("SELECT * FROM import_status")
87 assert temp_db_cursor.rowcount == 1
88 assert temp_db_cursor.fetchone() == [date, 456, False]
91 def test_get_status_empty_table(status_table, temp_db_conn):
92 assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
95 def test_get_status_success(status_table, temp_db_conn):
96 date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
97 nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
99 assert nominatim.db.status.get_status(temp_db_conn) == \
103 @pytest.mark.parametrize("old_state", [True, False])
104 @pytest.mark.parametrize("new_state", [True, False])
105 def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_state):
106 date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
107 nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
108 nominatim.db.status.set_indexed(temp_db_conn, new_state)
110 assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
113 def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor):
114 nominatim.db.status.set_indexed(temp_db_conn, True)
116 assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0
119 def text_log_status(status_table, temp_db_conn):
120 date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
121 start = dt.datetime.now() - dt.timedelta(hours=1)
122 nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
123 nominatim.db.status.log_status(temp_db_conn, start, 'index')
125 assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1
126 assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56
127 assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date