]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/tools/test_replication.py
Merge pull request #3492 from lonvia/drop-waste-disposal
[nominatim.git] / test / python / tools / test_replication.py
index 2bf2901b33a8b8794d05c3e3e637fd0b85147113..392ea0750b6f50c11c5006e4ef6eb09ceb7763f3 100644 (file)
@@ -1,3 +1,9 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2024 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Tests for replication functionality.
 """
@@ -7,9 +13,9 @@ import time
 import pytest
 from osmium.replication.server import OsmosisState
 
-import nominatim.tools.replication
-import nominatim.db.status as status
-from nominatim.errors import UsageError
+import nominatim_db.tools.replication
+import nominatim_db.db.status as status
+from nominatim_db.errors import UsageError
 
 OSM_NODE_DATA = """\
 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
@@ -27,21 +33,21 @@ def setup_status_table(status_table):
 def test_init_replication_bad_base_url(monkeypatch, place_row, temp_db_conn):
     place_row(osm_type='N', osm_id=100)
 
-    monkeypatch.setattr(nominatim.db.status, "get_url", lambda u: OSM_NODE_DATA)
+    monkeypatch.setattr(status, "get_url", lambda u: OSM_NODE_DATA)
 
     with pytest.raises(UsageError, match="Failed to reach replication service"):
-        nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io')
+        nominatim_db.tools.replication.init_replication(temp_db_conn, 'https://test.io')
 
 
 def test_init_replication_success(monkeypatch, place_row, temp_db_conn, temp_db_cursor):
     place_row(osm_type='N', osm_id=100)
 
-    monkeypatch.setattr(nominatim.db.status, "get_url", lambda u: OSM_NODE_DATA)
-    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+    monkeypatch.setattr(status, "get_url", lambda u: OSM_NODE_DATA)
+    monkeypatch.setattr(nominatim_db.tools.replication.ReplicationServer,
                         "timestamp_to_sequence",
                         lambda self, date: 234)
 
-    nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io')
+    nominatim_db.tools.replication.init_replication(temp_db_conn, 'https://test.io')
 
     expected_date = dt.datetime.strptime('2006-01-27T19:09:10', status.ISODATE_FORMAT)\
                         .replace(tzinfo=dt.timezone.utc)
@@ -53,22 +59,22 @@ def test_init_replication_success(monkeypatch, place_row, temp_db_conn, temp_db_
 ### checking for updates
 
 def test_check_for_updates_empty_status_table(temp_db_conn):
-    assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
+    assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
 
 
 def test_check_for_updates_seq_not_set(temp_db_conn):
     status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc))
 
-    assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
+    assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
 
 
 def test_check_for_updates_no_state(monkeypatch, temp_db_conn):
     status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=345)
 
-    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+    monkeypatch.setattr(nominatim_db.tools.replication.ReplicationServer,
                         "get_state_info", lambda self: None)
 
-    assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 253
+    assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 253
 
 
 @pytest.mark.parametrize("server_sequence,result", [(344, 2), (345, 2), (346, 0)])
@@ -77,11 +83,11 @@ def test_check_for_updates_no_new_data(monkeypatch, temp_db_conn,
     date = dt.datetime.now(dt.timezone.utc)
     status.set_status(temp_db_conn, date, seq=345)
 
-    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+    monkeypatch.setattr(nominatim_db.tools.replication.ReplicationServer,
                         "get_state_info",
                         lambda self: OsmosisState(server_sequence, date))
 
-    assert nominatim.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == result
+    assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == result
 
 
 ### updating
@@ -94,48 +100,48 @@ def update_options(tmpdir):
                 import_file=tmpdir / 'foo.osm',
                 max_diff_size=1)
 
-def test_update_empty_status_table(temp_db_conn):
+def test_update_empty_status_table(dsn):
     with pytest.raises(UsageError):
-        nominatim.tools.replication.update(temp_db_conn, {})
+        nominatim_db.tools.replication.update(dsn, {})
 
 
-def test_update_already_indexed(temp_db_conn):
+def test_update_already_indexed(temp_db_conn, dsn):
     status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=34, indexed=False)
 
-    assert nominatim.tools.replication.update(temp_db_conn, dict(indexed_only=True)) \
-             == nominatim.tools.replication.UpdateState.MORE_PENDING
+    assert nominatim_db.tools.replication.update(dsn, dict(indexed_only=True)) \
+             == nominatim_db.tools.replication.UpdateState.MORE_PENDING
 
 
-def test_update_no_data_no_sleep(monkeypatch, temp_db_conn, update_options):
+def test_update_no_data_no_sleep(monkeypatch, temp_db_conn, dsn, update_options):
     date = dt.datetime.now(dt.timezone.utc) - dt.timedelta(days=1)
     status.set_status(temp_db_conn, date, seq=34)
 
-    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+    monkeypatch.setattr(nominatim_db.tools.replication.ReplicationServer,
                         "apply_diffs",
                         lambda *args, **kwargs: None)
 
     sleeptime = []
     monkeypatch.setattr(time, 'sleep', sleeptime.append)
 
-    assert nominatim.tools.replication.update(temp_db_conn, update_options) \
-             == nominatim.tools.replication.UpdateState.NO_CHANGES
+    assert nominatim_db.tools.replication.update(dsn, update_options) \
+             == nominatim_db.tools.replication.UpdateState.NO_CHANGES
 
     assert not sleeptime
 
 
-def test_update_no_data_sleep(monkeypatch, temp_db_conn, update_options):
+def test_update_no_data_sleep(monkeypatch, temp_db_conn, dsn, update_options):
     date = dt.datetime.now(dt.timezone.utc) - dt.timedelta(minutes=30)
     status.set_status(temp_db_conn, date, seq=34)
 
-    monkeypatch.setattr(nominatim.tools.replication.ReplicationServer,
+    monkeypatch.setattr(nominatim_db.tools.replication.ReplicationServer,
                         "apply_diffs",
                         lambda *args, **kwargs: None)
 
     sleeptime = []
     monkeypatch.setattr(time, 'sleep', sleeptime.append)
 
-    assert nominatim.tools.replication.update(temp_db_conn, update_options) \
-             == nominatim.tools.replication.UpdateState.NO_CHANGES
+    assert nominatim_db.tools.replication.update(dsn, update_options) \
+             == nominatim_db.tools.replication.UpdateState.NO_CHANGES
 
     assert len(sleeptime) == 1
     assert sleeptime[0] < 3600