-# SPDX-License-Identifier: GPL-2.0-only
+# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for replication command of command-line interface wrapper.
import pytest
-import nominatim.cli
-import nominatim.indexer.indexer
-import nominatim.tools.replication
-from nominatim.db import status
+import nominatim_db.cli
+import nominatim_db.indexer.indexer
+import nominatim_db.tools.replication
+import nominatim_db.tools.refresh
+from nominatim_core.db import status
@pytest.fixture
def tokenizer_mock(monkeypatch):
self.finalize_import_called = True
tok = DummyTokenizer()
- monkeypatch.setattr(nominatim.tokenizer.factory, 'get_tokenizer_for_db',
+ monkeypatch.setattr(nominatim_db.tokenizer.factory, 'get_tokenizer_for_db',
lambda *args: tok)
- monkeypatch.setattr(nominatim.tokenizer.factory, 'create_tokenizer',
+ monkeypatch.setattr(nominatim_db.tokenizer.factory, 'create_tokenizer',
lambda *args: tok)
return tok
@pytest.fixture
def index_mock(mock_func_factory, tokenizer_mock, init_status):
- return mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full')
+ return mock_func_factory(nominatim_db.indexer.indexer.Indexer, 'index_full')
@pytest.fixture
def update_mock(mock_func_factory, init_status, tokenizer_mock):
- return mock_func_factory(nominatim.tools.replication, 'update')
+ return mock_func_factory(nominatim_db.tools.replication, 'update')
class TestCliReplication:
@pytest.fixture(autouse=True)
def setup_update_function(self, monkeypatch):
def _mock_updates(states):
- monkeypatch.setattr(nominatim.tools.replication, 'update',
+ monkeypatch.setattr(nominatim_db.tools.replication, 'update',
lambda *args, **kwargs: states.pop())
self.update_states = _mock_updates
(('--check-for-updates',), 'check_for_updates')
])
def test_replication_command(self, mock_func_factory, params, func):
- func_mock = mock_func_factory(nominatim.tools.replication, func)
+ func_mock = mock_func_factory(nominatim_db.tools.replication, func)
if params == ('--init',):
- umock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
+ umock = mock_func_factory(nominatim_db.tools.refresh, 'create_functions')
assert self.call_nominatim(*params) == 0
assert func_mock.called == 1
def test_replication_update_once_no_index(self, update_mock):
assert self.call_nominatim('--once', '--no-index') == 0
- assert str(update_mock.last_args[1]['osm2pgsql']) == 'OSM2PGSQL NOT AVAILABLE'
+ assert str(update_mock.last_args[1]['osm2pgsql']).endswith('OSM2PGSQL NOT AVAILABLE')
def test_replication_update_custom_osm2pgsql(self, monkeypatch, update_mock):
@pytest.mark.parametrize("update_interval", [60, 3600])
def test_replication_catchup(self, placex_table, monkeypatch, index_mock, update_interval):
monkeypatch.setenv('NOMINATIM_REPLICATION_UPDATE_INTERVAL', str(update_interval))
- self.update_states([nominatim.tools.replication.UpdateState.NO_CHANGES])
+ self.update_states([nominatim_db.tools.replication.UpdateState.NO_CHANGES])
assert self.call_nominatim('--catch-up') == 0
def test_replication_update_continuous(self, index_mock):
- self.update_states([nominatim.tools.replication.UpdateState.UP_TO_DATE,
- nominatim.tools.replication.UpdateState.UP_TO_DATE])
+ self.update_states([nominatim_db.tools.replication.UpdateState.UP_TO_DATE,
+ nominatim_db.tools.replication.UpdateState.UP_TO_DATE])
with pytest.raises(IndexError):
self.call_nominatim()
def test_replication_update_continuous_no_change(self, mock_func_factory,
index_mock):
- self.update_states([nominatim.tools.replication.UpdateState.NO_CHANGES,
- nominatim.tools.replication.UpdateState.UP_TO_DATE])
+ self.update_states([nominatim_db.tools.replication.UpdateState.NO_CHANGES,
+ nominatim_db.tools.replication.UpdateState.UP_TO_DATE])
sleep_mock = mock_func_factory(time, 'sleep')