]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_import_special_phrases.py
Merge pull request #2666 from lonvia/admin-command-for-forced-indexing
[nominatim.git] / test / python / tools / test_import_special_phrases.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Tests for import special phrases methods
9     of the class SPImporter.
10 """
11 from shutil import copyfile
12 import pytest
13 from nominatim.tools.special_phrases.sp_importer import SPImporter
14 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
15 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
16 from nominatim.errors import UsageError
17
18 from cursor import CursorForTesting
19
20 @pytest.fixture
21 def testfile_dir(src_dir):
22     return src_dir / 'test' / 'testfiles'
23
24
25 @pytest.fixture
26 def sp_importer(temp_db_conn, def_config):
27     """
28         Return an instance of SPImporter.
29     """
30     loader = SPWikiLoader(def_config, ['en'])
31     return SPImporter(def_config, temp_db_conn, loader)
32
33
34 @pytest.fixture
35 def xml_wiki_content(src_dir):
36     """
37         return the content of the static xml test file.
38     """
39     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
40     return xml_test_content.read_text()
41
42
43 @pytest.fixture
44 def default_phrases(table_factory):
45     table_factory('place_classtype_testclasstypetable_to_delete')
46     table_factory('place_classtype_testclasstypetable_to_keep')
47
48
49 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
50     """
51         Check for the fetch_existing_place_classtype_tables() method.
52         It should return the table just created.
53     """
54     table_factory('place_classtype_testclasstypetable')
55
56     sp_importer._fetch_existing_place_classtype_tables()
57     contained_table = sp_importer.table_phrases_to_delete.pop()
58     assert contained_table == 'place_classtype_testclasstypetable'
59
60 def test_check_sanity_class(sp_importer):
61     """
62         Check for _check_sanity() method.
63         If a wrong class or type is given, an UsageError should raise.
64         If a good class and type are given, nothing special happens.
65     """
66
67     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
68     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
69
70     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
71
72 def test_load_white_and_black_lists(sp_importer):
73     """
74         Test that _load_white_and_black_lists() well return
75         black list and white list and that they are of dict type.
76     """
77     black_list, white_list = sp_importer._load_white_and_black_lists()
78
79     assert isinstance(black_list, dict) and isinstance(white_list, dict)
80
81
82 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
83                                         table_factory, sp_importer):
84     """
85         Test that _create_place_classtype_indexes() create the
86         place_id index and centroid index on the right place_class_type table.
87     """
88     phrase_class = 'class'
89     phrase_type = 'type'
90     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
91
92     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
93
94     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
95
96     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
97
98 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
99     """
100         Test that _create_place_classtype_table() create
101         the right place_classtype table.
102     """
103     phrase_class = 'class'
104     phrase_type = 'type'
105     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
106
107     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
108
109 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
110     """
111         Test that _grant_access_to_webuser() give
112         right access to the web user.
113     """
114     phrase_class = 'class'
115     phrase_type = 'type'
116     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
117
118     table_factory(table_name)
119
120     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
121
122     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
123
124 def test_create_place_classtype_table_and_indexes(
125         temp_db_conn, def_config, placex_table,
126         sp_importer):
127     """
128         Test that _create_place_classtype_table_and_indexes()
129         create the right place_classtype tables and place_id indexes
130         and centroid indexes and grant access to the web user
131         for the given set of pairs.
132     """
133     pairs = set([('class1', 'type1'), ('class2', 'type2')])
134
135     sp_importer._create_place_classtype_table_and_indexes(pairs)
136
137     for pair in pairs:
138         assert check_table_exist(temp_db_conn, pair[0], pair[1])
139         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
140         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
141
142 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
143                                             temp_db_conn):
144     """
145         Check for the remove_non_existent_phrases_from_db() method.
146
147         It should removed entries from the word table which are contained
148         in the words_phrases_to_delete set and not those also contained
149         in the words_phrases_still_exist set.
150
151         place_classtype tables contained in table_phrases_to_delete should
152         be deleted.
153     """
154     sp_importer.table_phrases_to_delete = {
155         'place_classtype_testclasstypetable_to_delete'
156     }
157
158     query_tables = """
159         SELECT table_name
160         FROM information_schema.tables
161         WHERE table_schema='public'
162         AND table_name like 'place_classtype_%';
163     """
164
165     sp_importer._remove_non_existent_tables_from_db()
166
167     # Changes are not committed yet. Use temp_db_conn for checking results.
168     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
169         assert cur.row_set(query_tables) \
170                  == {('place_classtype_testclasstypetable_to_keep', )}
171
172
173 @pytest.mark.parametrize("should_replace", [(True), (False)])
174 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
175                         placex_table, table_factory, tokenizer_mock,
176                         xml_wiki_content, should_replace):
177     """
178         Check that the main import_phrases() method is well executed.
179         It should create the place_classtype table, the place_id and centroid indexes,
180         grand access to the web user and executing the SQL functions for amenities.
181         It should also update the database well by deleting or preserving existing entries
182         of the database.
183     """
184     #Add some data to the database before execution in order to test
185     #what is deleted and what is preserved.
186     table_factory('place_classtype_amenity_animal_shelter')
187     table_factory('place_classtype_wrongclass_wrongtype')
188
189     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
190                         lambda self, lang: xml_wiki_content)
191
192     tokenizer = tokenizer_mock()
193     sp_importer.import_phrases(tokenizer, should_replace)
194
195     assert len(tokenizer.analyser_cache['special_phrases']) == 18
196
197     class_test = 'aerialway'
198     type_test = 'zip_line'
199
200     assert check_table_exist(temp_db_conn, class_test, type_test)
201     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
202     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
203     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
204     if should_replace:
205         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
206
207     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
208     if should_replace:
209         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
210
211 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
212     """
213         Verify that the place_classtype table exists for the given
214         phrase_class and phrase_type.
215     """
216     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
217
218
219 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
220     """
221         Check that the web user has been granted right access to the
222         place_classtype table of the given phrase_class and phrase_type.
223     """
224     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
225
226     with temp_db_conn.cursor() as temp_db_cursor:
227         temp_db_cursor.execute("""
228                 SELECT * FROM information_schema.role_table_grants
229                 WHERE table_name='{}'
230                 AND grantee='{}'
231                 AND privilege_type='SELECT'""".format(table_name, user))
232         return temp_db_cursor.fetchone()
233
234 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
235     """
236         Check that the place_id index and centroid index exist for the
237         place_classtype table of the given phrase_class and phrase_type.
238     """
239     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
240
241     return (
242         temp_db_conn.index_exists(index_prefix + 'centroid')
243         and
244         temp_db_conn.index_exists(index_prefix + 'place_id')
245     )