2 Tests for import special phrases methods
3 of the class SpecialPhrasesImporter.
5 from nominatim.errors import UsageError
6 from pathlib import Path
8 from shutil import copyfile
10 from nominatim.tools import SpecialPhrasesImporter
12 TEST_BASE_DIR = Path(__file__) / '..' / '..'
14 def test_fetch_existing_words_phrases_basic(special_phrases_importer, word_table,
17 Check for the fetch_existing_words_phrases() method.
18 It should return special phrase term added to the word
22 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
23 'class', 'type', null, 0, 'near');
25 temp_db_cursor.execute(query)
27 assert not special_phrases_importer.words_phrases_to_delete
28 special_phrases_importer._fetch_existing_words_phrases()
29 contained_phrase = special_phrases_importer.words_phrases_to_delete.pop()
30 assert contained_phrase == ('normalized_word', 'class', 'type', 'near')
32 @pytest.mark.parametrize("house_type", ['house', 'postcode'])
33 def test_fetch_existing_words_phrases_special_cases(special_phrases_importer, word_table,
34 house_type, temp_db_cursor):
36 Check for the fetch_existing_words_phrases() method.
37 It should return nothing as the terms added correspond
38 to a housenumber and postcode term.
41 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
42 'place', %s, null, 0, 'near');
44 temp_db_cursor.execute(query, (house_type,))
46 special_phrases_importer._fetch_existing_words_phrases()
47 assert not special_phrases_importer.words_phrases_to_delete
49 def test_fetch_existing_place_classtype_tables(special_phrases_importer, temp_db_cursor):
51 Check for the fetch_existing_place_classtype_tables() method.
52 It should return the table just created.
54 temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
56 special_phrases_importer._fetch_existing_place_classtype_tables()
57 contained_table = special_phrases_importer.table_phrases_to_delete.pop()
58 assert contained_table == 'place_classtype_testclasstypetable'
60 def test_check_sanity_class(special_phrases_importer):
62 Check for _check_sanity() method.
63 If a wrong class or type is given, an UsageError should raise.
64 If a good class and type are given, nothing special happens.
67 assert not special_phrases_importer._check_sanity('en', '', 'type')
68 assert not special_phrases_importer._check_sanity('en', 'class', '')
70 assert special_phrases_importer._check_sanity('en', 'class', 'type')
72 def test_load_white_and_black_lists(special_phrases_importer):
74 Test that _load_white_and_black_lists() well return
75 black list and white list and that they are of dict type.
77 black_list, white_list = special_phrases_importer._load_white_and_black_lists()
79 assert isinstance(black_list, dict) and isinstance(white_list, dict)
81 def test_convert_php_settings(special_phrases_importer):
83 Test that _convert_php_settings_if_needed() convert the given
84 php file to a json file.
86 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
88 with tempfile.TemporaryDirectory() as temp_dir:
89 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
90 copyfile(php_file, temp_settings)
91 special_phrases_importer._convert_php_settings_if_needed(temp_settings)
93 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
95 def test_convert_settings_wrong_file(special_phrases_importer):
97 Test that _convert_php_settings_if_needed() raise an exception
98 if the given file is not a valid file.
100 with pytest.raises(UsageError, match='random_file is not a valid file.'):
101 special_phrases_importer._convert_php_settings_if_needed('random_file')
103 def test_convert_settings_json_already_exist(special_phrases_importer):
105 Test that if we give to '_convert_php_settings_if_needed' a php file path
106 and that a the corresponding json file already exists, it is returned.
108 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
109 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
111 returned = special_phrases_importer._convert_php_settings_if_needed(php_file)
113 assert returned == json_file
115 def test_convert_settings_giving_json(special_phrases_importer):
117 Test that if we give to '_convert_php_settings_if_needed' a json file path
118 the same path is directly returned
120 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
122 returned = special_phrases_importer._convert_php_settings_if_needed(json_file)
124 assert returned == json_file
126 def test_process_amenity_with_operator(special_phrases_importer, getorcreate_amenityoperator_funcs,
127 temp_db_conn, word_table):
129 Test that _process_amenity() execute well the
130 getorcreate_amenityoperator() SQL function and that
131 the 2 differents operators are well handled.
133 special_phrases_importer._process_amenity('', '', '', '', 'near')
134 special_phrases_importer._process_amenity('', '', '', '', 'in')
136 with temp_db_conn.cursor() as temp_db_cursor:
137 temp_db_cursor.execute("SELECT * FROM word WHERE operator='near' OR operator='in'")
138 results = temp_db_cursor.fetchall()
140 assert len(results) == 2
142 def test_process_amenity_without_operator(special_phrases_importer, getorcreate_amenity_funcs,
143 temp_db_conn, word_table):
145 Test that _process_amenity() execute well the
146 getorcreate_amenity() SQL function.
148 special_phrases_importer._process_amenity('', '', '', '', '')
150 with temp_db_conn.cursor() as temp_db_cursor:
151 temp_db_cursor.execute("SELECT * FROM word WHERE operator='no_operator'")
152 result = temp_db_cursor.fetchone()
156 def test_create_place_classtype_indexes(temp_db_conn, special_phrases_importer):
158 Test that _create_place_classtype_indexes() create the
159 place_id index and centroid index on the right place_class_type table.
161 phrase_class = 'class'
163 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
165 with temp_db_conn.cursor() as temp_db_cursor:
166 temp_db_cursor.execute("CREATE EXTENSION postgis;")
167 temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
169 special_phrases_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
171 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
173 def test_create_place_classtype_table(temp_db_conn, placex_table, special_phrases_importer):
175 Test that _create_place_classtype_table() create
176 the right place_classtype table.
178 phrase_class = 'class'
180 special_phrases_importer._create_place_classtype_table('', phrase_class, phrase_type)
182 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
184 def test_grant_access_to_web_user(temp_db_conn, def_config, special_phrases_importer):
186 Test that _grant_access_to_webuser() give
187 right access to the web user.
189 phrase_class = 'class'
191 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
193 with temp_db_conn.cursor() as temp_db_cursor:
194 temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
196 special_phrases_importer._grant_access_to_webuser(phrase_class, phrase_type)
198 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
200 def test_create_place_classtype_table_and_indexes(
201 temp_db_conn, def_config, placex_table,
202 special_phrases_importer):
204 Test that _create_place_classtype_table_and_indexes()
205 create the right place_classtype tables and place_id indexes
206 and centroid indexes and grant access to the web user
207 for the given set of pairs.
209 pairs = set([('class1', 'type1'), ('class2', 'type2')])
211 special_phrases_importer._create_place_classtype_table_and_indexes(pairs)
214 assert check_table_exist(temp_db_conn, pair[0], pair[1])
215 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
216 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
218 def test_process_xml_content(temp_db_conn, def_config, special_phrases_importer, word_table,
219 getorcreate_amenity_funcs, getorcreate_amenityoperator_funcs):
221 Test that _process_xml_content() process the given xml content right
222 by executing the right SQL functions for amenities and
223 by returning the right set of pairs.
225 class_test = 'aerialway'
226 type_test = 'zip_line'
228 #Converted output set to a dict for easy assert further.
229 results = dict(special_phrases_importer._process_xml_content(get_test_xml_wiki_content(), 'en'))
231 assert check_amenities_with_op(temp_db_conn)
232 assert check_amenities_without_op(temp_db_conn)
233 assert results[class_test] and type_test in results.values()
235 def test_remove_non_existent_phrases_from_db(special_phrases_importer, default_phrases,
238 Check for the remove_non_existent_phrases_from_db() method.
240 It should removed entries from the word table which are contained
241 in the words_phrases_to_delete set and not those also contained
242 in the words_phrases_still_exist set.
244 place_classtype tables contained in table_phrases_to_delete should
247 with temp_db_conn.cursor() as temp_db_cursor:
248 to_delete_phrase_tuple = ('normalized_word', 'class', 'type', 'near')
249 to_keep_phrase_tuple = (
250 'normalized_word_exists', 'class_exists', 'type_exists', 'near'
252 special_phrases_importer.words_phrases_to_delete = {
253 to_delete_phrase_tuple,
256 special_phrases_importer.words_phrases_still_exist = {
259 special_phrases_importer.table_phrases_to_delete = {
260 'place_classtype_testclasstypetable_to_delete'
263 query_words = 'SELECT word, class, type, operator FROM word;'
266 FROM information_schema.tables
267 WHERE table_schema='public'
268 AND table_name like 'place_classtype_%';
271 special_phrases_importer._remove_non_existent_phrases_from_db()
273 temp_db_cursor.execute(query_words)
274 words_result = temp_db_cursor.fetchall()
275 temp_db_cursor.execute(query_tables)
276 tables_result = temp_db_cursor.fetchall()
277 assert len(words_result) == 1 and words_result[0] == [
278 'normalized_word_exists', 'class_exists', 'type_exists', 'near'
280 assert (len(tables_result) == 1 and
281 tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
284 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, special_phrases_importer, placex_table,
285 getorcreate_amenity_funcs, getorcreate_amenityoperator_funcs, word_table):
287 Check that the main import_from_wiki() method is well executed.
288 It should create the place_classtype table, the place_id and centroid indexes,
289 grand access to the web user and executing the SQL functions for amenities.
290 It should also update the database well by deleting or preserving existing entries
293 #Add some data to the database before execution in order to test
294 #what is deleted and what is preserved.
295 with temp_db_conn.cursor() as temp_db_cursor:
296 temp_db_cursor.execute("""
297 INSERT INTO word VALUES(99999, ' animal shelter', 'animal shelter',
298 'amenity', 'animal_shelter', null, 0, null);
300 INSERT INTO word VALUES(99999, ' wrong_lookup_token', 'wrong_normalized_word',
301 'wrong_class', 'wrong_type', null, 0, 'near');
303 CREATE TABLE place_classtype_amenity_animal_shelter();
304 CREATE TABLE place_classtype_wrongclass_wrongtype();""")
306 monkeypatch.setattr('nominatim.tools.SpecialPhrasesImporter._get_wiki_content', mock_get_wiki_content)
307 special_phrases_importer.import_from_wiki(['en'])
309 class_test = 'aerialway'
310 type_test = 'zip_line'
312 assert check_table_exist(temp_db_conn, class_test, type_test)
313 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
314 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
315 assert check_amenities_with_op(temp_db_conn)
316 assert check_amenities_without_op(temp_db_conn)
317 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
318 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
320 #Format (query, should_return_something_bool) use to easily execute all asserts
321 queries_tests = set()
323 #Used to check that the correct phrase already in the word table before is still there.
324 query_correct_word = "SELECT * FROM word WHERE word = 'animal shelter'"
325 queries_tests.add((query_correct_word, True))
327 #Used to check if wrong phrase was deleted from the word table of the database.
328 query_wrong_word = "SELECT word FROM word WHERE word = 'wrong_normalized_word'"
329 queries_tests.add((query_wrong_word, False))
331 #Used to check that correct place_classtype table already in the datase before is still there.
332 query_existing_table = """
334 FROM information_schema.tables
335 WHERE table_schema='public'
336 AND table_name = 'place_classtype_amenity_animal_shelter';
338 queries_tests.add((query_existing_table, True))
340 #Used to check that wrong place_classtype table was deleted from the database.
341 query_wrong_table = """
343 FROM information_schema.tables
344 WHERE table_schema='public'
345 AND table_name = 'place_classtype_wrongclass_wrongtype';
347 queries_tests.add((query_wrong_table, False))
349 with temp_db_conn.cursor() as temp_db_cursor:
350 for query in queries_tests:
351 temp_db_cursor.execute(query[0])
352 if (query[1] == True):
353 assert temp_db_cursor.fetchone()
355 assert not temp_db_cursor.fetchone()
357 def mock_get_wiki_content(lang):
359 Mock the _get_wiki_content() method to return
360 static xml test file content.
362 return get_test_xml_wiki_content()
364 def get_test_xml_wiki_content():
366 return the content of the static xml test file.
368 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
369 with open(xml_test_content_path) as xml_content_reader:
370 return xml_content_reader.read()
372 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
374 Verify that the place_classtype table exists for the given
375 phrase_class and phrase_type.
377 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
379 with temp_db_conn.cursor() as temp_db_cursor:
380 temp_db_cursor.execute("""
382 FROM information_schema.tables
383 WHERE table_type='BASE TABLE'
384 AND table_name='{}'""".format(table_name))
385 return temp_db_cursor.fetchone()
387 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
389 Check that the web user has been granted right access to the
390 place_classtype table of the given phrase_class and phrase_type.
392 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
394 with temp_db_conn.cursor() as temp_db_cursor:
395 temp_db_cursor.execute("""
396 SELECT * FROM information_schema.role_table_grants
397 WHERE table_name='{}'
399 AND privilege_type='SELECT'""".format(table_name, user))
400 return temp_db_cursor.fetchone()
402 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
404 Check that the place_id index and centroid index exist for the
405 place_classtype table of the given phrase_class and phrase_type.
407 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
410 temp_db_conn.index_exists(index_prefix + 'centroid')
412 temp_db_conn.index_exists(index_prefix + 'place_id')
415 def check_amenities_with_op(temp_db_conn):
417 Check that the test table for the SQL function getorcreate_amenityoperator()
418 contains more than one value (so that the SQL function was call more than one time).
420 with temp_db_conn.cursor() as temp_db_cursor:
421 temp_db_cursor.execute("SELECT * FROM word WHERE operator != 'no_operator'")
422 return len(temp_db_cursor.fetchall()) > 1
424 def check_amenities_without_op(temp_db_conn):
426 Check that the test table for the SQL function getorcreate_amenity()
427 contains more than one value (so that the SQL function was call more than one time).
429 with temp_db_conn.cursor() as temp_db_cursor:
430 temp_db_cursor.execute("SELECT * FROM word WHERE operator = 'no_operator'")
431 return len(temp_db_cursor.fetchall()) > 1
434 def special_phrases_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
436 Return an instance of SpecialPhrasesImporter.
438 return SpecialPhrasesImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn)
441 def temp_phplib_dir_with_migration():
443 Return temporary phpdir with migration subdirectory and
444 PhraseSettingsToJson.php script inside.
446 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
447 / 'PhraseSettingsToJson.php').resolve()
448 with tempfile.TemporaryDirectory() as phpdir:
449 (Path(phpdir) / 'migration').mkdir()
450 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
451 copyfile(migration_file, migration_dest_path)
456 def default_phrases(word_table, temp_db_cursor):
457 temp_db_cursor.execute("""
458 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word',
459 'class', 'type', null, 0, 'near');
461 INSERT INTO word VALUES(99999, 'lookup_token', 'normalized_word_exists',
462 'class_exists', 'type_exists', null, 0, 'near');
464 CREATE TABLE place_classtype_testclasstypetable_to_delete();
465 CREATE TABLE place_classtype_testclasstypetable_to_keep();""")
468 def make_strandard_name_func(temp_db_cursor):
469 temp_db_cursor.execute("""
470 CREATE OR REPLACE FUNCTION make_standard_name(name TEXT) RETURNS TEXT AS $$
472 RETURN trim(name); --Basically return only the trimed name for the tests
474 $$ LANGUAGE plpgsql IMMUTABLE;""")
477 def getorcreate_amenity_funcs(temp_db_cursor, make_strandard_name_func):
478 temp_db_cursor.execute("""
479 CREATE OR REPLACE FUNCTION getorcreate_amenity(lookup_word TEXT, normalized_word TEXT,
480 lookup_class text, lookup_type text)
483 INSERT INTO word VALUES(null, lookup_word, normalized_word,
484 lookup_class, lookup_type, null, 0, 'no_operator');
486 $$ LANGUAGE plpgsql""")
489 def getorcreate_amenityoperator_funcs(temp_db_cursor, make_strandard_name_func):
490 temp_db_cursor.execute("""
491 CREATE TABLE temp_with_operator(op TEXT);
493 CREATE OR REPLACE FUNCTION getorcreate_amenityoperator(lookup_word TEXT, normalized_word TEXT,
494 lookup_class text, lookup_type text, op text)
497 INSERT INTO word VALUES(null, lookup_word, normalized_word,
498 lookup_class, lookup_type, null, 0, op);
500 $$ LANGUAGE plpgsql""")