]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_postcodes.py
Merge pull request #2718 from nslxndr/fix-log-endtime
[nominatim.git] / test / python / tools / test_postcodes.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tests for functions to maintain the artificial postcode table.
9 """
10 import subprocess
11
12 import pytest
13
14 from nominatim.tools import postcodes
15 import dummy_tokenizer
16
17 class MockPostcodeTable:
18     """ A location_postcode table for testing.
19     """
20     def __init__(self, conn):
21         self.conn = conn
22         with conn.cursor() as cur:
23             cur.execute("""CREATE TABLE location_postcode (
24                                place_id BIGINT,
25                                parent_place_id BIGINT,
26                                rank_search SMALLINT,
27                                rank_address SMALLINT,
28                                indexed_status SMALLINT,
29                                indexed_date TIMESTAMP,
30                                country_code varchar(2),
31                                postcode TEXT,
32                                geometry GEOMETRY(Geometry, 4326))""")
33             cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
34                            RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
35
36                            CREATE OR REPLACE FUNCTION get_country_code(place geometry)
37                            RETURNS TEXT AS $$ BEGIN 
38                            RETURN null;
39                            END; $$ LANGUAGE plpgsql;
40                         """)
41         conn.commit()
42
43     def add(self, country, postcode, x, y):
44         with self.conn.cursor() as cur:
45             cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
46                                                           country_code, postcode,
47                                                           geometry)
48                            VALUES (nextval('seq_place'), 1, %s, %s,
49                                    'SRID=4326;POINT(%s %s)')""",
50                         (country, postcode, x, y))
51         self.conn.commit()
52
53
54     @property
55     def row_set(self):
56         with self.conn.cursor() as cur:
57             cur.execute("""SELECT country_code, postcode,
58                                   ST_X(geometry), ST_Y(geometry)
59                            FROM location_postcode""")
60             return set((tuple(row) for row in cur))
61
62
63 @pytest.fixture
64 def tokenizer():
65     return dummy_tokenizer.DummyTokenizer(None, None)
66
67 @pytest.fixture
68 def postcode_table(temp_db_conn, placex_table):
69     return MockPostcodeTable(temp_db_conn)
70
71
72 def test_postcodes_empty(dsn, postcode_table, place_table,
73                          tmp_path, tokenizer):
74     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
75
76     assert not postcode_table.row_set
77
78
79 def test_postcodes_add_new(dsn, postcode_table, tmp_path,
80                            insert_implicit_postcode, tokenizer):
81     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
82     postcode_table.add('yy', '9486', 99, 34)
83
84     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
85
86     assert postcode_table.row_set == {('xx', '9486', 10, 12), }
87
88
89 def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
90                                        insert_implicit_postcode, tokenizer):
91     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
92     postcode_table.add('xx', 'AB 4511', 99, 34)
93
94     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
95
96     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
97
98
99 def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path,
100                                              insert_implicit_postcode, tokenizer):
101     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
102     postcode_table.add('xx', 'AB 4511', 10, 11.99999)
103
104     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
105
106     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
107
108
109 def test_postcodes_remove(dsn, postcode_table, tmp_path,
110                           insert_implicit_postcode, tokenizer):
111     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
112     postcode_table.add('xx', 'badname', 10, 12)
113
114     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
115
116     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
117
118
119 def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path,
120                                         insert_implicit_postcode, tokenizer):
121     insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
122     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
123     assert not postcode_table.row_set
124
125
126 def test_postcodes_remove_all(dsn, postcode_table, place_table,
127                               tmp_path, tokenizer):
128     postcode_table.add('ch', '5613', 10, 12)
129     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
130
131     assert not postcode_table.row_set
132
133
134 def test_postcodes_multi_country(dsn, postcode_table, tmp_path,
135                                  insert_implicit_postcode, tokenizer):
136     insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
137     insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
138     insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
139     insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
140
141     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
142
143     assert postcode_table.row_set == {('de', '54451', 10, 12),
144                                       ('de', '54452', 10.3, 11.0),
145                                       ('cc', '54452', 10.3, 11.0),
146                                       ('cc', 'DD23 T', 100, 56)}
147
148
149 @pytest.mark.parametrize("gzipped", [True, False])
150 def test_postcodes_extern(dsn, postcode_table, tmp_path,
151                           insert_implicit_postcode, tokenizer, gzipped):
152     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
153
154     extfile = tmp_path / 'xx_postcodes.csv'
155     extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
156
157     if gzipped:
158         subprocess.run(['gzip', str(extfile)])
159         assert not extfile.is_file()
160
161     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
162
163     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
164                                       ('xx', 'CD 4511', -10, -5)}
165
166
167 def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path, 
168                                      insert_implicit_postcode, tokenizer):
169     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
170
171     extfile = tmp_path / 'xx_postcodes.csv'
172     extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
173
174     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
175
176     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
177
178
179 def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
180                                      postcode_table, tmp_path, tokenizer):
181     insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
182
183     extfile = tmp_path / 'xx_postcodes.csv'
184     extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
185
186     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
187
188     assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
189                                       ('xx', 'CD 4511', -10, -5)}
190
191 def test_can_compute(dsn, table_factory):
192     assert not postcodes.can_compute(dsn)
193     table_factory('place')
194     assert postcodes.can_compute(dsn)
195
196 def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
197     #Rewrite the get_country_code function to verify its execution.
198     temp_db_cursor.execute("""
199         CREATE OR REPLACE FUNCTION get_country_code(place geometry)
200         RETURNS TEXT AS $$ BEGIN 
201         RETURN 'fr';
202         END; $$ LANGUAGE plpgsql;
203     """)
204     place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
205     postcodes.update_postcodes(dsn, tmp_path, tokenizer)
206
207     assert postcode_table.row_set == {('fr', 'AB 4511', 10, 12)}
208
209 @pytest.fixture
210 def insert_implicit_postcode(placex_table, place_row):
211     """
212         Inserts data into the placex and place table
213         which can then be used to compute one postcode.
214     """
215     def _insert_implicit_postcode(osm_id, country, geometry, address):
216         placex_table.add(osm_id=osm_id, country=country, geom=geometry)
217         place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
218
219     return _insert_implicit_postcode