]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tokenizer/icu_tokenizer.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
10 """
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
12                    Dict, Set, Iterable
13 import itertools
14 import logging
15 from pathlib import Path
16
17 from psycopg.types.json import Jsonb
18 from psycopg import sql as pysql
19
20 from ..db.connection import connect, Connection, Cursor, \
21                             drop_tables, table_exists, execute_scalar
22 from ..config import Configuration
23 from ..db.sql_preprocessor import SQLPreprocessor
24 from ..data.place_info import PlaceInfo
25 from ..data.place_name import PlaceName
26 from .icu_rule_loader import ICURuleLoader
27 from .place_sanitizer import PlaceSanitizer
28 from .icu_token_analysis import ICUTokenAnalysis
29 from .base import AbstractAnalyzer, AbstractTokenizer
30
31 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
32
33 LOG = logging.getLogger()
34
35 WORD_TYPES = (('country_names', 'C'),
36               ('postcodes', 'P'),
37               ('full_word', 'W'),
38               ('housenumbers', 'H'))
39
40
41 def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
42     """ Create a new instance of the tokenizer provided by this module.
43     """
44     return ICUTokenizer(dsn, data_dir)
45
46
47 class ICUTokenizer(AbstractTokenizer):
48     """ This tokenizer uses libICU to convert names and queries to ASCII.
49         Otherwise it uses the same algorithms and data structures as the
50         normalization routines in Nominatim 3.
51     """
52
53     def __init__(self, dsn: str, data_dir: Path) -> None:
54         self.dsn = dsn
55         self.data_dir = data_dir
56         self.loader: Optional[ICURuleLoader] = None
57
58     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
59         """ Set up a new tokenizer for the database.
60
61             This copies all necessary data in the project directory to make
62             sure the tokenizer remains stable even over updates.
63         """
64         self.loader = ICURuleLoader(config)
65
66         self._save_config()
67
68         if init_db:
69             self.update_sql_functions(config)
70             self._setup_db_tables(config)
71             self._create_base_indices(config, 'word')
72
73     def init_from_project(self, config: Configuration) -> None:
74         """ Initialise the tokenizer from the project directory.
75         """
76         self.loader = ICURuleLoader(config)
77
78         with connect(self.dsn) as conn:
79             self.loader.load_config_from_db(conn)
80
81     def finalize_import(self, config: Configuration) -> None:
82         """ Do any required postprocessing to make the tokenizer data ready
83             for use.
84         """
85         self._create_lookup_indices(config, 'word')
86
87     def update_sql_functions(self, config: Configuration) -> None:
88         """ Reimport the SQL functions for this tokenizer.
89         """
90         with connect(self.dsn) as conn:
91             sqlp = SQLPreprocessor(conn, config)
92             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
93
94     def check_database(self, config: Configuration) -> None:
95         """ Check that the tokenizer is set up correctly.
96         """
97         # Will throw an error if there is an issue.
98         self.init_from_project(config)
99
100     def update_statistics(self, config: Configuration, threads: int = 2) -> None:
101         """ Recompute frequencies for all name words.
102         """
103         with connect(self.dsn) as conn:
104             if not table_exists(conn, 'search_name'):
105                 return
106
107             with conn.cursor() as cur:
108                 cur.execute('ANALYSE search_name')
109                 if threads > 1:
110                     cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
111                                      .format(pysql.Literal(min(threads, 6),)))
112
113                 LOG.info('Computing word frequencies')
114                 drop_tables(conn, 'word_frequencies')
115                 cur.execute("""
116                   CREATE TEMP TABLE word_frequencies AS
117                   WITH word_freq AS MATERIALIZED (
118                            SELECT unnest(name_vector) as id, count(*)
119                                  FROM search_name GROUP BY id),
120                        addr_freq AS MATERIALIZED (
121                            SELECT unnest(nameaddress_vector) as id, count(*)
122                                  FROM search_name GROUP BY id)
123                   SELECT coalesce(a.id, w.id) as id,
124                          (CASE WHEN w.count is null THEN '{}'::JSONB
125                               ELSE jsonb_build_object('count', w.count) END
126                           ||
127                           CASE WHEN a.count is null THEN '{}'::JSONB
128                               ELSE jsonb_build_object('addr_count', a.count) END) as info
129                   FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
130                   """)
131                 cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
132                 cur.execute('ANALYSE word_frequencies')
133                 LOG.info('Update word table with recomputed frequencies')
134                 drop_tables(conn, 'tmp_word')
135                 cur.execute("""CREATE TABLE tmp_word AS
136                                 SELECT word_id, word_token, type, word,
137                                        (CASE WHEN wf.info is null THEN word.info
138                                         ELSE coalesce(word.info, '{}'::jsonb) || wf.info
139                                         END) as info
140                                 FROM word LEFT JOIN word_frequencies wf
141                                      ON word.word_id = wf.id
142                                 ORDER BY word_id
143                             """)
144                 drop_tables(conn, 'word_frequencies')
145
146             with conn.cursor() as cur:
147                 cur.execute('SET max_parallel_workers_per_gather TO 0')
148
149             sqlp = SQLPreprocessor(conn, config)
150             sqlp.run_string(conn,
151                             'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
152             conn.commit()
153         self._create_base_indices(config, 'tmp_word')
154         self._create_lookup_indices(config, 'tmp_word')
155         self._move_temporary_word_table('tmp_word')
156
157     def _cleanup_housenumbers(self) -> None:
158         """ Remove unused house numbers.
159         """
160         with connect(self.dsn) as conn:
161             if not table_exists(conn, 'search_name'):
162                 return
163             with conn.cursor(name="hnr_counter") as cur:
164                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
165                                FROM word
166                                WHERE type = 'H'
167                                  AND NOT EXISTS(SELECT * FROM search_name
168                                                 WHERE ARRAY[word.word_id] && name_vector)
169                                  AND (char_length(coalesce(word, word_token)) > 6
170                                       OR coalesce(word, word_token) not similar to '\\d+')
171                             """)
172                 candidates = {token: wid for wid, token in cur}
173             with conn.cursor(name="hnr_counter") as cur:
174                 cur.execute("""SELECT housenumber FROM placex
175                                WHERE housenumber is not null
176                                      AND (char_length(housenumber) > 6
177                                           OR housenumber not similar to '\\d+')
178                             """)
179                 for row in cur:
180                     for hnr in row[0].split(';'):
181                         candidates.pop(hnr, None)
182             LOG.info("There are %s outdated housenumbers.", len(candidates))
183             LOG.debug("Outdated housenumbers: %s", candidates.keys())
184             if candidates:
185                 with conn.cursor() as cur:
186                     cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
187                                 (list(candidates.values()), ))
188                 conn.commit()
189
190     def update_word_tokens(self) -> None:
191         """ Remove unused tokens.
192         """
193         LOG.warning("Cleaning up housenumber tokens.")
194         self._cleanup_housenumbers()
195         LOG.warning("Tokenizer house-keeping done.")
196
197     def name_analyzer(self) -> 'ICUNameAnalyzer':
198         """ Create a new analyzer for tokenizing names and queries
199             using this tokinzer. Analyzers are context managers and should
200             be used accordingly:
201
202             ```
203             with tokenizer.name_analyzer() as analyzer:
204                 analyser.tokenize()
205             ```
206
207             When used outside the with construct, the caller must ensure to
208             call the close() function before destructing the analyzer.
209
210             Analyzers are not thread-safe. You need to instantiate one per thread.
211         """
212         assert self.loader is not None
213         return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
214                                self.loader.make_token_analysis())
215
216     def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
217         """ Return a list of the `num` most frequent full words
218             in the database.
219         """
220         with conn.cursor() as cur:
221             cur.execute("""SELECT word, sum((info->>'count')::int) as count
222                              FROM word WHERE type = 'W'
223                              GROUP BY word
224                              ORDER BY count DESC LIMIT %s""", (num,))
225             return list(s[0].split('@')[0] for s in cur)
226
227     def _save_config(self) -> None:
228         """ Save the configuration that needs to remain stable for the given
229             database as database properties.
230         """
231         assert self.loader is not None
232         with connect(self.dsn) as conn:
233             self.loader.save_config_to_db(conn)
234
235     def _setup_db_tables(self, config: Configuration) -> None:
236         """ Set up the word table and fill it with pre-computed word
237             frequencies.
238         """
239         with connect(self.dsn) as conn:
240             drop_tables(conn, 'word')
241             sqlp = SQLPreprocessor(conn, config)
242             sqlp.run_string(conn, """
243                 CREATE TABLE word (
244                       word_id INTEGER,
245                       word_token text NOT NULL,
246                       type text NOT NULL,
247                       word text,
248                       info jsonb
249                     ) {{db.tablespace.search_data}};
250                 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
251
252                 DROP SEQUENCE IF EXISTS seq_word;
253                 CREATE SEQUENCE seq_word start 1;
254                 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
255             """)
256             conn.commit()
257
258     def _create_base_indices(self, config: Configuration, table_name: str) -> None:
259         """ Set up the word table and fill it with pre-computed word
260             frequencies.
261         """
262         with connect(self.dsn) as conn:
263             sqlp = SQLPreprocessor(conn, config)
264             sqlp.run_string(conn,
265                             """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
266                                USING BTREE (word_token) {{db.tablespace.search_index}}""",
267                             table_name=table_name)
268             for name, ctype in WORD_TYPES:
269                 sqlp.run_string(conn,
270                                 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
271                                    USING BTREE (word) {{db.tablespace.address_index}}
272                                    WHERE type = '{{column_type}}'
273                                 """,
274                                 table_name=table_name, idx_name=name,
275                                 column_type=ctype)
276             conn.commit()
277
278     def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
279         """ Create additional indexes used when running the API.
280         """
281         with connect(self.dsn) as conn:
282             sqlp = SQLPreprocessor(conn, config)
283             # Index required for details lookup.
284             sqlp.run_string(
285                 conn,
286                 """
287                 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
288                   ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
289                 """,
290                 table_name=table_name)
291             conn.commit()
292
293     def _move_temporary_word_table(self, old: str) -> None:
294         """ Rename all tables and indexes used by the tokenizer.
295         """
296         with connect(self.dsn) as conn:
297             drop_tables(conn, 'word')
298             with conn.cursor() as cur:
299                 cur.execute(f"ALTER TABLE {old} RENAME TO word")
300                 for idx in ('word_token', 'word_id'):
301                     cur.execute(f"""ALTER INDEX idx_{old}_{idx}
302                                       RENAME TO idx_word_{idx}""")
303                 for name, _ in WORD_TYPES:
304                     cur.execute(f"""ALTER INDEX idx_{old}_{name}
305                                     RENAME TO idx_word_{name}""")
306             conn.commit()
307
308
309 class ICUNameAnalyzer(AbstractAnalyzer):
310     """ The ICU analyzer uses the ICU library for splitting names.
311
312         Each instance opens a connection to the database to request the
313         normalization.
314     """
315
316     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
317                  token_analysis: ICUTokenAnalysis) -> None:
318         self.conn: Optional[Connection] = connect(dsn)
319         self.conn.autocommit = True
320         self.sanitizer = sanitizer
321         self.token_analysis = token_analysis
322
323         self._cache = _TokenCache()
324
325     def close(self) -> None:
326         """ Free all resources used by the analyzer.
327         """
328         if self.conn:
329             self.conn.close()
330             self.conn = None
331
332     def _search_normalized(self, name: str) -> str:
333         """ Return the search token transliteration of the given name.
334         """
335         return cast(str, self.token_analysis.search.transliterate(name)).strip()
336
337     def _normalized(self, name: str) -> str:
338         """ Return the normalized version of the given name with all
339             non-relevant information removed.
340         """
341         return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
342
343     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
344         """ Return token information for the given list of words.
345             If a word starts with # it is assumed to be a full name
346             otherwise is a partial name.
347
348             The function returns a list of tuples with
349             (original word, word token, word id).
350
351             The function is used for testing and debugging only
352             and not necessarily efficient.
353         """
354         assert self.conn is not None
355         full_tokens = {}
356         partial_tokens = {}
357         for word in words:
358             if word.startswith('#'):
359                 full_tokens[word] = self._search_normalized(word[1:])
360             else:
361                 partial_tokens[word] = self._search_normalized(word)
362
363         with self.conn.cursor() as cur:
364             cur.execute("""SELECT word_token, word_id
365                             FROM word WHERE word_token = ANY(%s) and type = 'W'
366                         """, (list(full_tokens.values()),))
367             full_ids = {r[0]: r[1] for r in cur}
368             cur.execute("""SELECT word_token, word_id
369                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
370                         (list(partial_tokens.values()),))
371             part_ids = {r[0]: r[1] for r in cur}
372
373         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
374             + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
375
376     def normalize_postcode(self, postcode: str) -> str:
377         """ Convert the postcode to a standardized form.
378
379             This function must yield exactly the same result as the SQL function
380             'token_normalized_postcode()'.
381         """
382         return postcode.strip().upper()
383
384     def update_postcodes_from_db(self) -> None:
385         """ Postcode update.
386
387             Removes all postcodes from the word table because they are not
388             needed. Postcodes are recognised by pattern.
389         """
390         assert self.conn is not None
391
392         with self.conn.cursor() as cur:
393             cur.execute("DELETE FROM word WHERE type = 'P'")
394
395     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
396                                should_replace: bool) -> None:
397         """ Replace the search index for special phrases with the new phrases.
398             If `should_replace` is True, then the previous set of will be
399             completely replaced. Otherwise the phrases are added to the
400             already existing ones.
401         """
402         assert self.conn is not None
403         norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
404                             for p in phrases))
405
406         with self.conn.cursor() as cur:
407             # Get the old phrases.
408             existing_phrases = set()
409             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
410             for word, info in cur:
411                 existing_phrases.add((word, info['class'], info['type'],
412                                       info.get('op') or '-'))
413
414             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
415             if should_replace:
416                 deleted = self._remove_special_phrases(cur, norm_phrases,
417                                                        existing_phrases)
418             else:
419                 deleted = 0
420
421         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
422                  len(norm_phrases), added, deleted)
423
424     def _add_special_phrases(self, cursor: Cursor,
425                              new_phrases: Set[Tuple[str, str, str, str]],
426                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
427         """ Add all phrases to the database that are not yet there.
428         """
429         to_add = new_phrases - existing_phrases
430
431         added = 0
432         with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
433             for word, cls, typ, oper in to_add:
434                 term = self._search_normalized(word)
435                 if term:
436                     copy.write_row((term, 'S', word,
437                                     Jsonb({'class': cls, 'type': typ,
438                                            'op': oper if oper in ('in', 'near') else None})))
439                     added += 1
440
441         return added
442
443     def _remove_special_phrases(self, cursor: Cursor,
444                                 new_phrases: Set[Tuple[str, str, str, str]],
445                                 existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
446         """ Remove all phrases from the database that are no longer in the
447             new phrase list.
448         """
449         to_delete = existing_phrases - new_phrases
450
451         if to_delete:
452             cursor.executemany(
453                 """ DELETE FROM word
454                       WHERE type = 'S' and word = %s
455                             and info->>'class' = %s and info->>'type' = %s
456                             and %s = coalesce(info->>'op', '-')
457                 """, to_delete)
458
459         return len(to_delete)
460
461     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
462         """ Add default names for the given country to the search index.
463         """
464         # Make sure any name preprocessing for country names applies.
465         info = PlaceInfo({'name': names, 'country_code': country_code,
466                           'rank_address': 4, 'class': 'boundary',
467                           'type': 'administrative'})
468         self._add_country_full_names(country_code,
469                                      self.sanitizer.process_names(info)[0],
470                                      internal=True)
471
472     def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
473                                 internal: bool = False) -> None:
474         """ Add names for the given country from an already sanitized
475             name list.
476         """
477         assert self.conn is not None
478         word_tokens = set()
479         for name in names:
480             norm_name = self._search_normalized(name.name)
481             if norm_name:
482                 word_tokens.add(norm_name)
483
484         with self.conn.cursor() as cur:
485             # Get existing names
486             cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
487                              FROM word
488                              WHERE type = 'C' and word = %s""",
489                         (country_code, ))
490             # internal/external names
491             existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
492             for word in cur:
493                 existing_tokens[word[1]].add(word[0])
494
495             # Delete names that no longer exist.
496             gone_tokens = existing_tokens[internal] - word_tokens
497             if internal:
498                 gone_tokens.update(existing_tokens[False] & word_tokens)
499             if gone_tokens:
500                 cur.execute("""DELETE FROM word
501                                USING unnest(%s::text[]) as token
502                                WHERE type = 'C' and word = %s
503                                      and word_token = token""",
504                             (list(gone_tokens), country_code))
505
506             # Only add those names that are not yet in the list.
507             new_tokens = word_tokens - existing_tokens[True]
508             if not internal:
509                 new_tokens -= existing_tokens[False]
510             if new_tokens:
511                 if internal:
512                     sql = """INSERT INTO word (word_token, type, word, info)
513                                (SELECT token, 'C', %s, '{"internal": "yes"}'
514                                   FROM unnest(%s::text[]) as token)
515                            """
516                 else:
517                     sql = """INSERT INTO word (word_token, type, word)
518                                    (SELECT token, 'C', %s
519                                     FROM unnest(%s::text[]) as token)
520                           """
521                 cur.execute(sql, (country_code, list(new_tokens)))
522
523     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
524         """ Determine tokenizer information about the given place.
525
526             Returns a JSON-serializable structure that will be handed into
527             the database via the token_info field.
528         """
529         token_info = _TokenInfo()
530
531         names, address = self.sanitizer.process_names(place)
532
533         if names:
534             token_info.set_names(*self._compute_name_tokens(names))
535
536             if place.is_country():
537                 assert place.country_code is not None
538                 self._add_country_full_names(place.country_code, names)
539
540         if address:
541             self._process_place_address(token_info, address)
542
543         return token_info.to_dict()
544
545     def _process_place_address(self, token_info: '_TokenInfo',
546                                address: Sequence[PlaceName]) -> None:
547         for item in address:
548             if item.kind == 'postcode':
549                 token_info.set_postcode(self._add_postcode(item))
550             elif item.kind == 'housenumber':
551                 token_info.add_housenumber(*self._compute_housenumber_token(item))
552             elif item.kind == 'street':
553                 token_info.add_street(self._retrieve_full_tokens(item.name))
554             elif item.kind == 'place':
555                 if not item.suffix:
556                     token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
557             elif (not item.kind.startswith('_') and not item.suffix and
558                   item.kind not in ('country', 'full', 'inclusion')):
559                 token_info.add_address_term(item.kind,
560                                             itertools.chain(*self._compute_name_tokens([item])))
561
562     def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
563         """ Normalize the housenumber and return the word token and the
564             canonical form.
565         """
566         assert self.conn is not None
567         analyzer = self.token_analysis.analysis.get('@housenumber')
568         result: Tuple[Optional[int], Optional[str]] = (None, None)
569
570         if analyzer is None:
571             # When no custom analyzer is set, simply normalize and transliterate
572             norm_name = self._search_normalized(hnr.name)
573             if norm_name:
574                 result = self._cache.housenumbers.get(norm_name, result)
575                 if result[0] is None:
576                     hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
577
578                     result = hid, norm_name
579                     self._cache.housenumbers[norm_name] = result
580         else:
581             # Otherwise use the analyzer to determine the canonical name.
582             # Per convention we use the first variant as the 'lookup name', the
583             # name that gets saved in the housenumber field of the place.
584             word_id = analyzer.get_canonical_id(hnr)
585             if word_id:
586                 result = self._cache.housenumbers.get(word_id, result)
587                 if result[0] is None:
588                     variants = analyzer.compute_variants(word_id)
589                     if variants:
590                         hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
591                                              (word_id, list(variants)))
592                         result = hid, variants[0]
593                         self._cache.housenumbers[word_id] = result
594
595         return result
596
597     def _retrieve_full_tokens(self, name: str) -> List[int]:
598         """ Get the full name token for the given name, if it exists.
599             The name is only retrieved for the standard analyser.
600         """
601         assert self.conn is not None
602         norm_name = self._search_normalized(name)
603
604         # return cached if possible
605         if norm_name in self._cache.fulls:
606             return self._cache.fulls[norm_name]
607
608         with self.conn.cursor() as cur:
609             cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
610                         (norm_name, ))
611             full = [row[0] for row in cur]
612
613         self._cache.fulls[norm_name] = full
614
615         return full
616
617     def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
618         """ Computes the full name and partial name tokens for the given
619             dictionary of names.
620         """
621         assert self.conn is not None
622         full_tokens: Set[int] = set()
623         partial_tokens: Set[int] = set()
624
625         for name in names:
626             analyzer_id = name.get_attr('analyzer')
627             analyzer = self.token_analysis.get_analyzer(analyzer_id)
628             word_id = analyzer.get_canonical_id(name)
629             if analyzer_id is None:
630                 token_id = word_id
631             else:
632                 token_id = f'{word_id}@{analyzer_id}'
633
634             full, part = self._cache.names.get(token_id, (None, None))
635             if full is None:
636                 variants = analyzer.compute_variants(word_id)
637                 if not variants:
638                     continue
639
640                 with self.conn.cursor() as cur:
641                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
642                                 (token_id, variants))
643                     full, part = cast(Tuple[int, List[int]], cur.fetchone())
644
645                 self._cache.names[token_id] = (full, part)
646
647             assert part is not None
648
649             full_tokens.add(full)
650             partial_tokens.update(part)
651
652         return full_tokens, partial_tokens
653
654     def _add_postcode(self, item: PlaceName) -> Optional[str]:
655         """ Make sure the normalized postcode is present in the word table.
656         """
657         assert self.conn is not None
658         analyzer = self.token_analysis.analysis.get('@postcode')
659
660         if analyzer is None:
661             return item.name.strip().upper()
662         else:
663             return analyzer.get_canonical_id(item)
664
665
666 class _TokenInfo:
667     """ Collect token information to be sent back to the database.
668     """
669     def __init__(self) -> None:
670         self.names: Optional[str] = None
671         self.housenumbers: Set[str] = set()
672         self.housenumber_tokens: Set[int] = set()
673         self.street_tokens: Optional[Set[int]] = None
674         self.place_tokens: Set[int] = set()
675         self.address_tokens: Dict[str, str] = {}
676         self.postcode: Optional[str] = None
677
678     def _mk_array(self, tokens: Iterable[Any]) -> str:
679         return f"{{{','.join((str(s) for s in tokens))}}}"
680
681     def to_dict(self) -> Dict[str, Any]:
682         """ Return the token information in database importable format.
683         """
684         out: Dict[str, Any] = {}
685
686         if self.names:
687             out['names'] = self.names
688
689         if self.housenumbers:
690             out['hnr'] = ';'.join(self.housenumbers)
691             out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
692
693         if self.street_tokens is not None:
694             out['street'] = self._mk_array(self.street_tokens)
695
696         if self.place_tokens:
697             out['place'] = self._mk_array(self.place_tokens)
698
699         if self.address_tokens:
700             out['addr'] = self.address_tokens
701
702         if self.postcode:
703             out['postcode'] = self.postcode
704
705         return out
706
707     def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
708         """ Adds token information for the normalised names.
709         """
710         self.names = self._mk_array(itertools.chain(fulls, partials))
711
712     def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
713         """ Extract housenumber information from a list of normalised
714             housenumbers.
715         """
716         if token:
717             assert hnr is not None
718             self.housenumbers.add(hnr)
719             self.housenumber_tokens.add(token)
720
721     def add_street(self, tokens: Iterable[int]) -> None:
722         """ Add addr:street match terms.
723         """
724         if self.street_tokens is None:
725             self.street_tokens = set()
726         self.street_tokens.update(tokens)
727
728     def add_place(self, tokens: Iterable[int]) -> None:
729         """ Add addr:place search and match terms.
730         """
731         self.place_tokens.update(tokens)
732
733     def add_address_term(self, key: str, partials: Iterable[int]) -> None:
734         """ Add additional address terms.
735         """
736         array = self._mk_array(partials)
737         if len(array) > 2:
738             self.address_tokens[key] = array
739
740     def set_postcode(self, postcode: Optional[str]) -> None:
741         """ Set the postcode to the given one.
742         """
743         self.postcode = postcode
744
745
746 class _TokenCache:
747     """ Cache for token information to avoid repeated database queries.
748
749         This cache is not thread-safe and needs to be instantiated per
750         analyzer.
751     """
752     def __init__(self) -> None:
753         self.names: Dict[str, Tuple[int, List[int]]] = {}
754         self.partials: Dict[str, int] = {}
755         self.fulls: Dict[str, List[int]] = {}
756         self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}