]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
Merge pull request #3582 from lonvia/switch-to-flake
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19
20 from psycopg.sql import Identifier, SQL
21
22 from ...typing import Protocol
23 from ...config import Configuration
24 from ...db.connection import Connection, drop_tables, index_exists
25 from .importer_statistics import SpecialPhrasesImporterStatistics
26 from .special_phrase import SpecialPhrase
27 from ...tokenizer.base import AbstractTokenizer
28
29 LOG = logging.getLogger()
30
31
32 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
33     """ Return the name of the table for the given class and type.
34     """
35     return f'place_classtype_{phrase_class}_{phrase_type}'
36
37
38 class SpecialPhraseLoader(Protocol):
39     """ Protocol for classes implementing a loader for special phrases.
40     """
41
42     def generate_phrases(self) -> Iterable[SpecialPhrase]:
43         """ Generates all special phrase terms this loader can produce.
44         """
45
46
47 class SPImporter():
48     """
49         Class handling the process of special phrases importation into the database.
50
51         Take a sp loader which load the phrases from an external source.
52     """
53     def __init__(self, config: Configuration, conn: Connection,
54                  sp_loader: SpecialPhraseLoader) -> None:
55         self.config = config
56         self.db_connection = conn
57         self.sp_loader = sp_loader
58         self.statistics_handler = SpecialPhrasesImporterStatistics()
59         self.black_list, self.white_list = self._load_white_and_black_lists()
60         self.sanity_check_pattern = re.compile(r'^\w+$')
61         # This set will contain all existing phrases to be added.
62         # It contains tuples with the following format: (label, class, type, operator)
63         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
64         # This set will contain all existing place_classtype tables which doesn't match any
65         # special phrases class/type on the wiki.
66         self.table_phrases_to_delete: Set[str] = set()
67
68     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
69         """
70             Iterate through all SpecialPhrases extracted from the
71             loader and import them into the database.
72
73             If should_replace is set to True only the loaded phrases
74             will be kept into the database. All other phrases already
75             in the database will be removed.
76         """
77         LOG.warning('Special phrases importation starting')
78         self._fetch_existing_place_classtype_tables()
79
80         # Store pairs of class/type for further processing
81         class_type_pairs = set()
82
83         for phrase in self.sp_loader.generate_phrases():
84             result = self._process_phrase(phrase)
85             if result:
86                 class_type_pairs.add(result)
87
88         self._create_classtype_table_and_indexes(class_type_pairs)
89         if should_replace:
90             self._remove_non_existent_tables_from_db()
91         self.db_connection.commit()
92
93         with tokenizer.name_analyzer() as analyzer:
94             analyzer.update_special_phrases(self.word_phrases, should_replace)
95
96         LOG.warning('Import done.')
97         self.statistics_handler.notify_import_done()
98
99     def _fetch_existing_place_classtype_tables(self) -> None:
100         """
101             Fetch existing place_classtype tables.
102             Fill the table_phrases_to_delete set of the class.
103         """
104         query = """
105             SELECT table_name
106             FROM information_schema.tables
107             WHERE table_schema='public'
108             AND table_name like 'place_classtype_%';
109         """
110         with self.db_connection.cursor() as db_cursor:
111             db_cursor.execute(SQL(query))
112             for row in db_cursor:
113                 self.table_phrases_to_delete.add(row[0])
114
115     def _load_white_and_black_lists(self) \
116             -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
117         """
118             Load white and black lists from phrases-settings.json.
119         """
120         settings = self.config.load_sub_configuration('phrase-settings.json')
121
122         return settings['blackList'], settings['whiteList']
123
124     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
125         """
126             Check sanity of given inputs in case somebody added garbage in the wiki.
127             If a bad class/type is detected the system will exit with an error.
128         """
129         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
130         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
131
132         if not class_matchs or not type_matchs:
133             LOG.warning("Bad class/type: %s=%s. It will not be imported",
134                         phrase.p_class, phrase.p_type)
135             return False
136         return True
137
138     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
139         """
140             Processes the given phrase by checking black and white list
141             and sanity.
142             Return the class/type pair corresponding to the phrase.
143         """
144
145         # blacklisting: disallow certain class/type combinations
146         if phrase.p_class in self.black_list.keys() \
147            and phrase.p_type in self.black_list[phrase.p_class]:
148             return None
149
150         # whitelisting: if class is in whitelist, allow only tags in the list
151         if phrase.p_class in self.white_list.keys() \
152            and phrase.p_type not in self.white_list[phrase.p_class]:
153             return None
154
155         # sanity check, in case somebody added garbage in the wiki
156         if not self._check_sanity(phrase):
157             self.statistics_handler.notify_one_phrase_invalid()
158             return None
159
160         self.word_phrases.add((phrase.p_label, phrase.p_class,
161                                phrase.p_type, phrase.p_operator))
162
163         return (phrase.p_class, phrase.p_type)
164
165     def _create_classtype_table_and_indexes(self,
166                                             class_type_pairs: Iterable[Tuple[str, str]]) -> None:
167         """
168             Create table place_classtype for each given pair.
169             Also create indexes on place_id and centroid.
170         """
171         LOG.warning('Create tables and indexes...')
172
173         sql_tablespace = self.config.TABLESPACE_AUX_DATA
174         if sql_tablespace:
175             sql_tablespace = ' TABLESPACE ' + sql_tablespace
176
177         with self.db_connection.cursor() as db_cursor:
178             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
179
180         for pair in class_type_pairs:
181             phrase_class = pair[0]
182             phrase_type = pair[1]
183
184             table_name = _classtype_table(phrase_class, phrase_type)
185
186             if table_name in self.table_phrases_to_delete:
187                 self.statistics_handler.notify_one_table_ignored()
188                 # Remove this table from the ones to delete as it match a
189                 # class/type still existing on the special phrases of the wiki.
190                 self.table_phrases_to_delete.remove(table_name)
191                 # So don't need to create the table and indexes.
192                 continue
193
194             # Table creation
195             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
196
197             # Indexes creation
198             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
199
200             # Grant access on read to the web user.
201             self._grant_access_to_webuser(phrase_class, phrase_type)
202
203             self.statistics_handler.notify_one_table_created()
204
205         with self.db_connection.cursor() as db_cursor:
206             db_cursor.execute("DROP INDEX idx_placex_classtype")
207
208     def _create_place_classtype_table(self, sql_tablespace: str,
209                                       phrase_class: str, phrase_type: str) -> None:
210         """
211             Create table place_classtype of the given phrase_class/phrase_type
212             if doesn't exit.
213         """
214         table_name = _classtype_table(phrase_class, phrase_type)
215         with self.db_connection.cursor() as cur:
216             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
217                                  SELECT place_id AS place_id,
218                                         st_centroid(geometry) AS centroid
219                                  FROM placex
220                                  WHERE class = %s AND type = %s
221                              """).format(Identifier(table_name), SQL(sql_tablespace)),
222                         (phrase_class, phrase_type))
223
224     def _create_place_classtype_indexes(self, sql_tablespace: str,
225                                         phrase_class: str, phrase_type: str) -> None:
226         """
227             Create indexes on centroid and place_id for the place_classtype table.
228         """
229         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
230         base_table = _classtype_table(phrase_class, phrase_type)
231         # Index on centroid
232         if not index_exists(self.db_connection, index_prefix + 'centroid'):
233             with self.db_connection.cursor() as db_cursor:
234                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
235                                   .format(Identifier(index_prefix + 'centroid'),
236                                           Identifier(base_table),
237                                           SQL(sql_tablespace)))
238
239         # Index on place_id
240         if not index_exists(self.db_connection, index_prefix + 'place_id'):
241             with self.db_connection.cursor() as db_cursor:
242                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
243                                   .format(Identifier(index_prefix + 'place_id'),
244                                           Identifier(base_table),
245                                           SQL(sql_tablespace)))
246
247     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
248         """
249             Grant access on read to the table place_classtype for the webuser.
250         """
251         table_name = _classtype_table(phrase_class, phrase_type)
252         with self.db_connection.cursor() as db_cursor:
253             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
254                               .format(Identifier(table_name),
255                                       Identifier(self.config.DATABASE_WEBUSER)))
256
257     def _remove_non_existent_tables_from_db(self) -> None:
258         """
259             Remove special phrases which doesn't exist on the wiki anymore.
260             Delete the place_classtype tables.
261         """
262         LOG.warning('Cleaning database...')
263
264         # Delete place_classtype tables corresponding to class/type which
265         # are not on the wiki anymore.
266         drop_tables(self.db_connection, *self.table_phrases_to_delete)
267         for _ in self.table_phrases_to_delete:
268             self.statistics_handler.notify_one_table_deleted()