]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/test_db_status.py
ignore failure to get replication date
[nominatim.git] / test / python / test_db_status.py
index d73b099eb7dcd91589d85b9c0c01b14b9d8ed54c..c659147148d4b2d970a5008e984e8fce02cc7242 100644 (file)
@@ -6,9 +6,10 @@ import datetime as dt
 import pytest
 
 import nominatim.db.status
+from nominatim.errors import UsageError
 
 def test_compute_database_date_place_empty(status_table, place_table, temp_db_conn):
-    with pytest.raises(RuntimeError):
+    with pytest.raises(UsageError):
         nominatim.db.status.compute_database_date(temp_db_conn)
 
 OSM_NODE_DATA = """\
@@ -44,7 +45,7 @@ def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_
 
     monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
 
-    with pytest.raises(RuntimeError):
+    with pytest.raises(UsageError):
         date = nominatim.db.status.compute_database_date(temp_db_conn)
 
 
@@ -64,7 +65,6 @@ def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
 
     assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
 
-
     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
     nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
 
@@ -72,3 +72,56 @@ def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
 
     assert temp_db_cursor.rowcount == 1
     assert temp_db_cursor.fetchone() == [date, 456, False]
+
+
+def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor):
+    date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
+    nominatim.db.status.set_status(temp_db_conn, date=date)
+
+    assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
+
+    nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
+
+    temp_db_cursor.execute("SELECT * FROM import_status")
+
+    assert temp_db_cursor.rowcount == 1
+    assert temp_db_cursor.fetchone() == [date, 456, False]
+
+
+def test_get_status_empty_table(status_table, temp_db_conn):
+    assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
+
+
+def test_get_status_success(status_table, temp_db_conn):
+    date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
+    nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
+
+    assert nominatim.db.status.get_status(temp_db_conn) == \
+             (date, 667, False)
+
+
+@pytest.mark.parametrize("old_state", [True, False])
+@pytest.mark.parametrize("new_state", [True, False])
+def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_state):
+    date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
+    nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
+    nominatim.db.status.set_indexed(temp_db_conn, new_state)
+
+    assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
+
+
+def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor):
+    nominatim.db.status.set_indexed(temp_db_conn, True)
+
+    assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0
+
+
+def text_log_status(status_table, temp_db_conn):
+    date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
+    start = dt.datetime.now() - dt.timedelta(hours=1)
+    nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
+    nominatim.db.status.log_status(temp_db_conn, start, 'index')
+
+    assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1
+    assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56
+    assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date