from nominatim.db.properties import set_property, get_property
from nominatim.errors import UsageError
from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
+from nominatim.tokenizer.icu_token_analysis import ICUTokenAnalysis
LOG = logging.getLogger()
def make_token_analysis(self):
""" Create a token analyser from the reviouly loaded rules.
"""
- return self.analysis[None].create(self.normalization_rules,
- self.transliteration_rules)
+ return ICUTokenAnalysis(self.normalization_rules,
+ self.transliteration_rules, self.analysis)
def get_search_rules(self):
module_name = 'nominatim.tokenizer.token_analysis.' \
+ _get_section(rules, 'analyzer').replace('-', '_')
analysis_mod = importlib.import_module(module_name)
- self._mod_create = analysis_mod.create
+ self.create = analysis_mod.create
# Load the configuration.
self.config = analysis_mod.configure(rules, normalization_rules)
-
-
- def create(self, normalization_rules, transliteration_rules):
- """ Create an analyzer from the given rules.
- """
- return self._mod_create(normalization_rules,
- transliteration_rules,
- self.config)
--- /dev/null
+"""
+Container class collecting all components required to transform an OSM name
+into a Nominatim token.
+"""
+
+from icu import Transliterator
+
+class ICUTokenAnalysis:
+ """ Container class collecting the transliterators and token analysis
+ modules for a single NameAnalyser instance.
+ """
+
+ def __init__(self, norm_rules, trans_rules, analysis_rules):
+ self.normalizer = Transliterator.createFromRules("icu_normalization",
+ norm_rules)
+ trans_rules += ";[:Space:]+ > ' '"
+ self.to_ascii = Transliterator.createFromRules("icu_to_ascii",
+ trans_rules)
+ self.search = Transliterator.createFromRules("icu_search",
+ norm_rules + trans_rules)
+
+ self.analysis = {name: arules.create(self.to_ascii, arules.config)
+ for name, arules in analysis_rules.items()}
""" Count the partial terms from the names in the place table.
"""
words = Counter()
- name_proc = self.loader.make_token_analysis()
+ analysis = self.loader.make_token_analysis()
with conn.cursor(name="words") as cur:
cur.execute(""" SELECT v, count(*) FROM
WHERE length(v) < 75 GROUP BY v""")
for name, cnt in cur:
- terms = set()
- for word in name_proc.get_variants_ascii(name_proc.get_normalized(name)):
- if ' ' in word:
- terms.update(word.split())
- for term in terms:
- words[term] += cnt
+ word = analysis.search.transliterate(name)
+ if word and ' ' in word:
+ for term in set(word.split()):
+ words[term] += cnt
return words
def _search_normalized(self, name):
""" Return the search token transliteration of the given name.
"""
- return self.token_analysis.get_search_normalized(name)
+ return self.token_analysis.search.transliterate(name).strip()
def _normalized(self, name):
""" Return the normalized version of the given name with all
non-relevant information removed.
"""
- return self.token_analysis.get_normalized(name)
+ return self.token_analysis.normalizer.transliterate(name).strip()
def get_word_token_info(self, words):
if addr_terms:
token_info.add_address_terms(addr_terms)
+
def _compute_partial_tokens(self, name):
""" Normalize the given term, split it into partial words and return
then token list for them.
partial_tokens = set()
for name in names:
+ analyzer_id = name.get_attr('analyzer')
norm_name = self._normalized(name.name)
- full, part = self._cache.names.get(norm_name, (None, None))
+ if analyzer_id is None:
+ token_id = norm_name
+ else:
+ token_id = f'{norm_name}@{analyzer_id}'
+
+ full, part = self._cache.names.get(token_id, (None, None))
if full is None:
- variants = self.token_analysis.get_variants_ascii(norm_name)
+ variants = self.token_analysis.analysis[analyzer_id].get_variants_ascii(norm_name)
if not variants:
continue
with self.conn.cursor() as cur:
cur.execute("SELECT (getorcreate_full_word(%s, %s)).*",
- (norm_name, variants))
+ (token_id, variants))
full, part = cur.fetchone()
- self._cache.names[norm_name] = (full, part)
+ self._cache.names[token_id] = (full, part)
full_tokens.add(full)
partial_tokens.update(part)
new_names = []
for name in obj.names:
split_names = regexp.split(name.name)
- print(split_names)
if len(split_names) == 1:
new_names.append(name)
else:
### Analysis section
-def create(norm_rules, trans_rules, config):
+def create(trans_rules, config):
""" Create a new token analysis instance for this module.
"""
- return GenericTokenAnalysis(norm_rules, trans_rules, config)
+ return GenericTokenAnalysis(trans_rules, config)
class GenericTokenAnalysis:
and provides the functions to apply the transformations.
"""
- def __init__(self, norm_rules, trans_rules, config):
- self.normalizer = Transliterator.createFromRules("icu_normalization",
- norm_rules)
- self.to_ascii = Transliterator.createFromRules("icu_to_ascii",
- trans_rules +
- ";[:Space:]+ > ' '")
- self.search = Transliterator.createFromRules("icu_search",
- norm_rules + trans_rules)
+ def __init__(self, to_ascii, config):
+ self.to_ascii = to_ascii
# Set up datrie
self.replacements = datrie.Trie(config['chars'])
self.replacements[src] = repllist
- def get_normalized(self, name):
- """ Normalize the given name, i.e. remove all elements not relevant
- for search.
- """
- return self.normalizer.transliterate(name).strip()
-
def get_variants_ascii(self, norm_name):
""" Compute the spelling variants for the given normalized name
and transliterate the result.
results.add(trans_name)
return list(results)
-
-
- def get_search_normalized(self, name):
- """ Return the normalized version of the name (including transliteration)
- to be applied at search time.
- """
- return self.search.transliterate(' ' + name + ' ').strip()
tok.init_new_db(test_config)
assert word_table.get_partial_words() == {('test', 1),
- ('no', 1), ('area', 2),
- ('holz', 1), ('strasse', 1),
- ('str', 1)}
+ ('no', 1), ('area', 2)}
def test_init_from_project(monkeypatch, test_config, tokenizer_factory):