]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_import_special_phrases.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / tools / test_import_special_phrases.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Tests for import special phrases methods
9     of the class SPImporter.
10 """
11 import pytest
12 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
13 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
14 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
15
16
17 @pytest.fixture
18 def sp_importer(temp_db_conn, def_config, monkeypatch):
19     """
20         Return an instance of SPImporter.
21     """
22     monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
23     loader = SPWikiLoader(def_config)
24     return SPImporter(def_config, temp_db_conn, loader)
25
26
27 @pytest.fixture
28 def xml_wiki_content(src_dir):
29     """
30         return the content of the static xml test file.
31     """
32     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
33     return xml_test_content.read_text()
34
35
36 @pytest.fixture
37 def default_phrases(table_factory):
38     table_factory('place_classtype_testclasstypetable_to_delete')
39     table_factory('place_classtype_testclasstypetable_to_keep')
40
41
42 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
43     """
44         Check for the fetch_existing_place_classtype_tables() method.
45         It should return the table just created.
46     """
47     table_factory('place_classtype_testclasstypetable')
48
49     sp_importer._fetch_existing_place_classtype_tables()
50     contained_table = sp_importer.table_phrases_to_delete.pop()
51     assert contained_table == 'place_classtype_testclasstypetable'
52
53
54 def test_check_sanity_class(sp_importer):
55     """
56         Check for _check_sanity() method.
57         If a wrong class or type is given, an UsageError should raise.
58         If a good class and type are given, nothing special happens.
59     """
60
61     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
62     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
63
64     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
65
66
67 def test_load_white_and_black_lists(sp_importer):
68     """
69         Test that _load_white_and_black_lists() well return
70         black list and white list and that they are of dict type.
71     """
72     black_list, white_list = sp_importer._load_white_and_black_lists()
73
74     assert isinstance(black_list, dict) and isinstance(white_list, dict)
75
76
77 def test_create_place_classtype_indexes(temp_db_with_extensions,
78                                         temp_db_conn, temp_db_cursor,
79                                         table_factory, sp_importer):
80     """
81         Test that _create_place_classtype_indexes() create the
82         place_id index and centroid index on the right place_class_type table.
83     """
84     phrase_class = 'class'
85     phrase_type = 'type'
86     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
87
88     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
89
90     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
91     temp_db_conn.commit()
92
93     assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
94
95
96 def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
97     """
98         Test that _create_place_classtype_table() create
99         the right place_classtype table.
100     """
101     phrase_class = 'class'
102     phrase_type = 'type'
103     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
104     temp_db_conn.commit()
105
106     assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
107
108
109 def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
110                                   def_config, sp_importer):
111     """
112         Test that _grant_access_to_webuser() give
113         right access to the web user.
114     """
115     phrase_class = 'class'
116     phrase_type = 'type'
117     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
118
119     table_factory(table_name)
120
121     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
122     temp_db_conn.commit()
123
124     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER,
125                               phrase_class, phrase_type)
126
127
128 def test_create_place_classtype_table_and_indexes(
129         temp_db_cursor, def_config, placex_table,
130         sp_importer, temp_db_conn):
131     """
132         Test that _create_place_classtype_table_and_indexes()
133         create the right place_classtype tables and place_id indexes
134         and centroid indexes and grant access to the web user
135         for the given set of pairs.
136     """
137     pairs = set([('class1', 'type1'), ('class2', 'type2')])
138
139     sp_importer._create_classtype_table_and_indexes(pairs)
140     temp_db_conn.commit()
141
142     for pair in pairs:
143         assert check_table_exist(temp_db_cursor, pair[0], pair[1])
144         assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
145         assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
146
147
148 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
149                                             temp_db_conn, temp_db_cursor):
150     """
151         Check for the remove_non_existent_phrases_from_db() method.
152
153         It should removed entries from the word table which are contained
154         in the words_phrases_to_delete set and not those also contained
155         in the words_phrases_still_exist set.
156
157         place_classtype tables contained in table_phrases_to_delete should
158         be deleted.
159     """
160     sp_importer.table_phrases_to_delete = {
161         'place_classtype_testclasstypetable_to_delete'
162     }
163
164     query_tables = """
165         SELECT table_name
166         FROM information_schema.tables
167         WHERE table_schema='public'
168         AND table_name like 'place_classtype_%';
169     """
170
171     sp_importer._remove_non_existent_tables_from_db()
172     temp_db_conn.commit()
173
174     assert temp_db_cursor.row_set(query_tables) \
175         == {('place_classtype_testclasstypetable_to_keep', )}
176
177
178 @pytest.mark.parametrize("should_replace", [(True), (False)])
179 def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
180                         placex_table, table_factory, tokenizer_mock,
181                         xml_wiki_content, should_replace):
182     """
183         Check that the main import_phrases() method is well executed.
184         It should create the place_classtype table, the place_id and centroid indexes,
185         grand access to the web user and executing the SQL functions for amenities.
186         It should also update the database well by deleting or preserving existing entries
187         of the database.
188     """
189     # Add some data to the database before execution in order to test
190     # what is deleted and what is preserved.
191     table_factory('place_classtype_amenity_animal_shelter')
192     table_factory('place_classtype_wrongclass_wrongtype')
193
194     monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
195                         lambda lang: xml_wiki_content)
196
197     tokenizer = tokenizer_mock()
198     sp_importer.import_phrases(tokenizer, should_replace)
199
200     assert len(tokenizer.analyser_cache['special_phrases']) == 18
201
202     class_test = 'aerialway'
203     type_test = 'zip_line'
204
205     assert check_table_exist(temp_db_cursor, class_test, type_test)
206     assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
207     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
208     assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
209     if should_replace:
210         assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
211
212     assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
213     if should_replace:
214         assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
215
216
217 def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
218     """
219         Verify that the place_classtype table exists for the given
220         phrase_class and phrase_type.
221     """
222     return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
223
224
225 def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
226     """
227         Check that the web user has been granted right access to the
228         place_classtype table of the given phrase_class and phrase_type.
229     """
230     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
231
232     temp_db_cursor.execute("""
233             SELECT * FROM information_schema.role_table_grants
234             WHERE table_name='{}'
235             AND grantee='{}'
236             AND privilege_type='SELECT'""".format(table_name, user))
237     return temp_db_cursor.fetchone()
238
239
240 def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
241     """
242         Check that the place_id index and centroid index exist for the
243         place_classtype table of the given phrase_class and phrase_type.
244     """
245     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
246     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
247
248     return (
249         temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
250         and
251         temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')
252     )