-# SPDX-License-Identifier: GPL-2.0-only
+# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2022 by the Nominatim developer community.
+# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for status table manipulation.
import pytest
-import nominatim.db.status
-from nominatim.errors import UsageError
+import nominatim_core.db.status
+from nominatim_core.errors import UsageError
OSM_NODE_DATA = """\
<osm version="0.6" generator="OpenStreetMap server" copyright="OpenStreetMap and contributors" attribution="http://www.openstreetmap.org/copyright" license="http://opendatacommons.org/licenses/odbl/1-0/">
"""
def iso_date(date):
- return dt.datetime.strptime(date, nominatim.db.status.ISODATE_FORMAT)\
+ return dt.datetime.strptime(date, nominatim_core.db.status.ISODATE_FORMAT)\
.replace(tzinfo=dt.timezone.utc)
pass
+@pytest.mark.parametrize('offline', [True, False])
+def test_compute_database_date_from_osm2pgsql(table_factory, temp_db_conn, offline):
+ table_factory('osm2pgsql_properties', 'property TEXT, value TEXT',
+ content=(('current_timestamp', '2024-01-03T23:45:54Z'), ))
+
+ date = nominatim_core.db.status.compute_database_date(temp_db_conn, offline=offline)
+ assert date == iso_date('2024-01-03T23:45:54')
+
+
+def test_compute_database_date_from_osm2pgsql_nodata(table_factory, temp_db_conn):
+ table_factory('osm2pgsql_properties', 'property TEXT, value TEXT')
+
+ with pytest.raises(UsageError, match='Cannot determine database date from data in offline mode'):
+ nominatim_core.db.status.compute_database_date(temp_db_conn, offline=True)
+
+
def test_compute_database_date_place_empty(place_table, temp_db_conn):
with pytest.raises(UsageError):
- nominatim.db.status.compute_database_date(temp_db_conn)
+ nominatim_core.db.status.compute_database_date(temp_db_conn)
def test_compute_database_date_valid(monkeypatch, place_row, temp_db_conn):
requested_url.append(url)
return OSM_NODE_DATA
- monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
+ monkeypatch.setattr(nominatim_core.db.status, "get_url", mock_url)
- date = nominatim.db.status.compute_database_date(temp_db_conn)
+ date = nominatim_core.db.status.compute_database_date(temp_db_conn)
assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
assert date == iso_date('2006-01-27T22:09:10')
requested_url.append(url)
return '<osm version="0.6" generator="OpenStre'
- monkeypatch.setattr(nominatim.db.status, "get_url", mock_url)
+ monkeypatch.setattr(nominatim_core.db.status, "get_url", mock_url)
with pytest.raises(UsageError):
- nominatim.db.status.compute_database_date(temp_db_conn)
+ nominatim_core.db.status.compute_database_date(temp_db_conn)
def test_set_status_empty_table(temp_db_conn, temp_db_cursor):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
- nominatim.db.status.set_status(temp_db_conn, date=date)
+ nominatim_core.db.status.set_status(temp_db_conn, date=date)
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
{(date, None, True)}
def test_set_status_filled_table(temp_db_conn, temp_db_cursor):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
- nominatim.db.status.set_status(temp_db_conn, date=date)
+ nominatim_core.db.status.set_status(temp_db_conn, date=date)
assert temp_db_cursor.table_rows('import_status') == 1
date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
- nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
+ nominatim_core.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
{(date, 456, False)}
def test_set_status_missing_date(temp_db_conn, temp_db_cursor):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
- nominatim.db.status.set_status(temp_db_conn, date=date)
+ nominatim_core.db.status.set_status(temp_db_conn, date=date)
assert temp_db_cursor.table_rows('import_status') == 1
- nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
+ nominatim_core.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
{(date, 456, False)}
def test_get_status_empty_table(temp_db_conn):
- assert nominatim.db.status.get_status(temp_db_conn) == (None, None, None)
+ assert nominatim_core.db.status.get_status(temp_db_conn) == (None, None, None)
def test_get_status_success(temp_db_conn):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
- nominatim.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
+ nominatim_core.db.status.set_status(temp_db_conn, date=date, seq=667, indexed=False)
- assert nominatim.db.status.get_status(temp_db_conn) == \
+ assert nominatim_core.db.status.get_status(temp_db_conn) == \
(date, 667, False)
@pytest.mark.parametrize("new_state", [True, False])
def test_set_indexed(temp_db_conn, temp_db_cursor, old_state, new_state):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
- nominatim.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
- nominatim.db.status.set_indexed(temp_db_conn, new_state)
+ nominatim_core.db.status.set_status(temp_db_conn, date=date, indexed=old_state)
+ nominatim_core.db.status.set_indexed(temp_db_conn, new_state)
assert temp_db_cursor.scalar("SELECT indexed FROM import_status") == new_state
def test_set_indexed_empty_status(temp_db_conn, temp_db_cursor):
- nominatim.db.status.set_indexed(temp_db_conn, True)
+ nominatim_core.db.status.set_indexed(temp_db_conn, True)
assert temp_db_cursor.table_rows("import_status") == 0
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
start = dt.datetime.now() - dt.timedelta(hours=1)
- nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
- nominatim.db.status.log_status(temp_db_conn, start, 'index')
+ nominatim_core.db.status.set_status(temp_db_conn, date=date, seq=56)
+ nominatim_core.db.status.log_status(temp_db_conn, start, 'index')
temp_db_conn.commit()