1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for import special phrases methods
9 of the class SPImporter.
11 from shutil import copyfile
13 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
14 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
15 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
16 from nominatim_db.errors import UsageError
18 from cursor import CursorForTesting
21 def sp_importer(temp_db_conn, def_config, monkeypatch):
23 Return an instance of SPImporter.
25 monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
26 loader = SPWikiLoader(def_config)
27 return SPImporter(def_config, temp_db_conn, loader)
31 def xml_wiki_content(src_dir):
33 return the content of the static xml test file.
35 xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
36 return xml_test_content.read_text()
40 def default_phrases(table_factory):
41 table_factory('place_classtype_testclasstypetable_to_delete')
42 table_factory('place_classtype_testclasstypetable_to_keep')
45 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
47 Check for the fetch_existing_place_classtype_tables() method.
48 It should return the table just created.
50 table_factory('place_classtype_testclasstypetable')
52 sp_importer._fetch_existing_place_classtype_tables()
53 contained_table = sp_importer.table_phrases_to_delete.pop()
54 assert contained_table == 'place_classtype_testclasstypetable'
56 def test_check_sanity_class(sp_importer):
58 Check for _check_sanity() method.
59 If a wrong class or type is given, an UsageError should raise.
60 If a good class and type are given, nothing special happens.
63 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
64 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
66 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
68 def test_load_white_and_black_lists(sp_importer):
70 Test that _load_white_and_black_lists() well return
71 black list and white list and that they are of dict type.
73 black_list, white_list = sp_importer._load_white_and_black_lists()
75 assert isinstance(black_list, dict) and isinstance(white_list, dict)
78 def test_create_place_classtype_indexes(temp_db_with_extensions,
79 temp_db_conn, temp_db_cursor,
80 table_factory, sp_importer):
82 Test that _create_place_classtype_indexes() create the
83 place_id index and centroid index on the right place_class_type table.
85 phrase_class = 'class'
87 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
89 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
91 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
94 assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
96 def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
98 Test that _create_place_classtype_table() create
99 the right place_classtype table.
101 phrase_class = 'class'
103 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
104 temp_db_conn.commit()
106 assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
108 def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
109 def_config, sp_importer):
111 Test that _grant_access_to_webuser() give
112 right access to the web user.
114 phrase_class = 'class'
116 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
118 table_factory(table_name)
120 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
121 temp_db_conn.commit()
123 assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
125 def test_create_place_classtype_table_and_indexes(
126 temp_db_cursor, def_config, placex_table,
127 sp_importer, temp_db_conn):
129 Test that _create_place_classtype_table_and_indexes()
130 create the right place_classtype tables and place_id indexes
131 and centroid indexes and grant access to the web user
132 for the given set of pairs.
134 pairs = set([('class1', 'type1'), ('class2', 'type2')])
136 sp_importer._create_classtype_table_and_indexes(pairs)
137 temp_db_conn.commit()
140 assert check_table_exist(temp_db_cursor, pair[0], pair[1])
141 assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
142 assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
144 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
145 temp_db_conn, temp_db_cursor):
147 Check for the remove_non_existent_phrases_from_db() method.
149 It should removed entries from the word table which are contained
150 in the words_phrases_to_delete set and not those also contained
151 in the words_phrases_still_exist set.
153 place_classtype tables contained in table_phrases_to_delete should
156 sp_importer.table_phrases_to_delete = {
157 'place_classtype_testclasstypetable_to_delete'
162 FROM information_schema.tables
163 WHERE table_schema='public'
164 AND table_name like 'place_classtype_%';
167 sp_importer._remove_non_existent_tables_from_db()
168 temp_db_conn.commit()
170 assert temp_db_cursor.row_set(query_tables) \
171 == {('place_classtype_testclasstypetable_to_keep', )}
174 @pytest.mark.parametrize("should_replace", [(True), (False)])
175 def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
176 placex_table, table_factory, tokenizer_mock,
177 xml_wiki_content, should_replace):
179 Check that the main import_phrases() method is well executed.
180 It should create the place_classtype table, the place_id and centroid indexes,
181 grand access to the web user and executing the SQL functions for amenities.
182 It should also update the database well by deleting or preserving existing entries
185 #Add some data to the database before execution in order to test
186 #what is deleted and what is preserved.
187 table_factory('place_classtype_amenity_animal_shelter')
188 table_factory('place_classtype_wrongclass_wrongtype')
190 monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
191 lambda lang: xml_wiki_content)
193 tokenizer = tokenizer_mock()
194 sp_importer.import_phrases(tokenizer, should_replace)
196 assert len(tokenizer.analyser_cache['special_phrases']) == 18
198 class_test = 'aerialway'
199 type_test = 'zip_line'
201 assert check_table_exist(temp_db_cursor, class_test, type_test)
202 assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
203 assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
204 assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
206 assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
208 assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
210 assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
212 def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
214 Verify that the place_classtype table exists for the given
215 phrase_class and phrase_type.
217 return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
220 def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
222 Check that the web user has been granted right access to the
223 place_classtype table of the given phrase_class and phrase_type.
225 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
227 temp_db_cursor.execute("""
228 SELECT * FROM information_schema.role_table_grants
229 WHERE table_name='{}'
231 AND privilege_type='SELECT'""".format(table_name, user))
232 return temp_db_cursor.fetchone()
234 def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
236 Check that the place_id index and centroid index exist for the
237 place_classtype table of the given phrase_class and phrase_type.
239 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
240 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
243 temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
245 temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')