#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2024 by the Nominatim developer community.
+# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for replication functionality.
<node id="100" visible="true" version="1" changeset="2047" timestamp="2006-01-27T22:09:10Z" user="Foo" uid="111" lat="48.7586670" lon="8.1343060">
</node>
</osm>
-"""
+""" # noqa
+
@pytest.fixture(autouse=True)
def setup_status_table(status_table):
pass
-### init replication
+
+# init replication
def test_init_replication_bad_base_url(monkeypatch, place_row, temp_db_conn):
place_row(osm_type='N', osm_id=100)
nominatim_db.tools.replication.init_replication(temp_db_conn, 'https://test.io')
expected_date = dt.datetime.strptime('2006-01-27T19:09:10', status.ISODATE_FORMAT)\
- .replace(tzinfo=dt.timezone.utc)
+ .replace(tzinfo=dt.timezone.utc)
assert temp_db_cursor.row_set("SELECT * FROM import_status") \
- == {(expected_date, 234, True)}
+ == {(expected_date, 234, True)}
-### checking for updates
+# checking for updates
def test_check_for_updates_empty_status_table(temp_db_conn):
assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
"get_state_info",
lambda self: OsmosisState(server_sequence, date))
- assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == result
+ assert result == \
+ nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io')
-### updating
+# updating
@pytest.fixture
def update_options(tmpdir):
import_file=tmpdir / 'foo.osm',
max_diff_size=1)
+
def test_update_empty_status_table(dsn):
with pytest.raises(UsageError):
nominatim_db.tools.replication.update(dsn, {})
status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=34, indexed=False)
assert nominatim_db.tools.replication.update(dsn, dict(indexed_only=True)) \
- == nominatim_db.tools.replication.UpdateState.MORE_PENDING
+ == nominatim_db.tools.replication.UpdateState.MORE_PENDING
def test_update_no_data_no_sleep(monkeypatch, temp_db_conn, dsn, update_options):
monkeypatch.setattr(time, 'sleep', sleeptime.append)
assert nominatim_db.tools.replication.update(dsn, update_options) \
- == nominatim_db.tools.replication.UpdateState.NO_CHANGES
+ == nominatim_db.tools.replication.UpdateState.NO_CHANGES
assert not sleeptime
monkeypatch.setattr(time, 'sleep', sleeptime.append)
assert nominatim_db.tools.replication.update(dsn, update_options) \
- == nominatim_db.tools.replication.UpdateState.NO_CHANGES
+ == nominatim_db.tools.replication.UpdateState.NO_CHANGES
assert len(sleeptime) == 1
assert sleeptime[0] < 3600