2 Tests for import special phrases methods
3 of the class SpecialPhrasesImporter.
5 from mocks import MockParamCapture
6 from nominatim.errors import UsageError
7 from pathlib import Path
9 from shutil import copyfile
11 from nominatim.tools.special_phrases import SpecialPhrasesImporter
13 TEST_BASE_DIR = Path(__file__) / '..' / '..'
15 def test_fetch_existing_words_phrases_basic(special_phrases_importer, word_table,
18 Check for the fetch_existing_words_phrases() method.
19 It should return special phrase term added to the word
23 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
24 'class', 'type', null, 0, 'near');
26 temp_db_cursor.execute(query)
28 assert not special_phrases_importer.words_phrases_to_delete
29 special_phrases_importer._fetch_existing_words_phrases()
30 contained_phrase = special_phrases_importer.words_phrases_to_delete.pop()
31 assert contained_phrase == ('normalized_word', 'class', 'type', 'near')
33 @pytest.mark.parametrize("house_type", ['house', 'postcode'])
34 def test_fetch_existing_words_phrases_special_cases(special_phrases_importer, word_table,
35 house_type, temp_db_cursor):
37 Check for the fetch_existing_words_phrases() method.
38 It should return nothing as the terms added correspond
39 to a housenumber and postcode term.
42 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
43 'place', %s, null, 0, 'near');
45 temp_db_cursor.execute(query, (house_type,))
47 special_phrases_importer._fetch_existing_words_phrases()
48 assert not special_phrases_importer.words_phrases_to_delete
50 def test_fetch_existing_place_classtype_tables(special_phrases_importer, temp_db_cursor):
52 Check for the fetch_existing_place_classtype_tables() method.
53 It should return the table just created.
55 temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
57 special_phrases_importer._fetch_existing_place_classtype_tables()
58 contained_table = special_phrases_importer.table_phrases_to_delete.pop()
59 assert contained_table == 'place_classtype_testclasstypetable'
61 def test_check_sanity_class(special_phrases_importer):
63 Check for _check_sanity() method.
64 If a wrong class or type is given, an UsageError should raise.
65 If a good class and type are given, nothing special happens.
67 with pytest.raises(UsageError):
68 special_phrases_importer._check_sanity('en', '', 'type')
70 with pytest.raises(UsageError):
71 special_phrases_importer._check_sanity('en', 'class', '')
73 special_phrases_importer._check_sanity('en', 'class', 'type')
75 def test_load_white_and_black_lists(special_phrases_importer):
77 Test that _load_white_and_black_lists() well return
78 black list and white list and that they are of dict type.
80 black_list, white_list = special_phrases_importer._load_white_and_black_lists()
82 assert isinstance(black_list, dict) and isinstance(white_list, dict)
84 def test_convert_php_settings(special_phrases_importer):
86 Test that _convert_php_settings_if_needed() convert the given
87 php file to a json file.
89 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
91 with tempfile.TemporaryDirectory() as temp_dir:
92 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
93 copyfile(php_file, temp_settings)
94 special_phrases_importer._convert_php_settings_if_needed(temp_settings)
96 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
98 def test_convert_settings_wrong_file(special_phrases_importer):
100 Test that _convert_php_settings_if_needed() raise an exception
101 if the given file is not a valid file.
103 with pytest.raises(UsageError, match='random_file is not a valid file.'):
104 special_phrases_importer._convert_php_settings_if_needed('random_file')
106 def test_convert_settings_json_already_exist(special_phrases_importer):
108 Test that if we give to '_convert_php_settings_if_needed' a php file path
109 and that a the corresponding json file already exists, it is returned.
111 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
112 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
114 returned = special_phrases_importer._convert_php_settings_if_needed(php_file)
116 assert returned == json_file
118 def test_convert_settings_giving_json(special_phrases_importer):
120 Test that if we give to '_convert_php_settings_if_needed' a json file path
121 the same path is directly returned
123 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
125 returned = special_phrases_importer._convert_php_settings_if_needed(json_file)
127 assert returned == json_file
129 def test_process_amenity_with_operator(special_phrases_importer, getorcreate_amenityoperator_funcs,
130 temp_db_conn, word_table):
132 Test that _process_amenity() execute well the
133 getorcreate_amenityoperator() SQL function and that
134 the 2 differents operators are well handled.
136 special_phrases_importer._process_amenity('', '', '', '', 'near')
137 special_phrases_importer._process_amenity('', '', '', '', 'in')
139 with temp_db_conn.cursor() as temp_db_cursor:
140 temp_db_cursor.execute("SELECT * FROM word WHERE operator='near' OR operator='in'")
141 results = temp_db_cursor.fetchall()
143 assert len(results) == 2
145 def test_process_amenity_without_operator(special_phrases_importer, getorcreate_amenity_funcs,
146 temp_db_conn, word_table):
148 Test that _process_amenity() execute well the
149 getorcreate_amenity() SQL function.
151 special_phrases_importer._process_amenity('', '', '', '', '')
153 with temp_db_conn.cursor() as temp_db_cursor:
154 temp_db_cursor.execute("SELECT * FROM word WHERE operator='no_operator'")
155 result = temp_db_cursor.fetchone()
159 def test_create_place_classtype_indexes(temp_db_conn, special_phrases_importer):
161 Test that _create_place_classtype_indexes() create the
162 place_id index and centroid index on the right place_class_type table.
164 phrase_class = 'class'
166 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
168 with temp_db_conn.cursor() as temp_db_cursor:
169 temp_db_cursor.execute("CREATE EXTENSION postgis;")
170 temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
172 special_phrases_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
174 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
176 def test_create_place_classtype_table(temp_db_conn, placex_table, special_phrases_importer):
178 Test that _create_place_classtype_table() create
179 the right place_classtype table.
181 phrase_class = 'class'
183 special_phrases_importer._create_place_classtype_table('', phrase_class, phrase_type)
185 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
187 def test_grant_access_to_web_user(temp_db_conn, def_config, special_phrases_importer):
189 Test that _grant_access_to_webuser() give
190 right access to the web user.
192 phrase_class = 'class'
194 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
196 with temp_db_conn.cursor() as temp_db_cursor:
197 temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
199 special_phrases_importer._grant_access_to_webuser(phrase_class, phrase_type)
201 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
203 def test_create_place_classtype_table_and_indexes(
204 temp_db_conn, def_config, placex_table,
205 special_phrases_importer):
207 Test that _create_place_classtype_table_and_indexes()
208 create the right place_classtype tables and place_id indexes
209 and centroid indexes and grant access to the web user
210 for the given set of pairs.
212 pairs = set([('class1', 'type1'), ('class2', 'type2')])
214 special_phrases_importer._create_place_classtype_table_and_indexes(pairs)
217 assert check_table_exist(temp_db_conn, pair[0], pair[1])
218 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
219 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
221 def test_process_xml_content(temp_db_conn, def_config, special_phrases_importer, word_table,
222 getorcreate_amenity_funcs, getorcreate_amenityoperator_funcs):
224 Test that _process_xml_content() process the given xml content right
225 by executing the right SQL functions for amenities and
226 by returning the right set of pairs.
228 class_test = 'aerialway'
229 type_test = 'zip_line'
231 #Converted output set to a dict for easy assert further.
232 results = dict(special_phrases_importer._process_xml_content(get_test_xml_wiki_content(), 'en'))
234 assert check_amenities_with_op(temp_db_conn)
235 assert check_amenities_without_op(temp_db_conn)
236 assert results[class_test] and type_test in results.values()
238 def test_remove_non_existent_phrases_from_db(special_phrases_importer, default_phrases,
241 Check for the remove_non_existent_phrases_from_db() method.
243 It should removed entries from the word table which are contained
244 in the words_phrases_to_delete set and not those also contained
245 in the words_phrases_still_exist set.
247 place_classtype tables contained in table_phrases_to_delete should
250 with temp_db_conn.cursor() as temp_db_cursor:
251 to_delete_phrase_tuple = ('normalized_word', 'class', 'type', 'near')
252 to_keep_phrase_tuple = (
253 'normalized_word_exists', 'class_exists', 'type_exists', 'near'
255 special_phrases_importer.words_phrases_to_delete = {
256 to_delete_phrase_tuple,
259 special_phrases_importer.words_phrases_still_exist = {
262 special_phrases_importer.table_phrases_to_delete = {
263 'place_classtype_testclasstypetable_to_delete'
266 query_words = 'SELECT word, class, type, operator FROM word;'
269 FROM information_schema.tables
270 WHERE table_schema='public'
271 AND table_name like 'place_classtype_%';
274 special_phrases_importer._remove_non_existent_phrases_from_db()
276 temp_db_cursor.execute(query_words)
277 words_result = temp_db_cursor.fetchall()
278 temp_db_cursor.execute(query_tables)
279 tables_result = temp_db_cursor.fetchall()
280 assert len(words_result) == 1 and words_result[0] == [
281 'normalized_word_exists', 'class_exists', 'type_exists', 'near'
283 assert (len(tables_result) == 1 and
284 tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
287 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, special_phrases_importer, placex_table,
288 getorcreate_amenity_funcs, getorcreate_amenityoperator_funcs, word_table):
290 Check that the main import_from_wiki() method is well executed.
291 It should create the place_classtype table, the place_id and centroid indexes,
292 grand access to the web user and executing the SQL functions for amenities.
293 It should also update the database well by deleting or preserving existing entries
296 #Add some data to the database before execution in order to test
297 #what is deleted and what is preserved.
298 with temp_db_conn.cursor() as temp_db_cursor:
299 temp_db_cursor.execute("""
300 INSERT INTO word VALUES(99999, ' animal shelter', 'animal shelter',
301 'amenity', 'animal_shelter', null, 0, null);
303 INSERT INTO word VALUES(99999, ' wrong_lookup_token', 'wrong_normalized_word',
304 'wrong_class', 'wrong_type', null, 0, 'near');
306 CREATE TABLE place_classtype_amenity_animal_shelter();
307 CREATE TABLE place_classtype_wrongclass_wrongtype();""")
309 monkeypatch.setattr('nominatim.tools.special_phrases.SpecialPhrasesImporter._get_wiki_content', mock_get_wiki_content)
310 special_phrases_importer.import_from_wiki(['en'])
312 class_test = 'aerialway'
313 type_test = 'zip_line'
315 assert check_table_exist(temp_db_conn, class_test, type_test)
316 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
317 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
318 assert check_amenities_with_op(temp_db_conn)
319 assert check_amenities_without_op(temp_db_conn)
320 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
321 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
323 #Format (query, should_return_something_bool) use to easily execute all asserts
324 queries_tests = set()
326 #Used to check that the correct phrase already in the word table before is still there.
327 query_correct_word = "SELECT * FROM word WHERE word = 'animal shelter'"
328 queries_tests.add((query_correct_word, True))
330 #Used to check if wrong phrase was deleted from the word table of the database.
331 query_wrong_word = "SELECT word FROM word WHERE word = 'wrong_normalized_word'"
332 queries_tests.add((query_wrong_word, False))
334 #Used to check that correct place_classtype table already in the datase before is still there.
335 query_existing_table = """
337 FROM information_schema.tables
338 WHERE table_schema='public'
339 AND table_name = 'place_classtype_amenity_animal_shelter';
341 queries_tests.add((query_existing_table, True))
343 #Used to check that wrong place_classtype table was deleted from the database.
344 query_wrong_table = """
346 FROM information_schema.tables
347 WHERE table_schema='public'
348 AND table_name = 'place_classtype_wrongclass_wrongtype';
350 queries_tests.add((query_wrong_table, False))
352 with temp_db_conn.cursor() as temp_db_cursor:
353 for query in queries_tests:
354 temp_db_cursor.execute(query[0])
355 if (query[1] == True):
356 assert temp_db_cursor.fetchone()
358 assert not temp_db_cursor.fetchone()
360 def mock_get_wiki_content(lang):
362 Mock the _get_wiki_content() method to return
363 static xml test file content.
365 return get_test_xml_wiki_content()
367 def get_test_xml_wiki_content():
369 return the content of the static xml test file.
371 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
372 with open(xml_test_content_path) as xml_content_reader:
373 return xml_content_reader.read()
375 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
377 Verify that the place_classtype table exists for the given
378 phrase_class and phrase_type.
380 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
382 with temp_db_conn.cursor() as temp_db_cursor:
383 temp_db_cursor.execute("""
385 FROM information_schema.tables
386 WHERE table_type='BASE TABLE'
387 AND table_name='{}'""".format(table_name))
388 return temp_db_cursor.fetchone()
390 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
392 Check that the web user has been granted right access to the
393 place_classtype table of the given phrase_class and phrase_type.
395 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
397 with temp_db_conn.cursor() as temp_db_cursor:
398 temp_db_cursor.execute("""
399 SELECT * FROM information_schema.role_table_grants
400 WHERE table_name='{}'
402 AND privilege_type='SELECT'""".format(table_name, user))
403 return temp_db_cursor.fetchone()
405 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
407 Check that the place_id index and centroid index exist for the
408 place_classtype table of the given phrase_class and phrase_type.
410 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
413 temp_db_conn.index_exists(index_prefix + 'centroid')
415 temp_db_conn.index_exists(index_prefix + 'place_id')
418 def check_amenities_with_op(temp_db_conn):
420 Check that the test table for the SQL function getorcreate_amenityoperator()
421 contains more than one value (so that the SQL function was call more than one time).
423 with temp_db_conn.cursor() as temp_db_cursor:
424 temp_db_cursor.execute("SELECT * FROM word WHERE operator != 'no_operator'")
425 return len(temp_db_cursor.fetchall()) > 1
427 def check_amenities_without_op(temp_db_conn):
429 Check that the test table for the SQL function getorcreate_amenity()
430 contains more than one value (so that the SQL function was call more than one time).
432 with temp_db_conn.cursor() as temp_db_cursor:
433 temp_db_cursor.execute("SELECT * FROM word WHERE operator = 'no_operator'")
434 return len(temp_db_cursor.fetchall()) > 1
437 def special_phrases_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
439 Return an instance of SpecialPhrasesImporter.
441 return SpecialPhrasesImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn)
444 def temp_phplib_dir_with_migration():
446 Return temporary phpdir with migration subdirectory and
447 PhraseSettingsToJson.php script inside.
449 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
450 / 'PhraseSettingsToJson.php').resolve()
451 with tempfile.TemporaryDirectory() as phpdir:
452 (Path(phpdir) / 'migration').mkdir()
453 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
454 copyfile(migration_file, migration_dest_path)
459 def default_phrases(word_table, temp_db_cursor):
460 temp_db_cursor.execute("""
461 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
462 'class', 'type', null, 0, 'near');
464 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word_exists',
465 'class_exists', 'type_exists', null, 0, 'near');
467 CREATE TABLE place_classtype_testclasstypetable_to_delete();
468 CREATE TABLE place_classtype_testclasstypetable_to_keep();""")
471 def make_strandard_name_func(temp_db_cursor):
472 temp_db_cursor.execute("""
473 CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT AS $$
475 RETURN trim(name); --Basically return only the trimed name for the tests
477 $$ LANGUAGE plpgsql IMMUTABLE;""")
480 def getorcreate_amenity_funcs(temp_db_cursor, make_strandard_name_func):
481 temp_db_cursor.execute("""
482 CREATE OR REPLACE FUNCTION getorcreate_amenity(lookup_word TEXT, normalized_word TEXT,
483 lookup_class text, lookup_type text)
486 INSERT INTO word VALUES(null, lookup_word, normalized_word,
487 lookup_class, lookup_type, null, 0, 'no_operator');
489 $$ LANGUAGE plpgsql""")
492 def getorcreate_amenityoperator_funcs(temp_db_cursor, make_strandard_name_func):
493 temp_db_cursor.execute("""
494 CREATE TABLE temp_with_operator(op TEXT);
496 CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT, normalized_word TEXT,
497 lookup_class text, lookup_type text, op text)
500 INSERT INTO word VALUES(null, lookup_word, normalized_word,
501 lookup_class, lookup_type, null, 0, op);
503 $$ LANGUAGE plpgsql""")