]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
Merge pull request #2936 from lonvia/fix-query-for-index-use
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11
12 import pytest
13
14 from nominatim.tools import postcodes
15 from nominatim.data import country_info
16 import dummy_tokenizer
17
18 class MockPostcodeTable:
19     """ A location_postcode table for testing.
20     """
21     def __init__(self, conn):
22         self.conn = conn
23         with conn.cursor() as cur:
24             cur.execute("""CREATE TABLE location_postcode (
25                                place_id BIGINT,
26                                parent_place_id BIGINT,
27                                rank_search SMALLINT,
28                                rank_address SMALLINT,
29                                indexed_status SMALLINT,
30                                indexed_date TIMESTAMP,
31                                country_code varchar(2),
32                                postcode TEXT,
33                                geometry GEOMETRY(Geometry, 4326))""")
34             cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
35                            RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
36
37                            CREATE OR REPLACE FUNCTION get_country_code(place geometry)
38                            RETURNS TEXT AS $$ BEGIN 
39                            RETURN null;
40                            END; $$ LANGUAGE plpgsql;
41                         """)
42         conn.commit()
43
44     def add(self, country, postcode, x, y):
45         with self.conn.cursor() as cur:
46             cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
47                                                           country_code, postcode,
48                                                           geometry)
49                            VALUES (nextval('seq_place'), 1, %s, %s,
50                                    'SRID=4326;POINT(%s %s)')""",
51                         (country, postcode, x, y))
52         self.conn.commit()
53
54
55     @property
56     def row_set(self):
57         with self.conn.cursor() as cur:
58             cur.execute("""SELECT country_code, postcode,
59                                   ST_X(geometry), ST_Y(geometry)
60                            FROM location_postcode""")
61             return set((tuple(row) for row in cur))
62
63
64 @pytest.fixture
65 def tokenizer():
66     return dummy_tokenizer.DummyTokenizer(None, None)
67
68
69 @pytest.fixture
70 def postcode_table(def_config, temp_db_conn, placex_table):
71     country_info.setup_country_config(def_config)
72     return MockPostcodeTable(temp_db_conn)
73
74
75 @pytest.fixture
76 def insert_implicit_postcode(placex_table, place_row):
77     """
78         Inserts data into the placex and place table
79         which can then be used to compute one postcode.
80     """
81     def _insert_implicit_postcode(osm_id, country, geometry, address):
82         placex_table.add(osm_id=osm_id, country=country, geom=geometry)
83         place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
84
85     return _insert_implicit_postcode
86
87
88 def test_postcodes_empty(dsn, postcode_table, place_table,
89                          tmp_path, tokenizer):
90     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
91
92     assert not postcode_table.row_set
93
94
95 def test_postcodes_add_new(dsn, postcode_table, tmp_path,
96                            insert_implicit_postcode, tokenizer):
97     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
98     postcode_table.add('yy', '9486', 99, 34)
99
100     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
101
102     assert postcode_table.row_set == {('xx', '9486', 10, 12), }
103
104
105 def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
106                                        insert_implicit_postcode, tokenizer):
107     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
108     postcode_table.add('xx', 'AB 4511', 99, 34)
109
110     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
111
112     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
113
114
115 def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path,
116                                              insert_implicit_postcode, tokenizer):
117     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
118     postcode_table.add('xx', 'AB 4511', 10, 11.99999)
119
120     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
121
122     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
123
124
125 def test_postcodes_remove(dsn, postcode_table, tmp_path,
126                           insert_implicit_postcode, tokenizer):
127     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
128     postcode_table.add('xx', 'badname', 10, 12)
129
130     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
131
132     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
133
134
135 def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path,
136                                         insert_implicit_postcode, tokenizer):
137     insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
138     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
139     assert not postcode_table.row_set
140
141
142 def test_postcodes_remove_all(dsn, postcode_table, place_table,
143                               tmp_path, tokenizer):
144     postcode_table.add('ch', '5613', 10, 12)
145     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
146
147     assert not postcode_table.row_set
148
149
150 def test_postcodes_multi_country(dsn, postcode_table, tmp_path,
151                                  insert_implicit_postcode, tokenizer):
152     insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
153     insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
154     insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
155     insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
156
157     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
158
159     assert postcode_table.row_set == {('de', '54451', 10, 12),
160                                       ('de', '54452', 10.3, 11.0),
161                                       ('cc', '54452', 10.3, 11.0),
162                                       ('cc', 'DD23 T', 100, 56)}
163
164
165 @pytest.mark.parametrize("gzipped", [True, False])
166 def test_postcodes_extern(dsn, postcode_table, tmp_path,
167                           insert_implicit_postcode, tokenizer, gzipped):
168     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
169
170     extfile = tmp_path / 'xx_postcodes.csv'
171     extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
172
173     if gzipped:
174         subprocess.run(['gzip', str(extfile)])
175         assert not extfile.is_file()
176
177     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
178
179     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
180                                       ('xx', 'CD 4511', -10, -5)}
181
182
183 def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path, 
184                                      insert_implicit_postcode, tokenizer):
185     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
186
187     extfile = tmp_path / 'xx_postcodes.csv'
188     extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
189
190     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
191
192     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
193
194
195 def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
196                                      postcode_table, tmp_path, tokenizer):
197     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
198
199     extfile = tmp_path / 'xx_postcodes.csv'
200     extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
201
202     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
203
204     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
205                                       ('xx', 'CD 4511', -10, -5)}
206
207 def test_can_compute(dsn, table_factory):
208     assert not postcodes.can_compute(dsn)
209     table_factory('place')
210     assert postcodes.can_compute(dsn)
211
212
213 def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
214     #Rewrite the get_country_code function to verify its execution.
215     temp_db_cursor.execute("""
216         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
217         RETURNS TEXT AS $$ BEGIN 
218         RETURN 'yy';
219         END; $$ LANGUAGE plpgsql;
220     """)
221     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
222     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
223
224     assert postcode_table.row_set == {('yy', 'AB 4511', 10, 12)}
225
226
227 def test_discard_badly_formatted_postcodes(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
228     #Rewrite the get_country_code function to verify its execution.
229     temp_db_cursor.execute("""
230         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
231         RETURNS TEXT AS $$ BEGIN 
232         RETURN 'fr';
233         END; $$ LANGUAGE plpgsql;
234     """)
235     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
236     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
237
238     assert not postcode_table.row_set