+@pytest.mark.parametrize("params,func", [
+ (('--init', '--no-update-functions'), 'init_replication'),
+ (('--check-for-updates',), 'check_for_updates')
+ ])
+def test_replication_command(monkeypatch, temp_db, params, func):
+ func_mock = MockParamCapture()
+ monkeypatch.setattr(nominatim.tools.replication, func, func_mock)
+
+ assert 0 == call_nominatim('replication', *params)
+ assert func_mock.called == 1
+
+
+def test_replication_update_bad_interval(monkeypatch, temp_db):
+ monkeypatch.setenv('NOMINATIM_REPLICATION_UPDATE_INTERVAL', 'xx')
+
+ assert call_nominatim('replication') == 1
+
+
+def test_replication_update_bad_interval_for_geofabrik(monkeypatch, temp_db):
+ monkeypatch.setenv('NOMINATIM_REPLICATION_URL',
+ 'https://download.geofabrik.de/europe/ireland-and-northern-ireland-updates')
+
+ assert call_nominatim('replication') == 1
+
+
+@pytest.mark.parametrize("state, retval", [
+ (nominatim.tools.replication.UpdateState.UP_TO_DATE, 0),
+ (nominatim.tools.replication.UpdateState.NO_CHANGES, 3)
+ ])
+def test_replication_update_once_no_index(monkeypatch, temp_db, temp_db_conn,
+ status_table, state, retval):
+ status.set_status(temp_db_conn, date=dt.datetime.now(dt.timezone.utc), seq=1)
+ func_mock = MockParamCapture(retval=state)
+ monkeypatch.setattr(nominatim.tools.replication, 'update', func_mock)
+
+ assert retval == call_nominatim('replication', '--once', '--no-index')
+
+
+def test_replication_update_continuous(monkeypatch, temp_db_conn, status_table):
+ status.set_status(temp_db_conn, date=dt.datetime.now(dt.timezone.utc), seq=1)
+ states = [nominatim.tools.replication.UpdateState.UP_TO_DATE,
+ nominatim.tools.replication.UpdateState.UP_TO_DATE]
+ monkeypatch.setattr(nominatim.tools.replication, 'update',
+ lambda *args, **kwargs: states.pop())
+
+ index_mock = MockParamCapture()
+ monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', index_mock)
+ monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_by_rank', index_mock)
+
+ with pytest.raises(IndexError):
+ call_nominatim('replication')
+
+ assert index_mock.called == 4
+
+
+def test_replication_update_continuous_no_change(monkeypatch, temp_db_conn, status_table):
+ status.set_status(temp_db_conn, date=dt.datetime.now(dt.timezone.utc), seq=1)
+ states = [nominatim.tools.replication.UpdateState.NO_CHANGES,
+ nominatim.tools.replication.UpdateState.UP_TO_DATE]
+ monkeypatch.setattr(nominatim.tools.replication, 'update',
+ lambda *args, **kwargs: states.pop())
+
+ index_mock = MockParamCapture()
+ monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_boundaries', index_mock)
+ monkeypatch.setattr(nominatim.indexer.indexer.Indexer, 'index_by_rank', index_mock)
+
+ sleep_mock = MockParamCapture()
+ monkeypatch.setattr(time, 'sleep', sleep_mock)
+
+ with pytest.raises(IndexError):
+ call_nominatim('replication')
+
+ assert index_mock.called == 2
+ assert sleep_mock.called == 1
+ assert sleep_mock.last_args[0] == 60
+
+
+def test_serve_command(monkeypatch):
+ func = MockParamCapture()
+ monkeypatch.setattr(nominatim.cli, 'run_php_server', func)
+
+ call_nominatim('serve')
+
+ assert func.called == 1
+