2 Tests for import special phrases methods
3 of the class SPImporter.
5 from shutil import copyfile
7 from nominatim.tools.special_phrases.sp_importer import SPImporter
8 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
9 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
10 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
11 from nominatim.errors import UsageError
13 from cursor import CursorForTesting
16 def testfile_dir(src_dir):
17 return src_dir / 'test' / 'testfiles'
21 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
23 Return an instance of SPImporter.
25 loader = SPWikiLoader(def_config, ['en'])
26 return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
30 def temp_phplib_dir_with_migration(src_dir, tmp_path):
32 Return temporary phpdir with migration subdirectory and
33 PhraseSettingsToJson.php script inside.
35 migration_file = (src_dir / 'lib-php' / 'migration' / 'PhraseSettingsToJson.php').resolve()
37 phpdir = tmp_path / 'tempphp'
40 (phpdir / 'migration').mkdir()
41 migration_dest_path = (phpdir / 'migration' / 'PhraseSettingsToJson.php').resolve()
42 copyfile(str(migration_file), str(migration_dest_path))
48 def xml_wiki_content(src_dir):
50 return the content of the static xml test file.
52 xml_test_content_path = (src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt').resolve()
53 return xml_test_content_path.read_text()
57 def default_phrases(table_factory):
58 table_factory('place_classtype_testclasstypetable_to_delete')
59 table_factory('place_classtype_testclasstypetable_to_keep')
62 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
64 Check for the fetch_existing_place_classtype_tables() method.
65 It should return the table just created.
67 table_factory('place_classtype_testclasstypetable')
69 sp_importer._fetch_existing_place_classtype_tables()
70 contained_table = sp_importer.table_phrases_to_delete.pop()
71 assert contained_table == 'place_classtype_testclasstypetable'
73 def test_check_sanity_class(sp_importer):
75 Check for _check_sanity() method.
76 If a wrong class or type is given, an UsageError should raise.
77 If a good class and type are given, nothing special happens.
80 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
81 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
83 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
85 def test_load_white_and_black_lists(sp_importer):
87 Test that _load_white_and_black_lists() well return
88 black list and white list and that they are of dict type.
90 black_list, white_list = sp_importer._load_white_and_black_lists()
92 assert isinstance(black_list, dict) and isinstance(white_list, dict)
94 def test_convert_php_settings(sp_importer, testfile_dir, tmp_path):
96 Test that _convert_php_settings_if_needed() convert the given
97 php file to a json file.
99 php_file = (testfile_dir / 'phrase_settings.php').resolve()
101 temp_settings = (tmp_path / 'phrase_settings.php').resolve()
102 copyfile(php_file, temp_settings)
103 sp_importer._convert_php_settings_if_needed(temp_settings)
105 assert (tmp_path / 'phrase_settings.json').is_file()
107 def test_convert_settings_wrong_file(sp_importer):
109 Test that _convert_php_settings_if_needed() raise an exception
110 if the given file is not a valid file.
112 with pytest.raises(UsageError, match='random_file is not a valid file.'):
113 sp_importer._convert_php_settings_if_needed('random_file')
115 def test_convert_settings_json_already_exist(sp_importer, testfile_dir):
117 Test that if we give to '_convert_php_settings_if_needed' a php file path
118 and that a the corresponding json file already exists, it is returned.
120 php_file = (testfile_dir / 'phrase_settings.php').resolve()
121 json_file = (testfile_dir / 'phrase_settings.json').resolve()
123 returned = sp_importer._convert_php_settings_if_needed(php_file)
125 assert returned == json_file
127 def test_convert_settings_giving_json(sp_importer, testfile_dir):
129 Test that if we give to '_convert_php_settings_if_needed' a json file path
130 the same path is directly returned
132 json_file = (testfile_dir / 'phrase_settings.json').resolve()
134 returned = sp_importer._convert_php_settings_if_needed(json_file)
136 assert returned == json_file
138 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
139 table_factory, sp_importer):
141 Test that _create_place_classtype_indexes() create the
142 place_id index and centroid index on the right place_class_type table.
144 phrase_class = 'class'
146 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
148 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
150 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
152 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
154 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
156 Test that _create_place_classtype_table() create
157 the right place_classtype table.
159 phrase_class = 'class'
161 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
163 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
165 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
167 Test that _grant_access_to_webuser() give
168 right access to the web user.
170 phrase_class = 'class'
172 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
174 table_factory(table_name)
176 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
178 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
180 def test_create_place_classtype_table_and_indexes(
181 temp_db_conn, def_config, placex_table,
184 Test that _create_place_classtype_table_and_indexes()
185 create the right place_classtype tables and place_id indexes
186 and centroid indexes and grant access to the web user
187 for the given set of pairs.
189 pairs = set([('class1', 'type1'), ('class2', 'type2')])
191 sp_importer._create_place_classtype_table_and_indexes(pairs)
194 assert check_table_exist(temp_db_conn, pair[0], pair[1])
195 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
196 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
198 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
201 Check for the remove_non_existent_phrases_from_db() method.
203 It should removed entries from the word table which are contained
204 in the words_phrases_to_delete set and not those also contained
205 in the words_phrases_still_exist set.
207 place_classtype tables contained in table_phrases_to_delete should
210 sp_importer.table_phrases_to_delete = {
211 'place_classtype_testclasstypetable_to_delete'
216 FROM information_schema.tables
217 WHERE table_schema='public'
218 AND table_name like 'place_classtype_%';
221 sp_importer._remove_non_existent_tables_from_db()
223 # Changes are not committed yet. Use temp_db_conn for checking results.
224 with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
225 assert cur.row_set(query_tables) \
226 == {('place_classtype_testclasstypetable_to_keep', )}
229 @pytest.mark.parametrize("should_replace", [(True), (False)])
230 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
231 placex_table, table_factory, tokenizer_mock,
232 xml_wiki_content, should_replace):
234 Check that the main import_phrases() method is well executed.
235 It should create the place_classtype table, the place_id and centroid indexes,
236 grand access to the web user and executing the SQL functions for amenities.
237 It should also update the database well by deleting or preserving existing entries
240 #Add some data to the database before execution in order to test
241 #what is deleted and what is preserved.
242 table_factory('place_classtype_amenity_animal_shelter')
243 table_factory('place_classtype_wrongclass_wrongtype')
245 monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
246 lambda self, lang: xml_wiki_content)
248 tokenizer = tokenizer_mock()
249 sp_importer.import_phrases(tokenizer, should_replace)
251 assert len(tokenizer.analyser_cache['special_phrases']) == 18
253 class_test = 'aerialway'
254 type_test = 'zip_line'
256 assert check_table_exist(temp_db_conn, class_test, type_test)
257 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
258 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
259 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
261 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
263 assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
265 assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
267 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
269 Verify that the place_classtype table exists for the given
270 phrase_class and phrase_type.
272 return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
275 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
277 Check that the web user has been granted right access to the
278 place_classtype table of the given phrase_class and phrase_type.
280 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
282 with temp_db_conn.cursor() as temp_db_cursor:
283 temp_db_cursor.execute("""
284 SELECT * FROM information_schema.role_table_grants
285 WHERE table_name='{}'
287 AND privilege_type='SELECT'""".format(table_name, user))
288 return temp_db_cursor.fetchone()
290 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
292 Check that the place_id index and centroid index exist for the
293 place_classtype table of the given phrase_class and phrase_type.
295 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
298 temp_db_conn.index_exists(index_prefix + 'centroid')
300 temp_db_conn.index_exists(index_prefix + 'place_id')