2 Tests for import special phrases methods
3 of the class SPImporter.
5 from shutil import copyfile
7 from nominatim.tools.special_phrases.sp_importer import SPImporter
8 from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
9 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
10 from nominatim.errors import UsageError
12 from cursor import CursorForTesting
15 def testfile_dir(src_dir):
16 return src_dir / 'test' / 'testfiles'
20 def sp_importer(temp_db_conn, def_config):
22 Return an instance of SPImporter.
24 loader = SPWikiLoader(def_config, ['en'])
25 return SPImporter(def_config, temp_db_conn, loader)
29 def xml_wiki_content(src_dir):
31 return the content of the static xml test file.
33 xml_test_content = src_dir / 'test' / 'testdata' / 'special_phrases_test_content.txt'
34 return xml_test_content.read_text()
38 def default_phrases(table_factory):
39 table_factory('place_classtype_testclasstypetable_to_delete')
40 table_factory('place_classtype_testclasstypetable_to_keep')
43 def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
45 Check for the fetch_existing_place_classtype_tables() method.
46 It should return the table just created.
48 table_factory('place_classtype_testclasstypetable')
50 sp_importer._fetch_existing_place_classtype_tables()
51 contained_table = sp_importer.table_phrases_to_delete.pop()
52 assert contained_table == 'place_classtype_testclasstypetable'
54 def test_check_sanity_class(sp_importer):
56 Check for _check_sanity() method.
57 If a wrong class or type is given, an UsageError should raise.
58 If a good class and type are given, nothing special happens.
61 assert not sp_importer._check_sanity(SpecialPhrase('en', '', 'type', ''))
62 assert not sp_importer._check_sanity(SpecialPhrase('en', 'class', '', ''))
64 assert sp_importer._check_sanity(SpecialPhrase('en', 'class', 'type', ''))
66 def test_load_white_and_black_lists(sp_importer):
68 Test that _load_white_and_black_lists() well return
69 black list and white list and that they are of dict type.
71 black_list, white_list = sp_importer._load_white_and_black_lists()
73 assert isinstance(black_list, dict) and isinstance(white_list, dict)
76 def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
77 table_factory, sp_importer):
79 Test that _create_place_classtype_indexes() create the
80 place_id index and centroid index on the right place_class_type table.
82 phrase_class = 'class'
84 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
86 table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
88 sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
90 assert check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type)
92 def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
94 Test that _create_place_classtype_table() create
95 the right place_classtype table.
97 phrase_class = 'class'
99 sp_importer._create_place_classtype_table('', phrase_class, phrase_type)
101 assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
103 def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
105 Test that _grant_access_to_webuser() give
106 right access to the web user.
108 phrase_class = 'class'
110 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
112 table_factory(table_name)
114 sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
116 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, phrase_class, phrase_type)
118 def test_create_place_classtype_table_and_indexes(
119 temp_db_conn, def_config, placex_table,
122 Test that _create_place_classtype_table_and_indexes()
123 create the right place_classtype tables and place_id indexes
124 and centroid indexes and grant access to the web user
125 for the given set of pairs.
127 pairs = set([('class1', 'type1'), ('class2', 'type2')])
129 sp_importer._create_place_classtype_table_and_indexes(pairs)
132 assert check_table_exist(temp_db_conn, pair[0], pair[1])
133 assert check_placeid_and_centroid_indexes(temp_db_conn, pair[0], pair[1])
134 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, pair[0], pair[1])
136 def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
139 Check for the remove_non_existent_phrases_from_db() method.
141 It should removed entries from the word table which are contained
142 in the words_phrases_to_delete set and not those also contained
143 in the words_phrases_still_exist set.
145 place_classtype tables contained in table_phrases_to_delete should
148 sp_importer.table_phrases_to_delete = {
149 'place_classtype_testclasstypetable_to_delete'
154 FROM information_schema.tables
155 WHERE table_schema='public'
156 AND table_name like 'place_classtype_%';
159 sp_importer._remove_non_existent_tables_from_db()
161 # Changes are not committed yet. Use temp_db_conn for checking results.
162 with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
163 assert cur.row_set(query_tables) \
164 == {('place_classtype_testclasstypetable_to_keep', )}
167 @pytest.mark.parametrize("should_replace", [(True), (False)])
168 def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
169 placex_table, table_factory, tokenizer_mock,
170 xml_wiki_content, should_replace):
172 Check that the main import_phrases() method is well executed.
173 It should create the place_classtype table, the place_id and centroid indexes,
174 grand access to the web user and executing the SQL functions for amenities.
175 It should also update the database well by deleting or preserving existing entries
178 #Add some data to the database before execution in order to test
179 #what is deleted and what is preserved.
180 table_factory('place_classtype_amenity_animal_shelter')
181 table_factory('place_classtype_wrongclass_wrongtype')
183 monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
184 lambda self, lang: xml_wiki_content)
186 tokenizer = tokenizer_mock()
187 sp_importer.import_phrases(tokenizer, should_replace)
189 assert len(tokenizer.analyser_cache['special_phrases']) == 18
191 class_test = 'aerialway'
192 type_test = 'zip_line'
194 assert check_table_exist(temp_db_conn, class_test, type_test)
195 assert check_placeid_and_centroid_indexes(temp_db_conn, class_test, type_test)
196 assert check_grant_access(temp_db_conn, def_config.DATABASE_WEBUSER, class_test, type_test)
197 assert check_table_exist(temp_db_conn, 'amenity', 'animal_shelter')
199 assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
201 assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
203 assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
205 def check_table_exist(temp_db_conn, phrase_class, phrase_type):
207 Verify that the place_classtype table exists for the given
208 phrase_class and phrase_type.
210 return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
213 def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
215 Check that the web user has been granted right access to the
216 place_classtype table of the given phrase_class and phrase_type.
218 table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
220 with temp_db_conn.cursor() as temp_db_cursor:
221 temp_db_cursor.execute("""
222 SELECT * FROM information_schema.role_table_grants
223 WHERE table_name='{}'
225 AND privilege_type='SELECT'""".format(table_name, user))
226 return temp_db_cursor.fetchone()
228 def check_placeid_and_centroid_indexes(temp_db_conn, phrase_class, phrase_type):
230 Check that the place_id index and centroid index exist for the
231 place_classtype table of the given phrase_class and phrase_type.
233 index_prefix = 'idx_place_classtype_{}_{}_'.format(phrase_class, phrase_type)
236 temp_db_conn.index_exists(index_prefix + 'centroid')
238 temp_db_conn.index_exists(index_prefix + 'place_id')