</osm>
"""
+def iso_date(date):
+ return dt.datetime.strptime(date, nominatim.db.status.ISODATE_FORMAT)\
+ .replace(tzinfo=dt.timezone.utc)
+
+
def test_compute_database_date_valid(monkeypatch, status_table, place_row, temp_db_conn):
place_row(osm_type='N', osm_id=45673)
date = nominatim.db.status.compute_database_date(temp_db_conn)
assert requested_url == ['https://www.openstreetmap.org/api/0.6/node/45673/1']
- assert date == dt.datetime.fromisoformat('2006-01-27T22:09:10').replace(tzinfo=dt.timezone.utc)
+ assert date == iso_date('2006-01-27T22:09:10')
def test_compute_database_broken_api(monkeypatch, status_table, place_row, temp_db_conn):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
nominatim.db.status.set_status(temp_db_conn, date=date)
- temp_db_cursor.execute("SELECT * FROM import_status")
-
- assert temp_db_cursor.rowcount == 1
- assert temp_db_cursor.fetchone() == [date, None, True]
+ assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
+ {(date, None, True)}
def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
nominatim.db.status.set_status(temp_db_conn, date=date)
- assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
+ assert temp_db_cursor.table_rows('import_status') == 1
date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
- temp_db_cursor.execute("SELECT * FROM import_status")
-
- assert temp_db_cursor.rowcount == 1
- assert temp_db_cursor.fetchone() == [date, 456, False]
+ assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
+ {(date, 456, False)}
def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor):
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
nominatim.db.status.set_status(temp_db_conn, date=date)
- assert 1 == temp_db_cursor.scalar("SELECT count(*) FROM import_status")
+ assert temp_db_cursor.table_rows('import_status') == 1
nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
- temp_db_cursor.execute("SELECT * FROM import_status")
-
- assert temp_db_cursor.rowcount == 1
- assert temp_db_cursor.fetchone() == [date, 456, False]
+ assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
+ {(date, 456, False)}
def test_get_status_empty_table(status_table, temp_db_conn):
def test_set_indexed_empty_status(status_table, temp_db_conn, temp_db_cursor):
nominatim.db.status.set_indexed(temp_db_conn, True)
- assert temp_db_cursor.scalar("SELECT count(*) FROM import_status") == 0
+ assert temp_db_cursor.table_rows("import_status") == 0
def text_log_status(status_table, temp_db_conn):
nominatim.db.status.set_status(temp_db_conn, date=date, seq=56)
nominatim.db.status.log_status(temp_db_conn, start, 'index')
- assert temp_db_cursor.scalar("SELECT count(*) FROM import_osmosis_log") == 1
+ assert temp_db_cursor.table_rows("import_osmosis_log") == 1
assert temp_db_cursor.scalar("SELECT seq FROM import_osmosis_log") == 56
assert temp_db_cursor.scalar("SELECT date FROM import_osmosis_log") == date