1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for import special phrases methods
9 of the class SPImporter.
11 from shutil import copyfile
13 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
14 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
15 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
16 from nominatim_core.errors import UsageError
18 from cursor import CursorForTesting
21 def sp_importer(temp_db_conn, def_config, monkeypatch):
23 Return an instance of SPImporter.
25 monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
26 loader = SPWikiLoader(def_config)
27 return SPImporter(def_config, temp_db_conn, loader)
31 def xml_wiki_content(src_dir):
33 return the content of the static xml test file.
35 xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
36 return xml_test_content.read_text()
40 def default_phrases(table_factory):
41 table_factory('place_classtype_testclasstypetable_to_delete')
42 table_factory('place_classtype_testclasstypetable_to_keep')
45 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
47 Check for the fetch_existing_place_classtype_tables() method.
48 It should return the table just created.
50 table_factory('place_classtype_testclasstypetable')
52 sp_importer._fetch_existing_place_classtype_tables()
53 contained_table = sp_importer.table_phrases_to_delete.pop()
54 assert contained_table == 'place_classtype_testclasstypetable'
56 def test_check_sanity_class(sp_importer):
58 Check for _check_sanity() method.
59 If a wrong class or type is given, an UsageError should raise.
60 If a good class and type are given, nothing special happens.
63 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
64 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
66 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
68 def test_load_white_and_black_lists(sp_importer):
70 Test that _load_white_and_black_lists() well return
71 black list and white list and that they are of dict type.
73 black_list, white_list = sp_importer._load_white_and_black_lists()
75 assert isinstance(black_list, dict) and isinstance(white_list, dict)
78 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
79 table_factory, sp_importer):
81 Test that _create_place_classtype_indexes() create the
82 place_id index and centroid index on the right place_class_type table.
84 phrase_class = 'class'
86 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
88 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
90 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
92 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
94 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
96 Test that _create_place_classtype_table() create
97 the right place_classtype table.
99 phrase_class = 'class'
101 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
103 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
105 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
107 Test that _grant_access_to_webuser() give
108 right access to the web user.
110 phrase_class = 'class'
112 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
114 table_factory(table_name)
116 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
118 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
120 def test_create_place_classtype_table_and_indexes(
121 temp_db_conn, def_config, placex_table,
124 Test that _create_place_classtype_table_and_indexes()
125 create the right place_classtype tables and place_id indexes
126 and centroid indexes and grant access to the web user
127 for the given set of pairs.
129 pairs = set([('class1', 'type1'), ('class2', 'type2')])
131 sp_importer._create_classtype_table_and_indexes(pairs)
134 assert check_table_exist(temp_db_conn, pair[0], pair[1])
135 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
136 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
138 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
141 Check for the remove_non_existent_phrases_from_db() method.
143 It should removed entries from the word table which are contained
144 in the words_phrases_to_delete set and not those also contained
145 in the words_phrases_still_exist set.
147 place_classtype tables contained in table_phrases_to_delete should
150 sp_importer.table_phrases_to_delete = {
151 'place_classtype_testclasstypetable_to_delete'
156 FROM information_schema.tables
157 WHERE table_schema='public'
158 AND table_name like 'place_classtype_%';
161 sp_importer._remove_non_existent_tables_from_db()
163 # Changes are not committed yet. Use temp_db_conn for checking results.
164 with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
165 assert cur.row_set(query_tables) \
166 == {('place_classtype_testclasstypetable_to_keep', )}
169 @pytest.mark.parametrize("should_replace", [(True), (False)])
170 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
171 placex_table, table_factory, tokenizer_mock,
172 xml_wiki_content, should_replace):
174 Check that the main import_phrases() method is well executed.
175 It should create the place_classtype table, the place_id and centroid indexes,
176 grand access to the web user and executing the SQL functions for amenities.
177 It should also update the database well by deleting or preserving existing entries
180 #Add some data to the database before execution in order to test
181 #what is deleted and what is preserved.
182 table_factory('place_classtype_amenity_animal_shelter')
183 table_factory('place_classtype_wrongclass_wrongtype')
185 monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
186 lambda lang: xml_wiki_content)
188 tokenizer = tokenizer_mock()
189 sp_importer.import_phrases(tokenizer, should_replace)
191 assert len(tokenizer.analyser_cache['special_phrases']) == 18
193 class_test = 'aerialway'
194 type_test = 'zip_line'
196 assert check_table_exist(temp_db_conn, class_test, type_test)
197 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
198 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
199 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
201 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
203 assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
205 assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
207 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
209 Verify that the place_classtype table exists for the given
210 phrase_class and phrase_type.
212 return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
215 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
217 Check that the web user has been granted right access to the
218 place_classtype table of the given phrase_class and phrase_type.
220 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
222 with temp_db_conn.cursor() as temp_db_cursor:
223 temp_db_cursor.execute("""
224 SELECT * FROM information_schema.role_table_grants
225 WHERE table_name='{}'
227 AND privilege_type='SELECT'""".format(table_name, user))
228 return temp_db_cursor.fetchone()
230 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
232 Check that the place_id index and centroid index exist for the
233 place_classtype table of the given phrase_class and phrase_type.
235 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
238 temp_db_conn.index_exists(index_prefix + 'centroid')
240 temp_db_conn.index_exists(index_prefix + 'place_id')