1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Module containing the class handling the import
9 of the special phrases.
11 Phrases are analyzed and imported into the database.
13 The phrases already present in the database which are not
14 valids anymore are removed.
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
20 from psycopg2.sql import Identifier, SQL
22 from nominatim.config import Configuration
23 from nominatim.db.connection import Connection
24 from nominatim.tools.special_phrases.importer_statistics import SpecialPhrasesImporterStatistics
25 from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
26 from nominatim.tokenizer.base import AbstractTokenizer
27 from nominatim.typing import Protocol
29 LOG = logging.getLogger()
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32 """ Return the name of the table for the given class and type.
34 return f'place_classtype_{phrase_class}_{phrase_type}'
37 class SpecialPhraseLoader(Protocol):
38 """ Protocol for classes implementing a loader for special phrases.
41 def generate_phrases(self) -> Iterable[SpecialPhrase]:
42 """ Generates all special phrase terms this loader can produce.
47 # pylint: disable-msg=too-many-instance-attributes
49 Class handling the process of special phrases importation into the database.
51 Take a sp loader which load the phrases from an external source.
53 def __init__(self, config: Configuration, conn: Connection,
54 sp_loader: SpecialPhraseLoader) -> None:
56 self.db_connection = conn
57 self.sp_loader = sp_loader
58 self.statistics_handler = SpecialPhrasesImporterStatistics()
59 self.black_list, self.white_list = self._load_white_and_black_lists()
60 self.sanity_check_pattern = re.compile(r'^\w+$')
61 # This set will contain all existing phrases to be added.
62 # It contains tuples with the following format: (label, class, type, operator)
63 self.word_phrases: Set[Tuple[str, str, str, str]] = set()
64 # This set will contain all existing place_classtype tables which doesn't match any
65 # special phrases class/type on the wiki.
66 self.table_phrases_to_delete: Set[str] = set()
68 def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
70 Iterate through all SpecialPhrases extracted from the
71 loader and import them into the database.
73 If should_replace is set to True only the loaded phrases
74 will be kept into the database. All other phrases already
75 in the database will be removed.
77 LOG.warning('Special phrases importation starting')
78 self._fetch_existing_place_classtype_tables()
80 # Store pairs of class/type for further processing
81 class_type_pairs = set()
83 for phrase in self.sp_loader.generate_phrases():
84 result = self._process_phrase(phrase)
86 class_type_pairs.add(result)
88 self._create_classtype_table_and_indexes(class_type_pairs)
90 self._remove_non_existent_tables_from_db()
91 self.db_connection.commit()
93 with tokenizer.name_analyzer() as analyzer:
94 analyzer.update_special_phrases(self.word_phrases, should_replace)
96 LOG.warning('Import done.')
97 self.statistics_handler.notify_import_done()
100 def _fetch_existing_place_classtype_tables(self) -> None:
102 Fetch existing place_classtype tables.
103 Fill the table_phrases_to_delete set of the class.
107 FROM information_schema.tables
108 WHERE table_schema='public'
109 AND table_name like 'place_classtype_%';
111 with self.db_connection.cursor() as db_cursor:
112 db_cursor.execute(SQL(query))
113 for row in db_cursor:
114 self.table_phrases_to_delete.add(row[0])
116 def _load_white_and_black_lists(self) \
117 -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
119 Load white and black lists from phrases-settings.json.
121 settings = self.config.load_sub_configuration('phrase-settings.json')
123 return settings['blackList'], settings['whiteList']
125 def _check_sanity(self, phrase: SpecialPhrase) -> bool:
127 Check sanity of given inputs in case somebody added garbage in the wiki.
128 If a bad class/type is detected the system will exit with an error.
130 class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
131 type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
133 if not class_matchs or not type_matchs:
134 LOG.warning("Bad class/type: %s=%s. It will not be imported",
135 phrase.p_class, phrase.p_type)
139 def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
141 Processes the given phrase by checking black and white list
143 Return the class/type pair corresponding to the phrase.
146 # blacklisting: disallow certain class/type combinations
147 if phrase.p_class in self.black_list.keys() \
148 and phrase.p_type in self.black_list[phrase.p_class]:
151 # whitelisting: if class is in whitelist, allow only tags in the list
152 if phrase.p_class in self.white_list.keys() \
153 and phrase.p_type not in self.white_list[phrase.p_class]:
156 # sanity check, in case somebody added garbage in the wiki
157 if not self._check_sanity(phrase):
158 self.statistics_handler.notify_one_phrase_invalid()
161 self.word_phrases.add((phrase.p_label, phrase.p_class,
162 phrase.p_type, phrase.p_operator))
164 return (phrase.p_class, phrase.p_type)
167 def _create_classtype_table_and_indexes(self,
168 class_type_pairs: Iterable[Tuple[str, str]]) -> None:
170 Create table place_classtype for each given pair.
171 Also create indexes on place_id and centroid.
173 LOG.warning('Create tables and indexes...')
175 sql_tablespace = self.config.TABLESPACE_AUX_DATA
177 sql_tablespace = ' TABLESPACE ' + sql_tablespace
179 with self.db_connection.cursor() as db_cursor:
180 db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
182 for pair in class_type_pairs:
183 phrase_class = pair[0]
184 phrase_type = pair[1]
186 table_name = _classtype_table(phrase_class, phrase_type)
188 if table_name in self.table_phrases_to_delete:
189 self.statistics_handler.notify_one_table_ignored()
190 # Remove this table from the ones to delete as it match a
191 # class/type still existing on the special phrases of the wiki.
192 self.table_phrases_to_delete.remove(table_name)
193 # So don't need to create the table and indexes.
197 self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
200 self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
202 # Grant access on read to the web user.
203 self._grant_access_to_webuser(phrase_class, phrase_type)
205 self.statistics_handler.notify_one_table_created()
207 with self.db_connection.cursor() as db_cursor:
208 db_cursor.execute("DROP INDEX idx_placex_classtype")
211 def _create_place_classtype_table(self, sql_tablespace: str,
212 phrase_class: str, phrase_type: str) -> None:
214 Create table place_classtype of the given phrase_class/phrase_type
217 table_name = _classtype_table(phrase_class, phrase_type)
218 with self.db_connection.cursor() as cur:
219 cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
220 SELECT place_id AS place_id,
221 st_centroid(geometry) AS centroid
223 WHERE class = %s AND type = %s
224 """).format(Identifier(table_name), SQL(sql_tablespace)),
225 (phrase_class, phrase_type))
228 def _create_place_classtype_indexes(self, sql_tablespace: str,
229 phrase_class: str, phrase_type: str) -> None:
231 Create indexes on centroid and place_id for the place_classtype table.
233 index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
234 base_table = _classtype_table(phrase_class, phrase_type)
236 if not self.db_connection.index_exists(index_prefix + 'centroid'):
237 with self.db_connection.cursor() as db_cursor:
238 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
239 .format(Identifier(index_prefix + 'centroid'),
240 Identifier(base_table),
241 SQL(sql_tablespace)))
244 if not self.db_connection.index_exists(index_prefix + 'place_id'):
245 with self.db_connection.cursor() as db_cursor:
246 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
247 .format(Identifier(index_prefix + 'place_id'),
248 Identifier(base_table),
249 SQL(sql_tablespace)))
252 def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
254 Grant access on read to the table place_classtype for the webuser.
256 table_name = _classtype_table(phrase_class, phrase_type)
257 with self.db_connection.cursor() as db_cursor:
258 db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
259 .format(Identifier(table_name),
260 Identifier(self.config.DATABASE_WEBUSER)))
262 def _remove_non_existent_tables_from_db(self) -> None:
264 Remove special phrases which doesn't exist on the wiki anymore.
265 Delete the place_classtype tables.
267 LOG.warning('Cleaning database...')
269 # Delete place_classtype tables corresponding to class/type which
270 # are not on the wiki anymore.
271 with self.db_connection.cursor() as db_cursor:
272 for table in self.table_phrases_to_delete:
273 self.statistics_handler.notify_one_table_deleted()
274 db_cursor.drop_table(table)