]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/special_phrases/sp_importer.py
Merge pull request #3487 from lonvia/port-to-psycopg3
[nominatim.git] / src / nominatim_db / tools / special_phrases / sp_importer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8     Module containing the class handling the import
9     of the special phrases.
10
11     Phrases are analyzed and imported into the database.
12
13     The phrases already present in the database which are not
14     valids anymore are removed.
15 """
16 from typing import Iterable, Tuple, Mapping, Sequence, Optional, Set
17 import logging
18 import re
19
20 from psycopg.sql import Identifier, SQL
21
22 from ...typing import Protocol
23 from ...config import Configuration
24 from ...db.connection import Connection, drop_tables, index_exists
25 from .importer_statistics import SpecialPhrasesImporterStatistics
26 from .special_phrase import SpecialPhrase
27 from ...tokenizer.base import AbstractTokenizer
28
29 LOG = logging.getLogger()
30
31 def _classtype_table(phrase_class: str, phrase_type: str) -> str:
32     """ Return the name of the table for the given class and type.
33     """
34     return f'place_classtype_{phrase_class}_{phrase_type}'
35
36
37 class SpecialPhraseLoader(Protocol):
38     """ Protocol for classes implementing a loader for special phrases.
39     """
40
41     def generate_phrases(self) -> Iterable[SpecialPhrase]:
42         """ Generates all special phrase terms this loader can produce.
43         """
44
45
46 class SPImporter():
47     # pylint: disable-msg=too-many-instance-attributes
48     """
49         Class handling the process of special phrases importation into the database.
50
51         Take a sp loader which load the phrases from an external source.
52     """
53     def __init__(self, config: Configuration, conn: Connection,
54                  sp_loader: SpecialPhraseLoader) -> None:
55         self.config = config
56         self.db_connection = conn
57         self.sp_loader = sp_loader
58         self.statistics_handler = SpecialPhrasesImporterStatistics()
59         self.black_list, self.white_list = self._load_white_and_black_lists()
60         self.sanity_check_pattern = re.compile(r'^\w+$')
61         # This set will contain all existing phrases to be added.
62         # It contains tuples with the following format: (label, class, type, operator)
63         self.word_phrases: Set[Tuple[str, str, str, str]] = set()
64         # This set will contain all existing place_classtype tables which doesn't match any
65         # special phrases class/type on the wiki.
66         self.table_phrases_to_delete: Set[str] = set()
67
68     def import_phrases(self, tokenizer: AbstractTokenizer, should_replace: bool) -> None:
69         """
70             Iterate through all SpecialPhrases extracted from the
71             loader and import them into the database.
72
73             If should_replace is set to True only the loaded phrases
74             will be kept into the database. All other phrases already
75             in the database will be removed.
76         """
77         LOG.warning('Special phrases importation starting')
78         self._fetch_existing_place_classtype_tables()
79
80         # Store pairs of class/type for further processing
81         class_type_pairs = set()
82
83         for phrase in self.sp_loader.generate_phrases():
84             result = self._process_phrase(phrase)
85             if result:
86                 class_type_pairs.add(result)
87
88         self._create_classtype_table_and_indexes(class_type_pairs)
89         if should_replace:
90             self._remove_non_existent_tables_from_db()
91         self.db_connection.commit()
92
93         with tokenizer.name_analyzer() as analyzer:
94             analyzer.update_special_phrases(self.word_phrases, should_replace)
95
96         LOG.warning('Import done.')
97         self.statistics_handler.notify_import_done()
98
99
100     def _fetch_existing_place_classtype_tables(self) -> None:
101         """
102             Fetch existing place_classtype tables.
103             Fill the table_phrases_to_delete set of the class.
104         """
105         query = """
106             SELECT table_name
107             FROM information_schema.tables
108             WHERE table_schema='public'
109             AND table_name like 'place_classtype_%';
110         """
111         with self.db_connection.cursor() as db_cursor:
112             db_cursor.execute(SQL(query))
113             for row in db_cursor:
114                 self.table_phrases_to_delete.add(row[0])
115
116     def _load_white_and_black_lists(self) \
117           -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
118         """
119             Load white and black lists from phrases-settings.json.
120         """
121         settings = self.config.load_sub_configuration('phrase-settings.json')
122
123         return settings['blackList'], settings['whiteList']
124
125     def _check_sanity(self, phrase: SpecialPhrase) -> bool:
126         """
127             Check sanity of given inputs in case somebody added garbage in the wiki.
128             If a bad class/type is detected the system will exit with an error.
129         """
130         class_matchs = self.sanity_check_pattern.findall(phrase.p_class)
131         type_matchs = self.sanity_check_pattern.findall(phrase.p_type)
132
133         if not class_matchs or not type_matchs:
134             LOG.warning("Bad class/type: %s=%s. It will not be imported",
135                         phrase.p_class, phrase.p_type)
136             return False
137         return True
138
139     def _process_phrase(self, phrase: SpecialPhrase) -> Optional[Tuple[str, str]]:
140         """
141             Processes the given phrase by checking black and white list
142             and sanity.
143             Return the class/type pair corresponding to the phrase.
144         """
145
146         # blacklisting: disallow certain class/type combinations
147         if phrase.p_class in self.black_list.keys() \
148            and phrase.p_type in self.black_list[phrase.p_class]:
149             return None
150
151         # whitelisting: if class is in whitelist, allow only tags in the list
152         if phrase.p_class in self.white_list.keys() \
153            and phrase.p_type not in self.white_list[phrase.p_class]:
154             return None
155
156         # sanity check, in case somebody added garbage in the wiki
157         if not self._check_sanity(phrase):
158             self.statistics_handler.notify_one_phrase_invalid()
159             return None
160
161         self.word_phrases.add((phrase.p_label, phrase.p_class,
162                                phrase.p_type, phrase.p_operator))
163
164         return (phrase.p_class, phrase.p_type)
165
166
167     def _create_classtype_table_and_indexes(self,
168                                             class_type_pairs: Iterable[Tuple[str, str]]) -> None:
169         """
170             Create table place_classtype for each given pair.
171             Also create indexes on place_id and centroid.
172         """
173         LOG.warning('Create tables and indexes...')
174
175         sql_tablespace = self.config.TABLESPACE_AUX_DATA
176         if sql_tablespace:
177             sql_tablespace = ' TABLESPACE ' + sql_tablespace
178
179         with self.db_connection.cursor() as db_cursor:
180             db_cursor.execute("CREATE INDEX idx_placex_classtype ON placex (class, type)")
181
182         for pair in class_type_pairs:
183             phrase_class = pair[0]
184             phrase_type = pair[1]
185
186             table_name = _classtype_table(phrase_class, phrase_type)
187
188             if table_name in self.table_phrases_to_delete:
189                 self.statistics_handler.notify_one_table_ignored()
190                 # Remove this table from the ones to delete as it match a
191                 # class/type still existing on the special phrases of the wiki.
192                 self.table_phrases_to_delete.remove(table_name)
193                 # So don't need to create the table and indexes.
194                 continue
195
196             # Table creation
197             self._create_place_classtype_table(sql_tablespace, phrase_class, phrase_type)
198
199             # Indexes creation
200             self._create_place_classtype_indexes(sql_tablespace, phrase_class, phrase_type)
201
202             # Grant access on read to the web user.
203             self._grant_access_to_webuser(phrase_class, phrase_type)
204
205             self.statistics_handler.notify_one_table_created()
206
207         with self.db_connection.cursor() as db_cursor:
208             db_cursor.execute("DROP INDEX idx_placex_classtype")
209
210
211     def _create_place_classtype_table(self, sql_tablespace: str,
212                                       phrase_class: str, phrase_type: str) -> None:
213         """
214             Create table place_classtype of the given phrase_class/phrase_type
215             if doesn't exit.
216         """
217         table_name = _classtype_table(phrase_class, phrase_type)
218         with self.db_connection.cursor() as cur:
219             cur.execute(SQL("""CREATE TABLE IF NOT EXISTS {} {} AS
220                                  SELECT place_id AS place_id,
221                                         st_centroid(geometry) AS centroid
222                                  FROM placex
223                                  WHERE class = %s AND type = %s
224                              """).format(Identifier(table_name), SQL(sql_tablespace)),
225                         (phrase_class, phrase_type))
226
227
228     def _create_place_classtype_indexes(self, sql_tablespace: str,
229                                         phrase_class: str, phrase_type: str) -> None:
230         """
231             Create indexes on centroid and place_id for the place_classtype table.
232         """
233         index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
234         base_table = _classtype_table(phrase_class, phrase_type)
235         # Index on centroid
236         if not index_exists(self.db_connection, index_prefix + 'centroid'):
237             with self.db_connection.cursor() as db_cursor:
238                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
239                                   .format(Identifier(index_prefix + 'centroid'),
240                                           Identifier(base_table),
241                                           SQL(sql_tablespace)))
242
243         # Index on place_id
244         if not index_exists(self.db_connection, index_prefix + 'place_id'):
245             with self.db_connection.cursor() as db_cursor:
246                 db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
247                                   .format(Identifier(index_prefix + 'place_id'),
248                                           Identifier(base_table),
249                                           SQL(sql_tablespace)))
250
251
252     def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
253         """
254             Grant access on read to the table place_classtype for the webuser.
255         """
256         table_name = _classtype_table(phrase_class, phrase_type)
257         with self.db_connection.cursor() as db_cursor:
258             db_cursor.execute(SQL("""GRANT SELECT ON {} TO {}""")
259                               .format(Identifier(table_name),
260                                       Identifier(self.config.DATABASE_WEBUSER)))
261
262
263     def _remove_non_existent_tables_from_db(self) -> None:
264         """
265             Remove special phrases which doesn't exist on the wiki anymore.
266             Delete the place_classtype tables.
267         """
268         LOG.warning('Cleaning database...')
269
270         # Delete place_classtype tables corresponding to class/type which
271         # are not on the wiki anymore.
272         drop_tables(self.db_connection, *self.table_phrases_to_delete)
273         for _ in self.table_phrases_to_delete:
274             self.statistics_handler.notify_one_table_deleted()