2 Tests for import special phrases methods
3 of the class SpecialPhrasesImporter.
5 from mocks import MockParamCapture
6 from nominatim.errors import UsageError
7 from pathlib import Path
9 from shutil import copyfile
11 from nominatim.tools.special_phrases import SpecialPhrasesImporter
13 TEST_BASE_DIR = Path(__file__) / '..' / '..'
15 def test_fetch_existing_words_phrases_basic(special_phrases_importer, word_table,
18 Check for the fetch_existing_words_phrases() method.
19 It should return special phrase term added to the word
23 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
24 'class', 'type', null, 0, 'near');
26 temp_db_cursor.execute(query)
28 assert not special_phrases_importer.words_phrases_to_delete
29 special_phrases_importer._fetch_existing_words_phrases()
30 contained_phrase = special_phrases_importer.words_phrases_to_delete.pop()
31 assert contained_phrase == ('normalized_word', 'class', 'type', 'near')
33 @pytest.mark.parametrize("house_type", ['house', 'postcode'])
34 def test_fetch_existing_words_phrases_special_cases(special_phrases_importer, word_table,
35 house_type, temp_db_cursor):
37 Check for the fetch_existing_words_phrases() method.
38 It should return nothing as the terms added correspond
39 to a housenumber and postcode term.
42 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
43 'place', %s, null, 0, 'near');
45 temp_db_cursor.execute(query, (house_type,))
47 special_phrases_importer._fetch_existing_words_phrases()
48 assert not special_phrases_importer.words_phrases_to_delete
50 def test_fetch_existing_place_classtype_tables(special_phrases_importer, temp_db_cursor):
52 Check for the fetch_existing_place_classtype_tables() method.
53 It should return the table just created.
55 temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
57 special_phrases_importer._fetch_existing_place_classtype_tables()
58 contained_table = special_phrases_importer.table_phrases_to_delete.pop()
59 assert contained_table == 'place_classtype_testclasstypetable'
61 def test_check_sanity_class(special_phrases_importer):
63 Check for _check_sanity() method.
64 If a wrong class or type is given, an UsageError should raise.
65 If a good class and type are given, nothing special happens.
68 assert not special_phrases_importer._check_sanity('en', '', 'type')
69 assert not special_phrases_importer._check_sanity('en', 'class', '')
71 assert special_phrases_importer._check_sanity('en', 'class', 'type')
73 def test_load_white_and_black_lists(special_phrases_importer):
75 Test that _load_white_and_black_lists() well return
76 black list and white list and that they are of dict type.
78 black_list, white_list = special_phrases_importer._load_white_and_black_lists()
80 assert isinstance(black_list, dict) and isinstance(white_list, dict)
82 def test_convert_php_settings(special_phrases_importer):
84 Test that _convert_php_settings_if_needed() convert the given
85 php file to a json file.
87 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
89 with tempfile.TemporaryDirectory() as temp_dir:
90 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
91 copyfile(php_file, temp_settings)
92 special_phrases_importer._convert_php_settings_if_needed(temp_settings)
94 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
96 def test_convert_settings_wrong_file(special_phrases_importer):
98 Test that _convert_php_settings_if_needed() raise an exception
99 if the given file is not a valid file.
101 with pytest.raises(UsageError, match='random_file is not a valid file.'):
102 special_phrases_importer._convert_php_settings_if_needed('random_file')
104 def test_convert_settings_json_already_exist(special_phrases_importer):
106 Test that if we give to '_convert_php_settings_if_needed' a php file path
107 and that a the corresponding json file already exists, it is returned.
109 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
110 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
112 returned = special_phrases_importer._convert_php_settings_if_needed(php_file)
114 assert returned == json_file
116 def test_convert_settings_giving_json(special_phrases_importer):
118 Test that if we give to '_convert_php_settings_if_needed' a json file path
119 the same path is directly returned
121 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
123 returned = special_phrases_importer._convert_php_settings_if_needed(json_file)
125 assert returned == json_file
127 def test_process_amenity_with_operator(special_phrases_importer, getorcreate_amenityoperator_funcs,
128 temp_db_conn, word_table):
130 Test that _process_amenity() execute well the
131 getorcreate_amenityoperator() SQL function and that
132 the 2 differents operators are well handled.
134 special_phrases_importer._process_amenity('', '', '', '', 'near')
135 special_phrases_importer._process_amenity('', '', '', '', 'in')
137 with temp_db_conn.cursor() as temp_db_cursor:
138 temp_db_cursor.execute("SELECT * FROM word WHERE operator='near' OR operator='in'")
139 results = temp_db_cursor.fetchall()
141 assert len(results) == 2
143 def test_process_amenity_without_operator(special_phrases_importer, getorcreate_amenity_funcs,
144 temp_db_conn, word_table):
146 Test that _process_amenity() execute well the
147 getorcreate_amenity() SQL function.
149 special_phrases_importer._process_amenity('', '', '', '', '')
151 with temp_db_conn.cursor() as temp_db_cursor:
152 temp_db_cursor.execute("SELECT * FROM word WHERE operator='no_operator'")
153 result = temp_db_cursor.fetchone()
157 def test_create_place_classtype_indexes(temp_db_conn, special_phrases_importer):
159 Test that _create_place_classtype_indexes() create the
160 place_id index and centroid index on the right place_class_type table.
162 phrase_class = 'class'
164 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
166 with temp_db_conn.cursor() as temp_db_cursor:
167 temp_db_cursor.execute("CREATE EXTENSION postgis;")
168 temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
170 special_phrases_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
172 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
174 def test_create_place_classtype_table(temp_db_conn, placex_table, special_phrases_importer):
176 Test that _create_place_classtype_table() create
177 the right place_classtype table.
179 phrase_class = 'class'
181 special_phrases_importer._create_place_classtype_table('', phrase_class, phrase_type)
183 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
185 def test_grant_access_to_web_user(temp_db_conn, def_config, special_phrases_importer):
187 Test that _grant_access_to_webuser() give
188 right access to the web user.
190 phrase_class = 'class'
192 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
194 with temp_db_conn.cursor() as temp_db_cursor:
195 temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
197 special_phrases_importer._grant_access_to_webuser(phrase_class, phrase_type)
199 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
201 def test_create_place_classtype_table_and_indexes(
202 temp_db_conn, def_config, placex_table,
203 special_phrases_importer):
205 Test that _create_place_classtype_table_and_indexes()
206 create the right place_classtype tables and place_id indexes
207 and centroid indexes and grant access to the web user
208 for the given set of pairs.
210 pairs = set([('class1', 'type1'), ('class2', 'type2')])
212 special_phrases_importer._create_place_classtype_table_and_indexes(pairs)
215 assert check_table_exist(temp_db_conn, pair[0], pair[1])
216 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
217 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
219 def test_process_xml_content(temp_db_conn, def_config, special_phrases_importer, word_table,
220 getorcreate_amenity_funcs, getorcreate_amenityoperator_funcs):
222 Test that _process_xml_content() process the given xml content right
223 by executing the right SQL functions for amenities and
224 by returning the right set of pairs.
226 class_test = 'aerialway'
227 type_test = 'zip_line'
229 #Converted output set to a dict for easy assert further.
230 results = dict(special_phrases_importer._process_xml_content(get_test_xml_wiki_content(), 'en'))
232 assert check_amenities_with_op(temp_db_conn)
233 assert check_amenities_without_op(temp_db_conn)
234 assert results[class_test] and type_test in results.values()
236 def test_remove_non_existent_phrases_from_db(special_phrases_importer, default_phrases,
239 Check for the remove_non_existent_phrases_from_db() method.
241 It should removed entries from the word table which are contained
242 in the words_phrases_to_delete set and not those also contained
243 in the words_phrases_still_exist set.
245 place_classtype tables contained in table_phrases_to_delete should
248 with temp_db_conn.cursor() as temp_db_cursor:
249 to_delete_phrase_tuple = ('normalized_word', 'class', 'type', 'near')
250 to_keep_phrase_tuple = (
251 'normalized_word_exists', 'class_exists', 'type_exists', 'near'
253 special_phrases_importer.words_phrases_to_delete = {
254 to_delete_phrase_tuple,
257 special_phrases_importer.words_phrases_still_exist = {
260 special_phrases_importer.table_phrases_to_delete = {
261 'place_classtype_testclasstypetable_to_delete'
264 query_words = 'SELECT word, class, type, operator FROM word;'
267 FROM information_schema.tables
268 WHERE table_schema='public'
269 AND table_name like 'place_classtype_%';
272 special_phrases_importer._remove_non_existent_phrases_from_db()
274 temp_db_cursor.execute(query_words)
275 words_result = temp_db_cursor.fetchall()
276 temp_db_cursor.execute(query_tables)
277 tables_result = temp_db_cursor.fetchall()
278 assert len(words_result) == 1 and words_result[0] == [
279 'normalized_word_exists', 'class_exists', 'type_exists', 'near'
281 assert (len(tables_result) == 1 and
282 tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
285 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, special_phrases_importer, placex_table,
286 getorcreate_amenity_funcs, getorcreate_amenityoperator_funcs, word_table):
288 Check that the main import_from_wiki() method is well executed.
289 It should create the place_classtype table, the place_id and centroid indexes,
290 grand access to the web user and executing the SQL functions for amenities.
291 It should also update the database well by deleting or preserving existing entries
294 #Add some data to the database before execution in order to test
295 #what is deleted and what is preserved.
296 with temp_db_conn.cursor() as temp_db_cursor:
297 temp_db_cursor.execute("""
298 INSERT INTO word VALUES(99999, ' animal shelter', 'animal shelter',
299 'amenity', 'animal_shelter', null, 0, null);
301 INSERT INTO word VALUES(99999, ' wrong_lookup_token', 'wrong_normalized_word',
302 'wrong_class', 'wrong_type', null, 0, 'near');
304 CREATE TABLE place_classtype_amenity_animal_shelter();
305 CREATE TABLE place_classtype_wrongclass_wrongtype();""")
307 monkeypatch.setattr('nominatim.tools.special_phrases.SpecialPhrasesImporter._get_wiki_content', mock_get_wiki_content)
308 special_phrases_importer.import_from_wiki(['en'])
310 class_test = 'aerialway'
311 type_test = 'zip_line'
313 assert check_table_exist(temp_db_conn, class_test, type_test)
314 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
315 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
316 assert check_amenities_with_op(temp_db_conn)
317 assert check_amenities_without_op(temp_db_conn)
318 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
319 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
321 #Format (query, should_return_something_bool) use to easily execute all asserts
322 queries_tests = set()
324 #Used to check that the correct phrase already in the word table before is still there.
325 query_correct_word = "SELECT * FROM word WHERE word = 'animal shelter'"
326 queries_tests.add((query_correct_word, True))
328 #Used to check if wrong phrase was deleted from the word table of the database.
329 query_wrong_word = "SELECT word FROM word WHERE word = 'wrong_normalized_word'"
330 queries_tests.add((query_wrong_word, False))
332 #Used to check that correct place_classtype table already in the datase before is still there.
333 query_existing_table = """
335 FROM information_schema.tables
336 WHERE table_schema='public'
337 AND table_name = 'place_classtype_amenity_animal_shelter';
339 queries_tests.add((query_existing_table, True))
341 #Used to check that wrong place_classtype table was deleted from the database.
342 query_wrong_table = """
344 FROM information_schema.tables
345 WHERE table_schema='public'
346 AND table_name = 'place_classtype_wrongclass_wrongtype';
348 queries_tests.add((query_wrong_table, False))
350 with temp_db_conn.cursor() as temp_db_cursor:
351 for query in queries_tests:
352 temp_db_cursor.execute(query[0])
353 if (query[1] == True):
354 assert temp_db_cursor.fetchone()
356 assert not temp_db_cursor.fetchone()
358 def mock_get_wiki_content(lang):
360 Mock the _get_wiki_content() method to return
361 static xml test file content.
363 return get_test_xml_wiki_content()
365 def get_test_xml_wiki_content():
367 return the content of the static xml test file.
369 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
370 with open(xml_test_content_path) as xml_content_reader:
371 return xml_content_reader.read()
373 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
375 Verify that the place_classtype table exists for the given
376 phrase_class and phrase_type.
378 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
380 with temp_db_conn.cursor() as temp_db_cursor:
381 temp_db_cursor.execute("""
383 FROM information_schema.tables
384 WHERE table_type='BASE TABLE'
385 AND table_name='{}'""".format(table_name))
386 return temp_db_cursor.fetchone()
388 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
390 Check that the web user has been granted right access to the
391 place_classtype table of the given phrase_class and phrase_type.
393 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
395 with temp_db_conn.cursor() as temp_db_cursor:
396 temp_db_cursor.execute("""
397 SELECT * FROM information_schema.role_table_grants
398 WHERE table_name='{}'
400 AND privilege_type='SELECT'""".format(table_name, user))
401 return temp_db_cursor.fetchone()
403 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
405 Check that the place_id index and centroid index exist for the
406 place_classtype table of the given phrase_class and phrase_type.
408 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
411 temp_db_conn.index_exists(index_prefix + 'centroid')
413 temp_db_conn.index_exists(index_prefix + 'place_id')
416 def check_amenities_with_op(temp_db_conn):
418 Check that the test table for the SQL function getorcreate_amenityoperator()
419 contains more than one value (so that the SQL function was call more than one time).
421 with temp_db_conn.cursor() as temp_db_cursor:
422 temp_db_cursor.execute("SELECT * FROM word WHERE operator != 'no_operator'")
423 return len(temp_db_cursor.fetchall()) > 1
425 def check_amenities_without_op(temp_db_conn):
427 Check that the test table for the SQL function getorcreate_amenity()
428 contains more than one value (so that the SQL function was call more than one time).
430 with temp_db_conn.cursor() as temp_db_cursor:
431 temp_db_cursor.execute("SELECT * FROM word WHERE operator = 'no_operator'")
432 return len(temp_db_cursor.fetchall()) > 1
435 def special_phrases_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
437 Return an instance of SpecialPhrasesImporter.
439 return SpecialPhrasesImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn)
442 def temp_phplib_dir_with_migration():
444 Return temporary phpdir with migration subdirectory and
445 PhraseSettingsToJson.php script inside.
447 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
448 / 'PhraseSettingsToJson.php').resolve()
449 with tempfile.TemporaryDirectory() as phpdir:
450 (Path(phpdir) / 'migration').mkdir()
451 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
452 copyfile(migration_file, migration_dest_path)
457 def default_phrases(word_table, temp_db_cursor):
458 temp_db_cursor.execute("""
459 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
460 'class', 'type', null, 0, 'near');
462 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word_exists',
463 'class_exists', 'type_exists', null, 0, 'near');
465 CREATE TABLE place_classtype_testclasstypetable_to_delete();
466 CREATE TABLE place_classtype_testclasstypetable_to_keep();""")
469 def make_strandard_name_func(temp_db_cursor):
470 temp_db_cursor.execute("""
471 CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT AS $$
473 RETURN trim(name); --Basically return only the trimed name for the tests
475 $$ LANGUAGE plpgsql IMMUTABLE;""")
478 def getorcreate_amenity_funcs(temp_db_cursor, make_strandard_name_func):
479 temp_db_cursor.execute("""
480 CREATE OR REPLACE FUNCTION getorcreate_amenity(lookup_word TEXT, normalized_word TEXT,
481 lookup_class text, lookup_type text)
484 INSERT INTO word VALUES(null, lookup_word, normalized_word,
485 lookup_class, lookup_type, null, 0, 'no_operator');
487 $$ LANGUAGE plpgsql""")
490 def getorcreate_amenityoperator_funcs(temp_db_cursor, make_strandard_name_func):
491 temp_db_cursor.execute("""
492 CREATE TABLE temp_with_operator(op TEXT);
494 CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT, normalized_word TEXT,
495 lookup_class text, lookup_type text, op text)
498 INSERT INTO word VALUES(null, lookup_word, normalized_word,
499 lookup_class, lookup_type, null, 0, op);
501 $$ LANGUAGE plpgsql""")