Tokenizer implementing normalisation as used before Nominatim 4 but using
libICU instead of the PostgreSQL module.
"""
+from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
+ Dict, Set, Iterable
import itertools
import json
import logging
-import re
+from pathlib import Path
from textwrap import dedent
-from nominatim.db.connection import connect
+from nominatim.db.connection import connect, Connection, Cursor
+from nominatim.config import Configuration
from nominatim.db.utils import CopyBuffer
from nominatim.db.sql_preprocessor import SQLPreprocessor
-from nominatim.indexer.place_info import PlaceInfo
+from nominatim.data.place_info import PlaceInfo
from nominatim.tokenizer.icu_rule_loader import ICURuleLoader
+from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
+from nominatim.data.place_name import PlaceName
+from nominatim.tokenizer.icu_token_analysis import ICUTokenAnalysis
from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
LOG = logging.getLogger()
-def create(dsn, data_dir):
+def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
""" Create a new instance of the tokenizer provided by this module.
"""
- return LegacyICUTokenizer(dsn, data_dir)
+ return ICUTokenizer(dsn, data_dir)
-class LegacyICUTokenizer(AbstractTokenizer):
- """ This tokenizer uses libICU to covert names and queries to ASCII.
+class ICUTokenizer(AbstractTokenizer):
+ """ This tokenizer uses libICU to convert names and queries to ASCII.
Otherwise it uses the same algorithms and data structures as the
normalization routines in Nominatim 3.
"""
- def __init__(self, dsn, data_dir):
+ def __init__(self, dsn: str, data_dir: Path) -> None:
self.dsn = dsn
self.data_dir = data_dir
- self.loader = None
+ self.loader: Optional[ICURuleLoader] = None
- def init_new_db(self, config, init_db=True):
+ def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
""" Set up a new tokenizer for the database.
This copies all necessary data in the project directory to make
"""
self.loader = ICURuleLoader(config)
- self._install_php(config.lib_dir.php)
+ self._install_php(config.lib_dir.php, overwrite=True)
self._save_config()
if init_db:
self._init_db_tables(config)
- def init_from_project(self, config):
+ def init_from_project(self, config: Configuration) -> None:
""" Initialise the tokenizer from the project directory.
"""
self.loader = ICURuleLoader(config)
with connect(self.dsn) as conn:
self.loader.load_config_from_db(conn)
+ self._install_php(config.lib_dir.php, overwrite=False)
- def finalize_import(self, config):
+
+ def finalize_import(self, config: Configuration) -> None:
""" Do any required postprocessing to make the tokenizer data ready
for use.
"""
sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
- def update_sql_functions(self, config):
+ def update_sql_functions(self, config: Configuration) -> None:
""" Reimport the SQL functions for this tokenizer.
"""
with connect(self.dsn) as conn:
sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
- def check_database(self, config):
+ def check_database(self, config: Configuration) -> None:
""" Check that the tokenizer is set up correctly.
"""
# Will throw an error if there is an issue.
self.init_from_project(config)
- def update_statistics(self):
+ def update_statistics(self) -> None:
""" Recompute frequencies for all name words.
"""
with connect(self.dsn) as conn:
conn.commit()
- def _cleanup_housenumbers(self):
+ def _cleanup_housenumbers(self) -> None:
""" Remove unused house numbers.
"""
with connect(self.dsn) as conn:
- def update_word_tokens(self):
+ def update_word_tokens(self) -> None:
""" Remove unused tokens.
"""
LOG.warning("Cleaning up housenumber tokens.")
LOG.warning("Tokenizer house-keeping done.")
- def name_analyzer(self):
+ def name_analyzer(self) -> 'ICUNameAnalyzer':
""" Create a new analyzer for tokenizing names and queries
using this tokinzer. Analyzers are context managers and should
be used accordingly:
Analyzers are not thread-safe. You need to instantiate one per thread.
"""
- return LegacyICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
- self.loader.make_token_analysis())
+ assert self.loader is not None
+ return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
+ self.loader.make_token_analysis())
+
+
+ def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
+ """ Return a list of the `num` most frequent full words
+ in the database.
+ """
+ with conn.cursor() as cur:
+ cur.execute("""SELECT word, sum((info->>'count')::int) as count
+ FROM word WHERE type = 'W'
+ GROUP BY word
+ ORDER BY count DESC LIMIT %s""", (num,))
+ return list(s[0].split('@')[0] for s in cur)
- def _install_php(self, phpdir):
+ def _install_php(self, phpdir: Path, overwrite: bool = True) -> None:
""" Install the php script for the tokenizer.
"""
+ assert self.loader is not None
php_file = self.data_dir / "tokenizer.php"
- php_file.write_text(dedent(f"""\
- <?php
- @define('CONST_Max_Word_Frequency', 10000000);
- @define('CONST_Term_Normalization_Rules', "{self.loader.normalization_rules}");
- @define('CONST_Transliteration', "{self.loader.get_search_rules()}");
- require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""))
+ if not php_file.exists() or overwrite:
+ php_file.write_text(dedent(f"""\
+ <?php
+ @define('CONST_Max_Word_Frequency', 10000000);
+ @define('CONST_Term_Normalization_Rules', "{self.loader.normalization_rules}");
+ @define('CONST_Transliteration', "{self.loader.get_search_rules()}");
+ require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
- def _save_config(self):
+
+ def _save_config(self) -> None:
""" Save the configuration that needs to remain stable for the given
database as database properties.
"""
+ assert self.loader is not None
with connect(self.dsn) as conn:
self.loader.save_config_to_db(conn)
- def _init_db_tables(self, config):
+ def _init_db_tables(self, config: Configuration) -> None:
""" Set up the word table and fill it with pre-computed word
frequencies.
"""
conn.commit()
-class LegacyICUNameAnalyzer(AbstractAnalyzer):
- """ The legacy analyzer uses the ICU library for splitting names.
+class ICUNameAnalyzer(AbstractAnalyzer):
+ """ The ICU analyzer uses the ICU library for splitting names.
Each instance opens a connection to the database to request the
normalization.
"""
- def __init__(self, dsn, sanitizer, token_analysis):
- self.conn = connect(dsn).connection
+ def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
+ token_analysis: ICUTokenAnalysis) -> None:
+ self.conn: Optional[Connection] = connect(dsn).connection
self.conn.autocommit = True
self.sanitizer = sanitizer
self.token_analysis = token_analysis
self._cache = _TokenCache()
- def close(self):
+ def close(self) -> None:
""" Free all resources used by the analyzer.
"""
if self.conn:
self.conn = None
- def _search_normalized(self, name):
+ def _search_normalized(self, name: str) -> str:
""" Return the search token transliteration of the given name.
"""
- return self.token_analysis.search.transliterate(name).strip()
+ return cast(str, self.token_analysis.search.transliterate(name)).strip()
- def _normalized(self, name):
+ def _normalized(self, name: str) -> str:
""" Return the normalized version of the given name with all
non-relevant information removed.
"""
- return self.token_analysis.normalizer.transliterate(name).strip()
+ return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
- def get_word_token_info(self, words):
+ def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
""" Return token information for the given list of words.
If a word starts with # it is assumed to be a full name
otherwise is a partial name.
The function is used for testing and debugging only
and not necessarily efficient.
"""
+ assert self.conn is not None
full_tokens = {}
partial_tokens = {}
for word in words:
+ [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
- @staticmethod
- def normalize_postcode(postcode):
+ def normalize_postcode(self, postcode: str) -> str:
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
return postcode.strip().upper()
- def update_postcodes_from_db(self):
+ def update_postcodes_from_db(self) -> None:
""" Update postcode tokens in the word table from the location_postcode
table.
"""
- to_delete = []
+ assert self.conn is not None
+ analyzer = self.token_analysis.analysis.get('@postcode')
+
with self.conn.cursor() as cur:
- # This finds us the rows in location_postcode and word that are
- # missing in the other table.
- cur.execute("""SELECT * FROM
- (SELECT pc, word FROM
- (SELECT distinct(postcode) as pc FROM location_postcode) p
- FULL JOIN
- (SELECT word FROM word WHERE type = 'P') w
- ON pc = word) x
- WHERE pc is null or word is null""")
-
- with CopyBuffer() as copystr:
- for postcode, word in cur:
- if postcode is None:
- to_delete.append(word)
- else:
- copystr.add(self._search_normalized(postcode),
- 'P', postcode)
-
- if to_delete:
- cur.execute("""DELETE FROM WORD
- WHERE type ='P' and word = any(%s)
- """, (to_delete, ))
-
- copystr.copy_out(cur, 'word',
- columns=['word_token', 'type', 'word'])
-
-
- def update_special_phrases(self, phrases, should_replace):
+ # First get all postcode names currently in the word table.
+ cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
+ word_entries = set((entry[0] for entry in cur))
+
+ # Then compute the required postcode names from the postcode table.
+ needed_entries = set()
+ cur.execute("SELECT country_code, postcode FROM location_postcode")
+ for cc, postcode in cur:
+ info = PlaceInfo({'country_code': cc,
+ 'class': 'place', 'type': 'postcode',
+ 'address': {'postcode': postcode}})
+ address = self.sanitizer.process_names(info)[1]
+ for place in address:
+ if place.kind == 'postcode':
+ if analyzer is None:
+ postcode_name = place.name.strip().upper()
+ variant_base = None
+ else:
+ postcode_name = analyzer.get_canonical_id(place)
+ variant_base = place.get_attr("variant")
+
+ if variant_base:
+ needed_entries.add(f'{postcode_name}@{variant_base}')
+ else:
+ needed_entries.add(postcode_name)
+ break
+
+ # Now update the word table.
+ self._delete_unused_postcode_words(word_entries - needed_entries)
+ self._add_missing_postcode_words(needed_entries - word_entries)
+
+ def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
+ assert self.conn is not None
+ if tokens:
+ with self.conn.cursor() as cur:
+ cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
+ (list(tokens), ))
+
+ def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
+ assert self.conn is not None
+ if not tokens:
+ return
+
+ analyzer = self.token_analysis.analysis.get('@postcode')
+ terms = []
+
+ for postcode_name in tokens:
+ if '@' in postcode_name:
+ term, variant = postcode_name.split('@', 2)
+ term = self._search_normalized(term)
+ if analyzer is None:
+ variants = [term]
+ else:
+ variants = analyzer.compute_variants(variant)
+ if term not in variants:
+ variants.append(term)
+ else:
+ variants = [self._search_normalized(postcode_name)]
+ terms.append((postcode_name, variants))
+
+ if terms:
+ with self.conn.cursor() as cur:
+ cur.execute_values("""SELECT create_postcode_word(pc, var)
+ FROM (VALUES %s) AS v(pc, var)""",
+ terms)
+
+
+
+
+ def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
+ should_replace: bool) -> None:
""" Replace the search index for special phrases with the new phrases.
If `should_replace` is True, then the previous set of will be
completely replaced. Otherwise the phrases are added to the
already existing ones.
"""
+ assert self.conn is not None
norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
for p in phrases))
len(norm_phrases), added, deleted)
- def _add_special_phrases(self, cursor, new_phrases, existing_phrases):
+ def _add_special_phrases(self, cursor: Cursor,
+ new_phrases: Set[Tuple[str, str, str, str]],
+ existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
""" Add all phrases to the database that are not yet there.
"""
to_add = new_phrases - existing_phrases
return added
- @staticmethod
- def _remove_special_phrases(cursor, new_phrases, existing_phrases):
- """ Remove all phrases from the databse that are no longer in the
+ def _remove_special_phrases(self, cursor: Cursor,
+ new_phrases: Set[Tuple[str, str, str, str]],
+ existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
+ """ Remove all phrases from the database that are no longer in the
new phrase list.
"""
to_delete = existing_phrases - new_phrases
return len(to_delete)
- def add_country_names(self, country_code, names):
+ def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
""" Add default names for the given country to the search index.
"""
# Make sure any name preprocessing for country names applies.
internal=True)
- def _add_country_full_names(self, country_code, names, internal=False):
+ def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
+ internal: bool = False) -> None:
""" Add names for the given country from an already sanitized
name list.
"""
+ assert self.conn is not None
word_tokens = set()
for name in names:
norm_name = self._search_normalized(name.name)
FROM word
WHERE type = 'C' and word = %s""",
(country_code, ))
- existing_tokens = {True: set(), False: set()} # internal/external names
+ # internal/external names
+ existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
for word in cur:
existing_tokens[word[1]].add(word[0])
cur.execute(sql, (country_code, list(new_tokens)))
- def process_place(self, place):
+ def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
""" Determine tokenizer information about the given place.
Returns a JSON-serializable structure that will be handed into
token_info.set_names(*self._compute_name_tokens(names))
if place.is_country():
+ assert place.country_code is not None
self._add_country_full_names(place.country_code, names)
if address:
return token_info.to_dict()
- def _process_place_address(self, token_info, address):
+ def _process_place_address(self, token_info: '_TokenInfo',
+ address: Sequence[PlaceName]) -> None:
for item in address:
if item.kind == 'postcode':
- self._add_postcode(item.name)
+ token_info.set_postcode(self._add_postcode(item))
elif item.kind == 'housenumber':
token_info.add_housenumber(*self._compute_housenumber_token(item))
elif item.kind == 'street':
if not item.suffix:
token_info.add_place(self._compute_partial_tokens(item.name))
elif not item.kind.startswith('_') and not item.suffix and \
- item.kind not in ('country', 'full'):
+ item.kind not in ('country', 'full', 'inclusion'):
token_info.add_address_term(item.kind, self._compute_partial_tokens(item.name))
- def _compute_housenumber_token(self, hnr):
+ def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
""" Normalize the housenumber and return the word token and the
canonical form.
"""
+ assert self.conn is not None
analyzer = self.token_analysis.analysis.get('@housenumber')
- result = None, None
+ result: Tuple[Optional[int], Optional[str]] = (None, None)
if analyzer is None:
# When no custom analyzer is set, simply normalize and transliterate
result = self._cache.housenumbers.get(norm_name, result)
if result[0] is None:
with self.conn.cursor() as cur:
- cur.execute("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
- result = cur.fetchone()[0], norm_name
+ hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
+
+ result = hid, norm_name
self._cache.housenumbers[norm_name] = result
else:
# Otherwise use the analyzer to determine the canonical name.
# Per convention we use the first variant as the 'lookup name', the
# name that gets saved in the housenumber field of the place.
- norm_name = analyzer.normalize(hnr.name)
- if norm_name:
- result = self._cache.housenumbers.get(norm_name, result)
+ word_id = analyzer.get_canonical_id(hnr)
+ if word_id:
+ result = self._cache.housenumbers.get(word_id, result)
if result[0] is None:
- variants = analyzer.get_variants_ascii(norm_name)
+ variants = analyzer.compute_variants(word_id)
if variants:
with self.conn.cursor() as cur:
- cur.execute("SELECT create_analyzed_hnr_id(%s, %s)",
- (norm_name, list(variants)))
- result = cur.fetchone()[0], variants[0]
- self._cache.housenumbers[norm_name] = result
+ hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)",
+ (word_id, list(variants)))
+ result = hid, variants[0]
+ self._cache.housenumbers[word_id] = result
return result
- def _compute_partial_tokens(self, name):
+ def _compute_partial_tokens(self, name: str) -> List[int]:
""" Normalize the given term, split it into partial words and return
then token list for them.
"""
+ assert self.conn is not None
norm_name = self._search_normalized(name)
tokens = []
(need_lookup, ))
for partial, token in cur:
+ assert token is not None
tokens.append(token)
self._cache.partials[partial] = token
return tokens
- def _retrieve_full_tokens(self, name):
+ def _retrieve_full_tokens(self, name: str) -> List[int]:
""" Get the full name token for the given name, if it exists.
- The name is only retrived for the standard analyser.
+ The name is only retrieved for the standard analyser.
"""
+ assert self.conn is not None
norm_name = self._search_normalized(name)
# return cached if possible
return full
- def _compute_name_tokens(self, names):
+ def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
""" Computes the full name and partial name tokens for the given
dictionary of names.
"""
- full_tokens = set()
- partial_tokens = set()
+ assert self.conn is not None
+ full_tokens: Set[int] = set()
+ partial_tokens: Set[int] = set()
for name in names:
analyzer_id = name.get_attr('analyzer')
analyzer = self.token_analysis.get_analyzer(analyzer_id)
- norm_name = analyzer.normalize(name.name)
+ word_id = analyzer.get_canonical_id(name)
if analyzer_id is None:
- token_id = norm_name
+ token_id = word_id
else:
- token_id = f'{norm_name}@{analyzer_id}'
+ token_id = f'{word_id}@{analyzer_id}'
full, part = self._cache.names.get(token_id, (None, None))
if full is None:
- variants = analyzer.get_variants_ascii(norm_name)
+ variants = analyzer.compute_variants(word_id)
if not variants:
continue
with self.conn.cursor() as cur:
cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
(token_id, variants))
- full, part = cur.fetchone()
+ full, part = cast(Tuple[int, List[int]], cur.fetchone())
self._cache.names[token_id] = (full, part)
+ assert part is not None
+
full_tokens.add(full)
partial_tokens.update(part)
return full_tokens, partial_tokens
- def _add_postcode(self, postcode):
+ def _add_postcode(self, item: PlaceName) -> Optional[str]:
""" Make sure the normalized postcode is present in the word table.
"""
- if re.search(r'[:,;]', postcode) is None:
- postcode = self.normalize_postcode(postcode)
+ assert self.conn is not None
+ analyzer = self.token_analysis.analysis.get('@postcode')
+
+ if analyzer is None:
+ postcode_name = item.name.strip().upper()
+ variant_base = None
+ else:
+ postcode_name = analyzer.get_canonical_id(item)
+ variant_base = item.get_attr("variant")
- if postcode not in self._cache.postcodes:
- term = self._search_normalized(postcode)
- if not term:
- return
+ if variant_base:
+ postcode = f'{postcode_name}@{variant_base}'
+ else:
+ postcode = postcode_name
- with self.conn.cursor() as cur:
- # no word_id needed for postcodes
- cur.execute("""INSERT INTO word (word_token, type, word)
- (SELECT %s, 'P', pc FROM (VALUES (%s)) as v(pc)
- WHERE NOT EXISTS
- (SELECT * FROM word
- WHERE type = 'P' and word = pc))
- """, (term, postcode))
- self._cache.postcodes.add(postcode)
+ if postcode not in self._cache.postcodes:
+ term = self._search_normalized(postcode_name)
+ if not term:
+ return None
+
+ variants = {term}
+ if analyzer is not None and variant_base:
+ variants.update(analyzer.compute_variants(variant_base))
+
+ with self.conn.cursor() as cur:
+ cur.execute("SELECT create_postcode_word(%s, %s)",
+ (postcode, list(variants)))
+ self._cache.postcodes.add(postcode)
+
+ return postcode_name
class _TokenInfo:
""" Collect token information to be sent back to the database.
"""
- def __init__(self):
- self.names = None
- self.housenumbers = set()
- self.housenumber_tokens = set()
- self.street_tokens = set()
- self.place_tokens = set()
- self.address_tokens = {}
+ def __init__(self) -> None:
+ self.names: Optional[str] = None
+ self.housenumbers: Set[str] = set()
+ self.housenumber_tokens: Set[int] = set()
+ self.street_tokens: Optional[Set[int]] = None
+ self.place_tokens: Set[int] = set()
+ self.address_tokens: Dict[str, str] = {}
+ self.postcode: Optional[str] = None
- @staticmethod
- def _mk_array(tokens):
+ def _mk_array(self, tokens: Iterable[Any]) -> str:
return f"{{{','.join((str(s) for s in tokens))}}}"
- def to_dict(self):
+ def to_dict(self) -> Dict[str, Any]:
""" Return the token information in database importable format.
"""
- out = {}
+ out: Dict[str, Any] = {}
if self.names:
out['names'] = self.names
out['hnr'] = ';'.join(self.housenumbers)
out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
- if self.street_tokens:
+ if self.street_tokens is not None:
out['street'] = self._mk_array(self.street_tokens)
if self.place_tokens:
if self.address_tokens:
out['addr'] = self.address_tokens
+ if self.postcode:
+ out['postcode'] = self.postcode
+
return out
- def set_names(self, fulls, partials):
+ def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
""" Adds token information for the normalised names.
"""
self.names = self._mk_array(itertools.chain(fulls, partials))
- def add_housenumber(self, token, hnr):
+ def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
""" Extract housenumber information from a list of normalised
housenumbers.
"""
if token:
+ assert hnr is not None
self.housenumbers.add(hnr)
self.housenumber_tokens.add(token)
- def add_street(self, tokens):
+ def add_street(self, tokens: Iterable[int]) -> None:
""" Add addr:street match terms.
"""
+ if self.street_tokens is None:
+ self.street_tokens = set()
self.street_tokens.update(tokens)
- def add_place(self, tokens):
+ def add_place(self, tokens: Iterable[int]) -> None:
""" Add addr:place search and match terms.
"""
self.place_tokens.update(tokens)
- def add_address_term(self, key, partials):
+ def add_address_term(self, key: str, partials: Iterable[int]) -> None:
""" Add additional address terms.
"""
if partials:
self.address_tokens[key] = self._mk_array(partials)
+ def set_postcode(self, postcode: Optional[str]) -> None:
+ """ Set the postcode to the given one.
+ """
+ self.postcode = postcode
+
class _TokenCache:
""" Cache for token information to avoid repeated database queries.
This cache is not thread-safe and needs to be instantiated per
analyzer.
"""
- def __init__(self):
- self.names = {}
- self.partials = {}
- self.fulls = {}
- self.postcodes = set()
- self.housenumbers = {}
+ def __init__(self) -> None:
+ self.names: Dict[str, Tuple[int, List[int]]] = {}
+ self.partials: Dict[str, int] = {}
+ self.fulls: Dict[str, List[int]] = {}
+ self.postcodes: Set[str] = set()
+ self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}