2 Tests for import special phrases methods
3 of the class SPImporter.
5 from nominatim.tools import SpecialPhrase
6 from nominatim.tools import SPWikiLoader
7 from nominatim.errors import UsageError
8 from pathlib import Path
10 from shutil import copyfile
12 from nominatim.tools import SPImporter
14 TEST_BASE_DIR = Path(__file__) / '..' / '..'
16 def test_fetch_existing_place_classtype_tables(sp_importer, temp_db_cursor):
18 Check for the fetch_existing_place_classtype_tables() method.
19 It should return the table just created.
21 temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
23 sp_importer._fetch_existing_place_classtype_tables()
24 contained_table = sp_importer.table_phrases_to_delete.pop()
25 assert contained_table == 'place_classtype_testclasstypetable'
27 def test_check_sanity_class(sp_importer):
29 Check for _check_sanity() method.
30 If a wrong class or type is given, an UsageError should raise.
31 If a good class and type are given, nothing special happens.
34 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
35 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
37 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
39 def test_load_white_and_black_lists(sp_importer):
41 Test that _load_white_and_black_lists() well return
42 black list and white list and that they are of dict type.
44 black_list, white_list = sp_importer._load_white_and_black_lists()
46 assert isinstance(black_list, dict) and isinstance(white_list, dict)
48 def test_convert_php_settings(sp_importer):
50 Test that _convert_php_settings_if_needed() convert the given
51 php file to a json file.
53 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
55 with tempfile.TemporaryDirectory() as temp_dir:
56 temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
57 copyfile(php_file, temp_settings)
58 sp_importer._convert_php_settings_if_needed(temp_settings)
60 assert (Path(temp_dir) / 'phrase_settings.json').is_file()
62 def test_convert_settings_wrong_file(sp_importer):
64 Test that _convert_php_settings_if_needed() raise an exception
65 if the given file is not a valid file.
67 with pytest.raises(UsageError, match='random_file is not a valid file.'):
68 sp_importer._convert_php_settings_if_needed('random_file')
70 def test_convert_settings_json_already_exist(sp_importer):
72 Test that if we give to '_convert_php_settings_if_needed' a php file path
73 and that a the corresponding json file already exists, it is returned.
75 php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
76 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
78 returned = sp_importer._convert_php_settings_if_needed(php_file)
80 assert returned == json_file
82 def test_convert_settings_giving_json(sp_importer):
84 Test that if we give to '_convert_php_settings_if_needed' a json file path
85 the same path is directly returned
87 json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
89 returned = sp_importer._convert_php_settings_if_needed(json_file)
91 assert returned == json_file
93 def test_create_place_classtype_indexes(temp_db_conn, sp_importer):
95 Test that _create_place_classtype_indexes() create the
96 place_id index and centroid index on the right place_class_type table.
98 phrase_class = 'class'
100 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
102 with temp_db_conn.cursor() as temp_db_cursor:
103 temp_db_cursor.execute("CREATE EXTENSION postgis;")
104 temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
106 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
108 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
110 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
112 Test that _create_place_classtype_table() create
113 the right place_classtype table.
115 phrase_class = 'class'
117 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
119 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
121 def test_grant_access_to_web_user(temp_db_conn, def_config, sp_importer):
123 Test that _grant_access_to_webuser() give
124 right access to the web user.
126 phrase_class = 'class'
128 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
130 with temp_db_conn.cursor() as temp_db_cursor:
131 temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
133 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
135 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
137 def test_create_place_classtype_table_and_indexes(
138 temp_db_conn, def_config, placex_table,
141 Test that _create_place_classtype_table_and_indexes()
142 create the right place_classtype tables and place_id indexes
143 and centroid indexes and grant access to the web user
144 for the given set of pairs.
146 pairs = set([('class1', 'type1'), ('class2', 'type2')])
148 sp_importer._create_place_classtype_table_and_indexes(pairs)
151 assert check_table_exist(temp_db_conn, pair[0], pair[1])
152 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
153 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
155 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
158 Check for the remove_non_existent_phrases_from_db() method.
160 It should removed entries from the word table which are contained
161 in the words_phrases_to_delete set and not those also contained
162 in the words_phrases_still_exist set.
164 place_classtype tables contained in table_phrases_to_delete should
167 with temp_db_conn.cursor() as temp_db_cursor:
168 sp_importer.table_phrases_to_delete = {
169 'place_classtype_testclasstypetable_to_delete'
174 FROM information_schema.tables
175 WHERE table_schema='public'
176 AND table_name like 'place_classtype_%';
179 sp_importer._remove_non_existent_tables_from_db()
181 temp_db_cursor.execute(query_tables)
182 tables_result = temp_db_cursor.fetchall()
183 assert (len(tables_result) == 1 and
184 tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
187 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, sp_importer,
188 placex_table, tokenizer_mock):
190 Check that the main import_from_wiki() method is well executed.
191 It should create the place_classtype table, the place_id and centroid indexes,
192 grand access to the web user and executing the SQL functions for amenities.
193 It should also update the database well by deleting or preserving existing entries
196 #Add some data to the database before execution in order to test
197 #what is deleted and what is preserved.
198 with temp_db_conn.cursor() as temp_db_cursor:
199 temp_db_cursor.execute("""
200 CREATE TABLE place_classtype_amenity_animal_shelter();
201 CREATE TABLE place_classtype_wrongclass_wrongtype();""")
203 monkeypatch.setattr('nominatim.tools.SPWikiLoader._get_wiki_content', mock_get_wiki_content)
204 tokenizer = tokenizer_mock()
205 sp_importer.import_phrases(tokenizer)
207 assert len(tokenizer.analyser_cache['special_phrases']) == 18
209 class_test = 'aerialway'
210 type_test = 'zip_line'
212 assert check_table_exist(temp_db_conn, class_test, type_test)
213 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
214 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
215 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
216 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
218 #Format (query, should_return_something_bool) use to easily execute all asserts
219 queries_tests = set()
221 #Used to check that correct place_classtype table already in the datase before is still there.
222 query_existing_table = """
224 FROM information_schema.tables
225 WHERE table_schema='public'
226 AND table_name = 'place_classtype_amenity_animal_shelter';
228 queries_tests.add((query_existing_table, True))
230 #Used to check that wrong place_classtype table was deleted from the database.
231 query_wrong_table = """
233 FROM information_schema.tables
234 WHERE table_schema='public'
235 AND table_name = 'place_classtype_wrongclass_wrongtype';
237 queries_tests.add((query_wrong_table, False))
239 with temp_db_conn.cursor() as temp_db_cursor:
240 for query in queries_tests:
241 temp_db_cursor.execute(query[0])
242 if (query[1] == True):
243 assert temp_db_cursor.fetchone()
245 assert not temp_db_cursor.fetchone()
247 def mock_get_wiki_content(self, lang):
249 Mock the _get_wiki_content() method to return
250 static xml test file content.
252 return get_test_xml_wiki_content()
254 def get_test_xml_wiki_content():
256 return the content of the static xml test file.
258 xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
259 with open(xml_test_content_path) as xml_content_reader:
260 return xml_content_reader.read()
262 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
264 Verify that the place_classtype table exists for the given
265 phrase_class and phrase_type.
267 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
269 with temp_db_conn.cursor() as temp_db_cursor:
270 temp_db_cursor.execute("""
272 FROM information_schema.tables
273 WHERE table_type='BASE TABLE'
274 AND table_name='{}'""".format(table_name))
275 return temp_db_cursor.fetchone()
277 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
279 Check that the web user has been granted right access to the
280 place_classtype table of the given phrase_class and phrase_type.
282 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
284 with temp_db_conn.cursor() as temp_db_cursor:
285 temp_db_cursor.execute("""
286 SELECT * FROM information_schema.role_table_grants
287 WHERE table_name='{}'
289 AND privilege_type='SELECT'""".format(table_name, user))
290 return temp_db_cursor.fetchone()
292 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
294 Check that the place_id index and centroid index exist for the
295 place_classtype table of the given phrase_class and phrase_type.
297 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
300 temp_db_conn.index_exists(index_prefix + 'centroid')
302 temp_db_conn.index_exists(index_prefix + 'place_id')
306 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
308 Return an instance of SPImporter.
310 loader = SPWikiLoader(def_config, ['en'])
311 return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
314 def temp_phplib_dir_with_migration():
316 Return temporary phpdir with migration subdirectory and
317 PhraseSettingsToJson.php script inside.
319 migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
320 / 'PhraseSettingsToJson.php').resolve()
321 with tempfile.TemporaryDirectory() as phpdir:
322 (Path(phpdir) / 'migration').mkdir()
323 migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
324 copyfile(migration_file, migration_dest_path)
329 def default_phrases(temp_db_cursor):
330 temp_db_cursor.execute("""
331 CREATE TABLE place_classtype_testclasstypetable_to_delete();
332 CREATE TABLE place_classtype_testclasstypetable_to_keep();""")