]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tokenizer/icu_tokenizer.py
release 5.1.0.post5
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
10 """
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
12                    Dict, Set, Iterable
13 import itertools
14 import logging
15
16 from psycopg.types.json import Jsonb
17 from psycopg import sql as pysql
18
19 from ..db.connection import connect, Connection, Cursor, \
20                             drop_tables, table_exists, execute_scalar
21 from ..config import Configuration
22 from ..db.sql_preprocessor import SQLPreprocessor
23 from ..data.place_info import PlaceInfo
24 from ..data.place_name import PlaceName
25 from .icu_rule_loader import ICURuleLoader
26 from .place_sanitizer import PlaceSanitizer
27 from .icu_token_analysis import ICUTokenAnalysis
28 from .base import AbstractAnalyzer, AbstractTokenizer
29
30 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
31
32 LOG = logging.getLogger()
33
34 WORD_TYPES = (('country_names', 'C'),
35               ('postcodes', 'P'),
36               ('full_word', 'W'),
37               ('housenumbers', 'H'))
38
39
40 def create(dsn: str) -> 'ICUTokenizer':
41     """ Create a new instance of the tokenizer provided by this module.
42     """
43     return ICUTokenizer(dsn)
44
45
46 class ICUTokenizer(AbstractTokenizer):
47     """ This tokenizer uses libICU to convert names and queries to ASCII.
48         Otherwise it uses the same algorithms and data structures as the
49         normalization routines in Nominatim 3.
50     """
51
52     def __init__(self, dsn: str) -> None:
53         self.dsn = dsn
54         self.loader: Optional[ICURuleLoader] = None
55
56     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
57         """ Set up a new tokenizer for the database.
58
59             This copies all necessary data in the project directory to make
60             sure the tokenizer remains stable even over updates.
61         """
62         self.loader = ICURuleLoader(config)
63
64         self._save_config()
65
66         if init_db:
67             self.update_sql_functions(config)
68             self._setup_db_tables(config)
69             self._create_base_indices(config, 'word')
70
71     def init_from_project(self, config: Configuration) -> None:
72         """ Initialise the tokenizer from the project directory.
73         """
74         self.loader = ICURuleLoader(config)
75
76         with connect(self.dsn) as conn:
77             self.loader.load_config_from_db(conn)
78
79     def finalize_import(self, config: Configuration) -> None:
80         """ Do any required postprocessing to make the tokenizer data ready
81             for use.
82         """
83         self._create_lookup_indices(config, 'word')
84
85     def update_sql_functions(self, config: Configuration) -> None:
86         """ Reimport the SQL functions for this tokenizer.
87         """
88         with connect(self.dsn) as conn:
89             sqlp = SQLPreprocessor(conn, config)
90             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
91
92     def check_database(self, config: Configuration) -> None:
93         """ Check that the tokenizer is set up correctly.
94         """
95         # Will throw an error if there is an issue.
96         self.init_from_project(config)
97
98     def update_statistics(self, config: Configuration, threads: int = 2) -> None:
99         """ Recompute frequencies for all name words.
100         """
101         with connect(self.dsn) as conn:
102             if not table_exists(conn, 'search_name'):
103                 return
104
105             with conn.cursor() as cur:
106                 cur.execute('ANALYSE search_name')
107                 if threads > 1:
108                     cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
109                                      .format(pysql.Literal(min(threads, 6),)))
110
111                 LOG.info('Computing word frequencies')
112                 drop_tables(conn, 'word_frequencies')
113                 cur.execute("""
114                   CREATE TEMP TABLE word_frequencies AS
115                   WITH word_freq AS MATERIALIZED (
116                            SELECT unnest(name_vector) as id, count(*)
117                                  FROM search_name GROUP BY id),
118                        addr_freq AS MATERIALIZED (
119                            SELECT unnest(nameaddress_vector) as id, count(*)
120                                  FROM search_name GROUP BY id)
121                   SELECT coalesce(a.id, w.id) as id,
122                          (CASE WHEN w.count is null or w.count <= 1 THEN '{}'::JSONB
123                               ELSE jsonb_build_object('count', w.count) END
124                           ||
125                           CASE WHEN a.count is null or a.count <= 1 THEN '{}'::JSONB
126                               ELSE jsonb_build_object('addr_count', a.count) END) as info
127                   FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
128                   """)
129                 cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
130                 cur.execute('ANALYSE word_frequencies')
131                 LOG.info('Update word table with recomputed frequencies')
132                 drop_tables(conn, 'tmp_word')
133                 cur.execute("""CREATE TABLE tmp_word AS
134                                 SELECT word_id, word_token, type, word,
135                                        coalesce(word.info, '{}'::jsonb)
136                                        - 'count' - 'addr_count' ||
137                                        coalesce(wf.info, '{}'::jsonb)
138                                        as info
139                                 FROM word LEFT JOIN word_frequencies wf
140                                      ON word.word_id = wf.id
141                                 ORDER BY word_id
142                             """)
143                 drop_tables(conn, 'word_frequencies')
144
145             with conn.cursor() as cur:
146                 cur.execute('SET max_parallel_workers_per_gather TO 0')
147
148             sqlp = SQLPreprocessor(conn, config)
149             sqlp.run_string(conn,
150                             'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
151             conn.commit()
152         self._create_base_indices(config, 'tmp_word')
153         self._create_lookup_indices(config, 'tmp_word')
154         self._move_temporary_word_table('tmp_word')
155
156     def _cleanup_housenumbers(self) -> None:
157         """ Remove unused house numbers.
158         """
159         with connect(self.dsn) as conn:
160             if not table_exists(conn, 'search_name'):
161                 return
162             with conn.cursor(name="hnr_counter") as cur:
163                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
164                                FROM word
165                                WHERE type = 'H'
166                                  AND NOT EXISTS(SELECT * FROM search_name
167                                                 WHERE ARRAY[word.word_id] && name_vector)
168                                  AND (char_length(coalesce(word, word_token)) > 6
169                                       OR coalesce(word, word_token) not similar to '\\d+')
170                             """)
171                 candidates = {token: wid for wid, token in cur}
172             with conn.cursor(name="hnr_counter") as cur:
173                 cur.execute("""SELECT housenumber FROM placex
174                                WHERE housenumber is not null
175                                      AND (char_length(housenumber) > 6
176                                           OR housenumber not similar to '\\d+')
177                             """)
178                 for row in cur:
179                     for hnr in row[0].split(';'):
180                         candidates.pop(hnr, None)
181             LOG.info("There are %s outdated housenumbers.", len(candidates))
182             LOG.debug("Outdated housenumbers: %s", candidates.keys())
183             if candidates:
184                 with conn.cursor() as cur:
185                     cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
186                                 (list(candidates.values()), ))
187                 conn.commit()
188
189     def update_word_tokens(self) -> None:
190         """ Remove unused tokens.
191         """
192         LOG.warning("Cleaning up housenumber tokens.")
193         self._cleanup_housenumbers()
194         LOG.warning("Tokenizer house-keeping done.")
195
196     def name_analyzer(self) -> 'ICUNameAnalyzer':
197         """ Create a new analyzer for tokenizing names and queries
198             using this tokinzer. Analyzers are context managers and should
199             be used accordingly:
200
201             ```
202             with tokenizer.name_analyzer() as analyzer:
203                 analyser.tokenize()
204             ```
205
206             When used outside the with construct, the caller must ensure to
207             call the close() function before destructing the analyzer.
208
209             Analyzers are not thread-safe. You need to instantiate one per thread.
210         """
211         assert self.loader is not None
212         return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
213                                self.loader.make_token_analysis())
214
215     def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
216         """ Return a list of the `num` most frequent full words
217             in the database.
218         """
219         with conn.cursor() as cur:
220             cur.execute("""SELECT word, sum((info->>'count')::int) as count
221                              FROM word WHERE type = 'W'
222                              GROUP BY word
223                              ORDER BY count DESC LIMIT %s""", (num,))
224             return list(s[0].split('@')[0] for s in cur)
225
226     def _save_config(self) -> None:
227         """ Save the configuration that needs to remain stable for the given
228             database as database properties.
229         """
230         assert self.loader is not None
231         with connect(self.dsn) as conn:
232             self.loader.save_config_to_db(conn)
233
234     def _setup_db_tables(self, config: Configuration) -> None:
235         """ Set up the word table and fill it with pre-computed word
236             frequencies.
237         """
238         with connect(self.dsn) as conn:
239             drop_tables(conn, 'word')
240             sqlp = SQLPreprocessor(conn, config)
241             sqlp.run_string(conn, """
242                 CREATE TABLE word (
243                       word_id INTEGER,
244                       word_token text NOT NULL,
245                       type text NOT NULL,
246                       word text,
247                       info jsonb
248                     ) {{db.tablespace.search_data}};
249                 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
250
251                 DROP SEQUENCE IF EXISTS seq_word;
252                 CREATE SEQUENCE seq_word start 1;
253                 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
254             """)
255             conn.commit()
256
257     def _create_base_indices(self, config: Configuration, table_name: str) -> None:
258         """ Set up the word table and fill it with pre-computed word
259             frequencies.
260         """
261         with connect(self.dsn) as conn:
262             sqlp = SQLPreprocessor(conn, config)
263             sqlp.run_string(conn,
264                             """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
265                                USING BTREE (word_token) {{db.tablespace.search_index}}""",
266                             table_name=table_name)
267             for name, ctype in WORD_TYPES:
268                 sqlp.run_string(conn,
269                                 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
270                                    USING BTREE (word) {{db.tablespace.address_index}}
271                                    WHERE type = '{{column_type}}'
272                                 """,
273                                 table_name=table_name, idx_name=name,
274                                 column_type=ctype)
275             conn.commit()
276
277     def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
278         """ Create additional indexes used when running the API.
279         """
280         with connect(self.dsn) as conn:
281             sqlp = SQLPreprocessor(conn, config)
282             # Index required for details lookup.
283             sqlp.run_string(
284                 conn,
285                 """
286                 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
287                   ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
288                 """,
289                 table_name=table_name)
290             conn.commit()
291
292     def _move_temporary_word_table(self, old: str) -> None:
293         """ Rename all tables and indexes used by the tokenizer.
294         """
295         with connect(self.dsn) as conn:
296             drop_tables(conn, 'word')
297             with conn.cursor() as cur:
298                 cur.execute(f"ALTER TABLE {old} RENAME TO word")
299                 for idx in ('word_token', 'word_id'):
300                     cur.execute(f"""ALTER INDEX idx_{old}_{idx}
301                                       RENAME TO idx_word_{idx}""")
302                 for name, _ in WORD_TYPES:
303                     cur.execute(f"""ALTER INDEX idx_{old}_{name}
304                                     RENAME TO idx_word_{name}""")
305             conn.commit()
306
307
308 class ICUNameAnalyzer(AbstractAnalyzer):
309     """ The ICU analyzer uses the ICU library for splitting names.
310
311         Each instance opens a connection to the database to request the
312         normalization.
313     """
314
315     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
316                  token_analysis: ICUTokenAnalysis) -> None:
317         self.conn: Optional[Connection] = connect(dsn)
318         self.conn.autocommit = True
319         self.sanitizer = sanitizer
320         self.token_analysis = token_analysis
321
322         self._cache = _TokenCache()
323
324     def close(self) -> None:
325         """ Free all resources used by the analyzer.
326         """
327         if self.conn:
328             self.conn.close()
329             self.conn = None
330
331     def _search_normalized(self, name: str) -> str:
332         """ Return the search token transliteration of the given name.
333         """
334         return cast(str, self.token_analysis.search.transliterate(name)).strip()
335
336     def _normalized(self, name: str) -> str:
337         """ Return the normalized version of the given name with all
338             non-relevant information removed.
339         """
340         return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
341
342     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
343         """ Return token information for the given list of words.
344             If a word starts with # it is assumed to be a full name
345             otherwise is a partial name.
346
347             The function returns a list of tuples with
348             (original word, word token, word id).
349
350             The function is used for testing and debugging only
351             and not necessarily efficient.
352         """
353         assert self.conn is not None
354         full_tokens = {}
355         partial_tokens = {}
356         for word in words:
357             if word.startswith('#'):
358                 full_tokens[word] = self._search_normalized(word[1:])
359             else:
360                 partial_tokens[word] = self._search_normalized(word)
361
362         with self.conn.cursor() as cur:
363             cur.execute("""SELECT word_token, word_id
364                             FROM word WHERE word_token = ANY(%s) and type = 'W'
365                         """, (list(full_tokens.values()),))
366             full_ids = {r[0]: r[1] for r in cur}
367             cur.execute("""SELECT word_token, word_id
368                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
369                         (list(partial_tokens.values()),))
370             part_ids = {r[0]: r[1] for r in cur}
371
372         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
373             + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
374
375     def normalize_postcode(self, postcode: str) -> str:
376         """ Convert the postcode to a standardized form.
377
378             This function must yield exactly the same result as the SQL function
379             'token_normalized_postcode()'.
380         """
381         return postcode.strip().upper()
382
383     def update_postcodes_from_db(self) -> None:
384         """ Postcode update.
385
386             Removes all postcodes from the word table because they are not
387             needed. Postcodes are recognised by pattern.
388         """
389         assert self.conn is not None
390
391         with self.conn.cursor() as cur:
392             cur.execute("DELETE FROM word WHERE type = 'P'")
393
394     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
395                                should_replace: bool) -> None:
396         """ Replace the search index for special phrases with the new phrases.
397             If `should_replace` is True, then the previous set of will be
398             completely replaced. Otherwise the phrases are added to the
399             already existing ones.
400         """
401         assert self.conn is not None
402         norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
403                             for p in phrases))
404
405         with self.conn.cursor() as cur:
406             # Get the old phrases.
407             existing_phrases = set()
408             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
409             for word, info in cur:
410                 existing_phrases.add((word, info['class'], info['type'],
411                                       info.get('op') or '-'))
412
413             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
414             if should_replace:
415                 deleted = self._remove_special_phrases(cur, norm_phrases,
416                                                        existing_phrases)
417             else:
418                 deleted = 0
419
420         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
421                  len(norm_phrases), added, deleted)
422
423     def _add_special_phrases(self, cursor: Cursor,
424                              new_phrases: Set[Tuple[str, str, str, str]],
425                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
426         """ Add all phrases to the database that are not yet there.
427         """
428         to_add = new_phrases - existing_phrases
429
430         added = 0
431         with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
432             for word, cls, typ, oper in to_add:
433                 term = self._search_normalized(word)
434                 if term:
435                     copy.write_row((term, 'S', word,
436                                     Jsonb({'class': cls, 'type': typ,
437                                            'op': oper if oper in ('in', 'near') else None})))
438                     added += 1
439
440         return added
441
442     def _remove_special_phrases(self, cursor: Cursor,
443                                 new_phrases: Set[Tuple[str, str, str, str]],
444                                 existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
445         """ Remove all phrases from the database that are no longer in the
446             new phrase list.
447         """
448         to_delete = existing_phrases - new_phrases
449
450         if to_delete:
451             cursor.executemany(
452                 """ DELETE FROM word
453                       WHERE type = 'S' and word = %s
454                             and info->>'class' = %s and info->>'type' = %s
455                             and %s = coalesce(info->>'op', '-')
456                 """, to_delete)
457
458         return len(to_delete)
459
460     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
461         """ Add default names for the given country to the search index.
462         """
463         # Make sure any name preprocessing for country names applies.
464         info = PlaceInfo({'name': names, 'country_code': country_code,
465                           'rank_address': 4, 'class': 'boundary',
466                           'type': 'administrative'})
467         self._add_country_full_names(country_code,
468                                      self.sanitizer.process_names(info)[0],
469                                      internal=True)
470
471     def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
472                                 internal: bool = False) -> None:
473         """ Add names for the given country from an already sanitized
474             name list.
475         """
476         assert self.conn is not None
477         word_tokens = set()
478         for name in names:
479             norm_name = self._search_normalized(name.name)
480             if norm_name:
481                 word_tokens.add(norm_name)
482
483         with self.conn.cursor() as cur:
484             # Get existing names
485             cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
486                              FROM word
487                              WHERE type = 'C' and word = %s""",
488                         (country_code, ))
489             # internal/external names
490             existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
491             for word in cur:
492                 existing_tokens[word[1]].add(word[0])
493
494             # Delete names that no longer exist.
495             gone_tokens = existing_tokens[internal] - word_tokens
496             if internal:
497                 gone_tokens.update(existing_tokens[False] & word_tokens)
498             if gone_tokens:
499                 cur.execute("""DELETE FROM word
500                                USING unnest(%s::text[]) as token
501                                WHERE type = 'C' and word = %s
502                                      and word_token = token""",
503                             (list(gone_tokens), country_code))
504
505             # Only add those names that are not yet in the list.
506             new_tokens = word_tokens - existing_tokens[True]
507             if not internal:
508                 new_tokens -= existing_tokens[False]
509             if new_tokens:
510                 if internal:
511                     sql = """INSERT INTO word (word_token, type, word, info)
512                                (SELECT token, 'C', %s, '{"internal": "yes"}'
513                                   FROM unnest(%s::text[]) as token)
514                            """
515                 else:
516                     sql = """INSERT INTO word (word_token, type, word)
517                                    (SELECT token, 'C', %s
518                                     FROM unnest(%s::text[]) as token)
519                           """
520                 cur.execute(sql, (country_code, list(new_tokens)))
521
522     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
523         """ Determine tokenizer information about the given place.
524
525             Returns a JSON-serializable structure that will be handed into
526             the database via the token_info field.
527         """
528         token_info = _TokenInfo()
529
530         names, address = self.sanitizer.process_names(place)
531
532         if names:
533             token_info.set_names(*self._compute_name_tokens(names))
534
535             if place.is_country():
536                 assert place.country_code is not None
537                 self._add_country_full_names(place.country_code, names)
538
539         if address:
540             self._process_place_address(token_info, address)
541
542         return token_info.to_dict()
543
544     def _process_place_address(self, token_info: '_TokenInfo',
545                                address: Sequence[PlaceName]) -> None:
546         for item in address:
547             if item.kind == 'postcode':
548                 token_info.set_postcode(self._add_postcode(item))
549             elif item.kind == 'housenumber':
550                 token_info.add_housenumber(*self._compute_housenumber_token(item))
551             elif item.kind == 'street':
552                 token_info.add_street(self._retrieve_full_tokens(item.name))
553             elif item.kind == 'place':
554                 if not item.suffix:
555                     token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
556             elif (not item.kind.startswith('_') and not item.suffix and
557                   item.kind not in ('country', 'full', 'inclusion')):
558                 token_info.add_address_term(item.kind,
559                                             itertools.chain(*self._compute_name_tokens([item])))
560
561     def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
562         """ Normalize the housenumber and return the word token and the
563             canonical form.
564         """
565         assert self.conn is not None
566         analyzer = self.token_analysis.analysis.get('@housenumber')
567         result: Tuple[Optional[int], Optional[str]] = (None, None)
568
569         if analyzer is None:
570             # When no custom analyzer is set, simply normalize and transliterate
571             norm_name = self._search_normalized(hnr.name)
572             if norm_name:
573                 result = self._cache.housenumbers.get(norm_name, result)
574                 if result[0] is None:
575                     hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
576
577                     result = hid, norm_name
578                     self._cache.housenumbers[norm_name] = result
579         else:
580             # Otherwise use the analyzer to determine the canonical name.
581             # Per convention we use the first variant as the 'lookup name', the
582             # name that gets saved in the housenumber field of the place.
583             word_id = analyzer.get_canonical_id(hnr)
584             if word_id:
585                 result = self._cache.housenumbers.get(word_id, result)
586                 if result[0] is None:
587                     varout = analyzer.compute_variants(word_id)
588                     if isinstance(varout, tuple):
589                         variants = varout[0]
590                     else:
591                         variants = varout
592                     if variants:
593                         hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
594                                              (word_id, variants))
595                         result = hid, variants[0]
596                         self._cache.housenumbers[word_id] = result
597
598         return result
599
600     def _retrieve_full_tokens(self, name: str) -> List[int]:
601         """ Get the full name token for the given name, if it exists.
602             The name is only retrieved for the standard analyser.
603         """
604         assert self.conn is not None
605         norm_name = self._search_normalized(name)
606
607         # return cached if possible
608         if norm_name in self._cache.fulls:
609             return self._cache.fulls[norm_name]
610
611         with self.conn.cursor() as cur:
612             cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
613                         (norm_name, ))
614             full = [row[0] for row in cur]
615
616         self._cache.fulls[norm_name] = full
617
618         return full
619
620     def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
621         """ Computes the full name and partial name tokens for the given
622             dictionary of names.
623         """
624         assert self.conn is not None
625         full_tokens: Set[int] = set()
626         partial_tokens: Set[int] = set()
627
628         for name in names:
629             analyzer_id = name.get_attr('analyzer')
630             analyzer = self.token_analysis.get_analyzer(analyzer_id)
631             word_id = analyzer.get_canonical_id(name)
632             if analyzer_id is None:
633                 token_id = word_id
634             else:
635                 token_id = f'{word_id}@{analyzer_id}'
636
637             full, part = self._cache.names.get(token_id, (None, None))
638             if full is None:
639                 varset = analyzer.compute_variants(word_id)
640                 if isinstance(varset, tuple):
641                     variants, lookups = varset
642                 else:
643                     variants, lookups = varset, None
644                 if not variants:
645                     continue
646
647                 with self.conn.cursor() as cur:
648                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s, %s)",
649                                 (token_id, variants, lookups))
650                     full, part = cast(Tuple[int, List[int]], cur.fetchone())
651
652                 self._cache.names[token_id] = (full, part)
653
654             assert part is not None
655
656             full_tokens.add(full)
657             partial_tokens.update(part)
658
659         return full_tokens, partial_tokens
660
661     def _add_postcode(self, item: PlaceName) -> Optional[str]:
662         """ Make sure the normalized postcode is present in the word table.
663         """
664         assert self.conn is not None
665         analyzer = self.token_analysis.analysis.get('@postcode')
666
667         if analyzer is None:
668             return item.name.strip().upper()
669         else:
670             return analyzer.get_canonical_id(item)
671
672
673 class _TokenInfo:
674     """ Collect token information to be sent back to the database.
675     """
676     def __init__(self) -> None:
677         self.names: Optional[str] = None
678         self.housenumbers: Set[str] = set()
679         self.housenumber_tokens: Set[int] = set()
680         self.street_tokens: Optional[Set[int]] = None
681         self.place_tokens: Set[int] = set()
682         self.address_tokens: Dict[str, str] = {}
683         self.postcode: Optional[str] = None
684
685     def _mk_array(self, tokens: Iterable[Any]) -> str:
686         return f"{{{','.join((str(s) for s in tokens))}}}"
687
688     def to_dict(self) -> Dict[str, Any]:
689         """ Return the token information in database importable format.
690         """
691         out: Dict[str, Any] = {}
692
693         if self.names:
694             out['names'] = self.names
695
696         if self.housenumbers:
697             out['hnr'] = ';'.join(self.housenumbers)
698             out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
699
700         if self.street_tokens is not None:
701             out['street'] = self._mk_array(self.street_tokens)
702
703         if self.place_tokens:
704             out['place'] = self._mk_array(self.place_tokens)
705
706         if self.address_tokens:
707             out['addr'] = self.address_tokens
708
709         if self.postcode:
710             out['postcode'] = self.postcode
711
712         return out
713
714     def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
715         """ Adds token information for the normalised names.
716         """
717         self.names = self._mk_array(itertools.chain(fulls, partials))
718
719     def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
720         """ Extract housenumber information from a list of normalised
721             housenumbers.
722         """
723         if token:
724             assert hnr is not None
725             self.housenumbers.add(hnr)
726             self.housenumber_tokens.add(token)
727
728     def add_street(self, tokens: Iterable[int]) -> None:
729         """ Add addr:street match terms.
730         """
731         if self.street_tokens is None:
732             self.street_tokens = set()
733         self.street_tokens.update(tokens)
734
735     def add_place(self, tokens: Iterable[int]) -> None:
736         """ Add addr:place search and match terms.
737         """
738         self.place_tokens.update(tokens)
739
740     def add_address_term(self, key: str, partials: Iterable[int]) -> None:
741         """ Add additional address terms.
742         """
743         array = self._mk_array(partials)
744         if len(array) > 2:
745             self.address_tokens[key] = array
746
747     def set_postcode(self, postcode: Optional[str]) -> None:
748         """ Set the postcode to the given one.
749         """
750         self.postcode = postcode
751
752
753 class _TokenCache:
754     """ Cache for token information to avoid repeated database queries.
755
756         This cache is not thread-safe and needs to be instantiated per
757         analyzer.
758     """
759     def __init__(self) -> None:
760         self.names: Dict[str, Tuple[int, List[int]]] = {}
761         self.partials: Dict[str, int] = {}
762         self.fulls: Dict[str, List[int]] = {}
763         self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}