- assert mock_run_legacy.called == 2
- assert mock_run_legacy.last_args == ('update.php', '--recompute-importance')
-
-
-@pytest.mark.parametrize("params,func", [
- (('--init', '--no-update-functions'), 'init_replication'),
- (('--check-for-updates',), 'check_for_updates')
- ])
-def test_replication_command(monkeypatch, temp_db, params, func):
- func_mock = MockParamCapture()
- monkeypatch.setattr(nominatim.tools.replication, func, func_mock)
-
- assert 0 == call_nominatim('replication', *params)
- assert func_mock.called == 1
-
-
-def test_replication_update_bad_interval(monkeypatch, temp_db):
- monkeypatch.setenv('NOMINATIM_REPLICATION_UPDATE_INTERVAL', 'xx')
-
- assert call_nominatim('replication') == 1
-
-
-def test_replication_update_bad_interval_for_geofabrik(monkeypatch, temp_db):
- monkeypatch.setenv('NOMINATIM_REPLICATION_URL',
- 'https://download.geofabrik.de/europe/ireland-and-northern-ireland-updates')
-
- assert call_nominatim('replication') == 1
-
-
-@pytest.mark.parametrize("state, retval", [
- (nominatim.tools.replication.UpdateState.UP_TO_DATE, 0),
- (nominatim.tools.replication.UpdateState.NO_CHANGES, 3)
- ])
-def test_replication_update_once_no_index(monkeypatch, temp_db, temp_db_conn,
- status_table, state, retval):
- status.set_status(temp_db_conn, date=dt.datetime.now(dt.timezone.utc), seq=1)
- func_mock = MockParamCapture(retval=state)
- monkeypatch.setattr(nominatim.tools.replication, 'update', func_mock)
-
- assert retval == call_nominatim('replication', '--once', '--no-index')
-
-
-def test_replication_update_continuous(monkeypatch, temp_db_conn, status_table):
- status.set_status(temp_db_conn, date=dt.datetime.now(dt.timezone.utc), seq=1)
- states = [nominatim.tools.replication.UpdateState.UP_TO_DATE,
- nominatim.tools.replication.UpdateState.UP_TO_DATE]
- monkeypatch.setattr(nominatim.tools.replication, 'update',
- lambda *args, **kwargs: states.pop())
-
- index_mock = MockParamCapture()
- monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', index_mock)
- monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_by_rank', index_mock)
-
- with pytest.raises(IndexError):
- call_nominatim('replication')
-
- assert index_mock.called == 4
-
-
-def test_replication_update_continuous_no_change(monkeypatch, temp_db_conn, status_table):
- status.set_status(temp_db_conn, date=dt.datetime.now(dt.timezone.utc), seq=1)
- states = [nominatim.tools.replication.UpdateState.NO_CHANGES,
- nominatim.tools.replication.UpdateState.UP_TO_DATE]
- monkeypatch.setattr(nominatim.tools.replication, 'update',
- lambda *args, **kwargs: states.pop())
-
- index_mock = MockParamCapture()
- monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', index_mock)
- monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_by_rank', index_mock)
-
- sleep_mock = MockParamCapture()
- monkeypatch.setattr(time, 'sleep', sleep_mock)
-
- with pytest.raises(IndexError):
- call_nominatim('replication')
-
- assert index_mock.called == 2
- assert sleep_mock.called == 1
- assert sleep_mock.last_args[0] == 60