X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/5071710db7832d13341230c9942bc7030a0ec6dc..bc981d0261896e240f8cd4cf212f7d191a71e152:/test/python/test_db_status.py diff --git a/test/python/test_db_status.py b/test/python/test_db_status.py index 9f032763..b6f5a7b1 100644 --- a/test/python/test_db_status.py +++ b/test/python/test_db_status.py @@ -8,10 +8,6 @@ import pytest import nominatim.db.status from nominatim.errors import UsageError -def test_compute_database_date_place_empty(status_table, place_table, temp_db_conn): - with pytest.raises(UsageError): - nominatim.db.status.compute_database_date(temp_db_conn) - OSM_NODE_DATA = """\ @@ -24,7 +20,17 @@ def iso_date(date): .replace(tzinfo=dt.timezone.utc) -def test_compute_database_date_valid(monkeypatch, status_table, place_row, temp_db_conn): +@pytest.fixture(autouse=True) +def setup_status_table(status_table): + pass + + +def test_compute_database_date_place_empty(place_table, temp_db_conn): + with pytest.raises(UsageError): + nominatim.db.status.compute_database_date(temp_db_conn) + + +def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn): place_row(osm_type='N', osm_id=45673) requested_url = [] @@ -40,7 +46,7 @@ def test_compute_database_date_valid(monkeypatch, status_table, place_row, temp_ assert date == iso_date('2006-01-27T22:09:10') -def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_db_conn): +def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn): place_row(osm_type='N', osm_id=45673) requested_url = [] @@ -51,53 +57,47 @@ def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_ monkeypatch.setattr(nominatim.db.status, "get_url", mock_url) with pytest.raises(UsageError): - date = nominatim.db.status.compute_database_date(temp_db_conn) + nominatim.db.status.compute_database_date(temp_db_conn) -def test_set_status_empty_table(status_table, temp_db_conn, temp_db_cursor): +def test_set_status_empty_table(temp_db_conn, temp_db_cursor): date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date) - temp_db_cursor.execute("SELECT * FROM import_status") + assert temp_db_cursor.row_set("SELECT * FROM import_status") == \ + {(date, None, True)} - assert temp_db_cursor.rowcount == 1 - assert temp_db_cursor.fetchone() == [date, None, True] - -def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor): +def test_set_status_filled_table(temp_db_conn, temp_db_cursor): date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date) - assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status") + assert temp_db_cursor.table_rows('import_status') == 1 date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False) - temp_db_cursor.execute("SELECT * FROM import_status") - - assert temp_db_cursor.rowcount == 1 - assert temp_db_cursor.fetchone() == [date, 456, False] + assert temp_db_cursor.row_set("SELECT * FROM import_status") == \ + {(date, 456, False)} -def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor): +def test_set_status_missing_date(temp_db_conn, temp_db_cursor): date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date) - assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status") + assert temp_db_cursor.table_rows('import_status') == 1 nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False) - temp_db_cursor.execute("SELECT * FROM import_status") + assert temp_db_cursor.row_set("SELECT * FROM import_status") == \ + {(date, 456, False)} - assert temp_db_cursor.rowcount == 1 - assert temp_db_cursor.fetchone() == [date, 456, False] - -def test_get_status_empty_table(status_table, temp_db_conn): +def test_get_status_empty_table(temp_db_conn): assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None) -def test_get_status_success(status_table, temp_db_conn): +def test_get_status_success(temp_db_conn): date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False) @@ -107,7 +107,7 @@ def test_get_status_success(status_table, temp_db_conn): @pytest.mark.parametrize("old_state", [True, False]) @pytest.mark.parametrize("new_state", [True, False]) -def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_state): +def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state): date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state) nominatim.db.status.set_indexed(temp_db_conn, new_state) @@ -115,18 +115,21 @@ def test_set_indexed(status_table, temp_db_conn, temp_db_cursor, old_state, new_ assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state -def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor): +def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor): nominatim.db.status.set_indexed(temp_db_conn, True) - assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0 + assert temp_db_cursor.table_rows("import_status") == 0 -def text_log_status(status_table, temp_db_conn): +def test_log_status(temp_db_conn, temp_db_cursor): date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc) start = dt.datetime.now() - dt.timedelta(hours=1) + nominatim.db.status.set_status(temp_db_conn, date=date, seq=56) nominatim.db.status.log_status(temp_db_conn, start, 'index') - assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1 - assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56 - assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date + temp_db_conn.commit() + + assert temp_db_cursor.table_rows("import_osmosis_log") == 1 + assert temp_db_cursor.scalar("SELECT batchseq FROM import_osmosis_log") == 56 + assert temp_db_cursor.scalar("SELECT event FROM import_osmosis_log") == 'index'