X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c171d881945e8c744e4a4a44c5f33edd0d8468fb..7c869701f02369873c5573236398ec885c4a951f:/nominatim/tokenizer/token_analysis/generic.py diff --git a/nominatim/tokenizer/token_analysis/generic.py b/nominatim/tokenizer/token_analysis/generic.py index f0de0cca..1ed9bf4d 100644 --- a/nominatim/tokenizer/token_analysis/generic.py +++ b/nominatim/tokenizer/token_analysis/generic.py @@ -1,234 +1,150 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2022 by the Nominatim developer community. +# For a full list of authors see the git log. """ Generic processor for names that creates abbreviation variants. """ -from collections import defaultdict +from typing import Mapping, Dict, Any, Iterable, Iterator, Optional, List, cast import itertools -import re -from icu import Transliterator import datrie -from nominatim.config import flatten_config_list from nominatim.errors import UsageError -import nominatim.tokenizer.icu_variants as variants +from nominatim.data.place_name import PlaceName +from nominatim.tokenizer.token_analysis.config_variants import get_variant_config +from nominatim.tokenizer.token_analysis.generic_mutation import MutationVariantGenerator ### Configuration section -def configure(rules, normalization_rules): +def configure(rules: Mapping[str, Any], normalizer: Any, _: Any) -> Dict[str, Any]: """ Extract and preprocess the configuration for this module. """ - return {'variants': _parse_variant_list(rules.get('variants'), - normalization_rules)} + config: Dict[str, Any] = {} + config['replacements'], config['chars'] = get_variant_config(rules.get('variants'), + normalizer) + config['variant_only'] = rules.get('mode', '') == 'variant-only' -def _parse_variant_list(rules, normalization_rules): - vset = set() + # parse mutation rules + config['mutations'] = [] + for rule in rules.get('mutations', []): + if 'pattern' not in rule: + raise UsageError("Missing field 'pattern' in mutation configuration.") + if not isinstance(rule['pattern'], str): + raise UsageError("Field 'pattern' in mutation configuration " + "must be a simple text field.") + if 'replacements' not in rule: + raise UsageError("Missing field 'replacements' in mutation configuration.") + if not isinstance(rule['replacements'], list): + raise UsageError("Field 'replacements' in mutation configuration " + "must be a list of texts.") - if rules: - rules = flatten_config_list(rules, 'variants') + config['mutations'].append((rule['pattern'], rule['replacements'])) - vmaker = _VariantMaker(normalization_rules) + return config - properties = [] - for section in rules: - # Create the property field and deduplicate against existing - # instances. - props = variants.ICUVariantProperties.from_rules(section) - for existing in properties: - if existing == props: - props = existing - break - else: - properties.append(props) - for rule in (section.get('words') or []): - vset.update(vmaker.compute(rule, props)) - - return vset - - -class _VariantMaker: - """ Generater for all necessary ICUVariants from a single variant rule. +### Analysis section - All text in rules is normalized to make sure the variants match later. +def create(normalizer: Any, transliterator: Any, + config: Mapping[str, Any]) -> 'GenericTokenAnalysis': + """ Create a new token analysis instance for this module. """ + return GenericTokenAnalysis(normalizer, transliterator, config) - def __init__(self, norm_rules): - self.norm = Transliterator.createFromRules("rule_loader_normalization", - norm_rules) - - - def compute(self, rule, props): - """ Generator for all ICUVariant tuples from a single variant rule. - """ - parts = re.split(r'(\|)?([=-])>', rule) - if len(parts) != 4: - raise UsageError("Syntax error in variant rule: " + rule) - - decompose = parts[1] is None - src_terms = [self._parse_variant_word(t) for t in parts[0].split(',')] - repl_terms = (self.norm.transliterate(t.strip()) for t in parts[3].split(',')) - - # If the source should be kept, add a 1:1 replacement - if parts[2] == '-': - for src in src_terms: - if src: - for froms, tos in _create_variants(*src, src[0], decompose): - yield variants.ICUVariant(froms, tos, props) - for src, repl in itertools.product(src_terms, repl_terms): - if src and repl: - for froms, tos in _create_variants(*src, repl, decompose): - yield variants.ICUVariant(froms, tos, props) - - - def _parse_variant_word(self, name): - name = name.strip() - match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name) - if match is None or (match.group(1) == '~' and match.group(3) == '~'): - raise UsageError("Invalid variant word descriptor '{}'".format(name)) - norm_name = self.norm.transliterate(match.group(2)) - if not norm_name: - return None - - return norm_name, match.group(1), match.group(3) - - -_FLAG_MATCH = {'^': '^ ', - '$': ' ^', - '': ' '} +class GenericTokenAnalysis: + """ Collects the different transformation rules for normalisation of names + and provides the functions to apply the transformations. + """ + def __init__(self, norm: Any, to_ascii: Any, config: Mapping[str, Any]) -> None: + self.norm = norm + self.to_ascii = to_ascii + self.variant_only = config['variant_only'] -def _create_variants(src, preflag, postflag, repl, decompose): - if preflag == '~': - postfix = _FLAG_MATCH[postflag] - # suffix decomposition - src = src + postfix - repl = repl + postfix + # Set up datrie + if config['replacements']: + self.replacements = datrie.Trie(config['chars']) + for src, repllist in config['replacements']: + self.replacements[src] = repllist + else: + self.replacements = None - yield src, repl - yield ' ' + src, ' ' + repl + # set up mutation rules + self.mutations = [MutationVariantGenerator(*cfg) for cfg in config['mutations']] - if decompose: - yield src, ' ' + repl - yield ' ' + src, repl - elif postflag == '~': - # prefix decomposition - prefix = _FLAG_MATCH[preflag] - src = prefix + src - repl = prefix + repl - yield src, repl - yield src + ' ', repl + ' ' + def get_canonical_id(self, name: PlaceName) -> str: + """ Return the normalized form of the name. This is the standard form + from which possible variants for the name can be derived. + """ + return cast(str, self.norm.transliterate(name.name)).strip() - if decompose: - yield src, repl + ' ' - yield src + ' ', repl - else: - prefix = _FLAG_MATCH[preflag] - postfix = _FLAG_MATCH[postflag] - yield prefix + src + postfix, prefix + repl + postfix + def compute_variants(self, norm_name: str) -> List[str]: + """ Compute the spelling variants for the given normalized name + and transliterate the result. + """ + variants = self._generate_word_variants(norm_name) + for mutation in self.mutations: + variants = mutation.generate(variants) -### Analysis section + return [name for name in self._transliterate_unique_list(norm_name, variants) if name] -def create(norm_rules, trans_rules, config): - """ Create a new token analysis instance for this module. - """ - return GenericTokenAnalysis(norm_rules, trans_rules, config['variants']) + def _transliterate_unique_list(self, norm_name: str, + iterable: Iterable[str]) -> Iterator[Optional[str]]: + seen = set() + if self.variant_only: + seen.add(norm_name) -class GenericTokenAnalysis: - """ Collects the different transformation rules for normalisation of names - and provides the functions to apply the transformations. - """ + for variant in map(str.strip, iterable): + if variant not in seen: + seen.add(variant) + yield self.to_ascii.transliterate(variant).strip() - def __init__(self, norm_rules, trans_rules, replacements): - self.normalizer = Transliterator.createFromRules("icu_normalization", - norm_rules) - self.to_ascii = Transliterator.createFromRules("icu_to_ascii", - trans_rules + - ";[:Space:]+ > ' '") - self.search = Transliterator.createFromRules("icu_search", - norm_rules + trans_rules) - - # Intermediate reorder by source. Also compute required character set. - immediate = defaultdict(list) - chars = set() - for variant in replacements: - if variant.source[-1] == ' ' and variant.replacement[-1] == ' ': - replstr = variant.replacement[:-1] - else: - replstr = variant.replacement - immediate[variant.source].append(replstr) - chars.update(variant.source) - # Then copy to datrie - self.replacements = datrie.Trie(''.join(chars)) - for src, repllist in immediate.items(): - self.replacements[src] = repllist - - - def get_normalized(self, name): - """ Normalize the given name, i.e. remove all elements not relevant - for search. - """ - return self.normalizer.transliterate(name).strip() - def get_variants_ascii(self, norm_name): - """ Compute the spelling variants for the given normalized name - and transliterate the result. - """ + def _generate_word_variants(self, norm_name: str) -> Iterable[str]: baseform = '^ ' + norm_name + ' ^' + baselen = len(baseform) partials = [''] startpos = 0 - pos = 0 - force_space = False - while pos < len(baseform): - full, repl = self.replacements.longest_prefix_item(baseform[pos:], - (None, None)) - if full is not None: - done = baseform[startpos:pos] - partials = [v + done + r - for v, r in itertools.product(partials, repl) - if not force_space or r.startswith(' ')] - if len(partials) > 128: - # If too many variants are produced, they are unlikely - # to be helpful. Only use the original term. - startpos = 0 - break - startpos = pos + len(full) - if full[-1] == ' ': - startpos -= 1 - force_space = True - pos = startpos - else: - pos += 1 - force_space = False + if self.replacements is not None: + pos = 0 + force_space = False + while pos < baselen: + full, repl = self.replacements.longest_prefix_item(baseform[pos:], + (None, None)) + if full is not None: + done = baseform[startpos:pos] + partials = [v + done + r + for v, r in itertools.product(partials, repl) + if not force_space or r.startswith(' ')] + if len(partials) > 128: + # If too many variants are produced, they are unlikely + # to be helpful. Only use the original term. + startpos = 0 + break + startpos = pos + len(full) + if full[-1] == ' ': + startpos -= 1 + force_space = True + pos = startpos + else: + pos += 1 + force_space = False # No variants detected? Fast return. if startpos == 0: - trans_name = self.to_ascii.transliterate(norm_name).strip() - return [trans_name] if trans_name else [] - - return self._compute_result_set(partials, baseform[startpos:]) + return (norm_name, ) + if startpos < baselen: + return (part[1:] + baseform[startpos:-1] for part in partials) - def _compute_result_set(self, partials, prefix): - results = set() - - for variant in partials: - vname = variant + prefix - trans_name = self.to_ascii.transliterate(vname[1:-1]).strip() - if trans_name: - results.add(trans_name) - - return list(results) - - - def get_search_normalized(self, name): - """ Return the normalized version of the name (including transliteration) - to be applied at search time. - """ - return self.search.transliterate(' ' + name + ' ').strip() + return (part[1:-1] for part in partials)