1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
15 from pathlib import Path
17 from psycopg.types.json import Jsonb
18 from psycopg import sql as pysql
20 from ..db.connection import connect, Connection, Cursor, \
21 drop_tables, table_exists, execute_scalar
22 from ..config import Configuration
23 from ..db.sql_preprocessor import SQLPreprocessor
24 from ..data.place_info import PlaceInfo
25 from ..data.place_name import PlaceName
26 from .icu_rule_loader import ICURuleLoader
27 from .place_sanitizer import PlaceSanitizer
28 from .icu_token_analysis import ICUTokenAnalysis
29 from .base import AbstractAnalyzer, AbstractTokenizer
31 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
33 LOG = logging.getLogger()
35 WORD_TYPES = (('country_names', 'C'),
38 ('housenumbers', 'H'))
41 def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
42 """ Create a new instance of the tokenizer provided by this module.
44 return ICUTokenizer(dsn, data_dir)
47 class ICUTokenizer(AbstractTokenizer):
48 """ This tokenizer uses libICU to convert names and queries to ASCII.
49 Otherwise it uses the same algorithms and data structures as the
50 normalization routines in Nominatim 3.
53 def __init__(self, dsn: str, data_dir: Path) -> None:
55 self.data_dir = data_dir
56 self.loader: Optional[ICURuleLoader] = None
58 def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
59 """ Set up a new tokenizer for the database.
61 This copies all necessary data in the project directory to make
62 sure the tokenizer remains stable even over updates.
64 self.loader = ICURuleLoader(config)
69 self.update_sql_functions(config)
70 self._setup_db_tables(config)
71 self._create_base_indices(config, 'word')
73 def init_from_project(self, config: Configuration) -> None:
74 """ Initialise the tokenizer from the project directory.
76 self.loader = ICURuleLoader(config)
78 with connect(self.dsn) as conn:
79 self.loader.load_config_from_db(conn)
81 def finalize_import(self, config: Configuration) -> None:
82 """ Do any required postprocessing to make the tokenizer data ready
85 self._create_lookup_indices(config, 'word')
87 def update_sql_functions(self, config: Configuration) -> None:
88 """ Reimport the SQL functions for this tokenizer.
90 with connect(self.dsn) as conn:
91 sqlp = SQLPreprocessor(conn, config)
92 sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
94 def check_database(self, config: Configuration) -> None:
95 """ Check that the tokenizer is set up correctly.
97 # Will throw an error if there is an issue.
98 self.init_from_project(config)
100 def update_statistics(self, config: Configuration, threads: int = 2) -> None:
101 """ Recompute frequencies for all name words.
103 with connect(self.dsn) as conn:
104 if not table_exists(conn, 'search_name'):
107 with conn.cursor() as cur:
108 cur.execute('ANALYSE search_name')
110 cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
111 .format(pysql.Literal(min(threads, 6),)))
113 LOG.info('Computing word frequencies')
114 drop_tables(conn, 'word_frequencies')
116 CREATE TEMP TABLE word_frequencies AS
117 WITH word_freq AS MATERIALIZED (
118 SELECT unnest(name_vector) as id, count(*)
119 FROM search_name GROUP BY id),
120 addr_freq AS MATERIALIZED (
121 SELECT unnest(nameaddress_vector) as id, count(*)
122 FROM search_name GROUP BY id)
123 SELECT coalesce(a.id, w.id) as id,
124 (CASE WHEN w.count is null or w.count <= 1 THEN '{}'::JSONB
125 ELSE jsonb_build_object('count', w.count) END
127 CASE WHEN a.count is null or a.count <= 1 THEN '{}'::JSONB
128 ELSE jsonb_build_object('addr_count', a.count) END) as info
129 FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
131 cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
132 cur.execute('ANALYSE word_frequencies')
133 LOG.info('Update word table with recomputed frequencies')
134 drop_tables(conn, 'tmp_word')
135 cur.execute("""CREATE TABLE tmp_word AS
136 SELECT word_id, word_token, type, word,
137 coalesce(word.info, '{}'::jsonb)
138 - 'count' - 'addr_count' ||
139 coalesce(wf.info, '{}'::jsonb)
141 FROM word LEFT JOIN word_frequencies wf
142 ON word.word_id = wf.id
145 drop_tables(conn, 'word_frequencies')
147 with conn.cursor() as cur:
148 cur.execute('SET max_parallel_workers_per_gather TO 0')
150 sqlp = SQLPreprocessor(conn, config)
151 sqlp.run_string(conn,
152 'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
154 self._create_base_indices(config, 'tmp_word')
155 self._create_lookup_indices(config, 'tmp_word')
156 self._move_temporary_word_table('tmp_word')
158 def _cleanup_housenumbers(self) -> None:
159 """ Remove unused house numbers.
161 with connect(self.dsn) as conn:
162 if not table_exists(conn, 'search_name'):
164 with conn.cursor(name="hnr_counter") as cur:
165 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
168 AND NOT EXISTS(SELECT * FROM search_name
169 WHERE ARRAY[word.word_id] && name_vector)
170 AND (char_length(coalesce(word, word_token)) > 6
171 OR coalesce(word, word_token) not similar to '\\d+')
173 candidates = {token: wid for wid, token in cur}
174 with conn.cursor(name="hnr_counter") as cur:
175 cur.execute("""SELECT housenumber FROM placex
176 WHERE housenumber is not null
177 AND (char_length(housenumber) > 6
178 OR housenumber not similar to '\\d+')
181 for hnr in row[0].split(';'):
182 candidates.pop(hnr, None)
183 LOG.info("There are %s outdated housenumbers.", len(candidates))
184 LOG.debug("Outdated housenumbers: %s", candidates.keys())
186 with conn.cursor() as cur:
187 cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
188 (list(candidates.values()), ))
191 def update_word_tokens(self) -> None:
192 """ Remove unused tokens.
194 LOG.warning("Cleaning up housenumber tokens.")
195 self._cleanup_housenumbers()
196 LOG.warning("Tokenizer house-keeping done.")
198 def name_analyzer(self) -> 'ICUNameAnalyzer':
199 """ Create a new analyzer for tokenizing names and queries
200 using this tokinzer. Analyzers are context managers and should
204 with tokenizer.name_analyzer() as analyzer:
208 When used outside the with construct, the caller must ensure to
209 call the close() function before destructing the analyzer.
211 Analyzers are not thread-safe. You need to instantiate one per thread.
213 assert self.loader is not None
214 return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
215 self.loader.make_token_analysis())
217 def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
218 """ Return a list of the `num` most frequent full words
221 with conn.cursor() as cur:
222 cur.execute("""SELECT word, sum((info->>'count')::int) as count
223 FROM word WHERE type = 'W'
225 ORDER BY count DESC LIMIT %s""", (num,))
226 return list(s[0].split('@')[0] for s in cur)
228 def _save_config(self) -> None:
229 """ Save the configuration that needs to remain stable for the given
230 database as database properties.
232 assert self.loader is not None
233 with connect(self.dsn) as conn:
234 self.loader.save_config_to_db(conn)
236 def _setup_db_tables(self, config: Configuration) -> None:
237 """ Set up the word table and fill it with pre-computed word
240 with connect(self.dsn) as conn:
241 drop_tables(conn, 'word')
242 sqlp = SQLPreprocessor(conn, config)
243 sqlp.run_string(conn, """
246 word_token text NOT NULL,
250 ) {{db.tablespace.search_data}};
251 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
253 DROP SEQUENCE IF EXISTS seq_word;
254 CREATE SEQUENCE seq_word start 1;
255 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
259 def _create_base_indices(self, config: Configuration, table_name: str) -> None:
260 """ Set up the word table and fill it with pre-computed word
263 with connect(self.dsn) as conn:
264 sqlp = SQLPreprocessor(conn, config)
265 sqlp.run_string(conn,
266 """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
267 USING BTREE (word_token) {{db.tablespace.search_index}}""",
268 table_name=table_name)
269 for name, ctype in WORD_TYPES:
270 sqlp.run_string(conn,
271 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
272 USING BTREE (word) {{db.tablespace.address_index}}
273 WHERE type = '{{column_type}}'
275 table_name=table_name, idx_name=name,
279 def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
280 """ Create additional indexes used when running the API.
282 with connect(self.dsn) as conn:
283 sqlp = SQLPreprocessor(conn, config)
284 # Index required for details lookup.
288 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
289 ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
291 table_name=table_name)
294 def _move_temporary_word_table(self, old: str) -> None:
295 """ Rename all tables and indexes used by the tokenizer.
297 with connect(self.dsn) as conn:
298 drop_tables(conn, 'word')
299 with conn.cursor() as cur:
300 cur.execute(f"ALTER TABLE {old} RENAME TO word")
301 for idx in ('word_token', 'word_id'):
302 cur.execute(f"""ALTER INDEX idx_{old}_{idx}
303 RENAME TO idx_word_{idx}""")
304 for name, _ in WORD_TYPES:
305 cur.execute(f"""ALTER INDEX idx_{old}_{name}
306 RENAME TO idx_word_{name}""")
310 class ICUNameAnalyzer(AbstractAnalyzer):
311 """ The ICU analyzer uses the ICU library for splitting names.
313 Each instance opens a connection to the database to request the
317 def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
318 token_analysis: ICUTokenAnalysis) -> None:
319 self.conn: Optional[Connection] = connect(dsn)
320 self.conn.autocommit = True
321 self.sanitizer = sanitizer
322 self.token_analysis = token_analysis
324 self._cache = _TokenCache()
326 def close(self) -> None:
327 """ Free all resources used by the analyzer.
333 def _search_normalized(self, name: str) -> str:
334 """ Return the search token transliteration of the given name.
336 return cast(str, self.token_analysis.search.transliterate(name)).strip()
338 def _normalized(self, name: str) -> str:
339 """ Return the normalized version of the given name with all
340 non-relevant information removed.
342 return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
344 def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
345 """ Return token information for the given list of words.
346 If a word starts with # it is assumed to be a full name
347 otherwise is a partial name.
349 The function returns a list of tuples with
350 (original word, word token, word id).
352 The function is used for testing and debugging only
353 and not necessarily efficient.
355 assert self.conn is not None
359 if word.startswith('#'):
360 full_tokens[word] = self._search_normalized(word[1:])
362 partial_tokens[word] = self._search_normalized(word)
364 with self.conn.cursor() as cur:
365 cur.execute("""SELECT word_token, word_id
366 FROM word WHERE word_token = ANY(%s) and type = 'W'
367 """, (list(full_tokens.values()),))
368 full_ids = {r[0]: r[1] for r in cur}
369 cur.execute("""SELECT word_token, word_id
370 FROM word WHERE word_token = ANY(%s) and type = 'w'""",
371 (list(partial_tokens.values()),))
372 part_ids = {r[0]: r[1] for r in cur}
374 return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
375 + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
377 def normalize_postcode(self, postcode: str) -> str:
378 """ Convert the postcode to a standardized form.
380 This function must yield exactly the same result as the SQL function
381 'token_normalized_postcode()'.
383 return postcode.strip().upper()
385 def update_postcodes_from_db(self) -> None:
388 Removes all postcodes from the word table because they are not
389 needed. Postcodes are recognised by pattern.
391 assert self.conn is not None
393 with self.conn.cursor() as cur:
394 cur.execute("DELETE FROM word WHERE type = 'P'")
396 def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
397 should_replace: bool) -> None:
398 """ Replace the search index for special phrases with the new phrases.
399 If `should_replace` is True, then the previous set of will be
400 completely replaced. Otherwise the phrases are added to the
401 already existing ones.
403 assert self.conn is not None
404 norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
407 with self.conn.cursor() as cur:
408 # Get the old phrases.
409 existing_phrases = set()
410 cur.execute("SELECT word, info FROM word WHERE type = 'S'")
411 for word, info in cur:
412 existing_phrases.add((word, info['class'], info['type'],
413 info.get('op') or '-'))
415 added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
417 deleted = self._remove_special_phrases(cur, norm_phrases,
422 LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
423 len(norm_phrases), added, deleted)
425 def _add_special_phrases(self, cursor: Cursor,
426 new_phrases: Set[Tuple[str, str, str, str]],
427 existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
428 """ Add all phrases to the database that are not yet there.
430 to_add = new_phrases - existing_phrases
433 with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
434 for word, cls, typ, oper in to_add:
435 term = self._search_normalized(word)
437 copy.write_row((term, 'S', word,
438 Jsonb({'class': cls, 'type': typ,
439 'op': oper if oper in ('in', 'near') else None})))
444 def _remove_special_phrases(self, cursor: Cursor,
445 new_phrases: Set[Tuple[str, str, str, str]],
446 existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
447 """ Remove all phrases from the database that are no longer in the
450 to_delete = existing_phrases - new_phrases
455 WHERE type = 'S' and word = %s
456 and info->>'class' = %s and info->>'type' = %s
457 and %s = coalesce(info->>'op', '-')
460 return len(to_delete)
462 def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
463 """ Add default names for the given country to the search index.
465 # Make sure any name preprocessing for country names applies.
466 info = PlaceInfo({'name': names, 'country_code': country_code,
467 'rank_address': 4, 'class': 'boundary',
468 'type': 'administrative'})
469 self._add_country_full_names(country_code,
470 self.sanitizer.process_names(info)[0],
473 def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
474 internal: bool = False) -> None:
475 """ Add names for the given country from an already sanitized
478 assert self.conn is not None
481 norm_name = self._search_normalized(name.name)
483 word_tokens.add(norm_name)
485 with self.conn.cursor() as cur:
487 cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
489 WHERE type = 'C' and word = %s""",
491 # internal/external names
492 existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
494 existing_tokens[word[1]].add(word[0])
496 # Delete names that no longer exist.
497 gone_tokens = existing_tokens[internal] - word_tokens
499 gone_tokens.update(existing_tokens[False] & word_tokens)
501 cur.execute("""DELETE FROM word
502 USING unnest(%s::text[]) as token
503 WHERE type = 'C' and word = %s
504 and word_token = token""",
505 (list(gone_tokens), country_code))
507 # Only add those names that are not yet in the list.
508 new_tokens = word_tokens - existing_tokens[True]
510 new_tokens -= existing_tokens[False]
513 sql = """INSERT INTO word (word_token, type, word, info)
514 (SELECT token, 'C', %s, '{"internal": "yes"}'
515 FROM unnest(%s::text[]) as token)
518 sql = """INSERT INTO word (word_token, type, word)
519 (SELECT token, 'C', %s
520 FROM unnest(%s::text[]) as token)
522 cur.execute(sql, (country_code, list(new_tokens)))
524 def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
525 """ Determine tokenizer information about the given place.
527 Returns a JSON-serializable structure that will be handed into
528 the database via the token_info field.
530 token_info = _TokenInfo()
532 names, address = self.sanitizer.process_names(place)
535 token_info.set_names(*self._compute_name_tokens(names))
537 if place.is_country():
538 assert place.country_code is not None
539 self._add_country_full_names(place.country_code, names)
542 self._process_place_address(token_info, address)
544 return token_info.to_dict()
546 def _process_place_address(self, token_info: '_TokenInfo',
547 address: Sequence[PlaceName]) -> None:
549 if item.kind == 'postcode':
550 token_info.set_postcode(self._add_postcode(item))
551 elif item.kind == 'housenumber':
552 token_info.add_housenumber(*self._compute_housenumber_token(item))
553 elif item.kind == 'street':
554 token_info.add_street(self._retrieve_full_tokens(item.name))
555 elif item.kind == 'place':
557 token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
558 elif (not item.kind.startswith('_') and not item.suffix and
559 item.kind not in ('country', 'full', 'inclusion')):
560 token_info.add_address_term(item.kind,
561 itertools.chain(*self._compute_name_tokens([item])))
563 def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
564 """ Normalize the housenumber and return the word token and the
567 assert self.conn is not None
568 analyzer = self.token_analysis.analysis.get('@housenumber')
569 result: Tuple[Optional[int], Optional[str]] = (None, None)
572 # When no custom analyzer is set, simply normalize and transliterate
573 norm_name = self._search_normalized(hnr.name)
575 result = self._cache.housenumbers.get(norm_name, result)
576 if result[0] is None:
577 hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
579 result = hid, norm_name
580 self._cache.housenumbers[norm_name] = result
582 # Otherwise use the analyzer to determine the canonical name.
583 # Per convention we use the first variant as the 'lookup name', the
584 # name that gets saved in the housenumber field of the place.
585 word_id = analyzer.get_canonical_id(hnr)
587 result = self._cache.housenumbers.get(word_id, result)
588 if result[0] is None:
589 varout = analyzer.compute_variants(word_id)
590 if isinstance(varout, tuple):
595 hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
597 result = hid, variants[0]
598 self._cache.housenumbers[word_id] = result
602 def _retrieve_full_tokens(self, name: str) -> List[int]:
603 """ Get the full name token for the given name, if it exists.
604 The name is only retrieved for the standard analyser.
606 assert self.conn is not None
607 norm_name = self._search_normalized(name)
609 # return cached if possible
610 if norm_name in self._cache.fulls:
611 return self._cache.fulls[norm_name]
613 with self.conn.cursor() as cur:
614 cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
616 full = [row[0] for row in cur]
618 self._cache.fulls[norm_name] = full
622 def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
623 """ Computes the full name and partial name tokens for the given
626 assert self.conn is not None
627 full_tokens: Set[int] = set()
628 partial_tokens: Set[int] = set()
631 analyzer_id = name.get_attr('analyzer')
632 analyzer = self.token_analysis.get_analyzer(analyzer_id)
633 word_id = analyzer.get_canonical_id(name)
634 if analyzer_id is None:
637 token_id = f'{word_id}@{analyzer_id}'
639 full, part = self._cache.names.get(token_id, (None, None))
641 varset = analyzer.compute_variants(word_id)
642 if isinstance(varset, tuple):
643 variants, lookups = varset
645 variants, lookups = varset, None
649 with self.conn.cursor() as cur:
650 cur.execute("SELECT * FROM getorcreate_full_word(%s, %s, %s)",
651 (token_id, variants, lookups))
652 full, part = cast(Tuple[int, List[int]], cur.fetchone())
654 self._cache.names[token_id] = (full, part)
656 assert part is not None
658 full_tokens.add(full)
659 partial_tokens.update(part)
661 return full_tokens, partial_tokens
663 def _add_postcode(self, item: PlaceName) -> Optional[str]:
664 """ Make sure the normalized postcode is present in the word table.
666 assert self.conn is not None
667 analyzer = self.token_analysis.analysis.get('@postcode')
670 return item.name.strip().upper()
672 return analyzer.get_canonical_id(item)
676 """ Collect token information to be sent back to the database.
678 def __init__(self) -> None:
679 self.names: Optional[str] = None
680 self.housenumbers: Set[str] = set()
681 self.housenumber_tokens: Set[int] = set()
682 self.street_tokens: Optional[Set[int]] = None
683 self.place_tokens: Set[int] = set()
684 self.address_tokens: Dict[str, str] = {}
685 self.postcode: Optional[str] = None
687 def _mk_array(self, tokens: Iterable[Any]) -> str:
688 return f"{{{','.join((str(s) for s in tokens))}}}"
690 def to_dict(self) -> Dict[str, Any]:
691 """ Return the token information in database importable format.
693 out: Dict[str, Any] = {}
696 out['names'] = self.names
698 if self.housenumbers:
699 out['hnr'] = ';'.join(self.housenumbers)
700 out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
702 if self.street_tokens is not None:
703 out['street'] = self._mk_array(self.street_tokens)
705 if self.place_tokens:
706 out['place'] = self._mk_array(self.place_tokens)
708 if self.address_tokens:
709 out['addr'] = self.address_tokens
712 out['postcode'] = self.postcode
716 def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
717 """ Adds token information for the normalised names.
719 self.names = self._mk_array(itertools.chain(fulls, partials))
721 def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
722 """ Extract housenumber information from a list of normalised
726 assert hnr is not None
727 self.housenumbers.add(hnr)
728 self.housenumber_tokens.add(token)
730 def add_street(self, tokens: Iterable[int]) -> None:
731 """ Add addr:street match terms.
733 if self.street_tokens is None:
734 self.street_tokens = set()
735 self.street_tokens.update(tokens)
737 def add_place(self, tokens: Iterable[int]) -> None:
738 """ Add addr:place search and match terms.
740 self.place_tokens.update(tokens)
742 def add_address_term(self, key: str, partials: Iterable[int]) -> None:
743 """ Add additional address terms.
745 array = self._mk_array(partials)
747 self.address_tokens[key] = array
749 def set_postcode(self, postcode: Optional[str]) -> None:
750 """ Set the postcode to the given one.
752 self.postcode = postcode
756 """ Cache for token information to avoid repeated database queries.
758 This cache is not thread-safe and needs to be instantiated per
761 def __init__(self) -> None:
762 self.names: Dict[str, Tuple[int, List[int]]] = {}
763 self.partials: Dict[str, int] = {}
764 self.fulls: Dict[str, List[int]] = {}
765 self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}