]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
test: replace raw execute() with fixture code where possible
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from nominatim.errors import UsageError
6 from pathlib import Path
7 import tempfile
8 from shutil import copyfile
9 import pytest
10 from nominatim.tools.special_phrases.sp_importer import SPImporter
11 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
12 from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
13 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
14
15 from cursor import CursorForTesting
16
17 TEST_BASE_DIR = Path(__file__) / '..' / '..'
18
19 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
20     """
21         Check for the fetch_existing_place_classtype_tables() method.
22         It should return the table just created.
23     """
24     table_factory('place_classtype_testclasstypetable')
25
26     sp_importer._fetch_existing_place_classtype_tables()
27     contained_table = sp_importer.table_phrases_to_delete.pop()
28     assert contained_table == 'place_classtype_testclasstypetable'
29
30 def test_check_sanity_class(sp_importer):
31     """
32         Check for _check_sanity() method.
33         If a wrong class or type is given, an UsageError should raise.
34         If a good class and type are given, nothing special happens.
35     """
36
37     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
38     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
39
40     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
41
42 def test_load_white_and_black_lists(sp_importer):
43     """
44         Test that _load_white_and_black_lists() well return
45         black list and white list and that they are of dict type.
46     """
47     black_list, white_list = sp_importer._load_white_and_black_lists()
48
49     assert isinstance(black_list, dict) and isinstance(white_list, dict)
50
51 def test_convert_php_settings(sp_importer):
52     """
53         Test that _convert_php_settings_if_needed() convert the given
54         php file to a json file.
55     """
56     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
57
58     with tempfile.TemporaryDirectory() as temp_dir:
59         temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
60         copyfile(php_file, temp_settings)
61         sp_importer._convert_php_settings_if_needed(temp_settings)
62
63         assert (Path(temp_dir) / 'phrase_settings.json').is_file()
64
65 def test_convert_settings_wrong_file(sp_importer):
66     """
67         Test that _convert_php_settings_if_needed() raise an exception
68         if the given file is not a valid file.
69     """
70     with pytest.raises(UsageError, match='random_file is not a valid file.'):
71         sp_importer._convert_php_settings_if_needed('random_file')
72
73 def test_convert_settings_json_already_exist(sp_importer):
74     """
75         Test that if we give to '_convert_php_settings_if_needed' a php file path
76         and that a the corresponding json file already exists, it is returned.
77     """
78     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
79     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
80
81     returned = sp_importer._convert_php_settings_if_needed(php_file)
82
83     assert returned == json_file
84
85 def test_convert_settings_giving_json(sp_importer):
86     """
87         Test that if we give to '_convert_php_settings_if_needed' a json file path
88         the same path is directly returned
89     """
90     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
91
92     returned = sp_importer._convert_php_settings_if_needed(json_file)
93
94     assert returned == json_file
95
96 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
97                                         table_factory, sp_importer):
98     """
99         Test that _create_place_classtype_indexes() create the
100         place_id index and centroid index on the right place_class_type table.
101     """
102     phrase_class = 'class'
103     phrase_type = 'type'
104     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
105
106     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
107
108     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
109
110     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
111
112 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
113     """
114         Test that _create_place_classtype_table() create
115         the right place_classtype table.
116     """
117     phrase_class = 'class'
118     phrase_type = 'type'
119     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
120
121     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
122
123 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
124     """
125         Test that _grant_access_to_webuser() give
126         right access to the web user.
127     """
128     phrase_class = 'class'
129     phrase_type = 'type'
130     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
131
132     table_factory(table_name)
133
134     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
135
136     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
137
138 def test_create_place_classtype_table_and_indexes(
139         temp_db_conn, def_config, placex_table,
140         sp_importer):
141     """
142         Test that _create_place_classtype_table_and_indexes()
143         create the right place_classtype tables and place_id indexes
144         and centroid indexes and grant access to the web user
145         for the given set of pairs.
146     """
147     pairs = set([('class1', 'type1'), ('class2', 'type2')])
148
149     sp_importer._create_place_classtype_table_and_indexes(pairs)
150
151     for pair in pairs:
152         assert check_table_exist(temp_db_conn, pair[0], pair[1])
153         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
154         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
155
156 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
157                                             temp_db_conn):
158     """
159         Check for the remove_non_existent_phrases_from_db() method.
160
161         It should removed entries from the word table which are contained
162         in the words_phrases_to_delete set and not those also contained
163         in the words_phrases_still_exist set.
164
165         place_classtype tables contained in table_phrases_to_delete should
166         be deleted.
167     """
168     sp_importer.table_phrases_to_delete = {
169         'place_classtype_testclasstypetable_to_delete'
170     }
171
172     query_tables = """
173         SELECT table_name
174         FROM information_schema.tables
175         WHERE table_schema='public'
176         AND table_name like 'place_classtype_%';
177     """
178
179     sp_importer._remove_non_existent_tables_from_db()
180
181     # Changes are not committed yet. Use temp_db_conn for checking results.
182     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
183         assert cur.row_set(query_tables) \
184                  == {('place_classtype_testclasstypetable_to_keep', )}
185
186
187 @pytest.mark.parametrize("should_replace", [(True), (False)])
188 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
189                         placex_table, table_factory, tokenizer_mock, should_replace):
190     """
191         Check that the main import_phrases() method is well executed.
192         It should create the place_classtype table, the place_id and centroid indexes,
193         grand access to the web user and executing the SQL functions for amenities.
194         It should also update the database well by deleting or preserving existing entries
195         of the database.
196     """
197     #Add some data to the database before execution in order to test
198     #what is deleted and what is preserved.
199     table_factory('place_classtype_amenity_animal_shelter')
200     table_factory('place_classtype_wrongclass_wrongtype')
201
202     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
203                         mock_get_wiki_content)
204
205     tokenizer = tokenizer_mock()
206     sp_importer.import_phrases(tokenizer, should_replace)
207
208     assert len(tokenizer.analyser_cache['special_phrases']) == 18
209
210     class_test = 'aerialway'
211     type_test = 'zip_line'
212
213     assert check_table_exist(temp_db_conn, class_test, type_test)
214     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
215     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
216     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
217     if should_replace:
218         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
219
220     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
221     if should_replace:
222         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
223
224
225 def mock_get_wiki_content(self, lang):
226     """
227         Mock the _get_wiki_content() method to return
228         static xml test file content.
229     """
230     return get_test_xml_wiki_content()
231
232 def get_test_xml_wiki_content():
233     """
234         return the content of the static xml test file.
235     """
236     xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
237     with open(xml_test_content_path) as xml_content_reader:
238         return xml_content_reader.read()
239
240 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
241     """
242         Verify that the place_classtype table exists for the given
243         phrase_class and phrase_type.
244     """
245     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
246
247
248 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
249     """
250         Check that the web user has been granted right access to the
251         place_classtype table of the given phrase_class and phrase_type.
252     """
253     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
254
255     with temp_db_conn.cursor() as temp_db_cursor:
256         temp_db_cursor.execute("""
257                 SELECT * FROM information_schema.role_table_grants
258                 WHERE table_name='{}'
259                 AND grantee='{}'
260                 AND privilege_type='SELECT'""".format(table_name, user))
261         return temp_db_cursor.fetchone()
262
263 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
264     """
265         Check that the place_id index and centroid index exist for the
266         place_classtype table of the given phrase_class and phrase_type.
267     """
268     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
269
270     return (
271         temp_db_conn.index_exists(index_prefix + 'centroid')
272         and
273         temp_db_conn.index_exists(index_prefix + 'place_id')
274     )
275
276 @pytest.fixture
277 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
278     """
279         Return an instance of SPImporter.
280     """
281     loader = SPWikiLoader(def_config, ['en'])
282     return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
283
284 @pytest.fixture
285 def temp_phplib_dir_with_migration():
286     """
287         Return temporary phpdir with migration subdirectory and
288         PhraseSettingsToJson.php script inside.
289     """
290     migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
291                       / 'PhraseSettingsToJson.php').resolve()
292     with tempfile.TemporaryDirectory() as phpdir:
293         (Path(phpdir) / 'migration').mkdir()
294         migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
295         copyfile(migration_file, migration_dest_path)
296
297         yield Path(phpdir)
298
299 @pytest.fixture
300 def default_phrases(table_factory):
301     table_factory('place_classtype_testclasstypetable_to_delete')
302     table_factory('place_classtype_testclasstypetable_to_keep')