]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tokenizer/icu_tokenizer.py
actions: add test for database migration
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
10 """
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
12                    Dict, Set, Iterable
13 import itertools
14 import logging
15 from pathlib import Path
16
17 from psycopg.types.json import Jsonb
18 from psycopg import sql as pysql
19
20 from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
21                             drop_tables, table_exists, execute_scalar
22 from ..config import Configuration
23 from ..db.sql_preprocessor import SQLPreprocessor
24 from ..data.place_info import PlaceInfo
25 from ..data.place_name import PlaceName
26 from .icu_rule_loader import ICURuleLoader
27 from .place_sanitizer import PlaceSanitizer
28 from .icu_token_analysis import ICUTokenAnalysis
29 from .base import AbstractAnalyzer, AbstractTokenizer
30
31 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
32
33 LOG = logging.getLogger()
34
35 WORD_TYPES =(('country_names', 'C'),
36              ('postcodes', 'P'),
37              ('full_word', 'W'),
38              ('housenumbers', 'H'))
39
40 def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
41     """ Create a new instance of the tokenizer provided by this module.
42     """
43     return ICUTokenizer(dsn, data_dir)
44
45
46 class ICUTokenizer(AbstractTokenizer):
47     """ This tokenizer uses libICU to convert names and queries to ASCII.
48         Otherwise it uses the same algorithms and data structures as the
49         normalization routines in Nominatim 3.
50     """
51
52     def __init__(self, dsn: str, data_dir: Path) -> None:
53         self.dsn = dsn
54         self.data_dir = data_dir
55         self.loader: Optional[ICURuleLoader] = None
56
57
58     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
59         """ Set up a new tokenizer for the database.
60
61             This copies all necessary data in the project directory to make
62             sure the tokenizer remains stable even over updates.
63         """
64         self.loader = ICURuleLoader(config)
65
66         self._save_config()
67
68         if init_db:
69             self.update_sql_functions(config)
70             self._setup_db_tables(config)
71             self._create_base_indices(config, 'word')
72
73
74     def init_from_project(self, config: Configuration) -> None:
75         """ Initialise the tokenizer from the project directory.
76         """
77         self.loader = ICURuleLoader(config)
78
79         with connect(self.dsn) as conn:
80             self.loader.load_config_from_db(conn)
81
82
83     def finalize_import(self, config: Configuration) -> None:
84         """ Do any required postprocessing to make the tokenizer data ready
85             for use.
86         """
87         self._create_lookup_indices(config, 'word')
88
89
90     def update_sql_functions(self, config: Configuration) -> None:
91         """ Reimport the SQL functions for this tokenizer.
92         """
93         with connect(self.dsn) as conn:
94             sqlp = SQLPreprocessor(conn, config)
95             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
96
97
98     def check_database(self, config: Configuration) -> None:
99         """ Check that the tokenizer is set up correctly.
100         """
101         # Will throw an error if there is an issue.
102         self.init_from_project(config)
103
104
105     def update_statistics(self, config: Configuration, threads: int = 2) -> None:
106         """ Recompute frequencies for all name words.
107         """
108         with connect(self.dsn) as conn:
109             if not table_exists(conn, 'search_name'):
110                 return
111
112             with conn.cursor() as cur:
113                 cur.execute('ANALYSE search_name')
114                 if threads > 1:
115                     cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
116                                      .format(pysql.Literal(min(threads, 6),)))
117
118                 if server_version_tuple(conn) < (12, 0):
119                     LOG.info('Computing word frequencies')
120                     drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
121                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
122                                      SELECT unnest(name_vector) as id, count(*)
123                                      FROM search_name GROUP BY id""")
124                     cur.execute('CREATE INDEX ON word_frequencies(id)')
125                     cur.execute("""CREATE TEMP TABLE addressword_frequencies AS
126                                      SELECT unnest(nameaddress_vector) as id, count(*)
127                                      FROM search_name GROUP BY id""")
128                     cur.execute('CREATE INDEX ON addressword_frequencies(id)')
129                     cur.execute("""CREATE OR REPLACE FUNCTION word_freq_update(wid INTEGER,
130                                                                                INOUT info JSONB)
131                                    AS $$
132                                    DECLARE rec RECORD;
133                                    BEGIN
134                                    IF info is null THEN
135                                      info = '{}'::jsonb;
136                                    END IF;
137                                    FOR rec IN SELECT count FROM word_frequencies WHERE id = wid
138                                    LOOP
139                                      info = info || jsonb_build_object('count', rec.count);
140                                    END LOOP;
141                                    FOR rec IN SELECT count FROM addressword_frequencies WHERE id = wid
142                                    LOOP
143                                      info = info || jsonb_build_object('addr_count', rec.count);
144                                    END LOOP;
145                                    IF info = '{}'::jsonb THEN
146                                      info = null;
147                                    END IF;
148                                    END;
149                                    $$ LANGUAGE plpgsql IMMUTABLE;
150                                 """)
151                     LOG.info('Update word table with recomputed frequencies')
152                     drop_tables(conn, 'tmp_word')
153                     cur.execute("""CREATE TABLE tmp_word AS
154                                     SELECT word_id, word_token, type, word,
155                                            word_freq_update(word_id, info) as info
156                                     FROM word
157                                 """)
158                     drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
159                 else:
160                     LOG.info('Computing word frequencies')
161                     drop_tables(conn, 'word_frequencies')
162                     cur.execute("""
163                       CREATE TEMP TABLE word_frequencies AS
164                       WITH word_freq AS MATERIALIZED (
165                                SELECT unnest(name_vector) as id, count(*)
166                                      FROM search_name GROUP BY id),
167                            addr_freq AS MATERIALIZED (
168                                SELECT unnest(nameaddress_vector) as id, count(*)
169                                      FROM search_name GROUP BY id)
170                       SELECT coalesce(a.id, w.id) as id,
171                              (CASE WHEN w.count is null THEN '{}'::JSONB
172                                   ELSE jsonb_build_object('count', w.count) END
173                               ||
174                               CASE WHEN a.count is null THEN '{}'::JSONB
175                                   ELSE jsonb_build_object('addr_count', a.count) END) as info
176                       FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
177                       """)
178                     cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
179                     cur.execute('ANALYSE word_frequencies')
180                     LOG.info('Update word table with recomputed frequencies')
181                     drop_tables(conn, 'tmp_word')
182                     cur.execute("""CREATE TABLE tmp_word AS
183                                     SELECT word_id, word_token, type, word,
184                                            (CASE WHEN wf.info is null THEN word.info
185                                             ELSE coalesce(word.info, '{}'::jsonb) || wf.info
186                                             END) as info
187                                     FROM word LEFT JOIN word_frequencies wf
188                                          ON word.word_id = wf.id
189                                 """)
190                     drop_tables(conn, 'word_frequencies')
191
192             with conn.cursor() as cur:
193                 cur.execute('SET max_parallel_workers_per_gather TO 0')
194
195             sqlp = SQLPreprocessor(conn, config)
196             sqlp.run_string(conn,
197                             'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
198             conn.commit()
199         self._create_base_indices(config, 'tmp_word')
200         self._create_lookup_indices(config, 'tmp_word')
201         self._move_temporary_word_table('tmp_word')
202
203
204
205     def _cleanup_housenumbers(self) -> None:
206         """ Remove unused house numbers.
207         """
208         with connect(self.dsn) as conn:
209             if not table_exists(conn, 'search_name'):
210                 return
211             with conn.cursor(name="hnr_counter") as cur:
212                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
213                                FROM word
214                                WHERE type = 'H'
215                                  AND NOT EXISTS(SELECT * FROM search_name
216                                                 WHERE ARRAY[word.word_id] && name_vector)
217                                  AND (char_length(coalesce(word, word_token)) > 6
218                                       OR coalesce(word, word_token) not similar to '\\d+')
219                             """)
220                 candidates = {token: wid for wid, token in cur}
221             with conn.cursor(name="hnr_counter") as cur:
222                 cur.execute("""SELECT housenumber FROM placex
223                                WHERE housenumber is not null
224                                      AND (char_length(housenumber) > 6
225                                           OR housenumber not similar to '\\d+')
226                             """)
227                 for row in cur:
228                     for hnr in row[0].split(';'):
229                         candidates.pop(hnr, None)
230             LOG.info("There are %s outdated housenumbers.", len(candidates))
231             LOG.debug("Outdated housenumbers: %s", candidates.keys())
232             if candidates:
233                 with conn.cursor() as cur:
234                     cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
235                                 (list(candidates.values()), ))
236                 conn.commit()
237
238
239
240     def update_word_tokens(self) -> None:
241         """ Remove unused tokens.
242         """
243         LOG.warning("Cleaning up housenumber tokens.")
244         self._cleanup_housenumbers()
245         LOG.warning("Tokenizer house-keeping done.")
246
247
248     def name_analyzer(self) -> 'ICUNameAnalyzer':
249         """ Create a new analyzer for tokenizing names and queries
250             using this tokinzer. Analyzers are context managers and should
251             be used accordingly:
252
253             ```
254             with tokenizer.name_analyzer() as analyzer:
255                 analyser.tokenize()
256             ```
257
258             When used outside the with construct, the caller must ensure to
259             call the close() function before destructing the analyzer.
260
261             Analyzers are not thread-safe. You need to instantiate one per thread.
262         """
263         assert self.loader is not None
264         return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
265                                self.loader.make_token_analysis())
266
267
268     def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
269         """ Return a list of the `num` most frequent full words
270             in the database.
271         """
272         with conn.cursor() as cur:
273             cur.execute("""SELECT word, sum((info->>'count')::int) as count
274                              FROM word WHERE type = 'W'
275                              GROUP BY word
276                              ORDER BY count DESC LIMIT %s""", (num,))
277             return list(s[0].split('@')[0] for s in cur)
278
279
280     def _save_config(self) -> None:
281         """ Save the configuration that needs to remain stable for the given
282             database as database properties.
283         """
284         assert self.loader is not None
285         with connect(self.dsn) as conn:
286             self.loader.save_config_to_db(conn)
287
288
289     def _setup_db_tables(self, config: Configuration) -> None:
290         """ Set up the word table and fill it with pre-computed word
291             frequencies.
292         """
293         with connect(self.dsn) as conn:
294             drop_tables(conn, 'word')
295             sqlp = SQLPreprocessor(conn, config)
296             sqlp.run_string(conn, """
297                 CREATE TABLE word (
298                       word_id INTEGER,
299                       word_token text NOT NULL,
300                       type text NOT NULL,
301                       word text,
302                       info jsonb
303                     ) {{db.tablespace.search_data}};
304                 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
305
306                 DROP SEQUENCE IF EXISTS seq_word;
307                 CREATE SEQUENCE seq_word start 1;
308                 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
309             """)
310             conn.commit()
311
312
313     def _create_base_indices(self, config: Configuration, table_name: str) -> None:
314         """ Set up the word table and fill it with pre-computed word
315             frequencies.
316         """
317         with connect(self.dsn) as conn:
318             sqlp = SQLPreprocessor(conn, config)
319             sqlp.run_string(conn,
320                             """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
321                                USING BTREE (word_token) {{db.tablespace.search_index}}""",
322                             table_name=table_name)
323             for name, ctype in WORD_TYPES:
324                 sqlp.run_string(conn,
325                                 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
326                                    USING BTREE (word) {{db.tablespace.address_index}}
327                                    WHERE type = '{{column_type}}'
328                                 """,
329                                 table_name=table_name, idx_name=name,
330                                 column_type=ctype)
331             conn.commit()
332
333
334     def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
335         """ Create additional indexes used when running the API.
336         """
337         with connect(self.dsn) as conn:
338             sqlp = SQLPreprocessor(conn, config)
339             # Index required for details lookup.
340             sqlp.run_string(conn, """
341                 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
342                   ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
343             """,
344             table_name=table_name)
345             conn.commit()
346
347
348     def _move_temporary_word_table(self, old: str) -> None:
349         """ Rename all tables and indexes used by the tokenizer.
350         """
351         with connect(self.dsn) as conn:
352             drop_tables(conn, 'word')
353             with conn.cursor() as cur:
354                 cur.execute(f"ALTER TABLE {old} RENAME TO word")
355                 for idx in ('word_token', 'word_id'):
356                     cur.execute(f"""ALTER INDEX idx_{old}_{idx}
357                                       RENAME TO idx_word_{idx}""")
358                 for name, _ in WORD_TYPES:
359                     cur.execute(f"""ALTER INDEX idx_{old}_{name}
360                                     RENAME TO idx_word_{name}""")
361             conn.commit()
362
363
364
365
366 class ICUNameAnalyzer(AbstractAnalyzer):
367     """ The ICU analyzer uses the ICU library for splitting names.
368
369         Each instance opens a connection to the database to request the
370         normalization.
371     """
372
373     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
374                  token_analysis: ICUTokenAnalysis) -> None:
375         self.conn: Optional[Connection] = connect(dsn)
376         self.conn.autocommit = True
377         self.sanitizer = sanitizer
378         self.token_analysis = token_analysis
379
380         self._cache = _TokenCache()
381
382
383     def close(self) -> None:
384         """ Free all resources used by the analyzer.
385         """
386         if self.conn:
387             self.conn.close()
388             self.conn = None
389
390
391     def _search_normalized(self, name: str) -> str:
392         """ Return the search token transliteration of the given name.
393         """
394         return cast(str, self.token_analysis.search.transliterate(name)).strip()
395
396
397     def _normalized(self, name: str) -> str:
398         """ Return the normalized version of the given name with all
399             non-relevant information removed.
400         """
401         return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
402
403
404     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
405         """ Return token information for the given list of words.
406             If a word starts with # it is assumed to be a full name
407             otherwise is a partial name.
408
409             The function returns a list of tuples with
410             (original word, word token, word id).
411
412             The function is used for testing and debugging only
413             and not necessarily efficient.
414         """
415         assert self.conn is not None
416         full_tokens = {}
417         partial_tokens = {}
418         for word in words:
419             if word.startswith('#'):
420                 full_tokens[word] = self._search_normalized(word[1:])
421             else:
422                 partial_tokens[word] = self._search_normalized(word)
423
424         with self.conn.cursor() as cur:
425             cur.execute("""SELECT word_token, word_id
426                             FROM word WHERE word_token = ANY(%s) and type = 'W'
427                         """, (list(full_tokens.values()),))
428             full_ids = {r[0]: r[1] for r in cur}
429             cur.execute("""SELECT word_token, word_id
430                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
431                         (list(partial_tokens.values()),))
432             part_ids = {r[0]: r[1] for r in cur}
433
434         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
435                + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
436
437
438     def normalize_postcode(self, postcode: str) -> str:
439         """ Convert the postcode to a standardized form.
440
441             This function must yield exactly the same result as the SQL function
442             'token_normalized_postcode()'.
443         """
444         return postcode.strip().upper()
445
446
447     def update_postcodes_from_db(self) -> None:
448         """ Update postcode tokens in the word table from the location_postcode
449             table.
450         """
451         assert self.conn is not None
452         analyzer = self.token_analysis.analysis.get('@postcode')
453
454         with self.conn.cursor() as cur:
455             # First get all postcode names currently in the word table.
456             cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
457             word_entries = set((entry[0] for entry in cur))
458
459             # Then compute the required postcode names from the postcode table.
460             needed_entries = set()
461             cur.execute("SELECT country_code, postcode FROM location_postcode")
462             for cc, postcode in cur:
463                 info = PlaceInfo({'country_code': cc,
464                                   'class': 'place', 'type': 'postcode',
465                                   'address': {'postcode': postcode}})
466                 address = self.sanitizer.process_names(info)[1]
467                 for place in address:
468                     if place.kind == 'postcode':
469                         if analyzer is None:
470                             postcode_name = place.name.strip().upper()
471                             variant_base = None
472                         else:
473                             postcode_name = analyzer.get_canonical_id(place)
474                             variant_base = place.get_attr("variant")
475
476                         if variant_base:
477                             needed_entries.add(f'{postcode_name}@{variant_base}')
478                         else:
479                             needed_entries.add(postcode_name)
480                         break
481
482         # Now update the word table.
483         self._delete_unused_postcode_words(word_entries - needed_entries)
484         self._add_missing_postcode_words(needed_entries - word_entries)
485
486     def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
487         assert self.conn is not None
488         if tokens:
489             with self.conn.cursor() as cur:
490                 cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
491                             (list(tokens), ))
492
493     def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
494         assert self.conn is not None
495         if not tokens:
496             return
497
498         analyzer = self.token_analysis.analysis.get('@postcode')
499         terms = []
500
501         for postcode_name in tokens:
502             if '@' in postcode_name:
503                 term, variant = postcode_name.split('@', 2)
504                 term = self._search_normalized(term)
505                 if analyzer is None:
506                     variants = [term]
507                 else:
508                     variants = analyzer.compute_variants(variant)
509                     if term not in variants:
510                         variants.append(term)
511             else:
512                 variants = [self._search_normalized(postcode_name)]
513             terms.append((postcode_name, variants))
514
515         if terms:
516             with self.conn.cursor() as cur:
517                 cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
518
519
520
521
522     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
523                                should_replace: bool) -> None:
524         """ Replace the search index for special phrases with the new phrases.
525             If `should_replace` is True, then the previous set of will be
526             completely replaced. Otherwise the phrases are added to the
527             already existing ones.
528         """
529         assert self.conn is not None
530         norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
531                             for p in phrases))
532
533         with self.conn.cursor() as cur:
534             # Get the old phrases.
535             existing_phrases = set()
536             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
537             for word, info in cur:
538                 existing_phrases.add((word, info['class'], info['type'],
539                                       info.get('op') or '-'))
540
541             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
542             if should_replace:
543                 deleted = self._remove_special_phrases(cur, norm_phrases,
544                                                        existing_phrases)
545             else:
546                 deleted = 0
547
548         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
549                  len(norm_phrases), added, deleted)
550
551
552     def _add_special_phrases(self, cursor: Cursor,
553                              new_phrases: Set[Tuple[str, str, str, str]],
554                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
555         """ Add all phrases to the database that are not yet there.
556         """
557         to_add = new_phrases - existing_phrases
558
559         added = 0
560         with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
561             for word, cls, typ, oper in to_add:
562                 term = self._search_normalized(word)
563                 if term:
564                     copy.write_row((term, 'S', word,
565                                     Jsonb({'class': cls, 'type': typ,
566                                            'op': oper if oper in ('in', 'near') else None})))
567                     added += 1
568
569         return added
570
571
572     def _remove_special_phrases(self, cursor: Cursor,
573                              new_phrases: Set[Tuple[str, str, str, str]],
574                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
575         """ Remove all phrases from the database that are no longer in the
576             new phrase list.
577         """
578         to_delete = existing_phrases - new_phrases
579
580         if to_delete:
581             cursor.executemany(
582                 """ DELETE FROM word
583                       WHERE type = 'S' and word = %s
584                             and info->>'class' = %s and info->>'type' = %s
585                             and %s = coalesce(info->>'op', '-')
586                 """, to_delete)
587
588         return len(to_delete)
589
590
591     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
592         """ Add default names for the given country to the search index.
593         """
594         # Make sure any name preprocessing for country names applies.
595         info = PlaceInfo({'name': names, 'country_code': country_code,
596                           'rank_address': 4, 'class': 'boundary',
597                           'type': 'administrative'})
598         self._add_country_full_names(country_code,
599                                      self.sanitizer.process_names(info)[0],
600                                      internal=True)
601
602
603     def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
604                                 internal: bool = False) -> None:
605         """ Add names for the given country from an already sanitized
606             name list.
607         """
608         assert self.conn is not None
609         word_tokens = set()
610         for name in names:
611             norm_name = self._search_normalized(name.name)
612             if norm_name:
613                 word_tokens.add(norm_name)
614
615         with self.conn.cursor() as cur:
616             # Get existing names
617             cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
618                              FROM word
619                              WHERE type = 'C' and word = %s""",
620                         (country_code, ))
621             # internal/external names
622             existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
623             for word in cur:
624                 existing_tokens[word[1]].add(word[0])
625
626             # Delete names that no longer exist.
627             gone_tokens = existing_tokens[internal] - word_tokens
628             if internal:
629                 gone_tokens.update(existing_tokens[False] & word_tokens)
630             if gone_tokens:
631                 cur.execute("""DELETE FROM word
632                                USING unnest(%s::text[]) as token
633                                WHERE type = 'C' and word = %s
634                                      and word_token = token""",
635                             (list(gone_tokens), country_code))
636
637             # Only add those names that are not yet in the list.
638             new_tokens = word_tokens - existing_tokens[True]
639             if not internal:
640                 new_tokens -= existing_tokens[False]
641             if new_tokens:
642                 if internal:
643                     sql = """INSERT INTO word (word_token, type, word, info)
644                                (SELECT token, 'C', %s, '{"internal": "yes"}'
645                                   FROM unnest(%s::text[]) as token)
646                            """
647                 else:
648                     sql = """INSERT INTO word (word_token, type, word)
649                                    (SELECT token, 'C', %s
650                                     FROM unnest(%s::text[]) as token)
651                           """
652                 cur.execute(sql, (country_code, list(new_tokens)))
653
654
655     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
656         """ Determine tokenizer information about the given place.
657
658             Returns a JSON-serializable structure that will be handed into
659             the database via the token_info field.
660         """
661         token_info = _TokenInfo()
662
663         names, address = self.sanitizer.process_names(place)
664
665         if names:
666             token_info.set_names(*self._compute_name_tokens(names))
667
668             if place.is_country():
669                 assert place.country_code is not None
670                 self._add_country_full_names(place.country_code, names)
671
672         if address:
673             self._process_place_address(token_info, address)
674
675         return token_info.to_dict()
676
677
678     def _process_place_address(self, token_info: '_TokenInfo',
679                                address: Sequence[PlaceName]) -> None:
680         for item in address:
681             if item.kind == 'postcode':
682                 token_info.set_postcode(self._add_postcode(item))
683             elif item.kind == 'housenumber':
684                 token_info.add_housenumber(*self._compute_housenumber_token(item))
685             elif item.kind == 'street':
686                 token_info.add_street(self._retrieve_full_tokens(item.name))
687             elif item.kind == 'place':
688                 if not item.suffix:
689                     token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
690             elif not item.kind.startswith('_') and not item.suffix and \
691                  item.kind not in ('country', 'full', 'inclusion'):
692                 token_info.add_address_term(item.kind,
693                                             itertools.chain(*self._compute_name_tokens([item])))
694
695
696     def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
697         """ Normalize the housenumber and return the word token and the
698             canonical form.
699         """
700         assert self.conn is not None
701         analyzer = self.token_analysis.analysis.get('@housenumber')
702         result: Tuple[Optional[int], Optional[str]] = (None, None)
703
704         if analyzer is None:
705             # When no custom analyzer is set, simply normalize and transliterate
706             norm_name = self._search_normalized(hnr.name)
707             if norm_name:
708                 result = self._cache.housenumbers.get(norm_name, result)
709                 if result[0] is None:
710                     hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
711
712                     result = hid, norm_name
713                     self._cache.housenumbers[norm_name] = result
714         else:
715             # Otherwise use the analyzer to determine the canonical name.
716             # Per convention we use the first variant as the 'lookup name', the
717             # name that gets saved in the housenumber field of the place.
718             word_id = analyzer.get_canonical_id(hnr)
719             if word_id:
720                 result = self._cache.housenumbers.get(word_id, result)
721                 if result[0] is None:
722                     variants = analyzer.compute_variants(word_id)
723                     if variants:
724                         hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
725                                              (word_id, list(variants)))
726                         result = hid, variants[0]
727                         self._cache.housenumbers[word_id] = result
728
729         return result
730
731
732     def _retrieve_full_tokens(self, name: str) -> List[int]:
733         """ Get the full name token for the given name, if it exists.
734             The name is only retrieved for the standard analyser.
735         """
736         assert self.conn is not None
737         norm_name = self._search_normalized(name)
738
739         # return cached if possible
740         if norm_name in self._cache.fulls:
741             return self._cache.fulls[norm_name]
742
743         with self.conn.cursor() as cur:
744             cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
745                         (norm_name, ))
746             full = [row[0] for row in cur]
747
748         self._cache.fulls[norm_name] = full
749
750         return full
751
752
753     def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
754         """ Computes the full name and partial name tokens for the given
755             dictionary of names.
756         """
757         assert self.conn is not None
758         full_tokens: Set[int] = set()
759         partial_tokens: Set[int] = set()
760
761         for name in names:
762             analyzer_id = name.get_attr('analyzer')
763             analyzer = self.token_analysis.get_analyzer(analyzer_id)
764             word_id = analyzer.get_canonical_id(name)
765             if analyzer_id is None:
766                 token_id = word_id
767             else:
768                 token_id = f'{word_id}@{analyzer_id}'
769
770             full, part = self._cache.names.get(token_id, (None, None))
771             if full is None:
772                 variants = analyzer.compute_variants(word_id)
773                 if not variants:
774                     continue
775
776                 with self.conn.cursor() as cur:
777                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
778                                 (token_id, variants))
779                     full, part = cast(Tuple[int, List[int]], cur.fetchone())
780
781                 self._cache.names[token_id] = (full, part)
782
783             assert part is not None
784
785             full_tokens.add(full)
786             partial_tokens.update(part)
787
788         return full_tokens, partial_tokens
789
790
791     def _add_postcode(self, item: PlaceName) -> Optional[str]:
792         """ Make sure the normalized postcode is present in the word table.
793         """
794         assert self.conn is not None
795         analyzer = self.token_analysis.analysis.get('@postcode')
796
797         if analyzer is None:
798             postcode_name = item.name.strip().upper()
799             variant_base = None
800         else:
801             postcode_name = analyzer.get_canonical_id(item)
802             variant_base = item.get_attr("variant")
803
804         if variant_base:
805             postcode = f'{postcode_name}@{variant_base}'
806         else:
807             postcode = postcode_name
808
809         if postcode not in self._cache.postcodes:
810             term = self._search_normalized(postcode_name)
811             if not term:
812                 return None
813
814             variants = {term}
815             if analyzer is not None and variant_base:
816                 variants.update(analyzer.compute_variants(variant_base))
817
818             with self.conn.cursor() as cur:
819                 cur.execute("SELECT create_postcode_word(%s, %s)",
820                             (postcode, list(variants)))
821             self._cache.postcodes.add(postcode)
822
823         return postcode_name
824
825
826 class _TokenInfo:
827     """ Collect token information to be sent back to the database.
828     """
829     def __init__(self) -> None:
830         self.names: Optional[str] = None
831         self.housenumbers: Set[str] = set()
832         self.housenumber_tokens: Set[int] = set()
833         self.street_tokens: Optional[Set[int]] = None
834         self.place_tokens: Set[int] = set()
835         self.address_tokens: Dict[str, str] = {}
836         self.postcode: Optional[str] = None
837
838
839     def _mk_array(self, tokens: Iterable[Any]) -> str:
840         return f"{{{','.join((str(s) for s in tokens))}}}"
841
842
843     def to_dict(self) -> Dict[str, Any]:
844         """ Return the token information in database importable format.
845         """
846         out: Dict[str, Any] = {}
847
848         if self.names:
849             out['names'] = self.names
850
851         if self.housenumbers:
852             out['hnr'] = ';'.join(self.housenumbers)
853             out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
854
855         if self.street_tokens is not None:
856             out['street'] = self._mk_array(self.street_tokens)
857
858         if self.place_tokens:
859             out['place'] = self._mk_array(self.place_tokens)
860
861         if self.address_tokens:
862             out['addr'] = self.address_tokens
863
864         if self.postcode:
865             out['postcode'] = self.postcode
866
867         return out
868
869
870     def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
871         """ Adds token information for the normalised names.
872         """
873         self.names = self._mk_array(itertools.chain(fulls, partials))
874
875
876     def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
877         """ Extract housenumber information from a list of normalised
878             housenumbers.
879         """
880         if token:
881             assert hnr is not None
882             self.housenumbers.add(hnr)
883             self.housenumber_tokens.add(token)
884
885
886     def add_street(self, tokens: Iterable[int]) -> None:
887         """ Add addr:street match terms.
888         """
889         if self.street_tokens is None:
890             self.street_tokens = set()
891         self.street_tokens.update(tokens)
892
893
894     def add_place(self, tokens: Iterable[int]) -> None:
895         """ Add addr:place search and match terms.
896         """
897         self.place_tokens.update(tokens)
898
899
900     def add_address_term(self, key: str, partials: Iterable[int]) -> None:
901         """ Add additional address terms.
902         """
903         array = self._mk_array(partials)
904         if len(array) > 2:
905             self.address_tokens[key] = array
906
907     def set_postcode(self, postcode: Optional[str]) -> None:
908         """ Set the postcode to the given one.
909         """
910         self.postcode = postcode
911
912
913 class _TokenCache:
914     """ Cache for token information to avoid repeated database queries.
915
916         This cache is not thread-safe and needs to be instantiated per
917         analyzer.
918     """
919     def __init__(self) -> None:
920         self.names: Dict[str, Tuple[int, List[int]]] = {}
921         self.partials: Dict[str, int] = {}
922         self.fulls: Dict[str, List[int]] = {}
923         self.postcodes: Set[str] = set()
924         self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}