]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
US TIGER data 2021 released
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from shutil import copyfile
6 import pytest
7 from nominatim.tools.special_phrases.sp_importer import SPImporter
8 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
9 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
10 from nominatim.errors import UsageError
11
12 from cursor import CursorForTesting
13
14 @pytest.fixture
15 def testfile_dir(src_dir):
16     return src_dir / 'test' / 'testfiles'
17
18
19 @pytest.fixture
20 def sp_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
21     """
22         Return an instance of SPImporter.
23     """
24     loader = SPWikiLoader(def_config, ['en'])
25     return SPImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn, loader)
26
27
28 @pytest.fixture
29 def temp_phplib_dir_with_migration(src_dir, tmp_path):
30     """
31         Return temporary phpdir with migration subdirectory and
32         PhraseSettingsToJson.php script inside.
33     """
34     migration_file = (src_dir / 'lib-php' / 'migration' / 'PhraseSettingsToJson.php').resolve()
35
36     phpdir = tmp_path / 'tempphp'
37     phpdir.mkdir()
38
39     (phpdir / 'migration').mkdir()
40     migration_dest_path = (phpdir / 'migration' / 'PhraseSettingsToJson.php').resolve()
41     copyfile(str(migration_file), str(migration_dest_path))
42
43     return phpdir
44
45
46 @pytest.fixture
47 def xml_wiki_content(src_dir):
48     """
49         return the content of the static xml test file.
50     """
51     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
52     return xml_test_content.read_text()
53
54
55 @pytest.fixture
56 def default_phrases(table_factory):
57     table_factory('place_classtype_testclasstypetable_to_delete')
58     table_factory('place_classtype_testclasstypetable_to_keep')
59
60
61 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
62     """
63         Check for the fetch_existing_place_classtype_tables() method.
64         It should return the table just created.
65     """
66     table_factory('place_classtype_testclasstypetable')
67
68     sp_importer._fetch_existing_place_classtype_tables()
69     contained_table = sp_importer.table_phrases_to_delete.pop()
70     assert contained_table == 'place_classtype_testclasstypetable'
71
72 def test_check_sanity_class(sp_importer):
73     """
74         Check for _check_sanity() method.
75         If a wrong class or type is given, an UsageError should raise.
76         If a good class and type are given, nothing special happens.
77     """
78
79     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
80     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
81
82     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
83
84 def test_load_white_and_black_lists(sp_importer):
85     """
86         Test that _load_white_and_black_lists() well return
87         black list and white list and that they are of dict type.
88     """
89     black_list, white_list = sp_importer._load_white_and_black_lists()
90
91     assert isinstance(black_list, dict) and isinstance(white_list, dict)
92
93 def test_convert_php_settings(sp_importer, testfile_dir, tmp_path):
94     """
95         Test that _convert_php_settings_if_needed() convert the given
96         php file to a json file.
97     """
98     php_file = (testfile_dir / 'phrase_settings.php').resolve()
99
100     temp_settings = (tmp_path / 'phrase_settings.php').resolve()
101     copyfile(php_file, temp_settings)
102     sp_importer._convert_php_settings_if_needed(temp_settings)
103
104     assert (tmp_path / 'phrase_settings.json').is_file()
105
106 def test_convert_settings_wrong_file(sp_importer):
107     """
108         Test that _convert_php_settings_if_needed() raise an exception
109         if the given file is not a valid file.
110     """
111     with pytest.raises(UsageError, match='random_file is not a valid file.'):
112         sp_importer._convert_php_settings_if_needed('random_file')
113
114 def test_convert_settings_json_already_exist(sp_importer, testfile_dir):
115     """
116         Test that if we give to '_convert_php_settings_if_needed' a php file path
117         and that a the corresponding json file already exists, it is returned.
118     """
119     php_file = (testfile_dir / 'phrase_settings.php').resolve()
120     json_file = (testfile_dir / 'phrase_settings.json').resolve()
121
122     returned = sp_importer._convert_php_settings_if_needed(php_file)
123
124     assert returned == json_file
125
126 def test_convert_settings_giving_json(sp_importer, testfile_dir):
127     """
128         Test that if we give to '_convert_php_settings_if_needed' a json file path
129         the same path is directly returned
130     """
131     json_file = (testfile_dir / 'phrase_settings.json').resolve()
132
133     returned = sp_importer._convert_php_settings_if_needed(json_file)
134
135     assert returned == json_file
136
137 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
138                                         table_factory, sp_importer):
139     """
140         Test that _create_place_classtype_indexes() create the
141         place_id index and centroid index on the right place_class_type table.
142     """
143     phrase_class = 'class'
144     phrase_type = 'type'
145     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
146
147     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
148
149     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
150
151     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
152
153 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
154     """
155         Test that _create_place_classtype_table() create
156         the right place_classtype table.
157     """
158     phrase_class = 'class'
159     phrase_type = 'type'
160     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
161
162     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
163
164 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
165     """
166         Test that _grant_access_to_webuser() give
167         right access to the web user.
168     """
169     phrase_class = 'class'
170     phrase_type = 'type'
171     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
172
173     table_factory(table_name)
174
175     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
176
177     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
178
179 def test_create_place_classtype_table_and_indexes(
180         temp_db_conn, def_config, placex_table,
181         sp_importer):
182     """
183         Test that _create_place_classtype_table_and_indexes()
184         create the right place_classtype tables and place_id indexes
185         and centroid indexes and grant access to the web user
186         for the given set of pairs.
187     """
188     pairs = set([('class1', 'type1'), ('class2', 'type2')])
189
190     sp_importer._create_place_classtype_table_and_indexes(pairs)
191
192     for pair in pairs:
193         assert check_table_exist(temp_db_conn, pair[0], pair[1])
194         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
195         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
196
197 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
198                                             temp_db_conn):
199     """
200         Check for the remove_non_existent_phrases_from_db() method.
201
202         It should removed entries from the word table which are contained
203         in the words_phrases_to_delete set and not those also contained
204         in the words_phrases_still_exist set.
205
206         place_classtype tables contained in table_phrases_to_delete should
207         be deleted.
208     """
209     sp_importer.table_phrases_to_delete = {
210         'place_classtype_testclasstypetable_to_delete'
211     }
212
213     query_tables = """
214         SELECT table_name
215         FROM information_schema.tables
216         WHERE table_schema='public'
217         AND table_name like 'place_classtype_%';
218     """
219
220     sp_importer._remove_non_existent_tables_from_db()
221
222     # Changes are not committed yet. Use temp_db_conn for checking results.
223     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
224         assert cur.row_set(query_tables) \
225                  == {('place_classtype_testclasstypetable_to_keep', )}
226
227
228 @pytest.mark.parametrize("should_replace", [(True), (False)])
229 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
230                         placex_table, table_factory, tokenizer_mock,
231                         xml_wiki_content, should_replace):
232     """
233         Check that the main import_phrases() method is well executed.
234         It should create the place_classtype table, the place_id and centroid indexes,
235         grand access to the web user and executing the SQL functions for amenities.
236         It should also update the database well by deleting or preserving existing entries
237         of the database.
238     """
239     #Add some data to the database before execution in order to test
240     #what is deleted and what is preserved.
241     table_factory('place_classtype_amenity_animal_shelter')
242     table_factory('place_classtype_wrongclass_wrongtype')
243
244     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
245                         lambda self, lang: xml_wiki_content)
246
247     tokenizer = tokenizer_mock()
248     sp_importer.import_phrases(tokenizer, should_replace)
249
250     assert len(tokenizer.analyser_cache['special_phrases']) == 18
251
252     class_test = 'aerialway'
253     type_test = 'zip_line'
254
255     assert check_table_exist(temp_db_conn, class_test, type_test)
256     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
257     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
258     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
259     if should_replace:
260         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
261
262     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
263     if should_replace:
264         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
265
266 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
267     """
268         Verify that the place_classtype table exists for the given
269         phrase_class and phrase_type.
270     """
271     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
272
273
274 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
275     """
276         Check that the web user has been granted right access to the
277         place_classtype table of the given phrase_class and phrase_type.
278     """
279     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
280
281     with temp_db_conn.cursor() as temp_db_cursor:
282         temp_db_cursor.execute("""
283                 SELECT * FROM information_schema.role_table_grants
284                 WHERE table_name='{}'
285                 AND grantee='{}'
286                 AND privilege_type='SELECT'""".format(table_name, user))
287         return temp_db_cursor.fetchone()
288
289 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
290     """
291         Check that the place_id index and centroid index exist for the
292         place_classtype table of the given phrase_class and phrase_type.
293     """
294     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
295
296     return (
297         temp_db_conn.index_exists(index_prefix + 'centroid')
298         and
299         temp_db_conn.index_exists(index_prefix + 'place_id')
300     )