]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tokenizer/icu_tokenizer.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
10 """
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
12                    Dict, Set, Iterable
13 import itertools
14 import logging
15 from pathlib import Path
16 from textwrap import dedent
17
18 from psycopg.types.json import Jsonb
19 from psycopg import sql as pysql
20
21 from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
22                             drop_tables, table_exists, execute_scalar
23 from ..config import Configuration
24 from ..db.sql_preprocessor import SQLPreprocessor
25 from ..data.place_info import PlaceInfo
26 from ..data.place_name import PlaceName
27 from .icu_rule_loader import ICURuleLoader
28 from .place_sanitizer import PlaceSanitizer
29 from .icu_token_analysis import ICUTokenAnalysis
30 from .base import AbstractAnalyzer, AbstractTokenizer
31
32 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
33
34 LOG = logging.getLogger()
35
36 WORD_TYPES =(('country_names', 'C'),
37              ('postcodes', 'P'),
38              ('full_word', 'W'),
39              ('housenumbers', 'H'))
40
41 def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
42     """ Create a new instance of the tokenizer provided by this module.
43     """
44     return ICUTokenizer(dsn, data_dir)
45
46
47 class ICUTokenizer(AbstractTokenizer):
48     """ This tokenizer uses libICU to convert names and queries to ASCII.
49         Otherwise it uses the same algorithms and data structures as the
50         normalization routines in Nominatim 3.
51     """
52
53     def __init__(self, dsn: str, data_dir: Path) -> None:
54         self.dsn = dsn
55         self.data_dir = data_dir
56         self.loader: Optional[ICURuleLoader] = None
57
58
59     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
60         """ Set up a new tokenizer for the database.
61
62             This copies all necessary data in the project directory to make
63             sure the tokenizer remains stable even over updates.
64         """
65         self.loader = ICURuleLoader(config)
66
67         self._install_php(config.lib_dir.php, overwrite=True)
68         self._save_config()
69
70         if init_db:
71             self.update_sql_functions(config)
72             self._setup_db_tables(config)
73             self._create_base_indices(config, 'word')
74
75
76     def init_from_project(self, config: Configuration) -> None:
77         """ Initialise the tokenizer from the project directory.
78         """
79         self.loader = ICURuleLoader(config)
80
81         with connect(self.dsn) as conn:
82             self.loader.load_config_from_db(conn)
83
84         self._install_php(config.lib_dir.php, overwrite=False)
85
86
87     def finalize_import(self, config: Configuration) -> None:
88         """ Do any required postprocessing to make the tokenizer data ready
89             for use.
90         """
91         self._create_lookup_indices(config, 'word')
92
93
94     def update_sql_functions(self, config: Configuration) -> None:
95         """ Reimport the SQL functions for this tokenizer.
96         """
97         with connect(self.dsn) as conn:
98             sqlp = SQLPreprocessor(conn, config)
99             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
100
101
102     def check_database(self, config: Configuration) -> None:
103         """ Check that the tokenizer is set up correctly.
104         """
105         # Will throw an error if there is an issue.
106         self.init_from_project(config)
107
108
109     def update_statistics(self, config: Configuration, threads: int = 2) -> None:
110         """ Recompute frequencies for all name words.
111         """
112         with connect(self.dsn) as conn:
113             if not table_exists(conn, 'search_name'):
114                 return
115
116             with conn.cursor() as cur:
117                 cur.execute('ANALYSE search_name')
118                 if threads > 1:
119                     cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
120                                      .format(pysql.Literal(min(threads, 6),)))
121
122                 if server_version_tuple(conn) < (12, 0):
123                     LOG.info('Computing word frequencies')
124                     drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
125                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
126                                      SELECT unnest(name_vector) as id, count(*)
127                                      FROM search_name GROUP BY id""")
128                     cur.execute('CREATE INDEX ON word_frequencies(id)')
129                     cur.execute("""CREATE TEMP TABLE addressword_frequencies AS
130                                      SELECT unnest(nameaddress_vector) as id, count(*)
131                                      FROM search_name GROUP BY id""")
132                     cur.execute('CREATE INDEX ON addressword_frequencies(id)')
133                     cur.execute("""CREATE OR REPLACE FUNCTION word_freq_update(wid INTEGER,
134                                                                                INOUT info JSONB)
135                                    AS $$
136                                    DECLARE rec RECORD;
137                                    BEGIN
138                                    IF info is null THEN
139                                      info = '{}'::jsonb;
140                                    END IF;
141                                    FOR rec IN SELECT count FROM word_frequencies WHERE id = wid
142                                    LOOP
143                                      info = info || jsonb_build_object('count', rec.count);
144                                    END LOOP;
145                                    FOR rec IN SELECT count FROM addressword_frequencies WHERE id = wid
146                                    LOOP
147                                      info = info || jsonb_build_object('addr_count', rec.count);
148                                    END LOOP;
149                                    IF info = '{}'::jsonb THEN
150                                      info = null;
151                                    END IF;
152                                    END;
153                                    $$ LANGUAGE plpgsql IMMUTABLE;
154                                 """)
155                     LOG.info('Update word table with recomputed frequencies')
156                     drop_tables(conn, 'tmp_word')
157                     cur.execute("""CREATE TABLE tmp_word AS
158                                     SELECT word_id, word_token, type, word,
159                                            word_freq_update(word_id, info) as info
160                                     FROM word
161                                 """)
162                     drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
163                 else:
164                     LOG.info('Computing word frequencies')
165                     drop_tables(conn, 'word_frequencies')
166                     cur.execute("""
167                       CREATE TEMP TABLE word_frequencies AS
168                       WITH word_freq AS MATERIALIZED (
169                                SELECT unnest(name_vector) as id, count(*)
170                                      FROM search_name GROUP BY id),
171                            addr_freq AS MATERIALIZED (
172                                SELECT unnest(nameaddress_vector) as id, count(*)
173                                      FROM search_name GROUP BY id)
174                       SELECT coalesce(a.id, w.id) as id,
175                              (CASE WHEN w.count is null THEN '{}'::JSONB
176                                   ELSE jsonb_build_object('count', w.count) END
177                               ||
178                               CASE WHEN a.count is null THEN '{}'::JSONB
179                                   ELSE jsonb_build_object('addr_count', a.count) END) as info
180                       FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
181                       """)
182                     cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
183                     cur.execute('ANALYSE word_frequencies')
184                     LOG.info('Update word table with recomputed frequencies')
185                     drop_tables(conn, 'tmp_word')
186                     cur.execute("""CREATE TABLE tmp_word AS
187                                     SELECT word_id, word_token, type, word,
188                                            (CASE WHEN wf.info is null THEN word.info
189                                             ELSE coalesce(word.info, '{}'::jsonb) || wf.info
190                                             END) as info
191                                     FROM word LEFT JOIN word_frequencies wf
192                                          ON word.word_id = wf.id
193                                     ORDER BY word_id
194                                 """)
195                     drop_tables(conn, 'word_frequencies')
196
197             with conn.cursor() as cur:
198                 cur.execute('SET max_parallel_workers_per_gather TO 0')
199
200             sqlp = SQLPreprocessor(conn, config)
201             sqlp.run_string(conn,
202                             'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
203             conn.commit()
204         self._create_base_indices(config, 'tmp_word')
205         self._create_lookup_indices(config, 'tmp_word')
206         self._move_temporary_word_table('tmp_word')
207
208
209
210     def _cleanup_housenumbers(self) -> None:
211         """ Remove unused house numbers.
212         """
213         with connect(self.dsn) as conn:
214             if not table_exists(conn, 'search_name'):
215                 return
216             with conn.cursor(name="hnr_counter") as cur:
217                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
218                                FROM word
219                                WHERE type = 'H'
220                                  AND NOT EXISTS(SELECT * FROM search_name
221                                                 WHERE ARRAY[word.word_id] && name_vector)
222                                  AND (char_length(coalesce(word, word_token)) > 6
223                                       OR coalesce(word, word_token) not similar to '\\d+')
224                             """)
225                 candidates = {token: wid for wid, token in cur}
226             with conn.cursor(name="hnr_counter") as cur:
227                 cur.execute("""SELECT housenumber FROM placex
228                                WHERE housenumber is not null
229                                      AND (char_length(housenumber) > 6
230                                           OR housenumber not similar to '\\d+')
231                             """)
232                 for row in cur:
233                     for hnr in row[0].split(';'):
234                         candidates.pop(hnr, None)
235             LOG.info("There are %s outdated housenumbers.", len(candidates))
236             LOG.debug("Outdated housenumbers: %s", candidates.keys())
237             if candidates:
238                 with conn.cursor() as cur:
239                     cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
240                                 (list(candidates.values()), ))
241                 conn.commit()
242
243
244
245     def update_word_tokens(self) -> None:
246         """ Remove unused tokens.
247         """
248         LOG.warning("Cleaning up housenumber tokens.")
249         self._cleanup_housenumbers()
250         LOG.warning("Tokenizer house-keeping done.")
251
252
253     def name_analyzer(self) -> 'ICUNameAnalyzer':
254         """ Create a new analyzer for tokenizing names and queries
255             using this tokinzer. Analyzers are context managers and should
256             be used accordingly:
257
258             ```
259             with tokenizer.name_analyzer() as analyzer:
260                 analyser.tokenize()
261             ```
262
263             When used outside the with construct, the caller must ensure to
264             call the close() function before destructing the analyzer.
265
266             Analyzers are not thread-safe. You need to instantiate one per thread.
267         """
268         assert self.loader is not None
269         return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
270                                self.loader.make_token_analysis())
271
272
273     def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
274         """ Return a list of the `num` most frequent full words
275             in the database.
276         """
277         with conn.cursor() as cur:
278             cur.execute("""SELECT word, sum((info->>'count')::int) as count
279                              FROM word WHERE type = 'W'
280                              GROUP BY word
281                              ORDER BY count DESC LIMIT %s""", (num,))
282             return list(s[0].split('@')[0] for s in cur)
283
284
285     def _install_php(self, phpdir: Optional[Path], overwrite: bool = True) -> None:
286         """ Install the php script for the tokenizer.
287         """
288         if phpdir is not None:
289             assert self.loader is not None
290             php_file = self.data_dir / "tokenizer.php"
291
292             if not php_file.exists() or overwrite:
293                 php_file.write_text(dedent(f"""\
294                     <?php
295                     @define('CONST_Max_Word_Frequency', 10000000);
296                     @define('CONST_Term_Normalization_Rules', "{self.loader.normalization_rules}");
297                     @define('CONST_Transliteration', "{self.loader.get_search_rules()}");
298                     require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
299
300
301     def _save_config(self) -> None:
302         """ Save the configuration that needs to remain stable for the given
303             database as database properties.
304         """
305         assert self.loader is not None
306         with connect(self.dsn) as conn:
307             self.loader.save_config_to_db(conn)
308
309
310     def _setup_db_tables(self, config: Configuration) -> None:
311         """ Set up the word table and fill it with pre-computed word
312             frequencies.
313         """
314         with connect(self.dsn) as conn:
315             drop_tables(conn, 'word')
316             sqlp = SQLPreprocessor(conn, config)
317             sqlp.run_string(conn, """
318                 CREATE TABLE word (
319                       word_id INTEGER,
320                       word_token text NOT NULL,
321                       type text NOT NULL,
322                       word text,
323                       info jsonb
324                     ) {{db.tablespace.search_data}};
325                 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
326
327                 DROP SEQUENCE IF EXISTS seq_word;
328                 CREATE SEQUENCE seq_word start 1;
329                 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
330             """)
331             conn.commit()
332
333
334     def _create_base_indices(self, config: Configuration, table_name: str) -> None:
335         """ Set up the word table and fill it with pre-computed word
336             frequencies.
337         """
338         with connect(self.dsn) as conn:
339             sqlp = SQLPreprocessor(conn, config)
340             sqlp.run_string(conn,
341                             """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
342                                USING BTREE (word_token) {{db.tablespace.search_index}}""",
343                             table_name=table_name)
344             for name, ctype in WORD_TYPES:
345                 sqlp.run_string(conn,
346                                 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
347                                    USING BTREE (word) {{db.tablespace.address_index}}
348                                    WHERE type = '{{column_type}}'
349                                 """,
350                                 table_name=table_name, idx_name=name,
351                                 column_type=ctype)
352             conn.commit()
353
354
355     def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
356         """ Create additional indexes used when running the API.
357         """
358         with connect(self.dsn) as conn:
359             sqlp = SQLPreprocessor(conn, config)
360             # Index required for details lookup.
361             sqlp.run_string(conn, """
362                 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
363                   ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
364             """,
365             table_name=table_name)
366             conn.commit()
367
368
369     def _move_temporary_word_table(self, old: str) -> None:
370         """ Rename all tables and indexes used by the tokenizer.
371         """
372         with connect(self.dsn) as conn:
373             drop_tables(conn, 'word')
374             with conn.cursor() as cur:
375                 cur.execute(f"ALTER TABLE {old} RENAME TO word")
376                 for idx in ('word_token', 'word_id'):
377                     cur.execute(f"""ALTER INDEX idx_{old}_{idx}
378                                       RENAME TO idx_word_{idx}""")
379                 for name, _ in WORD_TYPES:
380                     cur.execute(f"""ALTER INDEX idx_{old}_{name}
381                                     RENAME TO idx_word_{name}""")
382             conn.commit()
383
384
385
386
387 class ICUNameAnalyzer(AbstractAnalyzer):
388     """ The ICU analyzer uses the ICU library for splitting names.
389
390         Each instance opens a connection to the database to request the
391         normalization.
392     """
393
394     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
395                  token_analysis: ICUTokenAnalysis) -> None:
396         self.conn: Optional[Connection] = connect(dsn)
397         self.conn.autocommit = True
398         self.sanitizer = sanitizer
399         self.token_analysis = token_analysis
400
401         self._cache = _TokenCache()
402
403
404     def close(self) -> None:
405         """ Free all resources used by the analyzer.
406         """
407         if self.conn:
408             self.conn.close()
409             self.conn = None
410
411
412     def _search_normalized(self, name: str) -> str:
413         """ Return the search token transliteration of the given name.
414         """
415         return cast(str, self.token_analysis.search.transliterate(name)).strip()
416
417
418     def _normalized(self, name: str) -> str:
419         """ Return the normalized version of the given name with all
420             non-relevant information removed.
421         """
422         return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
423
424
425     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
426         """ Return token information for the given list of words.
427             If a word starts with # it is assumed to be a full name
428             otherwise is a partial name.
429
430             The function returns a list of tuples with
431             (original word, word token, word id).
432
433             The function is used for testing and debugging only
434             and not necessarily efficient.
435         """
436         assert self.conn is not None
437         full_tokens = {}
438         partial_tokens = {}
439         for word in words:
440             if word.startswith('#'):
441                 full_tokens[word] = self._search_normalized(word[1:])
442             else:
443                 partial_tokens[word] = self._search_normalized(word)
444
445         with self.conn.cursor() as cur:
446             cur.execute("""SELECT word_token, word_id
447                             FROM word WHERE word_token = ANY(%s) and type = 'W'
448                         """, (list(full_tokens.values()),))
449             full_ids = {r[0]: r[1] for r in cur}
450             cur.execute("""SELECT word_token, word_id
451                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
452                         (list(partial_tokens.values()),))
453             part_ids = {r[0]: r[1] for r in cur}
454
455         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
456                + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
457
458
459     def normalize_postcode(self, postcode: str) -> str:
460         """ Convert the postcode to a standardized form.
461
462             This function must yield exactly the same result as the SQL function
463             'token_normalized_postcode()'.
464         """
465         return postcode.strip().upper()
466
467
468     def update_postcodes_from_db(self) -> None:
469         """ Update postcode tokens in the word table from the location_postcode
470             table.
471         """
472         assert self.conn is not None
473         analyzer = self.token_analysis.analysis.get('@postcode')
474
475         with self.conn.cursor() as cur:
476             # First get all postcode names currently in the word table.
477             cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
478             word_entries = set((entry[0] for entry in cur))
479
480             # Then compute the required postcode names from the postcode table.
481             needed_entries = set()
482             cur.execute("SELECT country_code, postcode FROM location_postcode")
483             for cc, postcode in cur:
484                 info = PlaceInfo({'country_code': cc,
485                                   'class': 'place', 'type': 'postcode',
486                                   'address': {'postcode': postcode}})
487                 address = self.sanitizer.process_names(info)[1]
488                 for place in address:
489                     if place.kind == 'postcode':
490                         if analyzer is None:
491                             postcode_name = place.name.strip().upper()
492                             variant_base = None
493                         else:
494                             postcode_name = analyzer.get_canonical_id(place)
495                             variant_base = place.get_attr("variant")
496
497                         if variant_base:
498                             needed_entries.add(f'{postcode_name}@{variant_base}')
499                         else:
500                             needed_entries.add(postcode_name)
501                         break
502
503         # Now update the word table.
504         self._delete_unused_postcode_words(word_entries - needed_entries)
505         self._add_missing_postcode_words(needed_entries - word_entries)
506
507     def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
508         assert self.conn is not None
509         if tokens:
510             with self.conn.cursor() as cur:
511                 cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
512                             (list(tokens), ))
513
514     def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
515         assert self.conn is not None
516         if not tokens:
517             return
518
519         analyzer = self.token_analysis.analysis.get('@postcode')
520         terms = []
521
522         for postcode_name in tokens:
523             if '@' in postcode_name:
524                 term, variant = postcode_name.split('@', 2)
525                 term = self._search_normalized(term)
526                 if analyzer is None:
527                     variants = [term]
528                 else:
529                     variants = analyzer.compute_variants(variant)
530                     if term not in variants:
531                         variants.append(term)
532             else:
533                 variants = [self._search_normalized(postcode_name)]
534             terms.append((postcode_name, variants))
535
536         if terms:
537             with self.conn.cursor() as cur:
538                 cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
539
540
541
542
543     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
544                                should_replace: bool) -> None:
545         """ Replace the search index for special phrases with the new phrases.
546             If `should_replace` is True, then the previous set of will be
547             completely replaced. Otherwise the phrases are added to the
548             already existing ones.
549         """
550         assert self.conn is not None
551         norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
552                             for p in phrases))
553
554         with self.conn.cursor() as cur:
555             # Get the old phrases.
556             existing_phrases = set()
557             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
558             for word, info in cur:
559                 existing_phrases.add((word, info['class'], info['type'],
560                                       info.get('op') or '-'))
561
562             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
563             if should_replace:
564                 deleted = self._remove_special_phrases(cur, norm_phrases,
565                                                        existing_phrases)
566             else:
567                 deleted = 0
568
569         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
570                  len(norm_phrases), added, deleted)
571
572
573     def _add_special_phrases(self, cursor: Cursor,
574                              new_phrases: Set[Tuple[str, str, str, str]],
575                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
576         """ Add all phrases to the database that are not yet there.
577         """
578         to_add = new_phrases - existing_phrases
579
580         added = 0
581         with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
582             for word, cls, typ, oper in to_add:
583                 term = self._search_normalized(word)
584                 if term:
585                     copy.write_row((term, 'S', word,
586                                     Jsonb({'class': cls, 'type': typ,
587                                            'op': oper if oper in ('in', 'near') else None})))
588                     added += 1
589
590         return added
591
592
593     def _remove_special_phrases(self, cursor: Cursor,
594                              new_phrases: Set[Tuple[str, str, str, str]],
595                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
596         """ Remove all phrases from the database that are no longer in the
597             new phrase list.
598         """
599         to_delete = existing_phrases - new_phrases
600
601         if to_delete:
602             cursor.executemany(
603                 """ DELETE FROM word
604                       WHERE type = 'S' and word = %s
605                             and info->>'class' = %s and info->>'type' = %s
606                             and %s = coalesce(info->>'op', '-')
607                 """, to_delete)
608
609         return len(to_delete)
610
611
612     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
613         """ Add default names for the given country to the search index.
614         """
615         # Make sure any name preprocessing for country names applies.
616         info = PlaceInfo({'name': names, 'country_code': country_code,
617                           'rank_address': 4, 'class': 'boundary',
618                           'type': 'administrative'})
619         self._add_country_full_names(country_code,
620                                      self.sanitizer.process_names(info)[0],
621                                      internal=True)
622
623
624     def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
625                                 internal: bool = False) -> None:
626         """ Add names for the given country from an already sanitized
627             name list.
628         """
629         assert self.conn is not None
630         word_tokens = set()
631         for name in names:
632             norm_name = self._search_normalized(name.name)
633             if norm_name:
634                 word_tokens.add(norm_name)
635
636         with self.conn.cursor() as cur:
637             # Get existing names
638             cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
639                              FROM word
640                              WHERE type = 'C' and word = %s""",
641                         (country_code, ))
642             # internal/external names
643             existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
644             for word in cur:
645                 existing_tokens[word[1]].add(word[0])
646
647             # Delete names that no longer exist.
648             gone_tokens = existing_tokens[internal] - word_tokens
649             if internal:
650                 gone_tokens.update(existing_tokens[False] & word_tokens)
651             if gone_tokens:
652                 cur.execute("""DELETE FROM word
653                                USING unnest(%s::text[]) as token
654                                WHERE type = 'C' and word = %s
655                                      and word_token = token""",
656                             (list(gone_tokens), country_code))
657
658             # Only add those names that are not yet in the list.
659             new_tokens = word_tokens - existing_tokens[True]
660             if not internal:
661                 new_tokens -= existing_tokens[False]
662             if new_tokens:
663                 if internal:
664                     sql = """INSERT INTO word (word_token, type, word, info)
665                                (SELECT token, 'C', %s, '{"internal": "yes"}'
666                                   FROM unnest(%s::text[]) as token)
667                            """
668                 else:
669                     sql = """INSERT INTO word (word_token, type, word)
670                                    (SELECT token, 'C', %s
671                                     FROM unnest(%s::text[]) as token)
672                           """
673                 cur.execute(sql, (country_code, list(new_tokens)))
674
675
676     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
677         """ Determine tokenizer information about the given place.
678
679             Returns a JSON-serializable structure that will be handed into
680             the database via the token_info field.
681         """
682         token_info = _TokenInfo()
683
684         names, address = self.sanitizer.process_names(place)
685
686         if names:
687             token_info.set_names(*self._compute_name_tokens(names))
688
689             if place.is_country():
690                 assert place.country_code is not None
691                 self._add_country_full_names(place.country_code, names)
692
693         if address:
694             self._process_place_address(token_info, address)
695
696         return token_info.to_dict()
697
698
699     def _process_place_address(self, token_info: '_TokenInfo',
700                                address: Sequence[PlaceName]) -> None:
701         for item in address:
702             if item.kind == 'postcode':
703                 token_info.set_postcode(self._add_postcode(item))
704             elif item.kind == 'housenumber':
705                 token_info.add_housenumber(*self._compute_housenumber_token(item))
706             elif item.kind == 'street':
707                 token_info.add_street(self._retrieve_full_tokens(item.name))
708             elif item.kind == 'place':
709                 if not item.suffix:
710                     token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
711             elif not item.kind.startswith('_') and not item.suffix and \
712                  item.kind not in ('country', 'full', 'inclusion'):
713                 token_info.add_address_term(item.kind,
714                                             itertools.chain(*self._compute_name_tokens([item])))
715
716
717     def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
718         """ Normalize the housenumber and return the word token and the
719             canonical form.
720         """
721         assert self.conn is not None
722         analyzer = self.token_analysis.analysis.get('@housenumber')
723         result: Tuple[Optional[int], Optional[str]] = (None, None)
724
725         if analyzer is None:
726             # When no custom analyzer is set, simply normalize and transliterate
727             norm_name = self._search_normalized(hnr.name)
728             if norm_name:
729                 result = self._cache.housenumbers.get(norm_name, result)
730                 if result[0] is None:
731                     hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
732
733                     result = hid, norm_name
734                     self._cache.housenumbers[norm_name] = result
735         else:
736             # Otherwise use the analyzer to determine the canonical name.
737             # Per convention we use the first variant as the 'lookup name', the
738             # name that gets saved in the housenumber field of the place.
739             word_id = analyzer.get_canonical_id(hnr)
740             if word_id:
741                 result = self._cache.housenumbers.get(word_id, result)
742                 if result[0] is None:
743                     variants = analyzer.compute_variants(word_id)
744                     if variants:
745                         hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
746                                              (word_id, list(variants)))
747                         result = hid, variants[0]
748                         self._cache.housenumbers[word_id] = result
749
750         return result
751
752
753     def _retrieve_full_tokens(self, name: str) -> List[int]:
754         """ Get the full name token for the given name, if it exists.
755             The name is only retrieved for the standard analyser.
756         """
757         assert self.conn is not None
758         norm_name = self._search_normalized(name)
759
760         # return cached if possible
761         if norm_name in self._cache.fulls:
762             return self._cache.fulls[norm_name]
763
764         with self.conn.cursor() as cur:
765             cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
766                         (norm_name, ))
767             full = [row[0] for row in cur]
768
769         self._cache.fulls[norm_name] = full
770
771         return full
772
773
774     def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
775         """ Computes the full name and partial name tokens for the given
776             dictionary of names.
777         """
778         assert self.conn is not None
779         full_tokens: Set[int] = set()
780         partial_tokens: Set[int] = set()
781
782         for name in names:
783             analyzer_id = name.get_attr('analyzer')
784             analyzer = self.token_analysis.get_analyzer(analyzer_id)
785             word_id = analyzer.get_canonical_id(name)
786             if analyzer_id is None:
787                 token_id = word_id
788             else:
789                 token_id = f'{word_id}@{analyzer_id}'
790
791             full, part = self._cache.names.get(token_id, (None, None))
792             if full is None:
793                 variants = analyzer.compute_variants(word_id)
794                 if not variants:
795                     continue
796
797                 with self.conn.cursor() as cur:
798                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
799                                 (token_id, variants))
800                     full, part = cast(Tuple[int, List[int]], cur.fetchone())
801
802                 self._cache.names[token_id] = (full, part)
803
804             assert part is not None
805
806             full_tokens.add(full)
807             partial_tokens.update(part)
808
809         return full_tokens, partial_tokens
810
811
812     def _add_postcode(self, item: PlaceName) -> Optional[str]:
813         """ Make sure the normalized postcode is present in the word table.
814         """
815         assert self.conn is not None
816         analyzer = self.token_analysis.analysis.get('@postcode')
817
818         if analyzer is None:
819             postcode_name = item.name.strip().upper()
820             variant_base = None
821         else:
822             postcode_name = analyzer.get_canonical_id(item)
823             variant_base = item.get_attr("variant")
824
825         if variant_base:
826             postcode = f'{postcode_name}@{variant_base}'
827         else:
828             postcode = postcode_name
829
830         if postcode not in self._cache.postcodes:
831             term = self._search_normalized(postcode_name)
832             if not term:
833                 return None
834
835             variants = {term}
836             if analyzer is not None and variant_base:
837                 variants.update(analyzer.compute_variants(variant_base))
838
839             with self.conn.cursor() as cur:
840                 cur.execute("SELECT create_postcode_word(%s, %s)",
841                             (postcode, list(variants)))
842             self._cache.postcodes.add(postcode)
843
844         return postcode_name
845
846
847 class _TokenInfo:
848     """ Collect token information to be sent back to the database.
849     """
850     def __init__(self) -> None:
851         self.names: Optional[str] = None
852         self.housenumbers: Set[str] = set()
853         self.housenumber_tokens: Set[int] = set()
854         self.street_tokens: Optional[Set[int]] = None
855         self.place_tokens: Set[int] = set()
856         self.address_tokens: Dict[str, str] = {}
857         self.postcode: Optional[str] = None
858
859
860     def _mk_array(self, tokens: Iterable[Any]) -> str:
861         return f"{{{','.join((str(s) for s in tokens))}}}"
862
863
864     def to_dict(self) -> Dict[str, Any]:
865         """ Return the token information in database importable format.
866         """
867         out: Dict[str, Any] = {}
868
869         if self.names:
870             out['names'] = self.names
871
872         if self.housenumbers:
873             out['hnr'] = ';'.join(self.housenumbers)
874             out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
875
876         if self.street_tokens is not None:
877             out['street'] = self._mk_array(self.street_tokens)
878
879         if self.place_tokens:
880             out['place'] = self._mk_array(self.place_tokens)
881
882         if self.address_tokens:
883             out['addr'] = self.address_tokens
884
885         if self.postcode:
886             out['postcode'] = self.postcode
887
888         return out
889
890
891     def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
892         """ Adds token information for the normalised names.
893         """
894         self.names = self._mk_array(itertools.chain(fulls, partials))
895
896
897     def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
898         """ Extract housenumber information from a list of normalised
899             housenumbers.
900         """
901         if token:
902             assert hnr is not None
903             self.housenumbers.add(hnr)
904             self.housenumber_tokens.add(token)
905
906
907     def add_street(self, tokens: Iterable[int]) -> None:
908         """ Add addr:street match terms.
909         """
910         if self.street_tokens is None:
911             self.street_tokens = set()
912         self.street_tokens.update(tokens)
913
914
915     def add_place(self, tokens: Iterable[int]) -> None:
916         """ Add addr:place search and match terms.
917         """
918         self.place_tokens.update(tokens)
919
920
921     def add_address_term(self, key: str, partials: Iterable[int]) -> None:
922         """ Add additional address terms.
923         """
924         array = self._mk_array(partials)
925         if len(array) > 2:
926             self.address_tokens[key] = array
927
928     def set_postcode(self, postcode: Optional[str]) -> None:
929         """ Set the postcode to the given one.
930         """
931         self.postcode = postcode
932
933
934 class _TokenCache:
935     """ Cache for token information to avoid repeated database queries.
936
937         This cache is not thread-safe and needs to be instantiated per
938         analyzer.
939     """
940     def __init__(self) -> None:
941         self.names: Dict[str, Tuple[int, List[int]]] = {}
942         self.partials: Dict[str, int] = {}
943         self.fulls: Dict[str, List[int]] = {}
944         self.postcodes: Set[str] = set()
945         self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}