2 Tests for import special phrases methods
3 of the class SPImporter.
5 from nominatim.errors import UsageError
6 from pathlib import Path
8 from shutil import copyfile
10 from nominatim.tools.special_phrases.sp_importer import SPImporter
11 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
12 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
13 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
15 TEST_BASE_DIR = Path(__file__) / '..' / '..'
17 def test_fetch_existing_place_classtype_tables(sp_importer, temp_db_cursor):
19 Check for the fetch_existing_place_classtype_tables() method.
20 It should return the table just created.
22 temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
24 sp_importer._fetch_existing_place_classtype_tables()
25 contained_table = sp_importer.table_phrases_to_delete.pop()
26 assert contained_table == 'place_classtype_testclasstypetable'
28 def test_check_sanity_class(sp_importer):
30 Check for _check_sanity() method.
31 If a wrong class or type is given, an UsageError should raise.
32 If a good class and type are given, nothing special happens.
35 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
36 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
38 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
40 def test_load_white_and_black_lists(sp_importer):
42 Test that _load_white_and_black_lists() well return
43 black list and white list and that they are of dict type.
45 black_list, white_list = sp_importer._load_white_and_black_lists()
47 assert isinstance(black_list, dict) and isinstance(white_list, dict)
49 def test_convert_php_settings(sp_importer):
51 Test that _convert_php_settings_if_needed() convert the given
52 php file to a json file.
54 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
56 with tempfile.TemporaryDirectory() as temp_dir:
57 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
58 copyfile(php_file, temp_settings)
59 sp_importer._convert_php_settings_if_needed(temp_settings)
61 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
63 def test_convert_settings_wrong_file(sp_importer):
65 Test that _convert_php_settings_if_needed() raise an exception
66 if the given file is not a valid file.
68 with pytest.raises(UsageError, match='random_file is not a valid file.'):
69 sp_importer._convert_php_settings_if_needed('random_file')
71 def test_convert_settings_json_already_exist(sp_importer):
73 Test that if we give to '_convert_php_settings_if_needed' a php file path
74 and that a the corresponding json file already exists, it is returned.
76 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
77 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
79 returned = sp_importer._convert_php_settings_if_needed(php_file)
81 assert returned == json_file
83 def test_convert_settings_giving_json(sp_importer):
85 Test that if we give to '_convert_php_settings_if_needed' a json file path
86 the same path is directly returned
88 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
90 returned = sp_importer._convert_php_settings_if_needed(json_file)
92 assert returned == json_file
94 def test_create_place_classtype_indexes(temp_db_conn, sp_importer):
96 Test that _create_place_classtype_indexes() create the
97 place_id index and centroid index on the right place_class_type table.
99 phrase_class = 'class'
101 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
103 with temp_db_conn.cursor() as temp_db_cursor:
104 temp_db_cursor.execute("CREATE EXTENSION postgis;")
105 temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
107 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
109 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
111 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
113 Test that _create_place_classtype_table() create
114 the right place_classtype table.
116 phrase_class = 'class'
118 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
120 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
122 def test_grant_access_to_web_user(temp_db_conn, def_config, sp_importer):
124 Test that _grant_access_to_webuser() give
125 right access to the web user.
127 phrase_class = 'class'
129 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
131 with temp_db_conn.cursor() as temp_db_cursor:
132 temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
134 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
136 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
138 def test_create_place_classtype_table_and_indexes(
139 temp_db_conn, def_config, placex_table,
142 Test that _create_place_classtype_table_and_indexes()
143 create the right place_classtype tables and place_id indexes
144 and centroid indexes and grant access to the web user
145 for the given set of pairs.
147 pairs = set([('class1', 'type1'), ('class2', 'type2')])
149 sp_importer._create_place_classtype_table_and_indexes(pairs)
152 assert check_table_exist(temp_db_conn, pair[0], pair[1])
153 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
154 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
156 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
159 Check for the remove_non_existent_phrases_from_db() method.
161 It should removed entries from the word table which are contained
162 in the words_phrases_to_delete set and not those also contained
163 in the words_phrases_still_exist set.
165 place_classtype tables contained in table_phrases_to_delete should
168 with temp_db_conn.cursor() as temp_db_cursor:
169 sp_importer.table_phrases_to_delete = {
170 'place_classtype_testclasstypetable_to_delete'
175 FROM information_schema.tables
176 WHERE table_schema='public'
177 AND table_name like 'place_classtype_%';
180 sp_importer._remove_non_existent_tables_from_db()
182 temp_db_cursor.execute(query_tables)
183 tables_result = temp_db_cursor.fetchall()
184 assert (len(tables_result) == 1 and
185 tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
188 @pytest.mark.parametrize("should_replace", [(True), (False)])
189 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
190 placex_table, tokenizer_mock, should_replace):
192 Check that the main import_phrases() method is well executed.
193 It should create the place_classtype table, the place_id and centroid indexes,
194 grand access to the web user and executing the SQL functions for amenities.
195 It should also update the database well by deleting or preserving existing entries
198 #Add some data to the database before execution in order to test
199 #what is deleted and what is preserved.
200 with temp_db_conn.cursor() as temp_db_cursor:
201 temp_db_cursor.execute("""
202 CREATE TABLE place_classtype_amenity_animal_shelter();
203 CREATE TABLE place_classtype_wrongclass_wrongtype();""")
205 monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
206 mock_get_wiki_content)
208 tokenizer = tokenizer_mock()
209 sp_importer.import_phrases(tokenizer, should_replace)
211 assert len(tokenizer.analyser_cache['special_phrases']) == 18
213 class_test = 'aerialway'
214 type_test = 'zip_line'
216 assert check_table_exist(temp_db_conn, class_test, type_test)
217 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
218 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
219 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
221 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
223 #Format (query, should_return_something_bool) use to easily execute all asserts
224 queries_tests = set()
226 #Used to check that correct place_classtype table already in the datase before is still there.
227 query_existing_table = """
229 FROM information_schema.tables
230 WHERE table_schema='public'
231 AND table_name = 'place_classtype_amenity_animal_shelter';
233 queries_tests.add((query_existing_table, True))
235 #Used to check that wrong place_classtype table was deleted from the database.
236 query_wrong_table = """
238 FROM information_schema.tables
239 WHERE table_schema='public'
240 AND table_name = 'place_classtype_wrongclass_wrongtype';
243 queries_tests.add((query_wrong_table, False))
245 with temp_db_conn.cursor() as temp_db_cursor:
246 for query in queries_tests:
247 temp_db_cursor.execute(query[0])
248 if (query[1] == True):
249 assert temp_db_cursor.fetchone()
251 assert not temp_db_cursor.fetchone()
253 def mock_get_wiki_content(self, lang):
255 Mock the _get_wiki_content() method to return
256 static xml test file content.
258 return get_test_xml_wiki_content()
260 def get_test_xml_wiki_content():
262 return the content of the static xml test file.
264 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
265 with open(xml_test_content_path) as xml_content_reader:
266 return xml_content_reader.read()
268 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
270 Verify that the place_classtype table exists for the given
271 phrase_class and phrase_type.
273 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
275 with temp_db_conn.cursor() as temp_db_cursor:
276 temp_db_cursor.execute("""
278 FROM information_schema.tables
279 WHERE table_type='BASE TABLE'
280 AND table_name='{}'""".format(table_name))
281 return temp_db_cursor.fetchone()
283 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
285 Check that the web user has been granted right access to the
286 place_classtype table of the given phrase_class and phrase_type.
288 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
290 with temp_db_conn.cursor() as temp_db_cursor:
291 temp_db_cursor.execute("""
292 SELECT * FROM information_schema.role_table_grants
293 WHERE table_name='{}'
295 AND privilege_type='SELECT'""".format(table_name, user))
296 return temp_db_cursor.fetchone()
298 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
300 Check that the place_id index and centroid index exist for the
301 place_classtype table of the given phrase_class and phrase_type.
303 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
306 temp_db_conn.index_exists(index_prefix + 'centroid')
308 temp_db_conn.index_exists(index_prefix + 'place_id')
312 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
314 Return an instance of SPImporter.
316 loader = SPWikiLoader(def_config, ['en'])
317 return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
320 def temp_phplib_dir_with_migration():
322 Return temporary phpdir with migration subdirectory and
323 PhraseSettingsToJson.php script inside.
325 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
326 / 'PhraseSettingsToJson.php').resolve()
327 with tempfile.TemporaryDirectory() as phpdir:
328 (Path(phpdir) / 'migration').mkdir()
329 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
330 copyfile(migration_file, migration_dest_path)
335 def default_phrases(temp_db_cursor):
336 temp_db_cursor.execute("""
337 CREATE TABLE place_classtype_testclasstypetable_to_delete();
338 CREATE TABLE place_classtype_testclasstypetable_to_keep();""")