]> git.openstreetmap.org Git - nominatim.git/blob - test/python/test_tools_import_special_phrases.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / python / test_tools_import_special_phrases.py
1 """
2     Tests for import special phrases methods
3     of the class SPImporter.
4 """
5 from shutil import copyfile
6 import pytest
7 from nominatim.tools.special_phrases.sp_importer import SPImporter
8 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
9 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
10 from nominatim.errors import UsageError
11
12 from cursor import CursorForTesting
13
14 @pytest.fixture
15 def testfile_dir(src_dir):
16     return src_dir / 'test' / 'testfiles'
17
18
19 @pytest.fixture
20 def sp_importer(temp_db_conn, def_config):
21     """
22         Return an instance of SPImporter.
23     """
24     loader = SPWikiLoader(def_config, ['en'])
25     return SPImporter(def_config, temp_db_conn, loader)
26
27
28 @pytest.fixture
29 def xml_wiki_content(src_dir):
30     """
31         return the content of the static xml test file.
32     """
33     xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
34     return xml_test_content.read_text()
35
36
37 @pytest.fixture
38 def default_phrases(table_factory):
39     table_factory('place_classtype_testclasstypetable_to_delete')
40     table_factory('place_classtype_testclasstypetable_to_keep')
41
42
43 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
44     """
45         Check for the fetch_existing_place_classtype_tables() method.
46         It should return the table just created.
47     """
48     table_factory('place_classtype_testclasstypetable')
49
50     sp_importer._fetch_existing_place_classtype_tables()
51     contained_table = sp_importer.table_phrases_to_delete.pop()
52     assert contained_table == 'place_classtype_testclasstypetable'
53
54 def test_check_sanity_class(sp_importer):
55     """
56         Check for _check_sanity() method.
57         If a wrong class or type is given, an UsageError should raise.
58         If a good class and type are given, nothing special happens.
59     """
60
61     assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
62     assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
63
64     assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
65
66 def test_load_white_and_black_lists(sp_importer):
67     """
68         Test that _load_white_and_black_lists() well return
69         black list and white list and that they are of dict type.
70     """
71     black_list, white_list = sp_importer._load_white_and_black_lists()
72
73     assert isinstance(black_list, dict) and isinstance(white_list, dict)
74
75
76 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
77                                         table_factory, sp_importer):
78     """
79         Test that _create_place_classtype_indexes() create the
80         place_id index and centroid index on the right place_class_type table.
81     """
82     phrase_class = 'class'
83     phrase_type = 'type'
84     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
85
86     table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
87
88     sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
89
90     assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
91
92 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
93     """
94         Test that _create_place_classtype_table() create
95         the right place_classtype table.
96     """
97     phrase_class = 'class'
98     phrase_type = 'type'
99     sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
100
101     assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
102
103 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
104     """
105         Test that _grant_access_to_webuser() give
106         right access to the web user.
107     """
108     phrase_class = 'class'
109     phrase_type = 'type'
110     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
111
112     table_factory(table_name)
113
114     sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
115
116     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
117
118 def test_create_place_classtype_table_and_indexes(
119         temp_db_conn, def_config, placex_table,
120         sp_importer):
121     """
122         Test that _create_place_classtype_table_and_indexes()
123         create the right place_classtype tables and place_id indexes
124         and centroid indexes and grant access to the web user
125         for the given set of pairs.
126     """
127     pairs = set([('class1', 'type1'), ('class2', 'type2')])
128
129     sp_importer._create_place_classtype_table_and_indexes(pairs)
130
131     for pair in pairs:
132         assert check_table_exist(temp_db_conn, pair[0], pair[1])
133         assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
134         assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
135
136 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
137                                             temp_db_conn):
138     """
139         Check for the remove_non_existent_phrases_from_db() method.
140
141         It should removed entries from the word table which are contained
142         in the words_phrases_to_delete set and not those also contained
143         in the words_phrases_still_exist set.
144
145         place_classtype tables contained in table_phrases_to_delete should
146         be deleted.
147     """
148     sp_importer.table_phrases_to_delete = {
149         'place_classtype_testclasstypetable_to_delete'
150     }
151
152     query_tables = """
153         SELECT table_name
154         FROM information_schema.tables
155         WHERE table_schema='public'
156         AND table_name like 'place_classtype_%';
157     """
158
159     sp_importer._remove_non_existent_tables_from_db()
160
161     # Changes are not committed yet. Use temp_db_conn for checking results.
162     with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
163         assert cur.row_set(query_tables) \
164                  == {('place_classtype_testclasstypetable_to_keep', )}
165
166
167 @pytest.mark.parametrize("should_replace", [(True), (False)])
168 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
169                         placex_table, table_factory, tokenizer_mock,
170                         xml_wiki_content, should_replace):
171     """
172         Check that the main import_phrases() method is well executed.
173         It should create the place_classtype table, the place_id and centroid indexes,
174         grand access to the web user and executing the SQL functions for amenities.
175         It should also update the database well by deleting or preserving existing entries
176         of the database.
177     """
178     #Add some data to the database before execution in order to test
179     #what is deleted and what is preserved.
180     table_factory('place_classtype_amenity_animal_shelter')
181     table_factory('place_classtype_wrongclass_wrongtype')
182
183     monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
184                         lambda self, lang: xml_wiki_content)
185
186     tokenizer = tokenizer_mock()
187     sp_importer.import_phrases(tokenizer, should_replace)
188
189     assert len(tokenizer.analyser_cache['special_phrases']) == 18
190
191     class_test = 'aerialway'
192     type_test = 'zip_line'
193
194     assert check_table_exist(temp_db_conn, class_test, type_test)
195     assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
196     assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
197     assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
198     if should_replace:
199         assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
200
201     assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
202     if should_replace:
203         assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
204
205 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
206     """
207         Verify that the place_classtype table exists for the given
208         phrase_class and phrase_type.
209     """
210     return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
211
212
213 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
214     """
215         Check that the web user has been granted right access to the
216         place_classtype table of the given phrase_class and phrase_type.
217     """
218     table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
219
220     with temp_db_conn.cursor() as temp_db_cursor:
221         temp_db_cursor.execute("""
222                 SELECT * FROM information_schema.role_table_grants
223                 WHERE table_name='{}'
224                 AND grantee='{}'
225                 AND privilege_type='SELECT'""".format(table_name, user))
226         return temp_db_cursor.fetchone()
227
228 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
229     """
230         Check that the place_id index and centroid index exist for the
231         place_classtype table of the given phrase_class and phrase_type.
232     """
233     index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
234
235     return (
236         temp_db_conn.index_exists(index_prefix + 'centroid')
237         and
238         temp_db_conn.index_exists(index_prefix + 'place_id')
239     )