X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/7cfcbacfc75ab2e39ee7eab6a5cf40e8cbd152f5..f3c557bf684a0079e4bc54b622cc5d766f3a6b56:/nominatim/tokenizer/token_analysis/generic.py diff --git a/nominatim/tokenizer/token_analysis/generic.py b/nominatim/tokenizer/token_analysis/generic.py index 2c720f1d..3de915ba 100644 --- a/nominatim/tokenizer/token_analysis/generic.py +++ b/nominatim/tokenizer/token_analysis/generic.py @@ -1,18 +1,56 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2022 by the Nominatim developer community. +# For a full list of authors see the git log. """ Generic processor for names that creates abbreviation variants. """ -from collections import defaultdict import itertools -from icu import Transliterator import datrie +from nominatim.errors import UsageError +from nominatim.tokenizer.token_analysis.config_variants import get_variant_config +from nominatim.tokenizer.token_analysis.generic_mutation import MutationVariantGenerator + +### Configuration section + +def configure(rules, normalization_rules): + """ Extract and preprocess the configuration for this module. + """ + config = {} + + config['replacements'], config['chars'] = get_variant_config(rules.get('variants'), + normalization_rules) + config['variant_only'] = rules.get('mode', '') == 'variant-only' + + # parse mutation rules + config['mutations'] = [] + for rule in rules.get('mutations', []): + if 'pattern' not in rule: + raise UsageError("Missing field 'pattern' in mutation configuration.") + if not isinstance(rule['pattern'], str): + raise UsageError("Field 'pattern' in mutation configuration " + "must be a simple text field.") + if 'replacements' not in rule: + raise UsageError("Missing field 'replacements' in mutation configuration.") + if not isinstance(rule['replacements'], list): + raise UsageError("Field 'replacements' in mutation configuration " + "must be a list of texts.") + + config['mutations'].append((rule['pattern'], rule['replacements'])) + + return config + + ### Analysis section -def create(norm_rules, trans_rules, config): +def create(normalizer, transliterator, config): """ Create a new token analysis instance for this module. """ - return GenericTokenAnalysis(norm_rules, trans_rules, config['variants']) + return GenericTokenAnalysis(normalizer, transliterator, config) class GenericTokenAnalysis: @@ -20,91 +58,89 @@ class GenericTokenAnalysis: and provides the functions to apply the transformations. """ - def __init__(self, norm_rules, trans_rules, replacements): - self.normalizer = Transliterator.createFromRules("icu_normalization", - norm_rules) - self.to_ascii = Transliterator.createFromRules("icu_to_ascii", - trans_rules + - ";[:Space:]+ > ' '") - self.search = Transliterator.createFromRules("icu_search", - norm_rules + trans_rules) - - # Intermediate reorder by source. Also compute required character set. - immediate = defaultdict(list) - chars = set() - for variant in replacements: - if variant.source[-1] == ' ' and variant.replacement[-1] == ' ': - replstr = variant.replacement[:-1] - else: - replstr = variant.replacement - immediate[variant.source].append(replstr) - chars.update(variant.source) - # Then copy to datrie - self.replacements = datrie.Trie(''.join(chars)) - for src, repllist in immediate.items(): - self.replacements[src] = repllist - - - def get_normalized(self, name): - """ Normalize the given name, i.e. remove all elements not relevant - for search. + def __init__(self, norm, to_ascii, config): + self.norm = norm + self.to_ascii = to_ascii + self.variant_only = config['variant_only'] + + # Set up datrie + if config['replacements']: + self.replacements = datrie.Trie(config['chars']) + for src, repllist in config['replacements']: + self.replacements[src] = repllist + else: + self.replacements = None + + # set up mutation rules + self.mutations = [MutationVariantGenerator(*cfg) for cfg in config['mutations']] + + + def normalize(self, name): + """ Return the normalized form of the name. This is the standard form + from which possible variants for the name can be derived. """ - return self.normalizer.transliterate(name).strip() + return self.norm.transliterate(name).strip() + def get_variants_ascii(self, norm_name): """ Compute the spelling variants for the given normalized name and transliterate the result. """ - baseform = '^ ' + norm_name + ' ^' - partials = [''] + variants = self._generate_word_variants(norm_name) - startpos = 0 - pos = 0 - force_space = False - while pos < len(baseform): - full, repl = self.replacements.longest_prefix_item(baseform[pos:], - (None, None)) - if full is not None: - done = baseform[startpos:pos] - partials = [v + done + r - for v, r in itertools.product(partials, repl) - if not force_space or r.startswith(' ')] - if len(partials) > 128: - # If too many variants are produced, they are unlikely - # to be helpful. Only use the original term. - startpos = 0 - break - startpos = pos + len(full) - if full[-1] == ' ': - startpos -= 1 - force_space = True - pos = startpos - else: - pos += 1 - force_space = False + for mutation in self.mutations: + variants = mutation.generate(variants) - # No variants detected? Fast return. - if startpos == 0: - trans_name = self.to_ascii.transliterate(norm_name).strip() - return [trans_name] if trans_name else [] + return [name for name in self._transliterate_unique_list(norm_name, variants) if name] - return self._compute_result_set(partials, baseform[startpos:]) + def _transliterate_unique_list(self, norm_name, iterable): + seen = set() + if self.variant_only: + seen.add(norm_name) - def _compute_result_set(self, partials, prefix): - results = set() + for variant in map(str.strip, iterable): + if variant not in seen: + seen.add(variant) + yield self.to_ascii.transliterate(variant).strip() - for variant in partials: - vname = variant + prefix - trans_name = self.to_ascii.transliterate(vname[1:-1]).strip() - if trans_name: - results.add(trans_name) - return list(results) + def _generate_word_variants(self, norm_name): + baseform = '^ ' + norm_name + ' ^' + baselen = len(baseform) + partials = [''] + startpos = 0 + if self.replacements is not None: + pos = 0 + force_space = False + while pos < baselen: + full, repl = self.replacements.longest_prefix_item(baseform[pos:], + (None, None)) + if full is not None: + done = baseform[startpos:pos] + partials = [v + done + r + for v, r in itertools.product(partials, repl) + if not force_space or r.startswith(' ')] + if len(partials) > 128: + # If too many variants are produced, they are unlikely + # to be helpful. Only use the original term. + startpos = 0 + break + startpos = pos + len(full) + if full[-1] == ' ': + startpos -= 1 + force_space = True + pos = startpos + else: + pos += 1 + force_space = False - def get_search_normalized(self, name): - """ Return the normalized version of the name (including transliteration) - to be applied at search time. - """ - return self.search.transliterate(' ' + name + ' ').strip() + # No variants detected? Fast return. + if startpos == 0: + return (norm_name, ) + + if startpos < baselen: + return (part[1:] + baseform[startpos:-1] for part in partials) + + return (part[1:-1] for part in partials)