]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tokenizer/icu_tokenizer.py
properly close connections when shutting down starlette
[nominatim.git] / nominatim / tokenizer / icu_tokenizer.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
10 """
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
12                    Dict, Set, Iterable
13 import itertools
14 import json
15 import logging
16 from pathlib import Path
17 from textwrap import dedent
18
19 from nominatim.db.connection import connect, Connection, Cursor
20 from nominatim.config import Configuration
21 from nominatim.db.utils import CopyBuffer
22 from nominatim.db.sql_preprocessor import SQLPreprocessor
23 from nominatim.data.place_info import PlaceInfo
24 from nominatim.tokenizer.icu_rule_loader import ICURuleLoader
25 from nominatim.tokenizer.place_sanitizer import PlaceSanitizer
26 from nominatim.data.place_name import PlaceName
27 from nominatim.tokenizer.icu_token_analysis import ICUTokenAnalysis
28 from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
29
30 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
31
32 LOG = logging.getLogger()
33
34 def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
35     """ Create a new instance of the tokenizer provided by this module.
36     """
37     return ICUTokenizer(dsn, data_dir)
38
39
40 class ICUTokenizer(AbstractTokenizer):
41     """ This tokenizer uses libICU to convert names and queries to ASCII.
42         Otherwise it uses the same algorithms and data structures as the
43         normalization routines in Nominatim 3.
44     """
45
46     def __init__(self, dsn: str, data_dir: Path) -> None:
47         self.dsn = dsn
48         self.data_dir = data_dir
49         self.loader: Optional[ICURuleLoader] = None
50
51
52     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
53         """ Set up a new tokenizer for the database.
54
55             This copies all necessary data in the project directory to make
56             sure the tokenizer remains stable even over updates.
57         """
58         self.loader = ICURuleLoader(config)
59
60         self._install_php(config.lib_dir.php, overwrite=True)
61         self._save_config()
62
63         if init_db:
64             self.update_sql_functions(config)
65             self._init_db_tables(config)
66
67
68     def init_from_project(self, config: Configuration) -> None:
69         """ Initialise the tokenizer from the project directory.
70         """
71         self.loader = ICURuleLoader(config)
72
73         with connect(self.dsn) as conn:
74             self.loader.load_config_from_db(conn)
75
76         self._install_php(config.lib_dir.php, overwrite=False)
77
78
79     def finalize_import(self, config: Configuration) -> None:
80         """ Do any required postprocessing to make the tokenizer data ready
81             for use.
82         """
83         with connect(self.dsn) as conn:
84             sqlp = SQLPreprocessor(conn, config)
85             sqlp.run_sql_file(conn, 'tokenizer/legacy_tokenizer_indices.sql')
86
87
88     def update_sql_functions(self, config: Configuration) -> None:
89         """ Reimport the SQL functions for this tokenizer.
90         """
91         with connect(self.dsn) as conn:
92             sqlp = SQLPreprocessor(conn, config)
93             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
94
95
96     def check_database(self, config: Configuration) -> None:
97         """ Check that the tokenizer is set up correctly.
98         """
99         # Will throw an error if there is an issue.
100         self.init_from_project(config)
101
102
103     def update_statistics(self) -> None:
104         """ Recompute frequencies for all name words.
105         """
106         with connect(self.dsn) as conn:
107             if conn.table_exists('search_name'):
108                 with conn.cursor() as cur:
109                     cur.drop_table("word_frequencies")
110                     LOG.info("Computing word frequencies")
111                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
112                                      SELECT unnest(name_vector) as id, count(*)
113                                      FROM search_name GROUP BY id""")
114                     cur.execute("CREATE INDEX ON word_frequencies(id)")
115                     LOG.info("Update word table with recomputed frequencies")
116                     cur.execute("""UPDATE word
117                                    SET info = info || jsonb_build_object('count', count)
118                                    FROM word_frequencies WHERE word_id = id""")
119                     cur.drop_table("word_frequencies")
120             conn.commit()
121
122
123     def _cleanup_housenumbers(self) -> None:
124         """ Remove unused house numbers.
125         """
126         with connect(self.dsn) as conn:
127             if not conn.table_exists('search_name'):
128                 return
129             with conn.cursor(name="hnr_counter") as cur:
130                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
131                                FROM word
132                                WHERE type = 'H'
133                                  AND NOT EXISTS(SELECT * FROM search_name
134                                                 WHERE ARRAY[word.word_id] && name_vector)
135                                  AND (char_length(coalesce(word, word_token)) > 6
136                                       OR coalesce(word, word_token) not similar to '\\d+')
137                             """)
138                 candidates = {token: wid for wid, token in cur}
139             with conn.cursor(name="hnr_counter") as cur:
140                 cur.execute("""SELECT housenumber FROM placex
141                                WHERE housenumber is not null
142                                      AND (char_length(housenumber) > 6
143                                           OR housenumber not similar to '\\d+')
144                             """)
145                 for row in cur:
146                     for hnr in row[0].split(';'):
147                         candidates.pop(hnr, None)
148             LOG.info("There are %s outdated housenumbers.", len(candidates))
149             LOG.debug("Outdated housenumbers: %s", candidates.keys())
150             if candidates:
151                 with conn.cursor() as cur:
152                     cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
153                                 (list(candidates.values()), ))
154                 conn.commit()
155
156
157
158     def update_word_tokens(self) -> None:
159         """ Remove unused tokens.
160         """
161         LOG.warning("Cleaning up housenumber tokens.")
162         self._cleanup_housenumbers()
163         LOG.warning("Tokenizer house-keeping done.")
164
165
166     def name_analyzer(self) -> 'ICUNameAnalyzer':
167         """ Create a new analyzer for tokenizing names and queries
168             using this tokinzer. Analyzers are context managers and should
169             be used accordingly:
170
171             ```
172             with tokenizer.name_analyzer() as analyzer:
173                 analyser.tokenize()
174             ```
175
176             When used outside the with construct, the caller must ensure to
177             call the close() function before destructing the analyzer.
178
179             Analyzers are not thread-safe. You need to instantiate one per thread.
180         """
181         assert self.loader is not None
182         return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
183                                self.loader.make_token_analysis())
184
185
186     def _install_php(self, phpdir: Path, overwrite: bool = True) -> None:
187         """ Install the php script for the tokenizer.
188         """
189         assert self.loader is not None
190         php_file = self.data_dir / "tokenizer.php"
191
192         if not php_file.exists() or overwrite:
193             php_file.write_text(dedent(f"""\
194                 <?php
195                 @define('CONST_Max_Word_Frequency', 10000000);
196                 @define('CONST_Term_Normalization_Rules', "{self.loader.normalization_rules}");
197                 @define('CONST_Transliteration', "{self.loader.get_search_rules()}");
198                 require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
199
200
201     def _save_config(self) -> None:
202         """ Save the configuration that needs to remain stable for the given
203             database as database properties.
204         """
205         assert self.loader is not None
206         with connect(self.dsn) as conn:
207             self.loader.save_config_to_db(conn)
208
209
210     def _init_db_tables(self, config: Configuration) -> None:
211         """ Set up the word table and fill it with pre-computed word
212             frequencies.
213         """
214         with connect(self.dsn) as conn:
215             sqlp = SQLPreprocessor(conn, config)
216             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer_tables.sql')
217             conn.commit()
218
219
220 class ICUNameAnalyzer(AbstractAnalyzer):
221     """ The ICU analyzer uses the ICU library for splitting names.
222
223         Each instance opens a connection to the database to request the
224         normalization.
225     """
226
227     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
228                  token_analysis: ICUTokenAnalysis) -> None:
229         self.conn: Optional[Connection] = connect(dsn).connection
230         self.conn.autocommit = True
231         self.sanitizer = sanitizer
232         self.token_analysis = token_analysis
233
234         self._cache = _TokenCache()
235
236
237     def close(self) -> None:
238         """ Free all resources used by the analyzer.
239         """
240         if self.conn:
241             self.conn.close()
242             self.conn = None
243
244
245     def _search_normalized(self, name: str) -> str:
246         """ Return the search token transliteration of the given name.
247         """
248         return cast(str, self.token_analysis.search.transliterate(name)).strip()
249
250
251     def _normalized(self, name: str) -> str:
252         """ Return the normalized version of the given name with all
253             non-relevant information removed.
254         """
255         return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
256
257
258     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
259         """ Return token information for the given list of words.
260             If a word starts with # it is assumed to be a full name
261             otherwise is a partial name.
262
263             The function returns a list of tuples with
264             (original word, word token, word id).
265
266             The function is used for testing and debugging only
267             and not necessarily efficient.
268         """
269         assert self.conn is not None
270         full_tokens = {}
271         partial_tokens = {}
272         for word in words:
273             if word.startswith('#'):
274                 full_tokens[word] = self._search_normalized(word[1:])
275             else:
276                 partial_tokens[word] = self._search_normalized(word)
277
278         with self.conn.cursor() as cur:
279             cur.execute("""SELECT word_token, word_id
280                             FROM word WHERE word_token = ANY(%s) and type = 'W'
281                         """, (list(full_tokens.values()),))
282             full_ids = {r[0]: r[1] for r in cur}
283             cur.execute("""SELECT word_token, word_id
284                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
285                         (list(partial_tokens.values()),))
286             part_ids = {r[0]: r[1] for r in cur}
287
288         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
289                + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
290
291
292     def normalize_postcode(self, postcode: str) -> str:
293         """ Convert the postcode to a standardized form.
294
295             This function must yield exactly the same result as the SQL function
296             'token_normalized_postcode()'.
297         """
298         return postcode.strip().upper()
299
300
301     def update_postcodes_from_db(self) -> None:
302         """ Update postcode tokens in the word table from the location_postcode
303             table.
304         """
305         assert self.conn is not None
306         analyzer = self.token_analysis.analysis.get('@postcode')
307
308         with self.conn.cursor() as cur:
309             # First get all postcode names currently in the word table.
310             cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
311             word_entries = set((entry[0] for entry in cur))
312
313             # Then compute the required postcode names from the postcode table.
314             needed_entries = set()
315             cur.execute("SELECT country_code, postcode FROM location_postcode")
316             for cc, postcode in cur:
317                 info = PlaceInfo({'country_code': cc,
318                                   'class': 'place', 'type': 'postcode',
319                                   'address': {'postcode': postcode}})
320                 address = self.sanitizer.process_names(info)[1]
321                 for place in address:
322                     if place.kind == 'postcode':
323                         if analyzer is None:
324                             postcode_name = place.name.strip().upper()
325                             variant_base = None
326                         else:
327                             postcode_name = analyzer.get_canonical_id(place)
328                             variant_base = place.get_attr("variant")
329
330                         if variant_base:
331                             needed_entries.add(f'{postcode_name}@{variant_base}')
332                         else:
333                             needed_entries.add(postcode_name)
334                         break
335
336         # Now update the word table.
337         self._delete_unused_postcode_words(word_entries - needed_entries)
338         self._add_missing_postcode_words(needed_entries - word_entries)
339
340     def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
341         assert self.conn is not None
342         if tokens:
343             with self.conn.cursor() as cur:
344                 cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
345                             (list(tokens), ))
346
347     def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
348         assert self.conn is not None
349         if not tokens:
350             return
351
352         analyzer = self.token_analysis.analysis.get('@postcode')
353         terms = []
354
355         for postcode_name in tokens:
356             if '@' in postcode_name:
357                 term, variant = postcode_name.split('@', 2)
358                 term = self._search_normalized(term)
359                 if analyzer is None:
360                     variants = [term]
361                 else:
362                     variants = analyzer.compute_variants(variant)
363                     if term not in variants:
364                         variants.append(term)
365             else:
366                 variants = [self._search_normalized(postcode_name)]
367             terms.append((postcode_name, variants))
368
369         if terms:
370             with self.conn.cursor() as cur:
371                 cur.execute_values("""SELECT create_postcode_word(pc, var)
372                                       FROM (VALUES %s) AS v(pc, var)""",
373                                    terms)
374
375
376
377
378     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
379                                should_replace: bool) -> None:
380         """ Replace the search index for special phrases with the new phrases.
381             If `should_replace` is True, then the previous set of will be
382             completely replaced. Otherwise the phrases are added to the
383             already existing ones.
384         """
385         assert self.conn is not None
386         norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
387                             for p in phrases))
388
389         with self.conn.cursor() as cur:
390             # Get the old phrases.
391             existing_phrases = set()
392             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
393             for word, info in cur:
394                 existing_phrases.add((word, info['class'], info['type'],
395                                       info.get('op') or '-'))
396
397             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
398             if should_replace:
399                 deleted = self._remove_special_phrases(cur, norm_phrases,
400                                                        existing_phrases)
401             else:
402                 deleted = 0
403
404         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
405                  len(norm_phrases), added, deleted)
406
407
408     def _add_special_phrases(self, cursor: Cursor,
409                              new_phrases: Set[Tuple[str, str, str, str]],
410                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
411         """ Add all phrases to the database that are not yet there.
412         """
413         to_add = new_phrases - existing_phrases
414
415         added = 0
416         with CopyBuffer() as copystr:
417             for word, cls, typ, oper in to_add:
418                 term = self._search_normalized(word)
419                 if term:
420                     copystr.add(term, 'S', word,
421                                 json.dumps({'class': cls, 'type': typ,
422                                             'op': oper if oper in ('in', 'near') else None}))
423                     added += 1
424
425             copystr.copy_out(cursor, 'word',
426                              columns=['word_token', 'type', 'word', 'info'])
427
428         return added
429
430
431     def _remove_special_phrases(self, cursor: Cursor,
432                              new_phrases: Set[Tuple[str, str, str, str]],
433                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
434         """ Remove all phrases from the database that are no longer in the
435             new phrase list.
436         """
437         to_delete = existing_phrases - new_phrases
438
439         if to_delete:
440             cursor.execute_values(
441                 """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
442                     WHERE type = 'S' and word = name
443                           and info->>'class' = in_class and info->>'type' = in_type
444                           and ((op = '-' and info->>'op' is null) or op = info->>'op')
445                 """, to_delete)
446
447         return len(to_delete)
448
449
450     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
451         """ Add default names for the given country to the search index.
452         """
453         # Make sure any name preprocessing for country names applies.
454         info = PlaceInfo({'name': names, 'country_code': country_code,
455                           'rank_address': 4, 'class': 'boundary',
456                           'type': 'administrative'})
457         self._add_country_full_names(country_code,
458                                      self.sanitizer.process_names(info)[0],
459                                      internal=True)
460
461
462     def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
463                                 internal: bool = False) -> None:
464         """ Add names for the given country from an already sanitized
465             name list.
466         """
467         assert self.conn is not None
468         word_tokens = set()
469         for name in names:
470             norm_name = self._search_normalized(name.name)
471             if norm_name:
472                 word_tokens.add(norm_name)
473
474         with self.conn.cursor() as cur:
475             # Get existing names
476             cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
477                              FROM word
478                              WHERE type = 'C' and word = %s""",
479                         (country_code, ))
480             # internal/external names
481             existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
482             for word in cur:
483                 existing_tokens[word[1]].add(word[0])
484
485             # Delete names that no longer exist.
486             gone_tokens = existing_tokens[internal] - word_tokens
487             if internal:
488                 gone_tokens.update(existing_tokens[False] & word_tokens)
489             if gone_tokens:
490                 cur.execute("""DELETE FROM word
491                                USING unnest(%s) as token
492                                WHERE type = 'C' and word = %s
493                                      and word_token = token""",
494                             (list(gone_tokens), country_code))
495
496             # Only add those names that are not yet in the list.
497             new_tokens = word_tokens - existing_tokens[True]
498             if not internal:
499                 new_tokens -= existing_tokens[False]
500             if new_tokens:
501                 if internal:
502                     sql = """INSERT INTO word (word_token, type, word, info)
503                                (SELECT token, 'C', %s, '{"internal": "yes"}'
504                                   FROM unnest(%s) as token)
505                            """
506                 else:
507                     sql = """INSERT INTO word (word_token, type, word)
508                                    (SELECT token, 'C', %s
509                                     FROM unnest(%s) as token)
510                           """
511                 cur.execute(sql, (country_code, list(new_tokens)))
512
513
514     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
515         """ Determine tokenizer information about the given place.
516
517             Returns a JSON-serializable structure that will be handed into
518             the database via the token_info field.
519         """
520         token_info = _TokenInfo()
521
522         names, address = self.sanitizer.process_names(place)
523
524         if names:
525             token_info.set_names(*self._compute_name_tokens(names))
526
527             if place.is_country():
528                 assert place.country_code is not None
529                 self._add_country_full_names(place.country_code, names)
530
531         if address:
532             self._process_place_address(token_info, address)
533
534         return token_info.to_dict()
535
536
537     def _process_place_address(self, token_info: '_TokenInfo',
538                                address: Sequence[PlaceName]) -> None:
539         for item in address:
540             if item.kind == 'postcode':
541                 token_info.set_postcode(self._add_postcode(item))
542             elif item.kind == 'housenumber':
543                 token_info.add_housenumber(*self._compute_housenumber_token(item))
544             elif item.kind == 'street':
545                 token_info.add_street(self._retrieve_full_tokens(item.name))
546             elif item.kind == 'place':
547                 if not item.suffix:
548                     token_info.add_place(self._compute_partial_tokens(item.name))
549             elif not item.kind.startswith('_') and not item.suffix and \
550                  item.kind not in ('country', 'full', 'inclusion'):
551                 token_info.add_address_term(item.kind, self._compute_partial_tokens(item.name))
552
553
554     def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
555         """ Normalize the housenumber and return the word token and the
556             canonical form.
557         """
558         assert self.conn is not None
559         analyzer = self.token_analysis.analysis.get('@housenumber')
560         result: Tuple[Optional[int], Optional[str]] = (None, None)
561
562         if analyzer is None:
563             # When no custom analyzer is set, simply normalize and transliterate
564             norm_name = self._search_normalized(hnr.name)
565             if norm_name:
566                 result = self._cache.housenumbers.get(norm_name, result)
567                 if result[0] is None:
568                     with self.conn.cursor() as cur:
569                         hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
570
571                         result = hid, norm_name
572                         self._cache.housenumbers[norm_name] = result
573         else:
574             # Otherwise use the analyzer to determine the canonical name.
575             # Per convention we use the first variant as the 'lookup name', the
576             # name that gets saved in the housenumber field of the place.
577             word_id = analyzer.get_canonical_id(hnr)
578             if word_id:
579                 result = self._cache.housenumbers.get(word_id, result)
580                 if result[0] is None:
581                     variants = analyzer.compute_variants(word_id)
582                     if variants:
583                         with self.conn.cursor() as cur:
584                             hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)",
585                                              (word_id, list(variants)))
586                             result = hid, variants[0]
587                             self._cache.housenumbers[word_id] = result
588
589         return result
590
591
592     def _compute_partial_tokens(self, name: str) -> List[int]:
593         """ Normalize the given term, split it into partial words and return
594             then token list for them.
595         """
596         assert self.conn is not None
597         norm_name = self._search_normalized(name)
598
599         tokens = []
600         need_lookup = []
601         for partial in norm_name.split():
602             token = self._cache.partials.get(partial)
603             if token:
604                 tokens.append(token)
605             else:
606                 need_lookup.append(partial)
607
608         if need_lookup:
609             with self.conn.cursor() as cur:
610                 cur.execute("""SELECT word, getorcreate_partial_word(word)
611                                FROM unnest(%s) word""",
612                             (need_lookup, ))
613
614                 for partial, token in cur:
615                     assert token is not None
616                     tokens.append(token)
617                     self._cache.partials[partial] = token
618
619         return tokens
620
621
622     def _retrieve_full_tokens(self, name: str) -> List[int]:
623         """ Get the full name token for the given name, if it exists.
624             The name is only retrieved for the standard analyser.
625         """
626         assert self.conn is not None
627         norm_name = self._search_normalized(name)
628
629         # return cached if possible
630         if norm_name in self._cache.fulls:
631             return self._cache.fulls[norm_name]
632
633         with self.conn.cursor() as cur:
634             cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
635                         (norm_name, ))
636             full = [row[0] for row in cur]
637
638         self._cache.fulls[norm_name] = full
639
640         return full
641
642
643     def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
644         """ Computes the full name and partial name tokens for the given
645             dictionary of names.
646         """
647         assert self.conn is not None
648         full_tokens: Set[int] = set()
649         partial_tokens: Set[int] = set()
650
651         for name in names:
652             analyzer_id = name.get_attr('analyzer')
653             analyzer = self.token_analysis.get_analyzer(analyzer_id)
654             word_id = analyzer.get_canonical_id(name)
655             if analyzer_id is None:
656                 token_id = word_id
657             else:
658                 token_id = f'{word_id}@{analyzer_id}'
659
660             full, part = self._cache.names.get(token_id, (None, None))
661             if full is None:
662                 variants = analyzer.compute_variants(word_id)
663                 if not variants:
664                     continue
665
666                 with self.conn.cursor() as cur:
667                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
668                                 (token_id, variants))
669                     full, part = cast(Tuple[int, List[int]], cur.fetchone())
670
671                 self._cache.names[token_id] = (full, part)
672
673             assert part is not None
674
675             full_tokens.add(full)
676             partial_tokens.update(part)
677
678         return full_tokens, partial_tokens
679
680
681     def _add_postcode(self, item: PlaceName) -> Optional[str]:
682         """ Make sure the normalized postcode is present in the word table.
683         """
684         assert self.conn is not None
685         analyzer = self.token_analysis.analysis.get('@postcode')
686
687         if analyzer is None:
688             postcode_name = item.name.strip().upper()
689             variant_base = None
690         else:
691             postcode_name = analyzer.get_canonical_id(item)
692             variant_base = item.get_attr("variant")
693
694         if variant_base:
695             postcode = f'{postcode_name}@{variant_base}'
696         else:
697             postcode = postcode_name
698
699         if postcode not in self._cache.postcodes:
700             term = self._search_normalized(postcode_name)
701             if not term:
702                 return None
703
704             variants = {term}
705             if analyzer is not None and variant_base:
706                 variants.update(analyzer.compute_variants(variant_base))
707
708             with self.conn.cursor() as cur:
709                 cur.execute("SELECT create_postcode_word(%s, %s)",
710                             (postcode, list(variants)))
711             self._cache.postcodes.add(postcode)
712
713         return postcode_name
714
715
716 class _TokenInfo:
717     """ Collect token information to be sent back to the database.
718     """
719     def __init__(self) -> None:
720         self.names: Optional[str] = None
721         self.housenumbers: Set[str] = set()
722         self.housenumber_tokens: Set[int] = set()
723         self.street_tokens: Set[int] = set()
724         self.place_tokens: Set[int] = set()
725         self.address_tokens: Dict[str, str] = {}
726         self.postcode: Optional[str] = None
727
728
729     def _mk_array(self, tokens: Iterable[Any]) -> str:
730         return f"{{{','.join((str(s) for s in tokens))}}}"
731
732
733     def to_dict(self) -> Dict[str, Any]:
734         """ Return the token information in database importable format.
735         """
736         out: Dict[str, Any] = {}
737
738         if self.names:
739             out['names'] = self.names
740
741         if self.housenumbers:
742             out['hnr'] = ';'.join(self.housenumbers)
743             out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
744
745         if self.street_tokens:
746             out['street'] = self._mk_array(self.street_tokens)
747
748         if self.place_tokens:
749             out['place'] = self._mk_array(self.place_tokens)
750
751         if self.address_tokens:
752             out['addr'] = self.address_tokens
753
754         if self.postcode:
755             out['postcode'] = self.postcode
756
757         return out
758
759
760     def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
761         """ Adds token information for the normalised names.
762         """
763         self.names = self._mk_array(itertools.chain(fulls, partials))
764
765
766     def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
767         """ Extract housenumber information from a list of normalised
768             housenumbers.
769         """
770         if token:
771             assert hnr is not None
772             self.housenumbers.add(hnr)
773             self.housenumber_tokens.add(token)
774
775
776     def add_street(self, tokens: Iterable[int]) -> None:
777         """ Add addr:street match terms.
778         """
779         self.street_tokens.update(tokens)
780
781
782     def add_place(self, tokens: Iterable[int]) -> None:
783         """ Add addr:place search and match terms.
784         """
785         self.place_tokens.update(tokens)
786
787
788     def add_address_term(self, key: str, partials: Iterable[int]) -> None:
789         """ Add additional address terms.
790         """
791         if partials:
792             self.address_tokens[key] = self._mk_array(partials)
793
794     def set_postcode(self, postcode: Optional[str]) -> None:
795         """ Set the postcode to the given one.
796         """
797         self.postcode = postcode
798
799
800 class _TokenCache:
801     """ Cache for token information to avoid repeated database queries.
802
803         This cache is not thread-safe and needs to be instantiated per
804         analyzer.
805     """
806     def __init__(self) -> None:
807         self.names: Dict[str, Tuple[int, List[int]]] = {}
808         self.partials: Dict[str, int] = {}
809         self.fulls: Dict[str, List[int]] = {}
810         self.postcodes: Set[str] = set()
811         self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}