X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/efafa5271957fb54b356ec1c90e8613f14de40d4..1c33cb3186a38ceb5cc4de0975ae1956c861f9b5:/nominatim/tokenizer/token_analysis/generic.py diff --git a/nominatim/tokenizer/token_analysis/generic.py b/nominatim/tokenizer/token_analysis/generic.py index f790dad2..1ed9bf4d 100644 --- a/nominatim/tokenizer/token_analysis/generic.py +++ b/nominatim/tokenizer/token_analysis/generic.py @@ -7,151 +7,53 @@ """ Generic processor for names that creates abbreviation variants. """ -from collections import defaultdict, namedtuple +from typing import Mapping, Dict, Any, Iterable, Iterator, Optional, List, cast import itertools -import re -from icu import Transliterator import datrie -from nominatim.config import flatten_config_list from nominatim.errors import UsageError +from nominatim.data.place_name import PlaceName +from nominatim.tokenizer.token_analysis.config_variants import get_variant_config +from nominatim.tokenizer.token_analysis.generic_mutation import MutationVariantGenerator ### Configuration section -ICUVariant = namedtuple('ICUVariant', ['source', 'replacement']) - -def configure(rules, normalization_rules): +def configure(rules: Mapping[str, Any], normalizer: Any, _: Any) -> Dict[str, Any]: """ Extract and preprocess the configuration for this module. """ - config = {} + config: Dict[str, Any] = {} - config['replacements'], config['chars'] = _get_variant_config(rules.get('variants'), - normalization_rules) + config['replacements'], config['chars'] = get_variant_config(rules.get('variants'), + normalizer) config['variant_only'] = rules.get('mode', '') == 'variant-only' - return config - - -def _get_variant_config(rules, normalization_rules): - """ Convert the variant definition from the configuration into - replacement sets. - """ - immediate = defaultdict(list) - chars = set() - - if rules: - vset = set() - rules = flatten_config_list(rules, 'variants') - - vmaker = _VariantMaker(normalization_rules) - - for section in rules: - for rule in (section.get('words') or []): - vset.update(vmaker.compute(rule)) - - # Intermediate reorder by source. Also compute required character set. - for variant in vset: - if variant.source[-1] == ' ' and variant.replacement[-1] == ' ': - replstr = variant.replacement[:-1] - else: - replstr = variant.replacement - immediate[variant.source].append(replstr) - chars.update(variant.source) - - return list(immediate.items()), ''.join(chars) - - -class _VariantMaker: - """ Generater for all necessary ICUVariants from a single variant rule. - - All text in rules is normalized to make sure the variants match later. - """ - - def __init__(self, norm_rules): - self.norm = Transliterator.createFromRules("rule_loader_normalization", - norm_rules) - - - def compute(self, rule): - """ Generator for all ICUVariant tuples from a single variant rule. - """ - parts = re.split(r'(\|)?([=-])>', rule) - if len(parts) != 4: - raise UsageError("Syntax error in variant rule: " + rule) - - decompose = parts[1] is None - src_terms = [self._parse_variant_word(t) for t in parts[0].split(',')] - repl_terms = (self.norm.transliterate(t).strip() for t in parts[3].split(',')) - - # If the source should be kept, add a 1:1 replacement - if parts[2] == '-': - for src in src_terms: - if src: - for froms, tos in _create_variants(*src, src[0], decompose): - yield ICUVariant(froms, tos) - - for src, repl in itertools.product(src_terms, repl_terms): - if src and repl: - for froms, tos in _create_variants(*src, repl, decompose): - yield ICUVariant(froms, tos) - - - def _parse_variant_word(self, name): - name = name.strip() - match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name) - if match is None or (match.group(1) == '~' and match.group(3) == '~'): - raise UsageError("Invalid variant word descriptor '{}'".format(name)) - norm_name = self.norm.transliterate(match.group(2)).strip() - if not norm_name: - return None - - return norm_name, match.group(1), match.group(3) + # parse mutation rules + config['mutations'] = [] + for rule in rules.get('mutations', []): + if 'pattern' not in rule: + raise UsageError("Missing field 'pattern' in mutation configuration.") + if not isinstance(rule['pattern'], str): + raise UsageError("Field 'pattern' in mutation configuration " + "must be a simple text field.") + if 'replacements' not in rule: + raise UsageError("Missing field 'replacements' in mutation configuration.") + if not isinstance(rule['replacements'], list): + raise UsageError("Field 'replacements' in mutation configuration " + "must be a list of texts.") + + config['mutations'].append((rule['pattern'], rule['replacements'])) - -_FLAG_MATCH = {'^': '^ ', - '$': ' ^', - '': ' '} - - -def _create_variants(src, preflag, postflag, repl, decompose): - if preflag == '~': - postfix = _FLAG_MATCH[postflag] - # suffix decomposition - src = src + postfix - repl = repl + postfix - - yield src, repl - yield ' ' + src, ' ' + repl - - if decompose: - yield src, ' ' + repl - yield ' ' + src, repl - elif postflag == '~': - # prefix decomposition - prefix = _FLAG_MATCH[preflag] - src = prefix + src - repl = prefix + repl - - yield src, repl - yield src + ' ', repl + ' ' - - if decompose: - yield src, repl + ' ' - yield src + ' ', repl - else: - prefix = _FLAG_MATCH[preflag] - postfix = _FLAG_MATCH[postflag] - - yield prefix + src + postfix, prefix + repl + postfix + return config ### Analysis section -def create(transliterator, config): +def create(normalizer: Any, transliterator: Any, + config: Mapping[str, Any]) -> 'GenericTokenAnalysis': """ Create a new token analysis instance for this module. """ - return GenericTokenAnalysis(transliterator, config) + return GenericTokenAnalysis(normalizer, transliterator, config) class GenericTokenAnalysis: @@ -159,7 +61,8 @@ class GenericTokenAnalysis: and provides the functions to apply the transformations. """ - def __init__(self, to_ascii, config): + def __init__(self, norm: Any, to_ascii: Any, config: Mapping[str, Any]) -> None: + self.norm = norm self.to_ascii = to_ascii self.variant_only = config['variant_only'] @@ -171,19 +74,51 @@ class GenericTokenAnalysis: else: self.replacements = None + # set up mutation rules + self.mutations = [MutationVariantGenerator(*cfg) for cfg in config['mutations']] + + + def get_canonical_id(self, name: PlaceName) -> str: + """ Return the normalized form of the name. This is the standard form + from which possible variants for the name can be derived. + """ + return cast(str, self.norm.transliterate(name.name)).strip() + - def get_variants_ascii(self, norm_name): + def compute_variants(self, norm_name: str) -> List[str]: """ Compute the spelling variants for the given normalized name and transliterate the result. """ + variants = self._generate_word_variants(norm_name) + + for mutation in self.mutations: + variants = mutation.generate(variants) + + return [name for name in self._transliterate_unique_list(norm_name, variants) if name] + + + def _transliterate_unique_list(self, norm_name: str, + iterable: Iterable[str]) -> Iterator[Optional[str]]: + seen = set() + if self.variant_only: + seen.add(norm_name) + + for variant in map(str.strip, iterable): + if variant not in seen: + seen.add(variant) + yield self.to_ascii.transliterate(variant).strip() + + + def _generate_word_variants(self, norm_name: str) -> Iterable[str]: baseform = '^ ' + norm_name + ' ^' + baselen = len(baseform) partials = [''] startpos = 0 if self.replacements is not None: pos = 0 force_space = False - while pos < len(baseform): + while pos < baselen: full, repl = self.replacements.longest_prefix_item(baseform[pos:], (None, None)) if full is not None: @@ -207,24 +142,9 @@ class GenericTokenAnalysis: # No variants detected? Fast return. if startpos == 0: - if self.variant_only: - return [] - - trans_name = self.to_ascii.transliterate(norm_name).strip() - return [trans_name] if trans_name else [] - - return self._compute_result_set(partials, baseform[startpos:], - norm_name if self.variant_only else '') - - - def _compute_result_set(self, partials, prefix, exclude): - results = set() + return (norm_name, ) - for variant in partials: - vname = (variant + prefix)[1:-1].strip() - if vname != exclude: - trans_name = self.to_ascii.transliterate(vname).strip() - if trans_name: - results.add(trans_name) + if startpos < baselen: + return (part[1:] + baseform[startpos:-1] for part in partials) - return list(results) + return (part[1:-1] for part in partials)