1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for import special phrases methods
9 of the class SPImporter.
12 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
13 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
14 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
18 def sp_importer(temp_db_conn, def_config, monkeypatch):
20 Return an instance of SPImporter.
22 monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
23 loader = SPWikiLoader(def_config)
24 return SPImporter(def_config, temp_db_conn, loader)
28 def xml_wiki_content(src_dir):
30 return the content of the static xml test file.
32 xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
33 return xml_test_content.read_text()
37 def default_phrases(table_factory):
38 table_factory('place_classtype_testclasstypetable_to_delete')
39 table_factory('place_classtype_testclasstypetable_to_keep')
42 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
44 Check for the fetch_existing_place_classtype_tables() method.
45 It should return the table just created.
47 table_factory('place_classtype_testclasstypetable')
49 sp_importer._fetch_existing_place_classtype_tables()
50 contained_table = sp_importer.table_phrases_to_delete.pop()
51 assert contained_table == 'place_classtype_testclasstypetable'
54 def test_check_sanity_class(sp_importer):
56 Check for _check_sanity() method.
57 If a wrong class or type is given, an UsageError should raise.
58 If a good class and type are given, nothing special happens.
61 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
62 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
64 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
67 def test_load_white_and_black_lists(sp_importer):
69 Test that _load_white_and_black_lists() well return
70 black list and white list and that they are of dict type.
72 black_list, white_list = sp_importer._load_white_and_black_lists()
74 assert isinstance(black_list, dict) and isinstance(white_list, dict)
77 def test_create_place_classtype_indexes(temp_db_with_extensions,
78 temp_db_conn, temp_db_cursor,
79 table_factory, sp_importer):
81 Test that _create_place_classtype_indexes() create the
82 place_id index and centroid index on the right place_class_type table.
84 phrase_class = 'class'
86 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
88 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
90 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
93 assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
96 def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
98 Test that _create_place_classtype_table() create
99 the right place_classtype table.
101 phrase_class = 'class'
103 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
104 temp_db_conn.commit()
106 assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
109 def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
110 def_config, sp_importer):
112 Test that _grant_access_to_webuser() give
113 right access to the web user.
115 phrase_class = 'class'
117 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
119 table_factory(table_name)
121 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
122 temp_db_conn.commit()
124 assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER,
125 phrase_class, phrase_type)
128 def test_create_place_classtype_table_and_indexes(
129 temp_db_cursor, def_config, placex_table,
130 sp_importer, temp_db_conn):
132 Test that _create_place_classtype_table_and_indexes()
133 create the right place_classtype tables and place_id indexes
134 and centroid indexes and grant access to the web user
135 for the given set of pairs.
137 pairs = set([('class1', 'type1'), ('class2', 'type2')])
139 sp_importer._create_classtype_table_and_indexes(pairs)
140 temp_db_conn.commit()
143 assert check_table_exist(temp_db_cursor, pair[0], pair[1])
144 assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
145 assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
148 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
149 temp_db_conn, temp_db_cursor):
151 Check for the remove_non_existent_phrases_from_db() method.
153 It should removed entries from the word table which are contained
154 in the words_phrases_to_delete set and not those also contained
155 in the words_phrases_still_exist set.
157 place_classtype tables contained in table_phrases_to_delete should
160 sp_importer.table_phrases_to_delete = {
161 'place_classtype_testclasstypetable_to_delete'
166 FROM information_schema.tables
167 WHERE table_schema='public'
168 AND table_name like 'place_classtype_%';
171 sp_importer._remove_non_existent_tables_from_db()
172 temp_db_conn.commit()
174 assert temp_db_cursor.row_set(query_tables) \
175 == {('place_classtype_testclasstypetable_to_keep', )}
178 @pytest.mark.parametrize("should_replace", [(True), (False)])
179 def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
180 placex_table, table_factory, tokenizer_mock,
181 xml_wiki_content, should_replace):
183 Check that the main import_phrases() method is well executed.
184 It should create the place_classtype table, the place_id and centroid indexes,
185 grand access to the web user and executing the SQL functions for amenities.
186 It should also update the database well by deleting or preserving existing entries
189 # Add some data to the database before execution in order to test
190 # what is deleted and what is preserved.
191 table_factory('place_classtype_amenity_animal_shelter')
192 table_factory('place_classtype_wrongclass_wrongtype')
194 monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
195 lambda lang: xml_wiki_content)
197 tokenizer = tokenizer_mock()
198 sp_importer.import_phrases(tokenizer, should_replace)
200 assert len(tokenizer.analyser_cache['special_phrases']) == 18
202 class_test = 'aerialway'
203 type_test = 'zip_line'
205 assert check_table_exist(temp_db_cursor, class_test, type_test)
206 assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
207 assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
208 assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
210 assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
212 assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
214 assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
217 def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
219 Verify that the place_classtype table exists for the given
220 phrase_class and phrase_type.
222 return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
225 def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
227 Check that the web user has been granted right access to the
228 place_classtype table of the given phrase_class and phrase_type.
230 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
232 temp_db_cursor.execute("""
233 SELECT * FROM information_schema.role_table_grants
234 WHERE table_name='{}'
236 AND privilege_type='SELECT'""".format(table_name, user))
237 return temp_db_cursor.fetchone()
240 def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
242 Check that the place_id index and centroid index exist for the
243 place_classtype table of the given phrase_class and phrase_type.
245 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
246 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
249 temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
251 temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')