recheck_interval = args.config.get_int('REPLICATION_RECHECK_INTERVAL')
tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
+ indexer = Indexer(args.config.get_libpq_dsn(), tokenizer, args.threads or 1)
while True:
with connect(args.config.get_libpq_dsn()) as conn:
if state is not replication.UpdateState.NO_CHANGES and args.do_index:
index_start = dt.datetime.now(dt.timezone.utc)
- indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
- args.threads or 1)
indexer.index_full(analyse=False)
with connect(args.config.get_libpq_dsn()) as conn:
assert str(update_mock.last_args[1]['osm2pgsql']) == '/secret/osm2pgsql'
+ @pytest.mark.parametrize("update_interval", [60, 3600])
+ def test_replication_catchup(self, monkeypatch, index_mock, update_interval, placex_table):
+ monkeypatch.setenv('NOMINATIM_REPLICATION_UPDATE_INTERVAL', str(update_interval))
+ states = [nominatim.tools.replication.UpdateState.NO_CHANGES]
+ monkeypatch.setattr(nominatim.tools.replication, 'update',
+ lambda *args, **kwargs: states.pop())
+
+ assert self.call_nominatim('--catch-up') == 0
+
+
def test_replication_update_custom_threads(self, update_mock):
assert self.call_nominatim('--once', '--no-index', '--threads', '4') == 0