]> git.openstreetmap.org Git - nominatim.git/blob - test/python/tools/test_import_special_phrases.py
make DB helper functions free functions
[nominatim.git] / test / python / tools / test_import_special_phrases.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Tests for import special phrases methods
9     of the class SPImporter.
10 """
11 from shutil import copyfile
12 import pytest
13 from nominatim_db.tools.special_phrases.sp_importer import SPImporter
14 from nominatim_db.tools.special_phrases.sp_wiki_loader import SPWikiLoader
15 from nominatim_db.tools.special_phrases.special_phrase import SpecialPhrase
16 from nominatim_db.errors import UsageError
17
18 from cursor import CursorForTesting
19
20 @pytest.fixture
21 def sp_importer(temp_db_conn, def_config, monkeypatch):
22     """
23         Return an instance of SPImporter.
24     """
25     monkeypatch.setenv('NOMINATIM_LANGUAGES', 'en')
26     loader = SPWikiLoader(def_config)
27     return SPImporter(def_config, temp_db_conn, loader)
28
29
30 @pytest.fixture
31 def xml_wiki_content(src_dir):
32     """
33         return the content of the static xml test file.
34     """
35     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
36     return xml_test_content.read_text()
37
38
39 @pytest.fixture
40 def default_phrases(table_factory):
41     table_factory('place_classtype_testclasstypetable_to_delete')
42     table_factory('place_classtype_testclasstypetable_to_keep')
43
44
45 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
46     """
47         Check for the fetch_existing_place_classtype_tables() method.
48         It should return the table just created.
49     """
50     table_factory('place_classtype_testclasstypetable')
51
52     sp_importer._fetch_existing_place_classtype_tables()
53     contained_table = sp_importer.table_phrases_to_delete.pop()
54     assert contained_table == 'place_classtype_testclasstypetable'
55
56 def test_check_sanity_class(sp_importer):
57     """
58         Check for _check_sanity() method.
59         If a wrong class or type is given, an UsageError should raise.
60         If a good class and type are given, nothing special happens.
61     """
62
63     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
64     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
65
66     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
67
68 def test_load_white_and_black_lists(sp_importer):
69     """
70         Test that _load_white_and_black_lists() well return
71         black list and white list and that they are of dict type.
72     """
73     black_list, white_list = sp_importer._load_white_and_black_lists()
74
75     assert isinstance(black_list, dict) and isinstance(white_list, dict)
76
77
78 def test_create_place_classtype_indexes(temp_db_with_extensions,
79                                         temp_db_conn, temp_db_cursor,
80                                         table_factory, sp_importer):
81     """
82         Test that _create_place_classtype_indexes() create the
83         place_id index and centroid index on the right place_class_type table.
84     """
85     phrase_class = 'class'
86     phrase_type = 'type'
87     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
88
89     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
90
91     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
92     temp_db_conn.commit()
93
94     assert check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type)
95
96 def test_create_place_classtype_table(temp_db_conn, temp_db_cursor, placex_table, sp_importer):
97     """
98         Test that _create_place_classtype_table() create
99         the right place_classtype table.
100     """
101     phrase_class = 'class'
102     phrase_type = 'type'
103     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
104     temp_db_conn.commit()
105
106     assert check_table_exist(temp_db_cursor, phrase_class, phrase_type)
107
108 def test_grant_access_to_web_user(temp_db_conn, temp_db_cursor, table_factory,
109                                   def_config, sp_importer):
110     """
111         Test that _grant_access_to_webuser() give
112         right access to the web user.
113     """
114     phrase_class = 'class'
115     phrase_type = 'type'
116     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
117
118     table_factory(table_name)
119
120     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
121     temp_db_conn.commit()
122
123     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
124
125 def test_create_place_classtype_table_and_indexes(
126         temp_db_cursor, def_config, placex_table,
127         sp_importer, temp_db_conn):
128     """
129         Test that _create_place_classtype_table_and_indexes()
130         create the right place_classtype tables and place_id indexes
131         and centroid indexes and grant access to the web user
132         for the given set of pairs.
133     """
134     pairs = set([('class1', 'type1'), ('class2', 'type2')])
135
136     sp_importer._create_classtype_table_and_indexes(pairs)
137     temp_db_conn.commit()
138
139     for pair in pairs:
140         assert check_table_exist(temp_db_cursor, pair[0], pair[1])
141         assert check_placeid_and_centroid_indexes(temp_db_cursor, pair[0], pair[1])
142         assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, pair[0], pair[1])
143
144 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
145                                             temp_db_conn, temp_db_cursor):
146     """
147         Check for the remove_non_existent_phrases_from_db() method.
148
149         It should removed entries from the word table which are contained
150         in the words_phrases_to_delete set and not those also contained
151         in the words_phrases_still_exist set.
152
153         place_classtype tables contained in table_phrases_to_delete should
154         be deleted.
155     """
156     sp_importer.table_phrases_to_delete = {
157         'place_classtype_testclasstypetable_to_delete'
158     }
159
160     query_tables = """
161         SELECT table_name
162         FROM information_schema.tables
163         WHERE table_schema='public'
164         AND table_name like 'place_classtype_%';
165     """
166
167     sp_importer._remove_non_existent_tables_from_db()
168     temp_db_conn.commit()
169
170     assert temp_db_cursor.row_set(query_tables) \
171                  == {('place_classtype_testclasstypetable_to_keep', )}
172
173
174 @pytest.mark.parametrize("should_replace", [(True), (False)])
175 def test_import_phrases(monkeypatch, temp_db_cursor, def_config, sp_importer,
176                         placex_table, table_factory, tokenizer_mock,
177                         xml_wiki_content, should_replace):
178     """
179         Check that the main import_phrases() method is well executed.
180         It should create the place_classtype table, the place_id and centroid indexes,
181         grand access to the web user and executing the SQL functions for amenities.
182         It should also update the database well by deleting or preserving existing entries
183         of the database.
184     """
185     #Add some data to the database before execution in order to test
186     #what is deleted and what is preserved.
187     table_factory('place_classtype_amenity_animal_shelter')
188     table_factory('place_classtype_wrongclass_wrongtype')
189
190     monkeypatch.setattr('nominatim_db.tools.special_phrases.sp_wiki_loader._get_wiki_content',
191                         lambda lang: xml_wiki_content)
192
193     tokenizer = tokenizer_mock()
194     sp_importer.import_phrases(tokenizer, should_replace)
195
196     assert len(tokenizer.analyser_cache['special_phrases']) == 18
197
198     class_test = 'aerialway'
199     type_test = 'zip_line'
200
201     assert check_table_exist(temp_db_cursor, class_test, type_test)
202     assert check_placeid_and_centroid_indexes(temp_db_cursor, class_test, type_test)
203     assert check_grant_access(temp_db_cursor, def_config.DATABASE_WEBUSER, class_test, type_test)
204     assert check_table_exist(temp_db_cursor, 'amenity', 'animal_shelter')
205     if should_replace:
206         assert not check_table_exist(temp_db_cursor, 'wrong_class', 'wrong_type')
207
208     assert temp_db_cursor.table_exists('place_classtype_amenity_animal_shelter')
209     if should_replace:
210         assert not temp_db_cursor.table_exists('place_classtype_wrongclass_wrongtype')
211
212 def check_table_exist(temp_db_cursor, phrase_class, phrase_type):
213     """
214         Verify that the place_classtype table exists for the given
215         phrase_class and phrase_type.
216     """
217     return temp_db_cursor.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
218
219
220 def check_grant_access(temp_db_cursor, user, phrase_class, phrase_type):
221     """
222         Check that the web user has been granted right access to the
223         place_classtype table of the given phrase_class and phrase_type.
224     """
225     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
226
227     temp_db_cursor.execute("""
228             SELECT * FROM information_schema.role_table_grants
229             WHERE table_name='{}'
230             AND grantee='{}'
231             AND privilege_type='SELECT'""".format(table_name, user))
232     return temp_db_cursor.fetchone()
233
234 def check_placeid_and_centroid_indexes(temp_db_cursor, phrase_class, phrase_type):
235     """
236         Check that the place_id index and centroid index exist for the
237         place_classtype table of the given phrase_class and phrase_type.
238     """
239     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
240     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
241
242     return (
243         temp_db_cursor.index_exists(table_name, index_prefix + 'centroid')
244         and
245         temp_db_cursor.index_exists(table_name, index_prefix + 'place_id')
246     )