2 Helper class to create ICU rules from a configuration file.
10 from icu import Transliterator
12 from nominatim.config import flatten_config_list
13 from nominatim.db.properties import set_property, get_property
14 from nominatim.errors import UsageError
15 from nominatim.tokenizer.icu_name_processor import ICUNameProcessor
16 from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
17 import nominatim.tokenizer.icu_variants as variants
19 LOG = logging.getLogger()
21 DBCFG_IMPORT_NORM_RULES = "tokenizer_import_normalisation"
22 DBCFG_IMPORT_TRANS_RULES = "tokenizer_import_transliteration"
23 DBCFG_IMPORT_ANALYSIS_RULES = "tokenizer_import_analysis_rules"
27 """ Saves a single variant expansion.
29 An expansion consists of the normalized replacement term and
30 a dicitonary of properties that describe when the expansion applies.
33 def __init__(self, replacement, properties):
34 self.replacement = replacement
35 self.properties = properties or {}
39 """ Compiler for ICU rules from a tokenizer configuration file.
42 def __init__(self, config):
43 rules = config.load_sub_configuration('icu_tokenizer.yaml',
44 config='TOKENIZER_CONFIG')
46 self.normalization_rules = self._cfg_to_icu_rules(rules, 'normalization')
47 self.transliteration_rules = self._cfg_to_icu_rules(rules, 'transliteration')
48 self.analysis_rules = self._get_section(rules, 'token-analysis')
49 self._setup_analysis()
51 # Load optional sanitizer rule set.
52 self.sanitizer_rules = rules.get('sanitizers', [])
55 def load_config_from_db(self, conn):
56 """ Get previously saved parts of the configuration from the
59 self.normalization_rules = get_property(conn, DBCFG_IMPORT_NORM_RULES)
60 self.transliteration_rules = get_property(conn, DBCFG_IMPORT_TRANS_RULES)
61 self.analysis_rules = json.loads(get_property(conn, DBCFG_IMPORT_ANALYSIS_RULES))
62 self._setup_analysis()
65 def save_config_to_db(self, conn):
66 """ Save the part of the configuration that cannot be changed into
69 set_property(conn, DBCFG_IMPORT_NORM_RULES, self.normalization_rules)
70 set_property(conn, DBCFG_IMPORT_TRANS_RULES, self.transliteration_rules)
71 set_property(conn, DBCFG_IMPORT_ANALYSIS_RULES, json.dumps(self.analysis_rules))
74 def make_sanitizer(self):
75 """ Create a place sanitizer from the configured rules.
77 return PlaceSanitizer(self.sanitizer_rules)
80 def make_token_analysis(self):
81 """ Create a token analyser from the reviouly loaded rules.
83 return self.analysis[None].create(self.normalization_rules,
84 self.transliteration_rules)
87 def get_search_rules(self):
88 """ Return the ICU rules to be used during search.
89 The rules combine normalization and transliteration.
91 # First apply the normalization rules.
93 rules.write(self.normalization_rules)
95 # Then add transliteration.
96 rules.write(self.transliteration_rules)
97 return rules.getvalue()
100 def get_normalization_rules(self):
101 """ Return rules for normalisation of a term.
103 return self.normalization_rules
106 def get_transliteration_rules(self):
107 """ Return the rules for converting a string into its asciii representation.
109 return self.transliteration_rules
112 def _setup_analysis(self):
113 """ Process the rules used for creating the various token analyzers.
117 if not isinstance(self.analysis_rules, list):
118 raise UsageError("Configuration section 'token-analysis' must be a list.")
120 for section in self.analysis_rules:
121 name = section.get('id', None)
122 if name in self.analysis:
124 LOG.fatal("ICU tokenizer configuration has two default token analyzers.")
126 LOG.fatal("ICU tokenizer configuration has two token "
127 "analyzers with id '%s'.", name)
128 UsageError("Syntax error in ICU tokenizer config.")
129 self.analysis[name] = TokenAnalyzerRule(section, self.normalization_rules)
133 def _get_section(rules, section):
134 """ Get the section named 'section' from the rules. If the section does
135 not exist, raise a usage error with a meaningful message.
137 if section not in rules:
138 LOG.fatal("Section '%s' not found in tokenizer config.", section)
139 raise UsageError("Syntax error in tokenizer configuration file.")
141 return rules[section]
144 def _cfg_to_icu_rules(self, rules, section):
145 """ Load an ICU ruleset from the given section. If the section is a
146 simple string, it is interpreted as a file name and the rules are
147 loaded verbatim from the given file. The filename is expected to be
148 relative to the tokenizer rule file. If the section is a list then
149 each line is assumed to be a rule. All rules are concatenated and returned.
151 content = self._get_section(rules, section)
156 return ';'.join(flatten_config_list(content, section)) + ';'
159 class TokenAnalyzerRule:
160 """ Factory for a single analysis module. The class saves the configuration
161 and creates a new token analyzer on request.
164 def __init__(self, rules, normalization_rules):
165 self._parse_variant_list(rules.get('variants'), normalization_rules)
168 def create(self, normalization_rules, transliteration_rules):
169 """ Create an analyzer from the given rules.
171 return ICUNameProcessor(normalization_rules,
172 transliteration_rules,
176 def _parse_variant_list(self, rules, normalization_rules):
177 self.variants = set()
182 rules = flatten_config_list(rules, 'variants')
184 vmaker = _VariantMaker(normalization_rules)
187 for section in rules:
188 # Create the property field and deduplicate against existing
190 props = variants.ICUVariantProperties.from_rules(section)
191 for existing in properties:
192 if existing == props:
196 properties.append(props)
198 for rule in (section.get('words') or []):
199 self.variants.update(vmaker.compute(rule, props))
203 """ Generater for all necessary ICUVariants from a single variant rule.
205 All text in rules is normalized to make sure the variants match later.
208 def __init__(self, norm_rules):
209 self.norm = Transliterator.createFromRules("rule_loader_normalization",
213 def compute(self, rule, props):
214 """ Generator for all ICUVariant tuples from a single variant rule.
216 parts = re.split(r'(\|)?([=-])>', rule)
218 raise UsageError("Syntax error in variant rule: " + rule)
220 decompose = parts[1] is None
221 src_terms = [self._parse_variant_word(t) for t in parts[0].split(',')]
222 repl_terms = (self.norm.transliterate(t.strip()) for t in parts[3].split(','))
224 # If the source should be kept, add a 1:1 replacement
226 for src in src_terms:
228 for froms, tos in _create_variants(*src, src[0], decompose):
229 yield variants.ICUVariant(froms, tos, props)
231 for src, repl in itertools.product(src_terms, repl_terms):
233 for froms, tos in _create_variants(*src, repl, decompose):
234 yield variants.ICUVariant(froms, tos, props)
237 def _parse_variant_word(self, name):
239 match = re.fullmatch(r'([~^]?)([^~$^]*)([~$]?)', name)
240 if match is None or (match.group(1) == '~' and match.group(3) == '~'):
241 raise UsageError("Invalid variant word descriptor '{}'".format(name))
242 norm_name = self.norm.transliterate(match.group(2))
246 return norm_name, match.group(1), match.group(3)
249 _FLAG_MATCH = {'^': '^ ',
254 def _create_variants(src, preflag, postflag, repl, decompose):
256 postfix = _FLAG_MATCH[postflag]
257 # suffix decomposition
259 repl = repl + postfix
262 yield ' ' + src, ' ' + repl
265 yield src, ' ' + repl
266 yield ' ' + src, repl
267 elif postflag == '~':
268 # prefix decomposition
269 prefix = _FLAG_MATCH[preflag]
274 yield src + ' ', repl + ' '
277 yield src, repl + ' '
278 yield src + ' ', repl
280 prefix = _FLAG_MATCH[preflag]
281 postfix = _FLAG_MATCH[postflag]
283 yield prefix + src + postfix, prefix + repl + postfix