]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
f452c353b1ac8ed7258ee3df55dc08adfd2c8b4d
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from nominatim.tools import SpecialPhrase
6 from nominatim.tools import SPWikiLoader
7 from nominatim.errors import UsageError
8 from pathlib import Path
9 import tempfile
10 from shutil import copyfile
11 import pytest
12 from nominatim.tools import SPImporter
13
14 TEST_BASE_DIR = Path(__file__) / '..' / '..'
15
16 def test_fetch_existing_place_classtype_tables(sp_importer, temp_db_cursor):
17     """
18         Check for the fetch_existing_place_classtype_tables() method.
19         It should return the table just created.
20     """
21     temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
22
23     sp_importer._fetch_existing_place_classtype_tables()
24     contained_table = sp_importer.table_phrases_to_delete.pop()
25     assert contained_table == 'place_classtype_testclasstypetable'
26
27 def test_check_sanity_class(sp_importer):
28     """
29         Check for _check_sanity() method.
30         If a wrong class or type is given, an UsageError should raise.
31         If a good class and type are given, nothing special happens.
32     """
33
34     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
35     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
36
37     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
38
39 def test_load_white_and_black_lists(sp_importer):
40     """
41         Test that _load_white_and_black_lists() well return
42         black list and white list and that they are of dict type.
43     """
44     black_list, white_list = sp_importer._load_white_and_black_lists()
45
46     assert isinstance(black_list, dict) and isinstance(white_list, dict)
47
48 def test_convert_php_settings(sp_importer):
49     """
50         Test that _convert_php_settings_if_needed() convert the given
51         php file to a json file.
52     """
53     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
54
55     with tempfile.TemporaryDirectory() as temp_dir:
56         temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
57         copyfile(php_file, temp_settings)
58         sp_importer._convert_php_settings_if_needed(temp_settings)
59
60         assert (Path(temp_dir) / 'phrase_settings.json').is_file()
61
62 def test_convert_settings_wrong_file(sp_importer):
63     """
64         Test that _convert_php_settings_if_needed() raise an exception
65         if the given file is not a valid file.
66     """
67     with pytest.raises(UsageError, match='random_file is not a valid file.'):
68         sp_importer._convert_php_settings_if_needed('random_file')
69
70 def test_convert_settings_json_already_exist(sp_importer):
71     """
72         Test that if we give to '_convert_php_settings_if_needed' a php file path
73         and that a the corresponding json file already exists, it is returned.
74     """
75     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
76     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
77
78     returned = sp_importer._convert_php_settings_if_needed(php_file)
79
80     assert returned == json_file
81
82 def test_convert_settings_giving_json(sp_importer):
83     """
84         Test that if we give to '_convert_php_settings_if_needed' a json file path
85         the same path is directly returned
86     """
87     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
88
89     returned = sp_importer._convert_php_settings_if_needed(json_file)
90
91     assert returned == json_file
92
93 def test_create_place_classtype_indexes(temp_db_conn, sp_importer):
94     """
95         Test that _create_place_classtype_indexes() create the
96         place_id index and centroid index on the right place_class_type table.
97     """
98     phrase_class = 'class'
99     phrase_type = 'type'
100     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
101
102     with temp_db_conn.cursor() as temp_db_cursor:
103         temp_db_cursor.execute("CREATE EXTENSION postgis;")
104         temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
105
106     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
107
108     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
109
110 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
111     """
112         Test that _create_place_classtype_table() create
113         the right place_classtype table.
114     """
115     phrase_class = 'class'
116     phrase_type = 'type'
117     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
118
119     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
120
121 def test_grant_access_to_web_user(temp_db_conn, def_config, sp_importer):
122     """
123         Test that _grant_access_to_webuser() give
124         right access to the web user.
125     """
126     phrase_class = 'class'
127     phrase_type = 'type'
128     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
129
130     with temp_db_conn.cursor() as temp_db_cursor:
131         temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
132
133     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
134
135     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
136
137 def test_create_place_classtype_table_and_indexes(
138         temp_db_conn, def_config, placex_table,
139         sp_importer):
140     """
141         Test that _create_place_classtype_table_and_indexes()
142         create the right place_classtype tables and place_id indexes
143         and centroid indexes and grant access to the web user
144         for the given set of pairs.
145     """
146     pairs = set([('class1', 'type1'), ('class2', 'type2')])
147
148     sp_importer._create_place_classtype_table_and_indexes(pairs)
149
150     for pair in pairs:
151         assert check_table_exist(temp_db_conn, pair[0], pair[1])
152         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
153         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
154
155 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
156                                              temp_db_conn):
157     """
158         Check for the remove_non_existent_phrases_from_db() method.
159
160         It should removed entries from the word table which are contained
161         in the words_phrases_to_delete set and not those also contained
162         in the words_phrases_still_exist set.
163
164         place_classtype tables contained in table_phrases_to_delete should
165         be deleted.
166     """
167     with temp_db_conn.cursor() as temp_db_cursor:
168         sp_importer.table_phrases_to_delete = {
169             'place_classtype_testclasstypetable_to_delete'
170         }
171
172         query_tables = """
173             SELECT table_name
174             FROM information_schema.tables
175             WHERE table_schema='public'
176             AND table_name like 'place_classtype_%';
177         """
178
179         sp_importer._remove_non_existent_tables_from_db()
180
181         temp_db_cursor.execute(query_tables)
182         tables_result = temp_db_cursor.fetchall()
183         assert (len(tables_result) == 1 and
184             tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
185         )
186
187 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, sp_importer,
188                           placex_table, tokenizer_mock):
189     """
190         Check that the main import_from_wiki() method is well executed.
191         It should create the place_classtype table, the place_id and centroid indexes,
192         grand access to the web user and executing the SQL functions for amenities.
193         It should also update the database well by deleting or preserving existing entries
194         of the database.
195     """
196     #Add some data to the database before execution in order to test
197     #what is deleted and what is preserved.
198     with temp_db_conn.cursor() as temp_db_cursor:
199         temp_db_cursor.execute("""
200             CREATE TABLE place_classtype_amenity_animal_shelter();
201             CREATE TABLE place_classtype_wrongclass_wrongtype();""")
202     
203     monkeypatch.setattr('nominatim.tools.SPWikiLoader._get_wiki_content', mock_get_wiki_content)
204     tokenizer = tokenizer_mock()
205     sp_importer.import_phrases(tokenizer)
206
207     assert len(tokenizer.analyser_cache['special_phrases']) == 18
208
209     class_test = 'aerialway'
210     type_test = 'zip_line'
211
212     assert check_table_exist(temp_db_conn, class_test, type_test)
213     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
214     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
215     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
216     assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
217
218     #Format (query, should_return_something_bool) use to easily execute all asserts
219     queries_tests = set()
220
221     #Used to check that correct place_classtype table already in the datase before is still there.
222     query_existing_table = """
223         SELECT table_name
224         FROM information_schema.tables
225         WHERE table_schema='public'
226         AND table_name = 'place_classtype_amenity_animal_shelter';
227     """
228     queries_tests.add((query_existing_table, True))
229
230     #Used to check that wrong place_classtype table was deleted from the database.
231     query_wrong_table = """
232         SELECT table_name
233         FROM information_schema.tables
234         WHERE table_schema='public'
235         AND table_name = 'place_classtype_wrongclass_wrongtype';
236     """
237     queries_tests.add((query_wrong_table, False))
238
239     with temp_db_conn.cursor() as temp_db_cursor:
240         for query in queries_tests:
241             temp_db_cursor.execute(query[0])
242             if (query[1] == True):
243                 assert temp_db_cursor.fetchone()
244             else:
245                 assert not temp_db_cursor.fetchone()
246
247 def mock_get_wiki_content(self, lang):
248     """
249         Mock the _get_wiki_content() method to return
250         static xml test file content.
251     """
252     return get_test_xml_wiki_content()
253
254 def get_test_xml_wiki_content():
255     """
256         return the content of the static xml test file.
257     """
258     xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
259     with open(xml_test_content_path) as xml_content_reader:
260         return xml_content_reader.read()
261
262 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
263     """
264         Verify that the place_classtype table exists for the given
265         phrase_class and phrase_type.
266     """
267     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
268
269     with temp_db_conn.cursor() as temp_db_cursor:
270         temp_db_cursor.execute("""
271             SELECT *
272             FROM information_schema.tables
273             WHERE table_type='BASE TABLE'
274             AND table_name='{}'""".format(table_name))
275         return temp_db_cursor.fetchone()
276
277 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
278     """
279         Check that the web user has been granted right access to the
280         place_classtype table of the given phrase_class and phrase_type.
281     """
282     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
283
284     with temp_db_conn.cursor() as temp_db_cursor:
285         temp_db_cursor.execute("""
286                 SELECT * FROM information_schema.role_table_grants
287                 WHERE table_name='{}'
288                 AND grantee='{}'
289                 AND privilege_type='SELECT'""".format(table_name, user))
290         return temp_db_cursor.fetchone()
291
292 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
293     """
294         Check that the place_id index and centroid index exist for the
295         place_classtype table of the given phrase_class and phrase_type.
296     """
297     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
298
299     return (
300         temp_db_conn.index_exists(index_prefix + 'centroid')
301         and
302         temp_db_conn.index_exists(index_prefix + 'place_id')
303     )
304
305 @pytest.fixture
306 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
307     """
308         Return an instance of SPImporter.
309     """
310     loader = SPWikiLoader(def_config, ['en'])
311     return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
312
313 @pytest.fixture
314 def temp_phplib_dir_with_migration():
315     """
316         Return temporary phpdir with migration subdirectory and
317         PhraseSettingsToJson.php script inside.
318     """
319     migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
320                       / 'PhraseSettingsToJson.php').resolve()
321     with tempfile.TemporaryDirectory() as phpdir:
322         (Path(phpdir) / 'migration').mkdir()
323         migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
324         copyfile(migration_file, migration_dest_path)
325
326         yield Path(phpdir)
327
328 @pytest.fixture
329 def default_phrases(temp_db_cursor):
330     temp_db_cursor.execute("""
331         CREATE TABLE place_classtype_testclasstypetable_to_delete();
332         CREATE TABLE place_classtype_testclasstypetable_to_keep();""")