]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
move index creation for word table to tokenizer
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SpecialPhrasesImporter.
4 """
5 from nominatim.errors import UsageError
6 from pathlib import Path
7 import tempfile
8 from shutil import copyfile
9 import pytest
10 from nominatim.tools import SpecialPhrasesImporter
11
12 TEST_BASE_DIR = Path(__file__) / '..' / '..'
13
14 def test_fetch_existing_place_classtype_tables(special_phrases_importer, temp_db_cursor):
15     """
16         Check for the fetch_existing_place_classtype_tables() method.
17         It should return the table just created.
18     """
19     temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
20
21     special_phrases_importer._fetch_existing_place_classtype_tables()
22     contained_table = special_phrases_importer.table_phrases_to_delete.pop()
23     assert contained_table == 'place_classtype_testclasstypetable'
24
25 def test_check_sanity_class(special_phrases_importer):
26     """
27         Check for _check_sanity() method.
28         If a wrong class or type is given, an UsageError should raise.
29         If a good class and type are given, nothing special happens.
30     """
31     
32     assert not special_phrases_importer._check_sanity('en', '', 'type')
33     assert not special_phrases_importer._check_sanity('en', 'class', '')
34
35     assert special_phrases_importer._check_sanity('en', 'class', 'type')
36
37 def test_load_white_and_black_lists(special_phrases_importer):
38     """
39         Test that _load_white_and_black_lists() well return
40         black list and white list and that they are of dict type.
41     """
42     black_list, white_list = special_phrases_importer._load_white_and_black_lists()
43
44     assert isinstance(black_list, dict) and isinstance(white_list, dict)
45
46 def test_convert_php_settings(special_phrases_importer):
47     """
48         Test that _convert_php_settings_if_needed() convert the given
49         php file to a json file.
50     """
51     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
52
53     with tempfile.TemporaryDirectory() as temp_dir:
54         temp_settings = (Path(temp_dir) / 'phrase_settings.php').resolve()
55         copyfile(php_file, temp_settings)
56         special_phrases_importer._convert_php_settings_if_needed(temp_settings)
57
58         assert (Path(temp_dir) / 'phrase_settings.json').is_file()
59
60 def test_convert_settings_wrong_file(special_phrases_importer):
61     """
62         Test that _convert_php_settings_if_needed() raise an exception
63         if the given file is not a valid file.
64     """
65     with pytest.raises(UsageError, match='random_file is not a valid file.'):
66         special_phrases_importer._convert_php_settings_if_needed('random_file')
67
68 def test_convert_settings_json_already_exist(special_phrases_importer):
69     """
70         Test that if we give to '_convert_php_settings_if_needed' a php file path
71         and that a the corresponding json file already exists, it is returned.
72     """
73     php_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.php').resolve()
74     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
75
76     returned = special_phrases_importer._convert_php_settings_if_needed(php_file)
77
78     assert returned == json_file
79
80 def test_convert_settings_giving_json(special_phrases_importer):
81     """
82         Test that if we give to '_convert_php_settings_if_needed' a json file path
83         the same path is directly returned
84     """
85     json_file = (TEST_BASE_DIR / 'testfiles' / 'phrase_settings.json').resolve()
86
87     returned = special_phrases_importer._convert_php_settings_if_needed(json_file)
88
89     assert returned == json_file
90
91 def test_create_place_classtype_indexes(temp_db_conn, special_phrases_importer):
92     """
93         Test that _create_place_classtype_indexes() create the
94         place_id index and centroid index on the right place_class_type table.
95     """
96     phrase_class = 'class'
97     phrase_type = 'type'
98     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
99
100     with temp_db_conn.cursor() as temp_db_cursor:
101         temp_db_cursor.execute("CREATE EXTENSION postgis;")
102         temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
103
104     special_phrases_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
105
106     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
107
108 def test_create_place_classtype_table(temp_db_conn, placex_table, special_phrases_importer):
109     """
110         Test that _create_place_classtype_table() create
111         the right place_classtype table.
112     """
113     phrase_class = 'class'
114     phrase_type = 'type'
115     special_phrases_importer._create_place_classtype_table('', phrase_class, phrase_type)
116
117     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
118
119 def test_grant_access_to_web_user(temp_db_conn, def_config, special_phrases_importer):
120     """
121         Test that _grant_access_to_webuser() give 
122         right access to the web user.
123     """
124     phrase_class = 'class'
125     phrase_type = 'type'
126     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
127
128     with temp_db_conn.cursor() as temp_db_cursor:
129         temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
130
131     special_phrases_importer._grant_access_to_webuser(phrase_class, phrase_type)
132
133     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
134
135 def test_create_place_classtype_table_and_indexes(
136         temp_db_conn, def_config, placex_table,
137         special_phrases_importer):
138     """
139         Test that _create_place_classtype_table_and_indexes()
140         create the right place_classtype tables and place_id indexes
141         and centroid indexes and grant access to the web user
142         for the given set of pairs.
143     """
144     pairs = set([('class1', 'type1'), ('class2', 'type2')])
145
146     special_phrases_importer._create_place_classtype_table_and_indexes(pairs)
147
148     for pair in pairs:
149         assert check_table_exist(temp_db_conn, pair[0], pair[1])
150         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
151         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
152
153 def test_process_xml_content(temp_db_conn, def_config, special_phrases_importer):
154     """
155         Test that _process_xml_content() process the given xml content right
156         by executing the right SQL functions for amenities and 
157         by returning the right set of pairs.
158     """
159     class_test = 'aerialway'
160     type_test = 'zip_line'
161
162     #Converted output set to a dict for easy assert further.
163     results = dict(special_phrases_importer._process_xml_content(get_test_xml_wiki_content(), 'en'))
164
165     assert results[class_test] and type_test in results.values()
166
167 def test_remove_non_existent_tables_from_db(special_phrases_importer, default_phrases,
168                                              temp_db_conn):
169     """
170         Check for the remove_non_existent_phrases_from_db() method.
171
172         It should removed entries from the word table which are contained
173         in the words_phrases_to_delete set and not those also contained
174         in the words_phrases_still_exist set.
175
176         place_classtype tables contained in table_phrases_to_delete should
177         be deleted.
178     """
179     with temp_db_conn.cursor() as temp_db_cursor:
180         special_phrases_importer.table_phrases_to_delete = {
181             'place_classtype_testclasstypetable_to_delete'
182         }
183
184         query_tables = """
185             SELECT table_name
186             FROM information_schema.tables
187             WHERE table_schema='public'
188             AND table_name like 'place_classtype_%';
189         """
190
191         special_phrases_importer._remove_non_existent_tables_from_db()
192
193         temp_db_cursor.execute(query_tables)
194         tables_result = temp_db_cursor.fetchall()
195         assert (len(tables_result) == 1 and
196             tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
197         )
198
199 def test_import_from_wiki(monkeypatch, temp_db_conn, def_config, special_phrases_importer,
200                           placex_table, tokenizer_mock):
201     """
202         Check that the main import_from_wiki() method is well executed.
203         It should create the place_classtype table, the place_id and centroid indexes,
204         grand access to the web user and executing the SQL functions for amenities.
205         It should also update the database well by deleting or preserving existing entries 
206         of the database.
207     """
208     #Add some data to the database before execution in order to test
209     #what is deleted and what is preserved.
210     with temp_db_conn.cursor() as temp_db_cursor:
211         temp_db_cursor.execute("""
212             CREATE TABLE place_classtype_amenity_animal_shelter();
213             CREATE TABLE place_classtype_wrongclass_wrongtype();""")
214
215     monkeypatch.setattr('nominatim.tools.SpecialPhrasesImporter._get_wiki_content', mock_get_wiki_content)
216     tokenizer = tokenizer_mock()
217     special_phrases_importer.import_from_wiki(tokenizer, ['en'])
218
219     assert len(tokenizer.analyser_cache['special_phrases']) == 18
220
221     class_test = 'aerialway'
222     type_test = 'zip_line'
223
224     assert check_table_exist(temp_db_conn, class_test, type_test)
225     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
226     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
227     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
228     assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
229
230     #Format (query, should_return_something_bool) use to easily execute all asserts
231     queries_tests = set()
232
233     #Used to check that correct place_classtype table already in the datase before is still there.
234     query_existing_table = """
235         SELECT table_name
236         FROM information_schema.tables
237         WHERE table_schema='public'
238         AND table_name = 'place_classtype_amenity_animal_shelter';
239     """
240     queries_tests.add((query_existing_table, True))
241
242     #Used to check that wrong place_classtype table was deleted from the database.
243     query_wrong_table = """
244         SELECT table_name
245         FROM information_schema.tables
246         WHERE table_schema='public'
247         AND table_name = 'place_classtype_wrongclass_wrongtype';
248     """
249     queries_tests.add((query_wrong_table, False))
250
251     with temp_db_conn.cursor() as temp_db_cursor:
252         for query in queries_tests:
253             temp_db_cursor.execute(query[0])
254             if (query[1] == True):
255                 assert temp_db_cursor.fetchone()
256             else:
257                 assert not temp_db_cursor.fetchone()
258
259 def mock_get_wiki_content(lang):
260     """
261         Mock the _get_wiki_content() method to return
262         static xml test file content.
263     """
264     return get_test_xml_wiki_content()
265
266 def get_test_xml_wiki_content():
267     """
268         return the content of the static xml test file.
269     """
270     xml_test_content_path = (TEST_BASE_DIR / 'testdata' / 'special_phrases_test_content.txt').resolve()
271     with open(xml_test_content_path) as xml_content_reader:
272         return xml_content_reader.read()
273
274 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
275     """
276         Verify that the place_classtype table exists for the given
277         phrase_class and phrase_type.
278     """
279     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
280
281     with temp_db_conn.cursor() as temp_db_cursor:
282         temp_db_cursor.execute("""
283             SELECT *
284             FROM information_schema.tables
285             WHERE table_type='BASE TABLE'
286             AND table_name='{}'""".format(table_name))
287         return temp_db_cursor.fetchone()
288
289 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
290     """
291         Check that the web user has been granted right access to the
292         place_classtype table of the given phrase_class and phrase_type.
293     """
294     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
295
296     with temp_db_conn.cursor() as temp_db_cursor:
297         temp_db_cursor.execute("""
298                 SELECT * FROM information_schema.role_table_grants
299                 WHERE table_name='{}'
300                 AND grantee='{}'
301                 AND privilege_type='SELECT'""".format(table_name, user))
302         return temp_db_cursor.fetchone()
303
304 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
305     """
306         Check that the place_id index and centroid index exist for the
307         place_classtype table of the given phrase_class and phrase_type.
308     """
309     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
310
311     return (
312         temp_db_conn.index_exists(index_prefix + 'centroid')
313         and
314         temp_db_conn.index_exists(index_prefix + 'place_id')
315     )
316
317 @pytest.fixture
318 def special_phrases_importer(temp_db_conn, def_config, temp_phplib_dir_with_migration):
319     """
320         Return an instance of SpecialPhrasesImporter.
321     """
322     return SpecialPhrasesImporter(def_config, temp_phplib_dir_with_migration, temp_db_conn)
323
324 @pytest.fixture
325 def temp_phplib_dir_with_migration():
326     """
327         Return temporary phpdir with migration subdirectory and
328         PhraseSettingsToJson.php script inside.
329     """
330     migration_file = (TEST_BASE_DIR / '..' / 'lib-php' / 'migration'
331                       / 'PhraseSettingsToJson.php').resolve()
332     with tempfile.TemporaryDirectory() as phpdir:
333         (Path(phpdir) / 'migration').mkdir()
334         migration_dest_path = (Path(phpdir) / 'migration' / 'PhraseSettingsToJson.php').resolve()
335         copyfile(migration_file, migration_dest_path)
336
337         yield Path(phpdir)
338
339 @pytest.fixture
340 def default_phrases(temp_db_cursor):
341     temp_db_cursor.execute("""
342         CREATE TABLE place_classtype_testclasstypetable_to_delete();
343         CREATE TABLE place_classtype_testclasstypetable_to_keep();""")