]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
07c72a7ce15f51ce32e5f4931f81473cea370b9b
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from shutil import copyfile
6 import pytest
7 from nominatim.tools.special_phrases.sp_importer import SPImporter
8 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
9 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
10 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
11 from nominatim.errors import UsageError
12
13 from cursor import CursorForTesting
14
15 @pytest.fixture
16 def testfile_dir(src_dir):
17     return src_dir / 'test' / 'testfiles'
18
19
20 @pytest.fixture
21 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
22     """
23         Return an instance of SPImporter.
24     """
25     loader = SPWikiLoader(def_config, ['en'])
26     return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
27
28
29 @pytest.fixture
30 def temp_phplib_dir_with_migration(src_dir, tmp_path):
31     """
32         Return temporary phpdir with migration subdirectory and
33         PhraseSettingsToJson.php script inside.
34     """
35     migration_file = (src_dir / 'lib-php' / 'migration' / 'PhraseSettingsToJson.php').resolve()
36
37     phpdir = tmp_path / 'tempphp'
38     phpdir.mkdir()
39
40     (phpdir / 'migration').mkdir()
41     migration_dest_path = (phpdir / 'migration' / 'PhraseSettingsToJson.php').resolve()
42     copyfile(str(migration_file), str(migration_dest_path))
43
44     return phpdir
45
46
47 @pytest.fixture
48 def xml_wiki_content(src_dir):
49     """
50         return the content of the static xml test file.
51     """
52     xml_test_content_path = (src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt').resolve()
53     return xml_test_content_path.read_text()
54
55
56 @pytest.fixture
57 def default_phrases(table_factory):
58     table_factory('place_classtype_testclasstypetable_to_delete')
59     table_factory('place_classtype_testclasstypetable_to_keep')
60
61
62 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
63     """
64         Check for the fetch_existing_place_classtype_tables() method.
65         It should return the table just created.
66     """
67     table_factory('place_classtype_testclasstypetable')
68
69     sp_importer._fetch_existing_place_classtype_tables()
70     contained_table = sp_importer.table_phrases_to_delete.pop()
71     assert contained_table == 'place_classtype_testclasstypetable'
72
73 def test_check_sanity_class(sp_importer):
74     """
75         Check for _check_sanity() method.
76         If a wrong class or type is given, an UsageError should raise.
77         If a good class and type are given, nothing special happens.
78     """
79
80     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
81     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
82
83     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
84
85 def test_load_white_and_black_lists(sp_importer):
86     """
87         Test that _load_white_and_black_lists() well return
88         black list and white list and that they are of dict type.
89     """
90     black_list, white_list = sp_importer._load_white_and_black_lists()
91
92     assert isinstance(black_list, dict) and isinstance(white_list, dict)
93
94 def test_convert_php_settings(sp_importer, testfile_dir, tmp_path):
95     """
96         Test that _convert_php_settings_if_needed() convert the given
97         php file to a json file.
98     """
99     php_file = (testfile_dir / 'phrase_settings.php').resolve()
100
101     temp_settings = (tmp_path / 'phrase_settings.php').resolve()
102     copyfile(php_file, temp_settings)
103     sp_importer._convert_php_settings_if_needed(temp_settings)
104
105     assert (tmp_path / 'phrase_settings.json').is_file()
106
107 def test_convert_settings_wrong_file(sp_importer):
108     """
109         Test that _convert_php_settings_if_needed() raise an exception
110         if the given file is not a valid file.
111     """
112     with pytest.raises(UsageError, match='random_file is not a valid file.'):
113         sp_importer._convert_php_settings_if_needed('random_file')
114
115 def test_convert_settings_json_already_exist(sp_importer, testfile_dir):
116     """
117         Test that if we give to '_convert_php_settings_if_needed' a php file path
118         and that a the corresponding json file already exists, it is returned.
119     """
120     php_file = (testfile_dir / 'phrase_settings.php').resolve()
121     json_file = (testfile_dir / 'phrase_settings.json').resolve()
122
123     returned = sp_importer._convert_php_settings_if_needed(php_file)
124
125     assert returned == json_file
126
127 def test_convert_settings_giving_json(sp_importer, testfile_dir):
128     """
129         Test that if we give to '_convert_php_settings_if_needed' a json file path
130         the same path is directly returned
131     """
132     json_file = (testfile_dir / 'phrase_settings.json').resolve()
133
134     returned = sp_importer._convert_php_settings_if_needed(json_file)
135
136     assert returned == json_file
137
138 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
139                                         table_factory, sp_importer):
140     """
141         Test that _create_place_classtype_indexes() create the
142         place_id index and centroid index on the right place_class_type table.
143     """
144     phrase_class = 'class'
145     phrase_type = 'type'
146     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
147
148     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
149
150     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
151
152     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
153
154 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
155     """
156         Test that _create_place_classtype_table() create
157         the right place_classtype table.
158     """
159     phrase_class = 'class'
160     phrase_type = 'type'
161     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
162
163     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
164
165 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
166     """
167         Test that _grant_access_to_webuser() give
168         right access to the web user.
169     """
170     phrase_class = 'class'
171     phrase_type = 'type'
172     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
173
174     table_factory(table_name)
175
176     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
177
178     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
179
180 def test_create_place_classtype_table_and_indexes(
181         temp_db_conn, def_config, placex_table,
182         sp_importer):
183     """
184         Test that _create_place_classtype_table_and_indexes()
185         create the right place_classtype tables and place_id indexes
186         and centroid indexes and grant access to the web user
187         for the given set of pairs.
188     """
189     pairs = set([('class1', 'type1'), ('class2', 'type2')])
190
191     sp_importer._create_place_classtype_table_and_indexes(pairs)
192
193     for pair in pairs:
194         assert check_table_exist(temp_db_conn, pair[0], pair[1])
195         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
196         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
197
198 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
199                                             temp_db_conn):
200     """
201         Check for the remove_non_existent_phrases_from_db() method.
202
203         It should removed entries from the word table which are contained
204         in the words_phrases_to_delete set and not those also contained
205         in the words_phrases_still_exist set.
206
207         place_classtype tables contained in table_phrases_to_delete should
208         be deleted.
209     """
210     sp_importer.table_phrases_to_delete = {
211         'place_classtype_testclasstypetable_to_delete'
212     }
213
214     query_tables = """
215         SELECT table_name
216         FROM information_schema.tables
217         WHERE table_schema='public'
218         AND table_name like 'place_classtype_%';
219     """
220
221     sp_importer._remove_non_existent_tables_from_db()
222
223     # Changes are not committed yet. Use temp_db_conn for checking results.
224     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
225         assert cur.row_set(query_tables) \
226                  == {('place_classtype_testclasstypetable_to_keep', )}
227
228
229 @pytest.mark.parametrize("should_replace", [(True), (False)])
230 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
231                         placex_table, table_factory, tokenizer_mock,
232                         xml_wiki_content, should_replace):
233     """
234         Check that the main import_phrases() method is well executed.
235         It should create the place_classtype table, the place_id and centroid indexes,
236         grand access to the web user and executing the SQL functions for amenities.
237         It should also update the database well by deleting or preserving existing entries
238         of the database.
239     """
240     #Add some data to the database before execution in order to test
241     #what is deleted and what is preserved.
242     table_factory('place_classtype_amenity_animal_shelter')
243     table_factory('place_classtype_wrongclass_wrongtype')
244
245     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
246                         lambda self, lang: xml_wiki_content)
247
248     tokenizer = tokenizer_mock()
249     sp_importer.import_phrases(tokenizer, should_replace)
250
251     assert len(tokenizer.analyser_cache['special_phrases']) == 18
252
253     class_test = 'aerialway'
254     type_test = 'zip_line'
255
256     assert check_table_exist(temp_db_conn, class_test, type_test)
257     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
258     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
259     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
260     if should_replace:
261         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
262
263     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
264     if should_replace:
265         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
266
267 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
268     """
269         Verify that the place_classtype table exists for the given
270         phrase_class and phrase_type.
271     """
272     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
273
274
275 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
276     """
277         Check that the web user has been granted right access to the
278         place_classtype table of the given phrase_class and phrase_type.
279     """
280     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
281
282     with temp_db_conn.cursor() as temp_db_cursor:
283         temp_db_cursor.execute("""
284                 SELECT * FROM information_schema.role_table_grants
285                 WHERE table_name='{}'
286                 AND grantee='{}'
287                 AND privilege_type='SELECT'""".format(table_name, user))
288         return temp_db_cursor.fetchone()
289
290 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
291     """
292         Check that the place_id index and centroid index exist for the
293         place_classtype table of the given phrase_class and phrase_type.
294     """
295     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
296
297     return (
298         temp_db_conn.index_exists(index_prefix + 'centroid')
299         and
300         temp_db_conn.index_exists(index_prefix + 'place_id')
301     )