]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_import_special_phrases.py
restrict deduplication to results from placex
[nominatim.git] / test / python / tools / test_import_special_phrases.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Tests for import special phrases methods
9     of the class SPImporter.
10 """
11 from shutil import copyfile
12 import pytest
13 from nominatim.tools.special_phrases.sp_importer import SPImporter
14 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
15 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
16 from nominatim.errors import UsageError
17
18 from cursor import CursorForTesting
19
20 @pytest.fixture
21 def sp_importer(temp_db_conn, def_config, monkeypatch):
22     """
23         Return an instance of SPImporter.
24     """
25     monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
26     loader = SPWikiLoader(def_config)
27     return SPImporter(def_config, temp_db_conn, loader)
28
29
30 @pytest.fixture
31 def xml_wiki_content(src_dir):
32     """
33         return the content of the static xml test file.
34     """
35     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
36     return xml_test_content.read_text()
37
38
39 @pytest.fixture
40 def default_phrases(table_factory):
41     table_factory('place_classtype_testclasstypetable_to_delete')
42     table_factory('place_classtype_testclasstypetable_to_keep')
43
44
45 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
46     """
47         Check for the fetch_existing_place_classtype_tables() method.
48         It should return the table just created.
49     """
50     table_factory('place_classtype_testclasstypetable')
51
52     sp_importer._fetch_existing_place_classtype_tables()
53     contained_table = sp_importer.table_phrases_to_delete.pop()
54     assert contained_table == 'place_classtype_testclasstypetable'
55
56 def test_check_sanity_class(sp_importer):
57     """
58         Check for _check_sanity() method.
59         If a wrong class or type is given, an UsageError should raise.
60         If a good class and type are given, nothing special happens.
61     """
62
63     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
64     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
65
66     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
67
68 def test_load_white_and_black_lists(sp_importer):
69     """
70         Test that _load_white_and_black_lists() well return
71         black list and white list and that they are of dict type.
72     """
73     black_list, white_list = sp_importer._load_white_and_black_lists()
74
75     assert isinstance(black_list, dict) and isinstance(white_list, dict)
76
77
78 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
79                                         table_factory, sp_importer):
80     """
81         Test that _create_place_classtype_indexes() create the
82         place_id index and centroid index on the right place_class_type table.
83     """
84     phrase_class = 'class'
85     phrase_type = 'type'
86     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
87
88     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
89
90     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
91
92     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
93
94 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
95     """
96         Test that _create_place_classtype_table() create
97         the right place_classtype table.
98     """
99     phrase_class = 'class'
100     phrase_type = 'type'
101     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
102
103     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
104
105 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
106     """
107         Test that _grant_access_to_webuser() give
108         right access to the web user.
109     """
110     phrase_class = 'class'
111     phrase_type = 'type'
112     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
113
114     table_factory(table_name)
115
116     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
117
118     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
119
120 def test_create_place_classtype_table_and_indexes(
121         temp_db_conn, def_config, placex_table,
122         sp_importer):
123     """
124         Test that _create_place_classtype_table_and_indexes()
125         create the right place_classtype tables and place_id indexes
126         and centroid indexes and grant access to the web user
127         for the given set of pairs.
128     """
129     pairs = set([('class1', 'type1'), ('class2', 'type2')])
130
131     sp_importer._create_classtype_table_and_indexes(pairs)
132
133     for pair in pairs:
134         assert check_table_exist(temp_db_conn, pair[0], pair[1])
135         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
136         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
137
138 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
139                                             temp_db_conn):
140     """
141         Check for the remove_non_existent_phrases_from_db() method.
142
143         It should removed entries from the word table which are contained
144         in the words_phrases_to_delete set and not those also contained
145         in the words_phrases_still_exist set.
146
147         place_classtype tables contained in table_phrases_to_delete should
148         be deleted.
149     """
150     sp_importer.table_phrases_to_delete = {
151         'place_classtype_testclasstypetable_to_delete'
152     }
153
154     query_tables = """
155         SELECT table_name
156         FROM information_schema.tables
157         WHERE table_schema='public'
158         AND table_name like 'place_classtype_%';
159     """
160
161     sp_importer._remove_non_existent_tables_from_db()
162
163     # Changes are not committed yet. Use temp_db_conn for checking results.
164     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
165         assert cur.row_set(query_tables) \
166                  == {('place_classtype_testclasstypetable_to_keep', )}
167
168
169 @pytest.mark.parametrize("should_replace", [(True), (False)])
170 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
171                         placex_table, table_factory, tokenizer_mock,
172                         xml_wiki_content, should_replace):
173     """
174         Check that the main import_phrases() method is well executed.
175         It should create the place_classtype table, the place_id and centroid indexes,
176         grand access to the web user and executing the SQL functions for amenities.
177         It should also update the database well by deleting or preserving existing entries
178         of the database.
179     """
180     #Add some data to the database before execution in order to test
181     #what is deleted and what is preserved.
182     table_factory('place_classtype_amenity_animal_shelter')
183     table_factory('place_classtype_wrongclass_wrongtype')
184
185     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader._get_wiki_content',
186                         lambda lang: xml_wiki_content)
187
188     tokenizer = tokenizer_mock()
189     sp_importer.import_phrases(tokenizer, should_replace)
190
191     assert len(tokenizer.analyser_cache['special_phrases']) == 18
192
193     class_test = 'aerialway'
194     type_test = 'zip_line'
195
196     assert check_table_exist(temp_db_conn, class_test, type_test)
197     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
198     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
199     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
200     if should_replace:
201         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
202
203     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
204     if should_replace:
205         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
206
207 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
208     """
209         Verify that the place_classtype table exists for the given
210         phrase_class and phrase_type.
211     """
212     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
213
214
215 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
216     """
217         Check that the web user has been granted right access to the
218         place_classtype table of the given phrase_class and phrase_type.
219     """
220     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
221
222     with temp_db_conn.cursor() as temp_db_cursor:
223         temp_db_cursor.execute("""
224                 SELECT * FROM information_schema.role_table_grants
225                 WHERE table_name='{}'
226                 AND grantee='{}'
227                 AND privilege_type='SELECT'""".format(table_name, user))
228         return temp_db_cursor.fetchone()
229
230 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
231     """
232         Check that the place_id index and centroid index exist for the
233         place_classtype table of the given phrase_class and phrase_type.
234     """
235     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
236
237     return (
238         temp_db_conn.index_exists(index_prefix + 'centroid')
239         and
240         temp_db_conn.index_exists(index_prefix + 'place_id')
241     )