]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_indexing.py
cli indexer tests need a fake database
[nominatim.git] / test / python / test_indexing.py
1 """
2 Tests for running the indexing.
3 """
4 import itertools
5 import psycopg2
6 import pytest
7
8 from nominatim.indexer.indexer import Indexer
9
10 class IndexerTestDB:
11
12     def __init__(self, conn):
13         self.placex_id = itertools.count(100000)
14         self.osmline_id = itertools.count(500000)
15
16         self.conn = conn
17         self.conn.set_isolation_level(0)
18         with self.conn.cursor() as cur:
19             cur.execute("""CREATE TABLE placex (place_id BIGINT,
20                                                 class TEXT,
21                                                 type TEXT,
22                                                 rank_address SMALLINT,
23                                                 rank_search SMALLINT,
24                                                 indexed_status SMALLINT,
25                                                 indexed_date TIMESTAMP,
26                                                 partition SMALLINT,
27                                                 admin_level SMALLINT,
28                                                 geometry_sector INTEGER)""")
29             cur.execute("""CREATE TABLE location_property_osmline (
30                                place_id BIGINT,
31                                indexed_status SMALLINT,
32                                indexed_date TIMESTAMP,
33                                geometry_sector INTEGER)""")
34             cur.execute("""CREATE OR REPLACE FUNCTION date_update() RETURNS TRIGGER
35                            AS $$
36                            BEGIN
37                              IF NEW.indexed_status = 0 and OLD.indexed_status != 0 THEN
38                                NEW.indexed_date = now();
39                              END IF;
40                              RETURN NEW;
41                            END; $$ LANGUAGE plpgsql;""")
42             cur.execute("""CREATE TRIGGER placex_update BEFORE UPDATE ON placex
43                            FOR EACH ROW EXECUTE PROCEDURE date_update()""")
44             cur.execute("""CREATE TRIGGER osmline_update BEFORE UPDATE ON location_property_osmline
45                            FOR EACH ROW EXECUTE PROCEDURE date_update()""")
46
47     def scalar(self, query):
48         with self.conn.cursor() as cur:
49             cur.execute(query)
50             return cur.fetchone()[0]
51
52     def add_place(self, cls='place', typ='locality',
53                   rank_search=30, rank_address=30, sector=20):
54         next_id = next(self.placex_id)
55         with self.conn.cursor() as cur:
56             cur.execute("""INSERT INTO placex
57                               (place_id, class, type, rank_search, rank_address,
58                                indexed_status, geometry_sector)
59                               VALUES (%s, %s, %s, %s, %s, 1, %s)""",
60                         (next_id, cls, typ, rank_search, rank_address, sector))
61         return next_id
62
63     def add_admin(self, **kwargs):
64         kwargs['cls'] = 'boundary'
65         kwargs['typ'] = 'administrative'
66         return self.add_place(**kwargs)
67
68     def add_osmline(self, sector=20):
69         next_id = next(self.osmline_id)
70         with self.conn.cursor() as cur:
71             cur.execute("""INSERT INTO location_property_osmline
72                               (place_id, indexed_status, geometry_sector)
73                               VALUES (%s, 1, %s)""",
74                         (next_id, sector))
75         return next_id
76
77     def placex_unindexed(self):
78         return self.scalar('SELECT count(*) from placex where indexed_status > 0')
79
80     def osmline_unindexed(self):
81         return self.scalar('SELECT count(*) from location_property_osmline where indexed_status > 0')
82
83
84 @pytest.fixture
85 def test_db(temp_db):
86     conn = psycopg2.connect(database=temp_db)
87     yield IndexerTestDB(conn)
88     conn.close()
89
90
91 @pytest.mark.parametrize("threads", [1, 15])
92 def test_index_full(test_db, threads):
93     for rank in range(31):
94         test_db.add_place(rank_address=rank, rank_search=rank)
95     test_db.add_osmline()
96
97     assert 31 == test_db.placex_unindexed()
98     assert 1 == test_db.osmline_unindexed()
99
100     idx = Indexer('dbname=test_nominatim_python_unittest', threads)
101     idx.index_by_rank(0, 30)
102
103     assert 0 == test_db.placex_unindexed()
104     assert 0 == test_db.osmline_unindexed()
105
106     assert 0 == test_db.scalar("""SELECT count(*) from placex
107                                WHERE indexed_status = 0 and indexed_date is null""")
108     # ranks come in order of rank address
109     assert 0 == test_db.scalar("""
110         SELECT count(*) FROM placex p WHERE rank_address > 0
111           AND indexed_date >= (SELECT min(indexed_date) FROM placex o
112                                WHERE p.rank_address < o.rank_address)""")
113     # placex rank < 30 objects come before interpolations
114     assert 0 == test_db.scalar(
115         """SELECT count(*) FROM placex WHERE rank_address < 30
116              AND indexed_date > (SELECT min(indexed_date) FROM location_property_osmline)""")
117     # placex rank = 30 objects come after interpolations
118     assert 0 == test_db.scalar(
119         """SELECT count(*) FROM placex WHERE rank_address = 30
120              AND indexed_date < (SELECT max(indexed_date) FROM location_property_osmline)""")
121     # rank 0 comes after rank 29 and before rank 30
122     assert 0 == test_db.scalar(
123         """SELECT count(*) FROM placex WHERE rank_address < 30
124              AND indexed_date > (SELECT min(indexed_date) FROM placex WHERE rank_address = 0)""")
125     assert 0 == test_db.scalar(
126         """SELECT count(*) FROM placex WHERE rank_address = 30
127              AND indexed_date < (SELECT max(indexed_date) FROM placex WHERE rank_address = 0)""")
128
129
130 @pytest.mark.parametrize("threads", [1, 15])
131 def test_index_partial_without_30(test_db, threads):
132     for rank in range(31):
133         test_db.add_place(rank_address=rank, rank_search=rank)
134     test_db.add_osmline()
135
136     assert 31 == test_db.placex_unindexed()
137     assert 1 == test_db.osmline_unindexed()
138
139     idx = Indexer('dbname=test_nominatim_python_unittest', threads)
140     idx.index_by_rank(4, 15)
141
142     assert 19 == test_db.placex_unindexed()
143     assert 1 == test_db.osmline_unindexed()
144
145     assert 0 == test_db.scalar("""
146                     SELECT count(*) FROM placex
147                       WHERE indexed_status = 0 AND not rank_address between 4 and 15""")
148
149
150 @pytest.mark.parametrize("threads", [1, 15])
151 def test_index_partial_with_30(test_db, threads):
152     for rank in range(31):
153         test_db.add_place(rank_address=rank, rank_search=rank)
154     test_db.add_osmline()
155
156     assert 31 == test_db.placex_unindexed()
157     assert 1 == test_db.osmline_unindexed()
158
159     idx = Indexer('dbname=test_nominatim_python_unittest', threads)
160     idx.index_by_rank(28, 30)
161
162     assert 27 == test_db.placex_unindexed()
163     assert 0 == test_db.osmline_unindexed()
164
165     assert 0 == test_db.scalar("""
166                     SELECT count(*) FROM placex
167                       WHERE indexed_status = 0 AND rank_address between 1 and 27""")
168
169 @pytest.mark.parametrize("threads", [1, 15])
170 def test_index_boundaries(test_db, threads):
171     for rank in range(4, 10):
172         test_db.add_admin(rank_address=rank, rank_search=rank)
173     for rank in range(31):
174         test_db.add_place(rank_address=rank, rank_search=rank)
175     test_db.add_osmline()
176
177     assert 37 == test_db.placex_unindexed()
178     assert 1 == test_db.osmline_unindexed()
179
180     idx = Indexer('dbname=test_nominatim_python_unittest', threads)
181     idx.index_boundaries(0, 30)
182
183     assert 31 == test_db.placex_unindexed()
184     assert 1 == test_db.osmline_unindexed()
185
186     assert 0 == test_db.scalar("""
187                     SELECT count(*) FROM placex
188                       WHERE indexed_status = 0 AND class != 'boundary'""")