2 Tests for import special phrases methods
3 of the class SpecialPhrasesImporter.
5 from nominatim.errors import UsageError
6 from pathlib import Path
8 from shutil import copyfile
10 from nominatim.tools import SpecialPhrasesImporter
12 TEST_BASE_DIR = Path(__file__) / '..' / '..'
14 def test_fetch_existing_place_classtype_tables(special_phrases_importer, temp_db_cursor):
16 Check for the fetch_existing_place_classtype_tables() method.
17 It should return the table just created.
19 temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
21 special_phrases_importer._fetch_existing_place_classtype_tables()
22 contained_table = special_phrases_importer.table_phrases_to_delete.pop()
23 assert contained_table == 'place_classtype_testclasstypetable'
25 def test_check_sanity_class(special_phrases_importer):
27 Check for _check_sanity() method.
28 If a wrong class or type is given, an UsageError should raise.
29 If a good class and type are given, nothing special happens.
32 assert not special_phrases_importer._check_sanity('en', '', 'type')
33 assert not special_phrases_importer._check_sanity('en', 'class', '')
35 assert special_phrases_importer._check_sanity('en', 'class', 'type')
37 def test_load_white_and_black_lists(special_phrases_importer):
39 Test that _load_white_and_black_lists() well return
40 black list and white list and that they are of dict type.
42 black_list, white_list = special_phrases_importer._load_white_and_black_lists()
44 assert isinstance(black_list, dict) and isinstance(white_list, dict)
46 def test_convert_php_settings(special_phrases_importer):
48 Test that _convert_php_settings_if_needed() convert the given
49 php file to a json file.
51 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
53 with tempfile.TemporaryDirectory() as temp_dir:
54 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
55 copyfile(php_file, temp_settings)
56 special_phrases_importer._convert_php_settings_if_needed(temp_settings)
58 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
60 def test_convert_settings_wrong_file(special_phrases_importer):
62 Test that _convert_php_settings_if_needed() raise an exception
63 if the given file is not a valid file.
65 with pytest.raises(UsageError, match='random_file is not a valid file.'):
66 special_phrases_importer._convert_php_settings_if_needed('random_file')
68 def test_convert_settings_json_already_exist(special_phrases_importer):
70 Test that if we give to '_convert_php_settings_if_needed' a php file path
71 and that a the corresponding json file already exists, it is returned.
73 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
74 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
76 returned = special_phrases_importer._convert_php_settings_if_needed(php_file)
78 assert returned == json_file
80 def test_convert_settings_giving_json(special_phrases_importer):
82 Test that if we give to '_convert_php_settings_if_needed' a json file path
83 the same path is directly returned
85 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
87 returned = special_phrases_importer._convert_php_settings_if_needed(json_file)
89 assert returned == json_file
91 def test_create_place_classtype_indexes(temp_db_conn, special_phrases_importer):
93 Test that _create_place_classtype_indexes() create the
94 place_id index and centroid index on the right place_class_type table.
96 phrase_class = 'class'
98 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
100 with temp_db_conn.cursor() as temp_db_cursor:
101 temp_db_cursor.execute("CREATE EXTENSION postgis;")
102 temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
104 special_phrases_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
106 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
108 def test_create_place_classtype_table(temp_db_conn, placex_table, special_phrases_importer):
110 Test that _create_place_classtype_table() create
111 the right place_classtype table.
113 phrase_class = 'class'
115 special_phrases_importer._create_place_classtype_table('', phrase_class, phrase_type)
117 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
119 def test_grant_access_to_web_user(temp_db_conn, def_config, special_phrases_importer):
121 Test that _grant_access_to_webuser() give
122 right access to the web user.
124 phrase_class = 'class'
126 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
128 with temp_db_conn.cursor() as temp_db_cursor:
129 temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
131 special_phrases_importer._grant_access_to_webuser(phrase_class, phrase_type)
133 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
135 def test_create_place_classtype_table_and_indexes(
136 temp_db_conn, def_config, placex_table,
137 special_phrases_importer):
139 Test that _create_place_classtype_table_and_indexes()
140 create the right place_classtype tables and place_id indexes
141 and centroid indexes and grant access to the web user
142 for the given set of pairs.
144 pairs = set([('class1', 'type1'), ('class2', 'type2')])
146 special_phrases_importer._create_place_classtype_table_and_indexes(pairs)
149 assert check_table_exist(temp_db_conn, pair[0], pair[1])
150 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
151 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
153 def test_process_xml_content(temp_db_conn, def_config, special_phrases_importer):
155 Test that _process_xml_content() process the given xml content right
156 by executing the right SQL functions for amenities and
157 by returning the right set of pairs.
159 class_test = 'aerialway'
160 type_test = 'zip_line'
162 #Converted output set to a dict for easy assert further.
163 results = dict(special_phrases_importer._process_xml_content(get_test_xml_wiki_content(), 'en'))
165 assert results[class_test] and type_test in results.values()
167 def test_remove_non_existent_tables_from_db(special_phrases_importer, default_phrases,
170 Check for the remove_non_existent_phrases_from_db() method.
172 It should removed entries from the word table which are contained
173 in the words_phrases_to_delete set and not those also contained
174 in the words_phrases_still_exist set.
176 place_classtype tables contained in table_phrases_to_delete should
179 with temp_db_conn.cursor() as temp_db_cursor:
180 special_phrases_importer.table_phrases_to_delete = {
181 'place_classtype_testclasstypetable_to_delete'
186 FROM information_schema.tables
187 WHERE table_schema='public'
188 AND table_name like 'place_classtype_%';
191 special_phrases_importer._remove_non_existent_tables_from_db()
193 temp_db_cursor.execute(query_tables)
194 tables_result = temp_db_cursor.fetchall()
195 assert (len(tables_result) == 1 and
196 tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
199 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, special_phrases_importer,
200 placex_table, tokenizer_mock):
202 Check that the main import_from_wiki() method is well executed.
203 It should create the place_classtype table, the place_id and centroid indexes,
204 grand access to the web user and executing the SQL functions for amenities.
205 It should also update the database well by deleting or preserving existing entries
208 #Add some data to the database before execution in order to test
209 #what is deleted and what is preserved.
210 with temp_db_conn.cursor() as temp_db_cursor:
211 temp_db_cursor.execute("""
212 CREATE TABLE place_classtype_amenity_animal_shelter();
213 CREATE TABLE place_classtype_wrongclass_wrongtype();""")
215 monkeypatch.setattr('nominatim.tools.SpecialPhrasesImporter._get_wiki_content', mock_get_wiki_content)
216 tokenizer = tokenizer_mock()
217 special_phrases_importer.import_from_wiki(tokenizer, ['en'])
219 assert len(tokenizer.analyser_cache['special_phrases']) == 18
221 class_test = 'aerialway'
222 type_test = 'zip_line'
224 assert check_table_exist(temp_db_conn, class_test, type_test)
225 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
226 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
227 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
228 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
230 #Format (query, should_return_something_bool) use to easily execute all asserts
231 queries_tests = set()
233 #Used to check that correct place_classtype table already in the datase before is still there.
234 query_existing_table = """
236 FROM information_schema.tables
237 WHERE table_schema='public'
238 AND table_name = 'place_classtype_amenity_animal_shelter';
240 queries_tests.add((query_existing_table, True))
242 #Used to check that wrong place_classtype table was deleted from the database.
243 query_wrong_table = """
245 FROM information_schema.tables
246 WHERE table_schema='public'
247 AND table_name = 'place_classtype_wrongclass_wrongtype';
249 queries_tests.add((query_wrong_table, False))
251 with temp_db_conn.cursor() as temp_db_cursor:
252 for query in queries_tests:
253 temp_db_cursor.execute(query[0])
254 if (query[1] == True):
255 assert temp_db_cursor.fetchone()
257 assert not temp_db_cursor.fetchone()
259 def mock_get_wiki_content(lang):
261 Mock the _get_wiki_content() method to return
262 static xml test file content.
264 return get_test_xml_wiki_content()
266 def get_test_xml_wiki_content():
268 return the content of the static xml test file.
270 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
271 with open(xml_test_content_path) as xml_content_reader:
272 return xml_content_reader.read()
274 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
276 Verify that the place_classtype table exists for the given
277 phrase_class and phrase_type.
279 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
281 with temp_db_conn.cursor() as temp_db_cursor:
282 temp_db_cursor.execute("""
284 FROM information_schema.tables
285 WHERE table_type='BASE TABLE'
286 AND table_name='{}'""".format(table_name))
287 return temp_db_cursor.fetchone()
289 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
291 Check that the web user has been granted right access to the
292 place_classtype table of the given phrase_class and phrase_type.
294 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
296 with temp_db_conn.cursor() as temp_db_cursor:
297 temp_db_cursor.execute("""
298 SELECT * FROM information_schema.role_table_grants
299 WHERE table_name='{}'
301 AND privilege_type='SELECT'""".format(table_name, user))
302 return temp_db_cursor.fetchone()
304 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
306 Check that the place_id index and centroid index exist for the
307 place_classtype table of the given phrase_class and phrase_type.
309 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
312 temp_db_conn.index_exists(index_prefix + 'centroid')
314 temp_db_conn.index_exists(index_prefix + 'place_id')
318 def special_phrases_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
320 Return an instance of SpecialPhrasesImporter.
322 return SpecialPhrasesImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn)
325 def temp_phplib_dir_with_migration():
327 Return temporary phpdir with migration subdirectory and
328 PhraseSettingsToJson.php script inside.
330 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
331 / 'PhraseSettingsToJson.php').resolve()
332 with tempfile.TemporaryDirectory() as phpdir:
333 (Path(phpdir) / 'migration').mkdir()
334 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
335 copyfile(migration_file, migration_dest_path)
340 def default_phrases(temp_db_cursor):
341 temp_db_cursor.execute("""
342 CREATE TABLE place_classtype_testclasstypetable_to_delete();
343 CREATE TABLE place_classtype_testclasstypetable_to_keep();""")