Tokenizer implementing normalisation as used before Nominatim 4 but using
libICU instead of the PostgreSQL module.
"""
+from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
+ Dict, Set, Iterable
import itertools
import json
import logging
+from pathlib import Path
from textwrap import dedent
-from nominatim.db.connection import connect
+from nominatim.db.connection import connect, Connection, Cursor
+from nominatim.config import Configuration
from nominatim.db.utils import CopyBuffer
from nominatim.db.sql_preprocessor import SQLPreprocessor
-from nominatim.indexer.place_info import PlaceInfo
+from nominatim.data.place_info import PlaceInfo
from nominatim.tokenizer.icu_rule_loader import ICURuleLoader
+from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
+from nominatim.tokenizer.sanitizers.base import PlaceName
+from nominatim.tokenizer.icu_token_analysis import ICUTokenAnalysis
from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
LOG = logging.getLogger()
-def create(dsn, data_dir):
+def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
""" Create a new instance of the tokenizer provided by this module.
"""
- return LegacyICUTokenizer(dsn, data_dir)
+ return ICUTokenizer(dsn, data_dir)
-class LegacyICUTokenizer(AbstractTokenizer):
+class ICUTokenizer(AbstractTokenizer):
""" This tokenizer uses libICU to covert names and queries to ASCII.
Otherwise it uses the same algorithms and data structures as the
normalization routines in Nominatim 3.
"""
- def __init__(self, dsn, data_dir):
+ def __init__(self, dsn: str, data_dir: Path) -> None:
self.dsn = dsn
self.data_dir = data_dir
- self.loader = None
+ self.loader: Optional[ICURuleLoader] = None
- def init_new_db(self, config, init_db=True):
+ def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
""" Set up a new tokenizer for the database.
This copies all necessary data in the project directory to make
self._init_db_tables(config)
- def init_from_project(self, config):
+ def init_from_project(self, config: Configuration) -> None:
""" Initialise the tokenizer from the project directory.
"""
self.loader = ICURuleLoader(config)
self._install_php(config.lib_dir.php, overwrite=False)
- def finalize_import(self, config):
+ def finalize_import(self, config: Configuration) -> None:
""" Do any required postprocessing to make the tokenizer data ready
for use.
"""
sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
- def update_sql_functions(self, config):
+ def update_sql_functions(self, config: Configuration) -> None:
""" Reimport the SQL functions for this tokenizer.
"""
with connect(self.dsn) as conn:
sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
- def check_database(self, config):
+ def check_database(self, config: Configuration) -> None:
""" Check that the tokenizer is set up correctly.
"""
# Will throw an error if there is an issue.
self.init_from_project(config)
- def update_statistics(self):
+ def update_statistics(self) -> None:
""" Recompute frequencies for all name words.
"""
with connect(self.dsn) as conn:
conn.commit()
- def _cleanup_housenumbers(self):
+ def _cleanup_housenumbers(self) -> None:
""" Remove unused house numbers.
"""
with connect(self.dsn) as conn:
- def update_word_tokens(self):
+ def update_word_tokens(self) -> None:
""" Remove unused tokens.
"""
LOG.warning("Cleaning up housenumber tokens.")
LOG.warning("Tokenizer house-keeping done.")
- def name_analyzer(self):
+ def name_analyzer(self) -> 'ICUNameAnalyzer':
""" Create a new analyzer for tokenizing names and queries
using this tokinzer. Analyzers are context managers and should
be used accordingly:
Analyzers are not thread-safe. You need to instantiate one per thread.
"""
- return LegacyICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
- self.loader.make_token_analysis())
+ assert self.loader is not None
+ return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
+ self.loader.make_token_analysis())
- def _install_php(self, phpdir, overwrite=True):
+ def _install_php(self, phpdir: Path, overwrite: bool = True) -> None:
""" Install the php script for the tokenizer.
"""
+ assert self.loader is not None
php_file = self.data_dir / "tokenizer.php"
if not php_file.exists() or overwrite:
require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
- def _save_config(self):
+ def _save_config(self) -> None:
""" Save the configuration that needs to remain stable for the given
database as database properties.
"""
+ assert self.loader is not None
with connect(self.dsn) as conn:
self.loader.save_config_to_db(conn)
- def _init_db_tables(self, config):
+ def _init_db_tables(self, config: Configuration) -> None:
""" Set up the word table and fill it with pre-computed word
frequencies.
"""
conn.commit()
-class LegacyICUNameAnalyzer(AbstractAnalyzer):
- """ The legacy analyzer uses the ICU library for splitting names.
+class ICUNameAnalyzer(AbstractAnalyzer):
+ """ The ICU analyzer uses the ICU library for splitting names.
Each instance opens a connection to the database to request the
normalization.
"""
- def __init__(self, dsn, sanitizer, token_analysis):
- self.conn = connect(dsn).connection
+ def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
+ token_analysis: ICUTokenAnalysis) -> None:
+ self.conn: Optional[Connection] = connect(dsn).connection
self.conn.autocommit = True
self.sanitizer = sanitizer
self.token_analysis = token_analysis
self._cache = _TokenCache()
- def close(self):
+ def close(self) -> None:
""" Free all resources used by the analyzer.
"""
if self.conn:
self.conn = None
- def _search_normalized(self, name):
+ def _search_normalized(self, name: str) -> str:
""" Return the search token transliteration of the given name.
"""
- return self.token_analysis.search.transliterate(name).strip()
+ return cast(str, self.token_analysis.search.transliterate(name)).strip()
- def _normalized(self, name):
+ def _normalized(self, name: str) -> str:
""" Return the normalized version of the given name with all
non-relevant information removed.
"""
- return self.token_analysis.normalizer.transliterate(name).strip()
+ return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
- def get_word_token_info(self, words):
+ def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
""" Return token information for the given list of words.
If a word starts with # it is assumed to be a full name
otherwise is a partial name.
The function is used for testing and debugging only
and not necessarily efficient.
"""
+ assert self.conn is not None
full_tokens = {}
partial_tokens = {}
for word in words:
+ [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
- def normalize_postcode(self, postcode):
+ def normalize_postcode(self, postcode: str) -> str:
""" Convert the postcode to a standardized form.
This function must yield exactly the same result as the SQL function
return postcode.strip().upper()
- def update_postcodes_from_db(self):
+ def update_postcodes_from_db(self) -> None:
""" Update postcode tokens in the word table from the location_postcode
table.
"""
- to_delete = []
+ assert self.conn is not None
+ analyzer = self.token_analysis.analysis.get('@postcode')
+
with self.conn.cursor() as cur:
- # This finds us the rows in location_postcode and word that are
- # missing in the other table.
- cur.execute("""SELECT * FROM
- (SELECT pc, word FROM
- (SELECT distinct(postcode) as pc FROM location_postcode) p
- FULL JOIN
- (SELECT word FROM word WHERE type = 'P') w
- ON pc = word) x
- WHERE pc is null or word is null""")
-
- with CopyBuffer() as copystr:
- for postcode, word in cur:
- if postcode is None:
- to_delete.append(word)
- else:
- copystr.add(self._search_normalized(postcode),
- 'P', postcode)
-
- if to_delete:
- cur.execute("""DELETE FROM WORD
- WHERE type ='P' and word = any(%s)
- """, (to_delete, ))
-
- copystr.copy_out(cur, 'word',
- columns=['word_token', 'type', 'word'])
-
-
- def update_special_phrases(self, phrases, should_replace):
+ # First get all postcode names currently in the word table.
+ cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
+ word_entries = set((entry[0] for entry in cur))
+
+ # Then compute the required postcode names from the postcode table.
+ needed_entries = set()
+ cur.execute("SELECT country_code, postcode FROM location_postcode")
+ for cc, postcode in cur:
+ info = PlaceInfo({'country_code': cc,
+ 'class': 'place', 'type': 'postcode',
+ 'address': {'postcode': postcode}})
+ address = self.sanitizer.process_names(info)[1]
+ for place in address:
+ if place.kind == 'postcode':
+ if analyzer is None:
+ postcode_name = place.name.strip().upper()
+ variant_base = None
+ else:
+ postcode_name = analyzer.normalize(place.name)
+ variant_base = place.get_attr("variant")
+
+ if variant_base:
+ needed_entries.add(f'{postcode_name}@{variant_base}')
+ else:
+ needed_entries.add(postcode_name)
+ break
+
+ # Now update the word table.
+ self._delete_unused_postcode_words(word_entries - needed_entries)
+ self._add_missing_postcode_words(needed_entries - word_entries)
+
+ def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
+ assert self.conn is not None
+ if tokens:
+ with self.conn.cursor() as cur:
+ cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
+ (list(tokens), ))
+
+ def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
+ assert self.conn is not None
+ if not tokens:
+ return
+
+ analyzer = self.token_analysis.analysis.get('@postcode')
+ terms = []
+
+ for postcode_name in tokens:
+ if '@' in postcode_name:
+ term, variant = postcode_name.split('@', 2)
+ term = self._search_normalized(term)
+ if analyzer is None:
+ variants = [term]
+ else:
+ variants = analyzer.get_variants_ascii(variant)
+ if term not in variants:
+ variants.append(term)
+ else:
+ variants = [self._search_normalized(postcode_name)]
+ terms.append((postcode_name, variants))
+
+ if terms:
+ with self.conn.cursor() as cur:
+ cur.execute_values("""SELECT create_postcode_word(pc, var)
+ FROM (VALUES %s) AS v(pc, var)""",
+ terms)
+
+
+
+
+ def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
+ should_replace: bool) -> None:
""" Replace the search index for special phrases with the new phrases.
If `should_replace` is True, then the previous set of will be
completely replaced. Otherwise the phrases are added to the
already existing ones.
"""
+ assert self.conn is not None
norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
for p in phrases))
len(norm_phrases), added, deleted)
- def _add_special_phrases(self, cursor, new_phrases, existing_phrases):
+ def _add_special_phrases(self, cursor: Cursor,
+ new_phrases: Set[Tuple[str, str, str, str]],
+ existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
""" Add all phrases to the database that are not yet there.
"""
to_add = new_phrases - existing_phrases
return added
- @staticmethod
- def _remove_special_phrases(cursor, new_phrases, existing_phrases):
+ def _remove_special_phrases(self, cursor: Cursor,
+ new_phrases: Set[Tuple[str, str, str, str]],
+ existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
""" Remove all phrases from the databse that are no longer in the
new phrase list.
"""
return len(to_delete)
- def add_country_names(self, country_code, names):
+ def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
""" Add default names for the given country to the search index.
"""
# Make sure any name preprocessing for country names applies.
internal=True)
- def _add_country_full_names(self, country_code, names, internal=False):
+ def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
+ internal: bool = False) -> None:
""" Add names for the given country from an already sanitized
name list.
"""
+ assert self.conn is not None
word_tokens = set()
for name in names:
norm_name = self._search_normalized(name.name)
FROM word
WHERE type = 'C' and word = %s""",
(country_code, ))
- existing_tokens = {True: set(), False: set()} # internal/external names
+ # internal/external names
+ existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
for word in cur:
existing_tokens[word[1]].add(word[0])
cur.execute(sql, (country_code, list(new_tokens)))
- def process_place(self, place):
+ def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
""" Determine tokenizer information about the given place.
Returns a JSON-serializable structure that will be handed into
token_info.set_names(*self._compute_name_tokens(names))
if place.is_country():
+ assert place.country_code is not None
self._add_country_full_names(place.country_code, names)
if address:
return token_info.to_dict()
- def _process_place_address(self, token_info, address):
+ def _process_place_address(self, token_info: '_TokenInfo',
+ address: Sequence[PlaceName]) -> None:
for item in address:
if item.kind == 'postcode':
token_info.set_postcode(self._add_postcode(item))
token_info.add_address_term(item.kind, self._compute_partial_tokens(item.name))
- def _compute_housenumber_token(self, hnr):
+ def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
""" Normalize the housenumber and return the word token and the
canonical form.
"""
+ assert self.conn is not None
analyzer = self.token_analysis.analysis.get('@housenumber')
- result = None, None
+ result: Tuple[Optional[int], Optional[str]] = (None, None)
if analyzer is None:
# When no custom analyzer is set, simply normalize and transliterate
if result[0] is None:
with self.conn.cursor() as cur:
cur.execute("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
- result = cur.fetchone()[0], norm_name
+ result = cur.fetchone()[0], norm_name # type: ignore[no-untyped-call]
self._cache.housenumbers[norm_name] = result
else:
# Otherwise use the analyzer to determine the canonical name.
with self.conn.cursor() as cur:
cur.execute("SELECT create_analyzed_hnr_id(%s, %s)",
(norm_name, list(variants)))
- result = cur.fetchone()[0], variants[0]
+ result = cur.fetchone()[0], variants[0] # type: ignore[no-untyped-call]
self._cache.housenumbers[norm_name] = result
return result
- def _compute_partial_tokens(self, name):
+ def _compute_partial_tokens(self, name: str) -> List[int]:
""" Normalize the given term, split it into partial words and return
then token list for them.
"""
+ assert self.conn is not None
norm_name = self._search_normalized(name)
tokens = []
(need_lookup, ))
for partial, token in cur:
+ assert token is not None
tokens.append(token)
self._cache.partials[partial] = token
return tokens
- def _retrieve_full_tokens(self, name):
+ def _retrieve_full_tokens(self, name: str) -> List[int]:
""" Get the full name token for the given name, if it exists.
The name is only retrived for the standard analyser.
"""
+ assert self.conn is not None
norm_name = self._search_normalized(name)
# return cached if possible
return full
- def _compute_name_tokens(self, names):
+ def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
""" Computes the full name and partial name tokens for the given
dictionary of names.
"""
- full_tokens = set()
- partial_tokens = set()
+ assert self.conn is not None
+ full_tokens: Set[int] = set()
+ partial_tokens: Set[int] = set()
for name in names:
analyzer_id = name.get_attr('analyzer')
with self.conn.cursor() as cur:
cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
(token_id, variants))
- full, part = cur.fetchone()
+ full, part = cast(Tuple[int, List[int]],
+ cur.fetchone()) # type: ignore[no-untyped-call]
self._cache.names[token_id] = (full, part)
+ assert part is not None
+
full_tokens.add(full)
partial_tokens.update(part)
return full_tokens, partial_tokens
- def _add_postcode(self, item):
+ def _add_postcode(self, item: PlaceName) -> Optional[str]:
""" Make sure the normalized postcode is present in the word table.
"""
- analyzer = self.token_analysis.get_analyzer('@postcode')
+ assert self.conn is not None
+ analyzer = self.token_analysis.analysis.get('@postcode')
if analyzer is None:
postcode_name = item.name.strip().upper()
postcode_name = analyzer.normalize(item.name)
variant_base = item.get_attr("variant")
- if variant_base is not None:
+ if variant_base:
postcode = f'{postcode_name}@{variant_base}'
else:
postcode = postcode_name
if postcode not in self._cache.postcodes:
term = self._search_normalized(postcode_name)
if not term:
- return
+ return None
variants = {term}
- if analyzer is not None and variant_base is not None:
+ if analyzer is not None and variant_base:
variants.update(analyzer.get_variants_ascii(variant_base))
with self.conn.cursor() as cur:
(postcode, list(variants)))
self._cache.postcodes.add(postcode)
+ return postcode_name
+
class _TokenInfo:
""" Collect token information to be sent back to the database.
"""
- def __init__(self):
- self.names = None
- self.housenumbers = set()
- self.housenumber_tokens = set()
- self.street_tokens = set()
- self.place_tokens = set()
- self.address_tokens = {}
- self.postcode = None
-
-
- @staticmethod
- def _mk_array(tokens):
+ def __init__(self) -> None:
+ self.names: Optional[str] = None
+ self.housenumbers: Set[str] = set()
+ self.housenumber_tokens: Set[int] = set()
+ self.street_tokens: Set[int] = set()
+ self.place_tokens: Set[int] = set()
+ self.address_tokens: Dict[str, str] = {}
+ self.postcode: Optional[str] = None
+
+
+ def _mk_array(self, tokens: Iterable[Any]) -> str:
return f"{{{','.join((str(s) for s in tokens))}}}"
- def to_dict(self):
+ def to_dict(self) -> Dict[str, Any]:
""" Return the token information in database importable format.
"""
- out = {}
+ out: Dict[str, Any] = {}
if self.names:
out['names'] = self.names
return out
- def set_names(self, fulls, partials):
+ def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
""" Adds token information for the normalised names.
"""
self.names = self._mk_array(itertools.chain(fulls, partials))
- def add_housenumber(self, token, hnr):
+ def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
""" Extract housenumber information from a list of normalised
housenumbers.
"""
if token:
+ assert hnr is not None
self.housenumbers.add(hnr)
self.housenumber_tokens.add(token)
- def add_street(self, tokens):
+ def add_street(self, tokens: Iterable[int]) -> None:
""" Add addr:street match terms.
"""
self.street_tokens.update(tokens)
- def add_place(self, tokens):
+ def add_place(self, tokens: Iterable[int]) -> None:
""" Add addr:place search and match terms.
"""
self.place_tokens.update(tokens)
- def add_address_term(self, key, partials):
+ def add_address_term(self, key: str, partials: Iterable[int]) -> None:
""" Add additional address terms.
"""
if partials:
self.address_tokens[key] = self._mk_array(partials)
- def set_postcode(self, postcode):
+ def set_postcode(self, postcode: Optional[str]) -> None:
""" Set the postcode to the given one.
"""
self.postcode = postcode
This cache is not thread-safe and needs to be instantiated per
analyzer.
"""
- def __init__(self):
- self.names = {}
- self.partials = {}
- self.fulls = {}
- self.postcodes = set()
- self.housenumbers = {}
+ def __init__(self) -> None:
+ self.names: Dict[str, Tuple[int, List[int]]] = {}
+ self.partials: Dict[str, int] = {}
+ self.fulls: Dict[str, List[int]] = {}
+ self.postcodes: Set[str] = set()
+ self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}