1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Module containing the class handling the import
9 of the special phrases.
11 Phrases are analyzed and imported into the database.
13 The phrases already present in the database which are not
14 valids anymore are removed.
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
20 from typing_extensions import Protocol
22 from psycopg2.sql import Identifier, SQL
24 from nominatim.config import Configuration
25 from nominatim.db.connection import Connection
26 from nominatim.tools.special_phrases.importer_statistics import SpecialPhrasesImporterStatistics
27 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
28 from nominatim.tokenizer.base import AbstractTokenizer
30 LOG = logging.getLogger()
32 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
33 """ Return the name of the table for the given class and type.
35 return f'place_classtype_{phrase_class}_{phrase_type}'
38 class SpecialPhraseLoader(Protocol):
39 """ Protocol for classes implementing a loader for special phrases.
42 def generate_phrases(self) -> Iterable[SpecialPhrase]:
43 """ Generates all special phrase terms this loader can produce.
48 # pylint: disable-msg=too-many-instance-attributes
50 Class handling the process of special phrases importation into the database.
52 Take a sp loader which load the phrases from an external source.
54 def __init__(self, config: Configuration, conn: Connection,
55 sp_loader: SpecialPhraseLoader) -> None:
57 self.db_connection = conn
58 self.sp_loader = sp_loader
59 self.statistics_handler = SpecialPhrasesImporterStatistics()
60 self.black_list, self.white_list = self._load_white_and_black_lists()
61 self.sanity_check_pattern = re.compile(r'^\w+$')
62 # This set will contain all existing phrases to be added.
63 # It contains tuples with the following format: (lable, class, type, operator)
64 self.word_phrases: Set[Tuple[str, str, str, str]] = set()
65 # This set will contain all existing place_classtype tables which doesn't match any
66 # special phrases class/type on the wiki.
67 self.table_phrases_to_delete: Set[str] = set()
69 def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
71 Iterate through all SpecialPhrases extracted from the
72 loader and import them into the database.
74 If should_replace is set to True only the loaded phrases
75 will be kept into the database. All other phrases already
76 in the database will be removed.
78 LOG.warning('Special phrases importation starting')
79 self._fetch_existing_place_classtype_tables()
81 # Store pairs of class/type for further processing
82 class_type_pairs = set()
84 for phrase in self.sp_loader.generate_phrases():
85 result = self._process_phrase(phrase)
87 class_type_pairs.add(result)
89 self._create_classtype_table_and_indexes(class_type_pairs)
91 self._remove_non_existent_tables_from_db()
92 self.db_connection.commit()
94 with tokenizer.name_analyzer() as analyzer:
95 analyzer.update_special_phrases(self.word_phrases, should_replace)
97 LOG.warning('Import done.')
98 self.statistics_handler.notify_import_done()
101 def _fetch_existing_place_classtype_tables(self) -> None:
103 Fetch existing place_classtype tables.
104 Fill the table_phrases_to_delete set of the class.
108 FROM information_schema.tables
109 WHERE table_schema='public'
110 AND table_name like 'place_classtype_%';
112 with self.db_connection.cursor() as db_cursor:
113 db_cursor.execute(SQL(query))
114 for row in db_cursor:
115 self.table_phrases_to_delete.add(row[0])
117 def _load_white_and_black_lists(self) \
118 -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
120 Load white and black lists from phrases-settings.json.
122 settings = self.config.load_sub_configuration('phrase-settings.json')
124 return settings['blackList'], settings['whiteList']
126 def _check_sanity(self, phrase: SpecialPhrase) -> bool:
128 Check sanity of given inputs in case somebody added garbage in the wiki.
129 If a bad class/type is detected the system will exit with an error.
131 class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
132 type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
134 if not class_matchs or not type_matchs:
135 LOG.warning("Bad class/type: %s=%s. It will not be imported",
136 phrase.p_class, phrase.p_type)
140 def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
142 Processes the given phrase by checking black and white list
144 Return the class/type pair corresponding to the phrase.
147 # blacklisting: disallow certain class/type combinations
148 if phrase.p_class in self.black_list.keys() \
149 and phrase.p_type in self.black_list[phrase.p_class]:
152 # whitelisting: if class is in whitelist, allow only tags in the list
153 if phrase.p_class in self.white_list.keys() \
154 and phrase.p_type not in self.white_list[phrase.p_class]:
157 # sanity check, in case somebody added garbage in the wiki
158 if not self._check_sanity(phrase):
159 self.statistics_handler.notify_one_phrase_invalid()
162 self.word_phrases.add((phrase.p_label, phrase.p_class,
163 phrase.p_type, phrase.p_operator))
165 return (phrase.p_class, phrase.p_type)
168 def _create_classtype_table_and_indexes(self,
169 class_type_pairs: Iterable[Tuple[str, str]]) -> None:
171 Create table place_classtype for each given pair.
172 Also create indexes on place_id and centroid.
174 LOG.warning('Create tables and indexes...')
176 sql_tablespace = self.config.TABLESPACE_AUX_DATA
178 sql_tablespace = ' TABLESPACE ' + sql_tablespace
180 with self.db_connection.cursor() as db_cursor:
181 db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
183 for pair in class_type_pairs:
184 phrase_class = pair[0]
185 phrase_type = pair[1]
187 table_name = _classtype_table(phrase_class, phrase_type)
189 if table_name in self.table_phrases_to_delete:
190 self.statistics_handler.notify_one_table_ignored()
191 # Remove this table from the ones to delete as it match a
192 # class/type still existing on the special phrases of the wiki.
193 self.table_phrases_to_delete.remove(table_name)
194 # So don't need to create the table and indexes.
198 self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
201 self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
203 # Grant access on read to the web user.
204 self._grant_access_to_webuser(phrase_class, phrase_type)
206 self.statistics_handler.notify_one_table_created()
208 with self.db_connection.cursor() as db_cursor:
209 db_cursor.execute("DROP INDEX idx_placex_classtype")
212 def _create_place_classtype_table(self, sql_tablespace: str,
213 phrase_class: str, phrase_type: str) -> None:
215 Create table place_classtype of the given phrase_class/phrase_type
218 table_name = _classtype_table(phrase_class, phrase_type)
219 with self.db_connection.cursor() as cur:
220 cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
221 SELECT place_id AS place_id,
222 st_centroid(geometry) AS centroid
224 WHERE class = %s AND type = %s
225 """).format(Identifier(table_name), SQL(sql_tablespace)),
226 (phrase_class, phrase_type))
229 def _create_place_classtype_indexes(self, sql_tablespace: str,
230 phrase_class: str, phrase_type: str) -> None:
232 Create indexes on centroid and place_id for the place_classtype table.
234 index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
235 base_table = _classtype_table(phrase_class, phrase_type)
237 if not self.db_connection.index_exists(index_prefix + 'centroid'):
238 with self.db_connection.cursor() as db_cursor:
239 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
240 .format(Identifier(index_prefix + 'centroid'),
241 Identifier(base_table),
242 SQL(sql_tablespace)))
245 if not self.db_connection.index_exists(index_prefix + 'place_id'):
246 with self.db_connection.cursor() as db_cursor:
247 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
248 .format(Identifier(index_prefix + 'place_id'),
249 Identifier(base_table),
250 SQL(sql_tablespace)))
253 def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
255 Grant access on read to the table place_classtype for the webuser.
257 table_name = _classtype_table(phrase_class, phrase_type)
258 with self.db_connection.cursor() as db_cursor:
259 db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
260 .format(Identifier(table_name),
261 Identifier(self.config.DATABASE_WEBUSER)))
263 def _remove_non_existent_tables_from_db(self) -> None:
265 Remove special phrases which doesn't exist on the wiki anymore.
266 Delete the place_classtype tables.
268 LOG.warning('Cleaning database...')
270 # Delete place_classtype tables corresponding to class/type which
271 # are not on the wiki anymore.
272 with self.db_connection.cursor() as db_cursor:
273 for table in self.table_phrases_to_delete:
274 self.statistics_handler.notify_one_table_deleted()
275 db_cursor.drop_table(table)