X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/d78f0ba80470a33a7a76edfe3ace5108684873cd..fbe7be760be870189a15cc7ea7d7fc89e02a56b6:/test/python/test_db_status.py diff --git a/test/python/test_db_status.py b/test/python/test_db_status.py index d73b099e..c6591471 100644 --- a/test/python/test_db_status.py +++ b/test/python/test_db_status.py @@ -6,9 +6,10 @@ import datetime as dt import pytest import nominatim.db.status +from nominatim.errors import UsageError def test_compute_database_date_place_empty(status_table, place_table, temp_db_conn): - with pytest.raises(RuntimeError): + with pytest.raises(UsageError): nominatim.db.status.compute_database_date(temp_db_conn) OSM_NODE_DATA = """\ @@ -44,7 +45,7 @@ def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_ monkeypatch.setattr(nominatim.db.status, "get_url", mock_url) - with pytest.raises(RuntimeError): + with pytest.raises(UsageError): date = nominatim.db.status.compute_database_date(temp_db_conn) @@ -64,7 +65,6 @@ def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor): assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status") - date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False) @@ -72,3 +72,56 @@ def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor): assert temp_db_cursor.rowcount == 1 assert temp_db_cursor.fetchone() == [date, 456, False] + + +def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor): + date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) + nominatim.db.status.set_status(temp_db_conn, date=date) + + assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status") + + nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False) + + temp_db_cursor.execute("SELECT * FROM import_status") + + assert temp_db_cursor.rowcount == 1 + assert temp_db_cursor.fetchone() == [date, 456, False] + + +def test_get_status_empty_table(status_table, temp_db_conn): + assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None) + + +def test_get_status_success(status_table, temp_db_conn): + date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) + nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False) + + assert nominatim.db.status.get_status(temp_db_conn) == \ + (date, 667, False) + + +@pytest.mark.parametrize("old_state", [True, False]) +@pytest.mark.parametrize("new_state", [True, False]) +def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_state): + date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) + nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state) + nominatim.db.status.set_indexed(temp_db_conn, new_state) + + assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state + + +def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor): + nominatim.db.status.set_indexed(temp_db_conn, True) + + assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0 + + +def text_log_status(status_table, temp_db_conn): + date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) + start = dt.datetime.now() - dt.timedelta(hours=1) + nominatim.db.status.set_status(temp_db_conn, date=date, seq=56) + nominatim.db.status.log_status(temp_db_conn, start, 'index') + + assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1 + assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56 + assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date