X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/4cb6dc01f382e9fb748efbe4517442af2274f210..bc981d0261896e240f8cd4cf212f7d191a71e152:/test/python/test_tools_replication.py?ds=sidebyside diff --git a/test/python/test_tools_replication.py b/test/python/test_tools_replication.py index b94563ff..2bf2901b 100644 --- a/test/python/test_tools_replication.py +++ b/test/python/test_tools_replication.py @@ -9,6 +9,7 @@ from osmium.replication.server import OsmosisState import nominatim.tools.replication import nominatim.db.status as status +from nominatim.errors import UsageError OSM_NODE_DATA = """\ @@ -17,47 +18,51 @@ OSM_NODE_DATA = """\ """ +@pytest.fixture(autouse=True) +def setup_status_table(status_table): + pass + ### init replication -def test_init_replication_bad_base_url(monkeypatch, status_table, place_row, temp_db_conn, temp_db_cursor): +def test_init_replication_bad_base_url(monkeypatch, place_row, temp_db_conn): place_row(osm_type='N', osm_id=100) - monkeypatch.setattr(nominatim.db.status, "get_url", lambda u : OSM_NODE_DATA) + monkeypatch.setattr(nominatim.db.status, "get_url", lambda u: OSM_NODE_DATA) - with pytest.raises(RuntimeError, match="Failed to reach replication service"): + with pytest.raises(UsageError, match="Failed to reach replication service"): nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io') -def test_init_replication_success(monkeypatch, status_table, place_row, temp_db_conn, temp_db_cursor): +def test_init_replication_success(monkeypatch, place_row, temp_db_conn, temp_db_cursor): place_row(osm_type='N', osm_id=100) - monkeypatch.setattr(nominatim.db.status, "get_url", lambda u : OSM_NODE_DATA) + monkeypatch.setattr(nominatim.db.status, "get_url", lambda u: OSM_NODE_DATA) monkeypatch.setattr(nominatim.tools.replication.ReplicationServer, "timestamp_to_sequence", lambda self, date: 234) nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io') - temp_db_cursor.execute("SELECT * FROM import_status") + expected_date = dt.datetime.strptime('2006-01-27T19:09:10', status.ISODATE_FORMAT)\ + .replace(tzinfo=dt.timezone.utc) - expected_date = dt.datetime.fromisoformat('2006-01-27T19:09:10').replace(tzinfo=dt.timezone.utc) - assert temp_db_cursor.rowcount == 1 - assert temp_db_cursor.fetchone() == [expected_date, 234, True] + assert temp_db_cursor.row_set("SELECT * FROM import_status") \ + == {(expected_date, 234, True)} ### checking for updates -def test_check_for_updates_empty_status_table(status_table, temp_db_conn): +def test_check_for_updates_empty_status_table(temp_db_conn): assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254 -def test_check_for_updates_seq_not_set(status_table, temp_db_conn): +def test_check_for_updates_seq_not_set(temp_db_conn): status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc)) assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254 -def test_check_for_updates_no_state(monkeypatch, status_table, temp_db_conn): +def test_check_for_updates_no_state(monkeypatch, temp_db_conn): status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=345) monkeypatch.setattr(nominatim.tools.replication.ReplicationServer, @@ -67,7 +72,7 @@ def test_check_for_updates_no_state(monkeypatch, status_table, temp_db_conn): @pytest.mark.parametrize("server_sequence,result", [(344, 2), (345, 2), (346, 0)]) -def test_check_for_updates_no_new_data(monkeypatch, status_table, temp_db_conn, +def test_check_for_updates_no_new_data(monkeypatch, temp_db_conn, server_sequence, result): date = dt.datetime.now(dt.timezone.utc) status.set_status(temp_db_conn, date, seq=345) @@ -84,24 +89,24 @@ def test_check_for_updates_no_new_data(monkeypatch, status_table, temp_db_conn, @pytest.fixture def update_options(tmpdir): return dict(base_url='https://test.io', - indexed_only=False, - update_interval=3600, - import_file=tmpdir / 'foo.osm', - max_diff_size=1) + indexed_only=False, + update_interval=3600, + import_file=tmpdir / 'foo.osm', + max_diff_size=1) -def test_update_empty_status_table(status_table, temp_db_conn): - with pytest.raises(RuntimeError): +def test_update_empty_status_table(temp_db_conn): + with pytest.raises(UsageError): nominatim.tools.replication.update(temp_db_conn, {}) -def test_update_already_indexed(status_table, temp_db_conn): +def test_update_already_indexed(temp_db_conn): status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=34, indexed=False) assert nominatim.tools.replication.update(temp_db_conn, dict(indexed_only=True)) \ == nominatim.tools.replication.UpdateState.MORE_PENDING -def test_update_no_data_no_sleep(monkeypatch, status_table, temp_db_conn, update_options): +def test_update_no_data_no_sleep(monkeypatch, temp_db_conn, update_options): date = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=1) status.set_status(temp_db_conn, date, seq=34) @@ -110,7 +115,7 @@ def test_update_no_data_no_sleep(monkeypatch, status_table, temp_db_conn, update lambda *args, **kwargs: None) sleeptime = [] - monkeypatch.setattr(time, 'sleep', lambda s: sleeptime.append(s)) + monkeypatch.setattr(time, 'sleep', sleeptime.append) assert nominatim.tools.replication.update(temp_db_conn, update_options) \ == nominatim.tools.replication.UpdateState.NO_CHANGES @@ -118,7 +123,7 @@ def test_update_no_data_no_sleep(monkeypatch, status_table, temp_db_conn, update assert not sleeptime -def test_update_no_data_sleep(monkeypatch, status_table, temp_db_conn, update_options): +def test_update_no_data_sleep(monkeypatch, temp_db_conn, update_options): date = dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=30) status.set_status(temp_db_conn, date, seq=34) @@ -127,7 +132,7 @@ def test_update_no_data_sleep(monkeypatch, status_table, temp_db_conn, update_op lambda *args, **kwargs: None) sleeptime = [] - monkeypatch.setattr(time, 'sleep', lambda s: sleeptime.append(s)) + monkeypatch.setattr(time, 'sleep', sleeptime.append) assert nominatim.tools.replication.update(temp_db_conn, update_options) \ == nominatim.tools.replication.UpdateState.NO_CHANGES