]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/tokenizer/icu_rule_loader.py
Merge pull request #3050 from mtmail/tiger-check-if-database-frozen
[nominatim.git] / nominatim / tokenizer / icu_rule_loader.py
index ddb17ae76698025dd9c9fd82619137d0a5b1e22e..4c36282ca54bfbd3526d24ead471a3e9fe9dbc33 100644 (file)
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Helper class to create ICU rules from a configuration file.
 """
 """
 Helper class to create ICU rules from a configuration file.
 """
+from typing import Mapping, Any, Dict, Optional
 import io
 import io
+import json
 import logging
 import logging
-from collections import defaultdict
-import itertools
-from pathlib import Path
 
 
-import yaml
 from icu import Transliterator
 
 from icu import Transliterator
 
+from nominatim.config import flatten_config_list, Configuration
+from nominatim.db.properties import set_property, get_property
+from nominatim.db.connection import Connection
 from nominatim.errors import UsageError
 from nominatim.errors import UsageError
+from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
+from nominatim.tokenizer.icu_token_analysis import ICUTokenAnalysis
+from nominatim.tokenizer.token_analysis.base import AnalysisModule, Analyzer
+import nominatim.data.country_info
 
 LOG = logging.getLogger()
 
 
 LOG = logging.getLogger()
 
-def _flatten_yaml_list(content):
-    if not content:
-        return []
+DBCFG_IMPORT_NORM_RULES = "tokenizer_import_normalisation"
+DBCFG_IMPORT_TRANS_RULES = "tokenizer_import_transliteration"
+DBCFG_IMPORT_ANALYSIS_RULES = "tokenizer_import_analysis_rules"
 
 
-    if not isinstance(content, list):
-        raise UsageError("List expected in ICU yaml configuration.")
 
 
-    output = []
-    for ele in content:
-        if isinstance(ele, list):
-            output.extend(_flatten_yaml_list(ele))
-        else:
-            output.append(ele)
+def _get_section(rules: Mapping[str, Any], section: str) -> Any:
+    """ Get the section named 'section' from the rules. If the section does
+        not exist, raise a usage error with a meaningful message.
+    """
+    if section not in rules:
+        LOG.fatal("Section '%s' not found in tokenizer config.", section)
+        raise UsageError("Syntax error in tokenizer configuration file.")
 
 
-    return output
+    return rules[section]
 
 
 class ICURuleLoader:
     """ Compiler for ICU rules from a tokenizer configuration file.
     """
 
 
 
 class ICURuleLoader:
     """ Compiler for ICU rules from a tokenizer configuration file.
     """
 
-    def __init__(self, configfile):
-        self.configfile = configfile
-        self.compound_suffixes = set()
-        self.abbreviations = defaultdict()
+    def __init__(self, config: Configuration) -> None:
+        self.config = config
+        rules = config.load_sub_configuration('icu_tokenizer.yaml',
+                                              config='TOKENIZER_CONFIG')
+
+        # Make sure country information is available to analyzers and sanitizers.
+        nominatim.data.country_info.setup_country_config(config)
+
+        self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
+        self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
+        self.analysis_rules = _get_section(rules, 'token-analysis')
+        self._setup_analysis()
+
+        # Load optional sanitizer rule set.
+        self.sanitizer_rules = rules.get('sanitizers', [])
 
 
-        if configfile.suffix == '.yaml':
-            self._load_from_yaml()
+
+    def load_config_from_db(self, conn: Connection) -> None:
+        """ Get previously saved parts of the configuration from the
+            database.
+        """
+        rules = get_property(conn, DBCFG_IMPORT_NORM_RULES)
+        if rules is not None:
+            self.normalization_rules = rules
+
+        rules = get_property(conn, DBCFG_IMPORT_TRANS_RULES)
+        if rules is not None:
+            self.transliteration_rules = rules
+
+        rules = get_property(conn, DBCFG_IMPORT_ANALYSIS_RULES)
+        if rules:
+            self.analysis_rules = json.loads(rules)
         else:
         else:
-            raise UsageError("Unknown format of tokenizer configuration.")
+            self.analysis_rules = []
+        self._setup_analysis()
+
+
+    def save_config_to_db(self, conn: Connection) -> None:
+        """ Save the part of the configuration that cannot be changed into
+            the database.
+        """
+        set_property(conn, DBCFG_IMPORT_NORM_RULES, self.normalization_rules)
+        set_property(conn, DBCFG_IMPORT_TRANS_RULES, self.transliteration_rules)
+        set_property(conn, DBCFG_IMPORT_ANALYSIS_RULES, json.dumps(self.analysis_rules))
 
 
 
 
-    def get_search_rules(self):
+    def make_sanitizer(self) -> PlaceSanitizer:
+        """ Create a place sanitizer from the configured rules.
+        """
+        return PlaceSanitizer(self.sanitizer_rules, self.config)
+
+
+    def make_token_analysis(self) -> ICUTokenAnalysis:
+        """ Create a token analyser from the reviouly loaded rules.
+        """
+        return ICUTokenAnalysis(self.normalization_rules,
+                                self.transliteration_rules, self.analysis)
+
+
+    def get_search_rules(self) -> str:
         """ Return the ICU rules to be used during search.
             The rules combine normalization and transliteration.
         """
         """ Return the ICU rules to be used during search.
             The rules combine normalization and transliteration.
         """
@@ -58,133 +116,81 @@ class ICURuleLoader:
         rules.write(self.transliteration_rules)
         return rules.getvalue()
 
         rules.write(self.transliteration_rules)
         return rules.getvalue()
 
-    def get_normalization_rules(self):
+
+    def get_normalization_rules(self) -> str:
         """ Return rules for normalisation of a term.
         """
         return self.normalization_rules
 
         """ Return rules for normalisation of a term.
         """
         return self.normalization_rules
 
-    def get_transliteration_rules(self):
+
+    def get_transliteration_rules(self) -> str:
         """ Return the rules for converting a string into its asciii representation.
         """
         return self.transliteration_rules
 
         """ Return the rules for converting a string into its asciii representation.
         """
         return self.transliteration_rules
 
-    def get_replacement_pairs(self):
-        """ Return the list of possible compound decompositions with
-            application of abbreviations included.
-            The result is a list of pairs: the first item is the sequence to
-            replace, the second is a list of replacements.
-        """
-        synonyms = defaultdict(set)
-
-        # First add entries for compound decomposition.
-        for suffix in self.compound_suffixes:
-            variants = (suffix + ' ', ' ' + suffix + ' ')
-            for key in variants:
-                synonyms[key].update(variants)
-
-        for full, abbr in self.abbreviations.items():
-            key = ' ' + full + ' '
-            # Entries in the abbreviation list always apply to full words:
-            synonyms[key].update((' ' + a + ' ' for a in abbr))
-            # Replacements are optional, so add a noop
-            synonyms[key].add(key)
-
-            if full in self.compound_suffixes:
-                # Full word abbreviating to compunded version.
-                synonyms[key].update((a + ' ' for a in abbr))
-
-                key = full + ' '
-                # Uncompunded suffix abbrevitating to decompounded version.
-                synonyms[key].update((' ' + a + ' ' for a in abbr))
-                # Uncompunded suffix abbrevitating to compunded version.
-                synonyms[key].update((a + ' ' for a in abbr))
 
 
-        # sort the resulting list by descending length (longer matches are prefered).
-        sorted_keys = sorted(synonyms.keys(), key=len, reverse=True)
-
-        return [(k, list(synonyms[k])) for k in sorted_keys]
-
-    def _yaml_include_representer(self, loader, node):
-        value = loader.construct_scalar(node)
-
-        if Path(value).is_absolute():
-            content = Path(value).read_text()
-        else:
-            content = (self.configfile.parent / value).read_text()
-
-        return yaml.safe_load(content)
-
-
-    def _load_from_yaml(self):
-        yaml.add_constructor('!include', self._yaml_include_representer,
-                             Loader=yaml.SafeLoader)
-        rules = yaml.safe_load(self.configfile.read_text())
-
-        self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
-        self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
-        self._parse_compound_suffix_list(self._get_section(rules, 'compound_suffixes'))
-        self._parse_abbreviation_list(self._get_section(rules, 'abbreviations'))
-
-
-    def _get_section(self, rules, section):
-        """ Get the section named 'section' from the rules. If the section does
-            not exist, raise a usage error with a meaningful message.
+    def _setup_analysis(self) -> None:
+        """ Process the rules used for creating the various token analyzers.
         """
         """
-        if section not in rules:
-            LOG.fatal("Section '%s' not found in tokenizer config '%s'.",
-                      section, str(self.configfile))
-            raise UsageError("Syntax error in tokenizer configuration file.")
-
-        return rules[section]
+        self.analysis: Dict[Optional[str], TokenAnalyzerRule]  = {}
 
 
+        if not isinstance(self.analysis_rules, list):
+            raise UsageError("Configuration section 'token-analysis' must be a list.")
 
 
-    def _cfg_to_icu_rules(self, rules, section):
+        norm = Transliterator.createFromRules("rule_loader_normalization",
+                                              self.normalization_rules)
+        trans = Transliterator.createFromRules("rule_loader_transliteration",
+                                              self.transliteration_rules)
+
+        for section in self.analysis_rules:
+            name = section.get('id', None)
+            if name in self.analysis:
+                if name is None:
+                    LOG.fatal("ICU tokenizer configuration has two default token analyzers.")
+                else:
+                    LOG.fatal("ICU tokenizer configuration has two token "
+                              "analyzers with id '%s'.", name)
+                raise UsageError("Syntax error in ICU tokenizer config.")
+            self.analysis[name] = TokenAnalyzerRule(section, norm, trans,
+                                                    self.config)
+
+
+    @staticmethod
+    def _cfg_to_icu_rules(rules: Mapping[str, Any], section: str) -> str:
         """ Load an ICU ruleset from the given section. If the section is a
             simple string, it is interpreted as a file name and the rules are
             loaded verbatim from the given file. The filename is expected to be
             relative to the tokenizer rule file. If the section is a list then
             each line is assumed to be a rule. All rules are concatenated and returned.
         """
         """ Load an ICU ruleset from the given section. If the section is a
             simple string, it is interpreted as a file name and the rules are
             loaded verbatim from the given file. The filename is expected to be
             relative to the tokenizer rule file. If the section is a list then
             each line is assumed to be a rule. All rules are concatenated and returned.
         """
-        content = self._get_section(rules, section)
+        content = _get_section(rules, section)
 
         if content is None:
             return ''
 
 
         if content is None:
             return ''
 
-        return ';'.join(_flatten_yaml_list(content)) + ';'
-
-
+        return ';'.join(flatten_config_list(content, section)) + ';'
 
 
-    def _parse_compound_suffix_list(self, rules):
-        if not rules:
-            self.compound_suffixes = set()
-            return
-
-        norm = Transliterator.createFromRules("rule_loader_normalization",
-                                              self.normalization_rules)
 
 
-        # Make sure all suffixes are in their normalised form.
-        self.compound_suffixes = set((norm.transliterate(s) for s in rules))
-
-
-    def _parse_abbreviation_list(self, rules):
-        self.abbreviations = defaultdict(list)
+class TokenAnalyzerRule:
+    """ Factory for a single analysis module. The class saves the configuration
+        and creates a new token analyzer on request.
+    """
 
 
-        if not rules:
-            return
+    def __init__(self, rules: Mapping[str, Any],
+                 normalizer: Any, transliterator: Any,
+                 config: Configuration) -> None:
+        analyzer_name = _get_section(rules, 'analyzer')
+        if not analyzer_name or not isinstance(analyzer_name, str):
+            raise UsageError("'analyzer' parameter needs to be simple string")
 
 
-        norm = Transliterator.createFromRules("rule_loader_normalization",
-                                              self.normalization_rules)
+        self._analysis_mod: AnalysisModule = \
+            config.load_plugin_module(analyzer_name, 'nominatim.tokenizer.token_analysis')
 
 
-        for rule in rules:
-            parts = rule.split('=>')
-            if len(parts) != 2:
-                LOG.fatal("Syntax error in abbreviation section, line: %s", rule)
-                raise UsageError("Syntax error in tokenizer configuration file.")
+        self.config = self._analysis_mod.configure(rules, normalizer,
+                                                   transliterator)
 
 
-            # Make sure all terms match the normalised version.
-            fullterms = (norm.transliterate(t.strip()) for t in parts[0].split(','))
-            abbrterms = (norm.transliterate(t.strip()) for t in parts[1].split(','))
 
 
-            for full, abbr in itertools.product(fullterms, abbrterms):
-                if full and abbr:
-                    self.abbreviations[full].append(abbr)
+    def create(self, normalizer: Any, transliterator: Any) -> Analyzer:
+        """ Create a new analyser instance for the given rule.
+        """
+        return self._analysis_mod.create(normalizer, transliterator, self.config)