def test_init_replication_bad_base_url(monkeypatch, place_row, temp_db_conn):
place_row(osm_type='N', osm_id=100)
def test_init_replication_bad_base_url(monkeypatch, place_row, temp_db_conn):
place_row(osm_type='N', osm_id=100)
nominatim_db.tools.replication.init_replication(temp_db_conn, 'https://test.io')
expected_date = dt.datetime.strptime('2006-01-27T19:09:10', status.ISODATE_FORMAT)\
nominatim_db.tools.replication.init_replication(temp_db_conn, 'https://test.io')
expected_date = dt.datetime.strptime('2006-01-27T19:09:10', status.ISODATE_FORMAT)\
def test_check_for_updates_empty_status_table(temp_db_conn):
assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
def test_check_for_updates_empty_status_table(temp_db_conn):
assert nominatim_db.tools.replication.check_for_updates(temp_db_conn, 'https://test.io') == 254
def test_update_empty_status_table(dsn):
with pytest.raises(UsageError):
nominatim_db.tools.replication.update(dsn, {})
def test_update_empty_status_table(dsn):
with pytest.raises(UsageError):
nominatim_db.tools.replication.update(dsn, {})
status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=34, indexed=False)
assert nominatim_db.tools.replication.update(dsn, dict(indexed_only=True)) \
status.set_status(temp_db_conn, dt.datetime.now(dt.timezone.utc), seq=34, indexed=False)
assert nominatim_db.tools.replication.update(dsn, dict(indexed_only=True)) \
def test_update_no_data_no_sleep(monkeypatch, temp_db_conn, dsn, update_options):
def test_update_no_data_no_sleep(monkeypatch, temp_db_conn, dsn, update_options):
monkeypatch.setattr(time, 'sleep', sleeptime.append)
assert nominatim_db.tools.replication.update(dsn, update_options) \
monkeypatch.setattr(time, 'sleep', sleeptime.append)
assert nominatim_db.tools.replication.update(dsn, update_options) \
monkeypatch.setattr(time, 'sleep', sleeptime.append)
assert nominatim_db.tools.replication.update(dsn, update_options) \
monkeypatch.setattr(time, 'sleep', sleeptime.append)
assert nominatim_db.tools.replication.update(dsn, update_options) \