1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
16 from psycopg.types.json import Jsonb
17 from psycopg import sql as pysql
19 from ..db.connection import connect, Connection, Cursor, \
20 drop_tables, table_exists, execute_scalar
21 from ..config import Configuration
22 from ..db.sql_preprocessor import SQLPreprocessor
23 from ..data.place_info import PlaceInfo
24 from ..data.place_name import PlaceName
25 from .icu_rule_loader import ICURuleLoader
26 from .place_sanitizer import PlaceSanitizer
27 from .icu_token_analysis import ICUTokenAnalysis
28 from .base import AbstractAnalyzer, AbstractTokenizer
30 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
32 LOG = logging.getLogger()
34 WORD_TYPES = (('country_names', 'C'),
37 ('housenumbers', 'H'))
40 def create(dsn: str) -> 'ICUTokenizer':
41 """ Create a new instance of the tokenizer provided by this module.
43 return ICUTokenizer(dsn)
46 class ICUTokenizer(AbstractTokenizer):
47 """ This tokenizer uses libICU to convert names and queries to ASCII.
48 Otherwise it uses the same algorithms and data structures as the
49 normalization routines in Nominatim 3.
52 def __init__(self, dsn: str) -> None:
54 self.loader: Optional[ICURuleLoader] = None
56 def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
57 """ Set up a new tokenizer for the database.
59 This copies all necessary data in the project directory to make
60 sure the tokenizer remains stable even over updates.
62 self.loader = ICURuleLoader(config)
67 self.update_sql_functions(config)
68 self._setup_db_tables(config)
69 self._create_base_indices(config, 'word')
71 def init_from_project(self, config: Configuration) -> None:
72 """ Initialise the tokenizer from the project directory.
74 self.loader = ICURuleLoader(config)
76 with connect(self.dsn) as conn:
77 self.loader.load_config_from_db(conn)
79 def finalize_import(self, config: Configuration) -> None:
80 """ Do any required postprocessing to make the tokenizer data ready
83 self._create_lookup_indices(config, 'word')
85 def update_sql_functions(self, config: Configuration) -> None:
86 """ Reimport the SQL functions for this tokenizer.
88 with connect(self.dsn) as conn:
89 sqlp = SQLPreprocessor(conn, config)
90 sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
92 def check_database(self, config: Configuration) -> None:
93 """ Check that the tokenizer is set up correctly.
95 # Will throw an error if there is an issue.
96 self.init_from_project(config)
98 def update_statistics(self, config: Configuration, threads: int = 2) -> None:
99 """ Recompute frequencies for all name words.
101 with connect(self.dsn) as conn:
102 if not table_exists(conn, 'search_name'):
105 with conn.cursor() as cur:
106 cur.execute('ANALYSE search_name')
108 cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
109 .format(pysql.Literal(min(threads, 6),)))
111 LOG.info('Computing word frequencies')
112 drop_tables(conn, 'word_frequencies')
114 CREATE TEMP TABLE word_frequencies AS
115 WITH word_freq AS MATERIALIZED (
116 SELECT unnest(name_vector) as id, count(*)
117 FROM search_name GROUP BY id),
118 addr_freq AS MATERIALIZED (
119 SELECT unnest(nameaddress_vector) as id, count(*)
120 FROM search_name GROUP BY id)
121 SELECT coalesce(a.id, w.id) as id,
122 (CASE WHEN w.count is null or w.count <= 1 THEN '{}'::JSONB
123 ELSE jsonb_build_object('count', w.count) END
125 CASE WHEN a.count is null or a.count <= 1 THEN '{}'::JSONB
126 ELSE jsonb_build_object('addr_count', a.count) END) as info
127 FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
129 cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
130 cur.execute('ANALYSE word_frequencies')
131 LOG.info('Update word table with recomputed frequencies')
132 drop_tables(conn, 'tmp_word')
133 cur.execute("""CREATE TABLE tmp_word AS
134 SELECT word_id, word_token, type, word,
135 coalesce(word.info, '{}'::jsonb)
136 - 'count' - 'addr_count' ||
137 coalesce(wf.info, '{}'::jsonb)
139 FROM word LEFT JOIN word_frequencies wf
140 ON word.word_id = wf.id
142 drop_tables(conn, 'word_frequencies')
144 with conn.cursor() as cur:
145 cur.execute('SET max_parallel_workers_per_gather TO 0')
147 sqlp = SQLPreprocessor(conn, config)
148 sqlp.run_string(conn,
149 'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
151 self._create_base_indices(config, 'tmp_word')
152 self._create_lookup_indices(config, 'tmp_word')
153 self._move_temporary_word_table('tmp_word')
155 def _cleanup_housenumbers(self) -> None:
156 """ Remove unused house numbers.
158 with connect(self.dsn) as conn:
159 if not table_exists(conn, 'search_name'):
161 with conn.cursor(name="hnr_counter") as cur:
162 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
165 AND NOT EXISTS(SELECT * FROM search_name
166 WHERE ARRAY[word.word_id] && name_vector)
167 AND (char_length(coalesce(word, word_token)) > 6
168 OR coalesce(word, word_token) not similar to '\\d+')
170 candidates = {token: wid for wid, token in cur}
171 with conn.cursor(name="hnr_counter") as cur:
172 cur.execute("""SELECT housenumber FROM placex
173 WHERE housenumber is not null
174 AND (char_length(housenumber) > 6
175 OR housenumber not similar to '\\d+')
178 for hnr in row[0].split(';'):
179 candidates.pop(hnr, None)
180 LOG.info("There are %s outdated housenumbers.", len(candidates))
181 LOG.debug("Outdated housenumbers: %s", candidates.keys())
183 with conn.cursor() as cur:
184 cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
185 (list(candidates.values()), ))
188 def update_word_tokens(self) -> None:
189 """ Remove unused tokens.
191 LOG.warning("Cleaning up housenumber tokens.")
192 self._cleanup_housenumbers()
193 LOG.warning("Tokenizer house-keeping done.")
195 def name_analyzer(self) -> 'ICUNameAnalyzer':
196 """ Create a new analyzer for tokenizing names and queries
197 using this tokinzer. Analyzers are context managers and should
201 with tokenizer.name_analyzer() as analyzer:
205 When used outside the with construct, the caller must ensure to
206 call the close() function before destructing the analyzer.
208 Analyzers are not thread-safe. You need to instantiate one per thread.
210 assert self.loader is not None
211 return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
212 self.loader.make_token_analysis())
214 def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
215 """ Return a list of the `num` most frequent full words
218 with conn.cursor() as cur:
219 cur.execute("""SELECT word, sum((info->>'count')::int) as count
220 FROM word WHERE type = 'W'
222 ORDER BY count DESC LIMIT %s""", (num,))
223 return list(s[0].split('@')[0] for s in cur)
225 def _save_config(self) -> None:
226 """ Save the configuration that needs to remain stable for the given
227 database as database properties.
229 assert self.loader is not None
230 with connect(self.dsn) as conn:
231 self.loader.save_config_to_db(conn)
233 def _setup_db_tables(self, config: Configuration) -> None:
234 """ Set up the word table and fill it with pre-computed word
237 with connect(self.dsn) as conn:
238 drop_tables(conn, 'word')
239 sqlp = SQLPreprocessor(conn, config)
240 sqlp.run_string(conn, """
243 word_token text NOT NULL,
247 ) {{db.tablespace.search_data}};
248 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
250 DROP SEQUENCE IF EXISTS seq_word;
251 CREATE SEQUENCE seq_word start 1;
252 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
256 def _create_base_indices(self, config: Configuration, table_name: str) -> None:
257 """ Set up the word table and fill it with pre-computed word
260 with connect(self.dsn) as conn:
261 sqlp = SQLPreprocessor(conn, config)
262 sqlp.run_string(conn,
263 """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
264 USING BTREE (word_token) {{db.tablespace.search_index}}""",
265 table_name=table_name)
266 for name, ctype in WORD_TYPES:
267 sqlp.run_string(conn,
268 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
269 USING BTREE (word) {{db.tablespace.address_index}}
270 WHERE type = '{{column_type}}'
272 table_name=table_name, idx_name=name,
276 def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
277 """ Create additional indexes used when running the API.
279 with connect(self.dsn) as conn:
280 sqlp = SQLPreprocessor(conn, config)
281 # Index required for details lookup.
285 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
286 ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
288 table_name=table_name)
291 def _move_temporary_word_table(self, old: str) -> None:
292 """ Rename all tables and indexes used by the tokenizer.
294 with connect(self.dsn) as conn:
295 drop_tables(conn, 'word')
296 with conn.cursor() as cur:
297 cur.execute(f"ALTER TABLE {old} RENAME TO word")
298 for idx in ('word_token', 'word_id'):
299 cur.execute(f"""ALTER INDEX idx_{old}_{idx}
300 RENAME TO idx_word_{idx}""")
301 for name, _ in WORD_TYPES:
302 cur.execute(f"""ALTER INDEX idx_{old}_{name}
303 RENAME TO idx_word_{name}""")
307 class ICUNameAnalyzer(AbstractAnalyzer):
308 """ The ICU analyzer uses the ICU library for splitting names.
310 Each instance opens a connection to the database to request the
314 def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
315 token_analysis: ICUTokenAnalysis) -> None:
316 self.conn: Optional[Connection] = connect(dsn)
317 self.conn.autocommit = True
318 self.sanitizer = sanitizer
319 self.token_analysis = token_analysis
321 self._cache = _TokenCache()
323 def close(self) -> None:
324 """ Free all resources used by the analyzer.
330 def _search_normalized(self, name: str) -> str:
331 """ Return the search token transliteration of the given name.
333 return cast(str, self.token_analysis.search.transliterate(name)).strip()
335 def _normalized(self, name: str) -> str:
336 """ Return the normalized version of the given name with all
337 non-relevant information removed.
339 return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
341 def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
342 """ Return token information for the given list of words.
343 If a word starts with # it is assumed to be a full name
344 otherwise is a partial name.
346 The function returns a list of tuples with
347 (original word, word token, word id).
349 The function is used for testing and debugging only
350 and not necessarily efficient.
352 assert self.conn is not None
356 if word.startswith('#'):
357 full_tokens[word] = self._search_normalized(word[1:])
359 partial_tokens[word] = self._search_normalized(word)
361 with self.conn.cursor() as cur:
362 cur.execute("""SELECT word_token, word_id
363 FROM word WHERE word_token = ANY(%s) and type = 'W'
364 """, (list(full_tokens.values()),))
365 full_ids = {r[0]: r[1] for r in cur}
366 cur.execute("""SELECT word_token, word_id
367 FROM word WHERE word_token = ANY(%s) and type = 'w'""",
368 (list(partial_tokens.values()),))
369 part_ids = {r[0]: r[1] for r in cur}
371 return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
372 + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
374 def normalize_postcode(self, postcode: str) -> str:
375 """ Convert the postcode to a standardized form.
377 This function must yield exactly the same result as the SQL function
378 'token_normalized_postcode()'.
380 return postcode.strip().upper()
382 def update_postcodes_from_db(self) -> None:
385 Removes all postcodes from the word table because they are not
386 needed. Postcodes are recognised by pattern.
388 assert self.conn is not None
390 with self.conn.cursor() as cur:
391 cur.execute("DELETE FROM word WHERE type = 'P'")
393 def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
394 should_replace: bool) -> None:
395 """ Replace the search index for special phrases with the new phrases.
396 If `should_replace` is True, then the previous set of will be
397 completely replaced. Otherwise the phrases are added to the
398 already existing ones.
400 assert self.conn is not None
401 norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
404 with self.conn.cursor() as cur:
405 # Get the old phrases.
406 existing_phrases = set()
407 cur.execute("SELECT word, info FROM word WHERE type = 'S'")
408 for word, info in cur:
409 existing_phrases.add((word, info['class'], info['type'],
410 info.get('op') or '-'))
412 added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
414 deleted = self._remove_special_phrases(cur, norm_phrases,
419 LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
420 len(norm_phrases), added, deleted)
422 def _add_special_phrases(self, cursor: Cursor,
423 new_phrases: Set[Tuple[str, str, str, str]],
424 existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
425 """ Add all phrases to the database that are not yet there.
427 to_add = new_phrases - existing_phrases
430 with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
431 for word, cls, typ, oper in to_add:
432 term = self._search_normalized(word)
434 copy.write_row((term, 'S', word,
435 Jsonb({'class': cls, 'type': typ,
436 'op': oper if oper in ('in', 'near') else None})))
441 def _remove_special_phrases(self, cursor: Cursor,
442 new_phrases: Set[Tuple[str, str, str, str]],
443 existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
444 """ Remove all phrases from the database that are no longer in the
447 to_delete = existing_phrases - new_phrases
452 WHERE type = 'S' and word = %s
453 and info->>'class' = %s and info->>'type' = %s
454 and %s = coalesce(info->>'op', '-')
457 return len(to_delete)
459 def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
460 """ Add default names for the given country to the search index.
462 # Make sure any name preprocessing for country names applies.
463 info = PlaceInfo({'name': names, 'country_code': country_code,
464 'rank_address': 4, 'class': 'boundary',
465 'type': 'administrative'})
466 self._add_country_full_names(country_code,
467 self.sanitizer.process_names(info)[0],
470 def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
471 internal: bool = False) -> None:
472 """ Add names for the given country from an already sanitized
475 assert self.conn is not None
478 norm_name = self._search_normalized(name.name)
480 word_tokens.add(norm_name)
482 with self.conn.cursor() as cur:
484 cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
486 WHERE type = 'C' and word = %s""",
488 # internal/external names
489 existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
491 existing_tokens[word[1]].add(word[0])
493 # Delete names that no longer exist.
494 gone_tokens = existing_tokens[internal] - word_tokens
496 gone_tokens.update(existing_tokens[False] & word_tokens)
498 cur.execute("""DELETE FROM word
499 USING unnest(%s::text[]) as token
500 WHERE type = 'C' and word = %s
501 and word_token = token""",
502 (list(gone_tokens), country_code))
504 # Only add those names that are not yet in the list.
505 new_tokens = word_tokens - existing_tokens[True]
507 new_tokens -= existing_tokens[False]
510 sql = """INSERT INTO word (word_token, type, word, info)
511 (SELECT token, 'C', %s, '{"internal": "yes"}'
512 FROM unnest(%s::text[]) as token)
515 sql = """INSERT INTO word (word_token, type, word)
516 (SELECT token, 'C', %s
517 FROM unnest(%s::text[]) as token)
519 cur.execute(sql, (country_code, list(new_tokens)))
521 def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
522 """ Determine tokenizer information about the given place.
524 Returns a JSON-serializable structure that will be handed into
525 the database via the token_info field.
527 token_info = _TokenInfo()
529 names, address = self.sanitizer.process_names(place)
532 token_info.set_names(*self._compute_name_tokens(names))
534 if place.is_country():
535 assert place.country_code is not None
536 self._add_country_full_names(place.country_code, names)
539 self._process_place_address(token_info, address)
541 return token_info.to_dict()
543 def _process_place_address(self, token_info: '_TokenInfo',
544 address: Sequence[PlaceName]) -> None:
546 if item.kind == 'postcode':
547 token_info.set_postcode(self._add_postcode(item))
548 elif item.kind == 'housenumber':
549 token_info.add_housenumber(*self._compute_housenumber_token(item))
550 elif item.kind == 'street':
551 token_info.add_street(self._retrieve_full_tokens(item.name))
552 elif item.kind == 'place':
554 token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
555 elif (not item.kind.startswith('_') and not item.suffix and
556 item.kind not in ('country', 'full', 'inclusion')):
557 token_info.add_address_term(item.kind,
558 itertools.chain(*self._compute_name_tokens([item])))
560 def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
561 """ Normalize the housenumber and return the word token and the
564 assert self.conn is not None
565 analyzer = self.token_analysis.analysis.get('@housenumber')
566 result: Tuple[Optional[int], Optional[str]] = (None, None)
569 # When no custom analyzer is set, simply normalize and transliterate
570 norm_name = self._search_normalized(hnr.name)
572 result = self._cache.housenumbers.get(norm_name, result)
573 if result[0] is None:
574 hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
576 result = hid, norm_name
577 self._cache.housenumbers[norm_name] = result
579 # Otherwise use the analyzer to determine the canonical name.
580 # Per convention we use the first variant as the 'lookup name', the
581 # name that gets saved in the housenumber field of the place.
582 word_id = analyzer.get_canonical_id(hnr)
584 result = self._cache.housenumbers.get(word_id, result)
585 if result[0] is None:
586 varout = analyzer.compute_variants(word_id)
587 if isinstance(varout, tuple):
592 hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
594 result = hid, variants[0]
595 self._cache.housenumbers[word_id] = result
599 def _retrieve_full_tokens(self, name: str) -> List[int]:
600 """ Get the full name token for the given name, if it exists.
601 The name is only retrieved for the standard analyser.
603 assert self.conn is not None
604 norm_name = self._search_normalized(name)
606 # return cached if possible
607 if norm_name in self._cache.fulls:
608 return self._cache.fulls[norm_name]
610 with self.conn.cursor() as cur:
611 cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
613 full = [row[0] for row in cur]
615 self._cache.fulls[norm_name] = full
619 def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
620 """ Computes the full name and partial name tokens for the given
623 assert self.conn is not None
624 full_tokens: Set[int] = set()
625 partial_tokens: Set[int] = set()
628 analyzer_id = name.get_attr('analyzer')
629 analyzer = self.token_analysis.get_analyzer(analyzer_id)
630 word_id = analyzer.get_canonical_id(name)
631 if analyzer_id is None:
634 token_id = f'{word_id}@{analyzer_id}'
636 full, part = self._cache.names.get(token_id, (None, None))
638 varset = analyzer.compute_variants(word_id)
639 if isinstance(varset, tuple):
640 variants, lookups = varset
642 variants, lookups = varset, None
646 with self.conn.cursor() as cur:
647 cur.execute("SELECT * FROM getorcreate_full_word(%s, %s, %s)",
648 (token_id, variants, lookups))
649 full, part = cast(Tuple[int, List[int]], cur.fetchone())
651 self._cache.names[token_id] = (full, part)
653 assert part is not None
655 full_tokens.add(full)
656 partial_tokens.update(part)
658 return full_tokens, partial_tokens
660 def _add_postcode(self, item: PlaceName) -> Optional[str]:
661 """ Make sure the normalized postcode is present in the word table.
663 assert self.conn is not None
664 analyzer = self.token_analysis.analysis.get('@postcode')
667 return item.name.strip().upper()
669 return analyzer.get_canonical_id(item)
673 """ Collect token information to be sent back to the database.
675 def __init__(self) -> None:
676 self.names: Optional[str] = None
677 self.housenumbers: Set[str] = set()
678 self.housenumber_tokens: Set[int] = set()
679 self.street_tokens: Optional[Set[int]] = None
680 self.place_tokens: Set[int] = set()
681 self.address_tokens: Dict[str, str] = {}
682 self.postcode: Optional[str] = None
684 def _mk_array(self, tokens: Iterable[Any]) -> str:
685 return f"{{{','.join((str(s) for s in tokens))}}}"
687 def to_dict(self) -> Dict[str, Any]:
688 """ Return the token information in database importable format.
690 out: Dict[str, Any] = {}
693 out['names'] = self.names
695 if self.housenumbers:
696 out['hnr'] = ';'.join(self.housenumbers)
697 out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
699 if self.street_tokens is not None:
700 out['street'] = self._mk_array(self.street_tokens)
702 if self.place_tokens:
703 out['place'] = self._mk_array(self.place_tokens)
705 if self.address_tokens:
706 out['addr'] = self.address_tokens
709 out['postcode'] = self.postcode
713 def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
714 """ Adds token information for the normalised names.
716 self.names = self._mk_array(itertools.chain(fulls, partials))
718 def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
719 """ Extract housenumber information from a list of normalised
723 assert hnr is not None
724 self.housenumbers.add(hnr)
725 self.housenumber_tokens.add(token)
727 def add_street(self, tokens: Iterable[int]) -> None:
728 """ Add addr:street match terms.
730 if self.street_tokens is None:
731 self.street_tokens = set()
732 self.street_tokens.update(tokens)
734 def add_place(self, tokens: Iterable[int]) -> None:
735 """ Add addr:place search and match terms.
737 self.place_tokens.update(tokens)
739 def add_address_term(self, key: str, partials: Iterable[int]) -> None:
740 """ Add additional address terms.
742 array = self._mk_array(partials)
744 self.address_tokens[key] = array
746 def set_postcode(self, postcode: Optional[str]) -> None:
747 """ Set the postcode to the given one.
749 self.postcode = postcode
753 """ Cache for token information to avoid repeated database queries.
755 This cache is not thread-safe and needs to be instantiated per
758 def __init__(self) -> None:
759 self.names: Dict[str, Tuple[int, List[int]]] = {}
760 self.partials: Dict[str, int] = {}
761 self.fulls: Dict[str, List[int]] = {}
762 self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}