+
+
+@pytest.mark.parametrize("threads", [1, 15])
+def test_index_postcodes(test_db, threads):
+ for postcode in range(1000):
+ test_db.add_postcode('de', postcode)
+ for postcode in range(32000, 33000):
+ test_db.add_postcode('us', postcode)
+
+ idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
+ idx.index_postcodes()
+
+ assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
+ WHERE indexed_status != 0""")
+
+
+@pytest.mark.parametrize("analyse", [True, False])
+def test_index_full(test_db, analyse):
+ for rank in range(4, 10):
+ test_db.add_admin(rank_address=rank, rank_search=rank)
+ for rank in range(31):
+ test_db.add_place(rank_address=rank, rank_search=rank)
+ test_db.add_osmline()
+ for postcode in range(1000):
+ test_db.add_postcode('de', postcode)
+
+ idx = indexer.Indexer('dbname=test_nominatim_python_unittest', 4)
+ idx.index_full(analyse=analyse)
+
+ assert 0 == test_db.placex_unindexed()
+ assert 0 == test_db.osmline_unindexed()
+ assert 0 == test_db.scalar("""SELECT count(*) FROM location_postcode
+ WHERE indexed_status != 0""")
+
+
+@pytest.mark.parametrize("threads", [1, 15])
+def test_index_reopen_connection(test_db, threads, monkeypatch):
+ monkeypatch.setattr(indexer.WorkerPool, "REOPEN_CONNECTIONS_AFTER", 15)
+
+ for _ in range(1000):
+ test_db.add_place(rank_address=30, rank_search=30)
+
+ idx = indexer.Indexer('dbname=test_nominatim_python_unittest', threads)
+ idx.index_by_rank(28, 30)
+
+ assert 0 == test_db.placex_unindexed()