+ def __init__(self, config, phplib_dir, db_connection) -> None:
+ self.db_connection = db_connection
+ self.config = config
+ self.phplib_dir = phplib_dir
+ self.black_list, self.white_list = self._load_white_and_black_lists()
+ #Compile the regex here to increase performances.
+ self.occurence_pattern = re.compile(
+ r'\| *([^\|]+) *\|\| *([^\|]+) *\|\| *([^\|]+) *\|\| *([^\|]+) *\|\| *([\-YN])'
+ )
+ self.sanity_check_pattern = re.compile(r'^\w+$')
+ self.transliterator = Transliterator.createFromRules("special-phrases normalizer",
+ self.config.TERM_NORMALIZATION)
+ #This set will contain all existing phrases from the word table which
+ #no longer exist on the wiki.
+ #It contain tuples with the following format: (normalized_word, class, type, operator)
+ self.words_phrases_to_delete = set()
+ #This set will contain the phrases which still exist from the wiki.
+ #It is used to prevent duplicates on the wiki by removing them from
+ #the word_phrases_to_delete only at the end.
+ self.words_phrases_still_exist = set()
+ #This set will contain all existing place_classtype tables which doesn't match any
+ #special phrases class/type on the wiki.
+ self.table_phrases_to_delete = set()
+
+ def import_from_wiki(self, languages=None):
+ """
+ Iterate through all specified languages and
+ extract corresponding special phrases from the wiki.
+ """
+ if languages is not None and not isinstance(languages, list):
+ raise TypeError('The \'languages\' argument should be of type list.')
+
+ self._fetch_existing_words_phrases()
+ self._fetch_existing_place_classtype_tables()
+
+ #Get all languages to process.
+ languages = self._load_languages() if not languages else languages
+
+ #Store pairs of class/type for further processing
+ class_type_pairs = set()
+
+ for lang in languages:
+ LOG.warning('Import phrases for lang: %s', lang)
+ wiki_page_xml_content = SpecialPhrasesImporter._get_wiki_content(lang)
+ class_type_pairs.update(self._process_xml_content(wiki_page_xml_content, lang))
+
+ self._create_place_classtype_table_and_indexes(class_type_pairs)
+ self._remove_non_existent_phrases_from_db()
+ self.db_connection.commit()
+ LOG.warning('Import done.')
+
+ def _fetch_existing_words_phrases(self):
+ """
+ Fetch existing special phrases from the word table.
+ Fill the word_phrases_to_delete set of the class.
+ """
+ #Only extract special phrases terms:
+ #If class=place and type=house then it is a housenumber term.
+ #If class=place and type=postcode then it is a postcode term.
+ word_query = """
+ SELECT word, class, type, operator FROM word
+ WHERE class != 'place' OR (type != 'house' AND type != 'postcode')
+ """
+ with self.db_connection.cursor() as db_cursor:
+ db_cursor.execute(SQL(word_query))
+ for row in db_cursor:
+ row[3] = '-' if row[3] is None else row[3]
+ self.words_phrases_to_delete.add(
+ (row[0], row[1], row[2], row[3])
+ )
+
+ def _fetch_existing_place_classtype_tables(self):
+ """
+ Fetch existing place_classtype tables.
+ Fill the table_phrases_to_delete set of the class.
+ """
+ query = """
+ SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema='public'
+ AND table_name like 'place_classtype_%';
+ """
+ with self.db_connection.cursor() as db_cursor:
+ db_cursor.execute(SQL(query))
+ for row in db_cursor:
+ self.table_phrases_to_delete.add(row[0])
+
+ def _load_white_and_black_lists(self):
+ """
+ Load white and black lists from phrases-settings.json.
+ """
+ settings_path = (self.config.config_dir / 'phrase-settings.json').resolve()
+
+ if self.config.PHRASE_CONFIG:
+ settings_path = self._convert_php_settings_if_needed(self.config.PHRASE_CONFIG)
+
+ with settings_path.open("r") as json_settings:
+ settings = json.load(json_settings)
+ return settings['blackList'], settings['whiteList']
+
+ def _load_languages(self):
+ """
+ Get list of all languages from env config file
+ or default if there is no languages configured.
+ The system will extract special phrases only from all specified languages.
+ """
+ default_languages = [
+ 'af', 'ar', 'br', 'ca', 'cs', 'de', 'en', 'es',
+ 'et', 'eu', 'fa', 'fi', 'fr', 'gl', 'hr', 'hu',
+ 'ia', 'is', 'it', 'ja', 'mk', 'nl', 'no', 'pl',
+ 'ps', 'pt', 'ru', 'sk', 'sl', 'sv', 'uk', 'vi']
+ return self.config.LANGUAGES.split(',') if self.config.LANGUAGES else default_languages
+
+ @staticmethod
+ def _get_wiki_content(lang):
+ """
+ Request and return the wiki page's content
+ corresponding to special phrases for a given lang.
+ Requested URL Example :
+ https://wiki.openstreetmap.org/wiki/Special:Export/Nominatim/Special_Phrases/EN
+ """
+ url = 'https://wiki.openstreetmap.org/wiki/Special:Export/Nominatim/Special_Phrases/' + lang.upper() # pylint: disable=line-too-long
+ return get_url(url)
+
+ def _check_sanity(self, lang, phrase_class, phrase_type):
+ """
+ Check sanity of given inputs in case somebody added garbage in the wiki.
+ If a bad class/type is detected the system will exit with an error.
+ """
+ type_matchs = self.sanity_check_pattern.findall(phrase_type)
+ class_matchs = self.sanity_check_pattern.findall(phrase_class)
+
+ if not class_matchs or not type_matchs:
+ LOG.warning("Bad class/type for language %s: %s=%s. It will not be imported",
+ lang, phrase_class, phrase_type)
+ return False
+ return True
+
+ def _process_xml_content(self, xml_content, lang):
+ """
+ Process given xml content by extracting matching patterns.
+ Matching patterns are processed there and returned in a
+ set of class/type pairs.
+ """