import logging
import re
-from psycopg2.sql import Identifier, SQL
+from psycopg.sql import Identifier, SQL
from ...typing import Protocol
from ...config import Configuration
-from ...db.connection import Connection
+from ...db.connection import Connection, drop_tables, index_exists
from .importer_statistics import SpecialPhrasesImporterStatistics
from .special_phrase import SpecialPhrase
from ...tokenizer.base import AbstractTokenizer
LOG = logging.getLogger()
+
def _classtype_table(phrase_class: str, phrase_type: str) -> str:
""" Return the name of the table for the given class and type.
"""
class SPImporter():
- # pylint: disable-msg=too-many-instance-attributes
"""
Class handling the process of special phrases importation into the database.
LOG.warning('Import done.')
self.statistics_handler.notify_import_done()
-
def _fetch_existing_place_classtype_tables(self) -> None:
"""
Fetch existing place_classtype tables.
self.table_phrases_to_delete.add(row[0])
def _load_white_and_black_lists(self) \
- -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
+ -> Tuple[Mapping[str, Sequence[str]], Mapping[str, Sequence[str]]]:
"""
Load white and black lists from phrases-settings.json.
"""
return (phrase.p_class, phrase.p_type)
-
def _create_classtype_table_and_indexes(self,
class_type_pairs: Iterable[Tuple[str, str]]) -> None:
"""
with self.db_connection.cursor() as db_cursor:
db_cursor.execute("DROP INDEX idx_placex_classtype")
-
def _create_place_classtype_table(self, sql_tablespace: str,
phrase_class: str, phrase_type: str) -> None:
"""
""").format(Identifier(table_name), SQL(sql_tablespace)),
(phrase_class, phrase_type))
-
def _create_place_classtype_indexes(self, sql_tablespace: str,
phrase_class: str, phrase_type: str) -> None:
"""
index_prefix = f'idx_place_classtype_{phrase_class}_{phrase_type}_'
base_table = _classtype_table(phrase_class, phrase_type)
# Index on centroid
- if not self.db_connection.index_exists(index_prefix + 'centroid'):
+ if not index_exists(self.db_connection, index_prefix + 'centroid'):
with self.db_connection.cursor() as db_cursor:
db_cursor.execute(SQL("CREATE INDEX {} ON {} USING GIST (centroid) {}")
.format(Identifier(index_prefix + 'centroid'),
SQL(sql_tablespace)))
# Index on place_id
- if not self.db_connection.index_exists(index_prefix + 'place_id'):
+ if not index_exists(self.db_connection, index_prefix + 'place_id'):
with self.db_connection.cursor() as db_cursor:
db_cursor.execute(SQL("CREATE INDEX {} ON {} USING btree(place_id) {}")
.format(Identifier(index_prefix + 'place_id'),
Identifier(base_table),
SQL(sql_tablespace)))
-
def _grant_access_to_webuser(self, phrase_class: str, phrase_type: str) -> None:
"""
Grant access on read to the table place_classtype for the webuser.
# Delete place_classtype tables corresponding to class/type which
# are not on the wiki anymore.
- with self.db_connection.cursor() as db_cursor:
- for table in self.table_phrases_to_delete:
- self.statistics_handler.notify_one_table_deleted()
- db_cursor.drop_table(table)
+ drop_tables(self.db_connection, *self.table_phrases_to_delete)
+ for _ in self.table_phrases_to_delete:
+ self.statistics_handler.notify_one_table_deleted()