2 Tests for import special phrases methods
3 of the class SPImporter.
5 from nominatim.errors import UsageError
6 from pathlib import Path
8 from shutil import copyfile
10 from nominatim.tools.special_phrases.sp_importer import SPImporter
11 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
12 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
13 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
15 from cursor import CursorForTesting
17 TEST_BASE_DIR = Path(__file__) / '..' / '..'
19 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
21 Check for the fetch_existing_place_classtype_tables() method.
22 It should return the table just created.
24 table_factory('place_classtype_testclasstypetable')
26 sp_importer._fetch_existing_place_classtype_tables()
27 contained_table = sp_importer.table_phrases_to_delete.pop()
28 assert contained_table == 'place_classtype_testclasstypetable'
30 def test_check_sanity_class(sp_importer):
32 Check for _check_sanity() method.
33 If a wrong class or type is given, an UsageError should raise.
34 If a good class and type are given, nothing special happens.
37 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
38 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
40 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
42 def test_load_white_and_black_lists(sp_importer):
44 Test that _load_white_and_black_lists() well return
45 black list and white list and that they are of dict type.
47 black_list, white_list = sp_importer._load_white_and_black_lists()
49 assert isinstance(black_list, dict) and isinstance(white_list, dict)
51 def test_convert_php_settings(sp_importer):
53 Test that _convert_php_settings_if_needed() convert the given
54 php file to a json file.
56 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
58 with tempfile.TemporaryDirectory() as temp_dir:
59 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
60 copyfile(php_file, temp_settings)
61 sp_importer._convert_php_settings_if_needed(temp_settings)
63 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
65 def test_convert_settings_wrong_file(sp_importer):
67 Test that _convert_php_settings_if_needed() raise an exception
68 if the given file is not a valid file.
70 with pytest.raises(UsageError, match='random_file is not a valid file.'):
71 sp_importer._convert_php_settings_if_needed('random_file')
73 def test_convert_settings_json_already_exist(sp_importer):
75 Test that if we give to '_convert_php_settings_if_needed' a php file path
76 and that a the corresponding json file already exists, it is returned.
78 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
79 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
81 returned = sp_importer._convert_php_settings_if_needed(php_file)
83 assert returned == json_file
85 def test_convert_settings_giving_json(sp_importer):
87 Test that if we give to '_convert_php_settings_if_needed' a json file path
88 the same path is directly returned
90 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
92 returned = sp_importer._convert_php_settings_if_needed(json_file)
94 assert returned == json_file
96 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
97 table_factory, sp_importer):
99 Test that _create_place_classtype_indexes() create the
100 place_id index and centroid index on the right place_class_type table.
102 phrase_class = 'class'
104 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
106 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
108 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
110 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
112 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
114 Test that _create_place_classtype_table() create
115 the right place_classtype table.
117 phrase_class = 'class'
119 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
121 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
123 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
125 Test that _grant_access_to_webuser() give
126 right access to the web user.
128 phrase_class = 'class'
130 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
132 table_factory(table_name)
134 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
136 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
138 def test_create_place_classtype_table_and_indexes(
139 temp_db_conn, def_config, placex_table,
142 Test that _create_place_classtype_table_and_indexes()
143 create the right place_classtype tables and place_id indexes
144 and centroid indexes and grant access to the web user
145 for the given set of pairs.
147 pairs = set([('class1', 'type1'), ('class2', 'type2')])
149 sp_importer._create_place_classtype_table_and_indexes(pairs)
152 assert check_table_exist(temp_db_conn, pair[0], pair[1])
153 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
154 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
156 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
159 Check for the remove_non_existent_phrases_from_db() method.
161 It should removed entries from the word table which are contained
162 in the words_phrases_to_delete set and not those also contained
163 in the words_phrases_still_exist set.
165 place_classtype tables contained in table_phrases_to_delete should
168 sp_importer.table_phrases_to_delete = {
169 'place_classtype_testclasstypetable_to_delete'
174 FROM information_schema.tables
175 WHERE table_schema='public'
176 AND table_name like 'place_classtype_%';
179 sp_importer._remove_non_existent_tables_from_db()
181 # Changes are not committed yet. Use temp_db_conn for checking results.
182 with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
183 assert cur.row_set(query_tables) \
184 == {('place_classtype_testclasstypetable_to_keep', )}
187 @pytest.mark.parametrize("should_replace", [(True), (False)])
188 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
189 placex_table, table_factory, tokenizer_mock, should_replace):
191 Check that the main import_phrases() method is well executed.
192 It should create the place_classtype table, the place_id and centroid indexes,
193 grand access to the web user and executing the SQL functions for amenities.
194 It should also update the database well by deleting or preserving existing entries
197 #Add some data to the database before execution in order to test
198 #what is deleted and what is preserved.
199 table_factory('place_classtype_amenity_animal_shelter')
200 table_factory('place_classtype_wrongclass_wrongtype')
202 monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
203 mock_get_wiki_content)
205 tokenizer = tokenizer_mock()
206 sp_importer.import_phrases(tokenizer, should_replace)
208 assert len(tokenizer.analyser_cache['special_phrases']) == 18
210 class_test = 'aerialway'
211 type_test = 'zip_line'
213 assert check_table_exist(temp_db_conn, class_test, type_test)
214 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
215 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
216 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
218 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
220 assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
222 assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
225 def mock_get_wiki_content(self, lang):
227 Mock the _get_wiki_content() method to return
228 static xml test file content.
230 return get_test_xml_wiki_content()
232 def get_test_xml_wiki_content():
234 return the content of the static xml test file.
236 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
237 with open(xml_test_content_path) as xml_content_reader:
238 return xml_content_reader.read()
240 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
242 Verify that the place_classtype table exists for the given
243 phrase_class and phrase_type.
245 return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
248 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
250 Check that the web user has been granted right access to the
251 place_classtype table of the given phrase_class and phrase_type.
253 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
255 with temp_db_conn.cursor() as temp_db_cursor:
256 temp_db_cursor.execute("""
257 SELECT * FROM information_schema.role_table_grants
258 WHERE table_name='{}'
260 AND privilege_type='SELECT'""".format(table_name, user))
261 return temp_db_cursor.fetchone()
263 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
265 Check that the place_id index and centroid index exist for the
266 place_classtype table of the given phrase_class and phrase_type.
268 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
271 temp_db_conn.index_exists(index_prefix + 'centroid')
273 temp_db_conn.index_exists(index_prefix + 'place_id')
277 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
279 Return an instance of SPImporter.
281 loader = SPWikiLoader(def_config, ['en'])
282 return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
285 def temp_phplib_dir_with_migration():
287 Return temporary phpdir with migration subdirectory and
288 PhraseSettingsToJson.php script inside.
290 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
291 / 'PhraseSettingsToJson.php').resolve()
292 with tempfile.TemporaryDirectory() as phpdir:
293 (Path(phpdir) / 'migration').mkdir()
294 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
295 copyfile(migration_file, migration_dest_path)
300 def default_phrases(table_factory):
301 table_factory('place_classtype_testclasstypetable_to_delete')
302 table_factory('place_classtype_testclasstypetable_to_keep')