]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/python/db/test_status.py
reduce from 3 to 2 packages
[nominatim.git] / test / python / db / test_status.py
index 67cd22e91d536d7f8281f18ff242e824676d6fee..77135a8c7f943c13df96632aa9901e50b9aabed7 100644 (file)
@@ -11,8 +11,8 @@ import datetime as dt
 
 import pytest
 
 
 import pytest
 
-import nominatim_core.db.status
-from nominatim_core.errors import UsageError
+import nominatim_db.db.status
+from nominatim_db.errors import UsageError
 
 OSM_NODE_DATA = """\
 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
 
 OSM_NODE_DATA = """\
 <osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
@@ -22,7 +22,7 @@ OSM_NODE_DATA = """\
 """
 
 def iso_date(date):
 """
 
 def iso_date(date):
-    return dt.datetime.strptime(date, nominatim_core.db.status.ISODATE_FORMAT)\
+    return dt.datetime.strptime(date, nominatim_db.db.status.ISODATE_FORMAT)\
                .replace(tzinfo=dt.timezone.utc)
 
 
                .replace(tzinfo=dt.timezone.utc)
 
 
@@ -36,7 +36,7 @@ def test_compute_database_date_from_osm2pgsql(table_factory, temp_db_conn, offli
     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT',
                   content=(('current_timestamp', '2024-01-03T23:45:54Z'), ))
 
     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT',
                   content=(('current_timestamp', '2024-01-03T23:45:54Z'), ))
 
-    date = nominatim_core.db.status.compute_database_date(temp_db_conn, offline=offline)
+    date = nominatim_db.db.status.compute_database_date(temp_db_conn, offline=offline)
     assert date == iso_date('2024-01-03T23:45:54')
 
 
     assert date == iso_date('2024-01-03T23:45:54')
 
 
@@ -44,12 +44,12 @@ def test_compute_database_date_from_osm2pgsql_nodata(table_factory, temp_db_conn
     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT')
 
     with pytest.raises(UsageError, match='Cannot determine database date from data in offline mode'):
     table_factory('osm2pgsql_properties', 'property TEXT, value TEXT')
 
     with pytest.raises(UsageError, match='Cannot determine database date from data in offline mode'):
-        nominatim_core.db.status.compute_database_date(temp_db_conn, offline=True)
+        nominatim_db.db.status.compute_database_date(temp_db_conn, offline=True)
 
 
 def test_compute_database_date_place_empty(place_table, temp_db_conn):
     with pytest.raises(UsageError):
 
 
 def test_compute_database_date_place_empty(place_table, temp_db_conn):
     with pytest.raises(UsageError):
-        nominatim_core.db.status.compute_database_date(temp_db_conn)
+        nominatim_db.db.status.compute_database_date(temp_db_conn)
 
 
 def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
 
 
 def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
@@ -60,9 +60,9 @@ def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
         requested_url.append(url)
         return OSM_NODE_DATA
 
         requested_url.append(url)
         return OSM_NODE_DATA
 
-    monkeypatch.setattr(nominatim_core.db.status, "get_url", mock_url)
+    monkeypatch.setattr(nominatim_db.db.status, "get_url", mock_url)
 
 
-    date = nominatim_core.db.status.compute_database_date(temp_db_conn)
+    date = nominatim_db.db.status.compute_database_date(temp_db_conn)
 
     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
     assert date == iso_date('2006-01-27T22:09:10')
 
     assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
     assert date == iso_date('2006-01-27T22:09:10')
@@ -76,15 +76,15 @@ def test_compute_database_broken_api(monkeypatch, place_row, temp_db_conn):
         requested_url.append(url)
         return '<osm version="0.6" generator="OpenStre'
 
         requested_url.append(url)
         return '<osm version="0.6" generator="OpenStre'
 
-    monkeypatch.setattr(nominatim_core.db.status, "get_url", mock_url)
+    monkeypatch.setattr(nominatim_db.db.status, "get_url", mock_url)
 
     with pytest.raises(UsageError):
 
     with pytest.raises(UsageError):
-        nominatim_core.db.status.compute_database_date(temp_db_conn)
+        nominatim_db.db.status.compute_database_date(temp_db_conn)
 
 
 def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 
 def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
-    nominatim_core.db.status.set_status(temp_db_conn, date=date)
+    nominatim_db.db.status.set_status(temp_db_conn, date=date)
 
     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
              {(date, None, True)}
 
     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
              {(date, None, True)}
@@ -92,12 +92,12 @@ def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
 
 def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
-    nominatim_core.db.status.set_status(temp_db_conn, date=date)
+    nominatim_db.db.status.set_status(temp_db_conn, date=date)
 
     assert temp_db_cursor.table_rows('import_status') == 1
 
     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
 
     assert temp_db_cursor.table_rows('import_status') == 1
 
     date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
-    nominatim_core.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
+    nominatim_db.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
 
     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
              {(date, 456, False)}
 
     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
              {(date, 456, False)}
@@ -105,25 +105,25 @@ def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
 
 def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
-    nominatim_core.db.status.set_status(temp_db_conn, date=date)
+    nominatim_db.db.status.set_status(temp_db_conn, date=date)
 
     assert temp_db_cursor.table_rows('import_status') == 1
 
 
     assert temp_db_cursor.table_rows('import_status') == 1
 
-    nominatim_core.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
+    nominatim_db.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
 
     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
              {(date, 456, False)}
 
 
 def test_get_status_empty_table(temp_db_conn):
 
     assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
              {(date, 456, False)}
 
 
 def test_get_status_empty_table(temp_db_conn):
-    assert nominatim_core.db.status.get_status(temp_db_conn) == (None, None, None)
+    assert nominatim_db.db.status.get_status(temp_db_conn) == (None, None, None)
 
 
 def test_get_status_success(temp_db_conn):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 
 
 def test_get_status_success(temp_db_conn):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
-    nominatim_core.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
+    nominatim_db.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
 
 
-    assert nominatim_core.db.status.get_status(temp_db_conn) == \
+    assert nominatim_db.db.status.get_status(temp_db_conn) == \
              (date, 667, False)
 
 
              (date, 667, False)
 
 
@@ -131,14 +131,14 @@ def test_get_status_success(temp_db_conn):
 @pytest.mark.parametrize("new_state", [True, False])
 def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
 @pytest.mark.parametrize("new_state", [True, False])
 def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
-    nominatim_core.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
-    nominatim_core.db.status.set_indexed(temp_db_conn, new_state)
+    nominatim_db.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
+    nominatim_db.db.status.set_indexed(temp_db_conn, new_state)
 
     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
 
 
 def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
 
     assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
 
 
 def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
-    nominatim_core.db.status.set_indexed(temp_db_conn, True)
+    nominatim_db.db.status.set_indexed(temp_db_conn, True)
 
     assert temp_db_cursor.table_rows("import_status") == 0
 
 
     assert temp_db_cursor.table_rows("import_status") == 0
 
@@ -147,8 +147,8 @@ def test_log_status(temp_db_conn, temp_db_cursor):
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
     start = dt.datetime.now() - dt.timedelta(hours=1)
 
     date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
     start = dt.datetime.now() - dt.timedelta(hours=1)
 
-    nominatim_core.db.status.set_status(temp_db_conn, date=date, seq=56)
-    nominatim_core.db.status.log_status(temp_db_conn, start, 'index')
+    nominatim_db.db.status.set_status(temp_db_conn, date=date, seq=56)
+    nominatim_db.db.status.log_status(temp_db_conn, start, 'index')
 
     temp_db_conn.commit()
 
 
     temp_db_conn.commit()