]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tokenizer/icu_tokenizer.py
osm2pgsq: do not use deprecated tablespace options
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Tokenizer implementing normalisation as used before Nominatim 4 but using
9 libICU instead of the PostgreSQL module.
10 """
11 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
12                    Dict, Set, Iterable
13 import itertools
14 import logging
15 from pathlib import Path
16 from textwrap import dedent
17
18 from psycopg.types.json import Jsonb
19 from psycopg import sql as pysql
20
21 from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
22                             drop_tables, table_exists, execute_scalar
23 from ..config import Configuration
24 from ..db.sql_preprocessor import SQLPreprocessor
25 from ..data.place_info import PlaceInfo
26 from ..data.place_name import PlaceName
27 from .icu_rule_loader import ICURuleLoader
28 from .place_sanitizer import PlaceSanitizer
29 from .icu_token_analysis import ICUTokenAnalysis
30 from .base import AbstractAnalyzer, AbstractTokenizer
31
32 DBCFG_TERM_NORMALIZATION = "tokenizer_term_normalization"
33
34 LOG = logging.getLogger()
35
36 WORD_TYPES =(('country_names', 'C'),
37              ('postcodes', 'P'),
38              ('full_word', 'W'),
39              ('housenumbers', 'H'))
40
41 def create(dsn: str, data_dir: Path) -> 'ICUTokenizer':
42     """ Create a new instance of the tokenizer provided by this module.
43     """
44     return ICUTokenizer(dsn, data_dir)
45
46
47 class ICUTokenizer(AbstractTokenizer):
48     """ This tokenizer uses libICU to convert names and queries to ASCII.
49         Otherwise it uses the same algorithms and data structures as the
50         normalization routines in Nominatim 3.
51     """
52
53     def __init__(self, dsn: str, data_dir: Path) -> None:
54         self.dsn = dsn
55         self.data_dir = data_dir
56         self.loader: Optional[ICURuleLoader] = None
57
58
59     def init_new_db(self, config: Configuration, init_db: bool = True) -> None:
60         """ Set up a new tokenizer for the database.
61
62             This copies all necessary data in the project directory to make
63             sure the tokenizer remains stable even over updates.
64         """
65         self.loader = ICURuleLoader(config)
66
67         self._install_php(config.lib_dir.php, overwrite=True)
68         self._save_config()
69
70         if init_db:
71             self.update_sql_functions(config)
72             self._setup_db_tables(config)
73             self._create_base_indices(config, 'word')
74
75
76     def init_from_project(self, config: Configuration) -> None:
77         """ Initialise the tokenizer from the project directory.
78         """
79         self.loader = ICURuleLoader(config)
80
81         with connect(self.dsn) as conn:
82             self.loader.load_config_from_db(conn)
83
84         self._install_php(config.lib_dir.php, overwrite=False)
85
86
87     def finalize_import(self, config: Configuration) -> None:
88         """ Do any required postprocessing to make the tokenizer data ready
89             for use.
90         """
91         self._create_lookup_indices(config, 'word')
92
93
94     def update_sql_functions(self, config: Configuration) -> None:
95         """ Reimport the SQL functions for this tokenizer.
96         """
97         with connect(self.dsn) as conn:
98             sqlp = SQLPreprocessor(conn, config)
99             sqlp.run_sql_file(conn, 'tokenizer/icu_tokenizer.sql')
100
101
102     def check_database(self, config: Configuration) -> None:
103         """ Check that the tokenizer is set up correctly.
104         """
105         # Will throw an error if there is an issue.
106         self.init_from_project(config)
107
108
109     def update_statistics(self, config: Configuration, threads: int = 2) -> None:
110         """ Recompute frequencies for all name words.
111         """
112         with connect(self.dsn) as conn:
113             if not table_exists(conn, 'search_name'):
114                 return
115
116             with conn.cursor() as cur:
117                 cur.execute('ANALYSE search_name')
118                 if threads > 1:
119                     cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
120                                      .format(pysql.Literal(min(threads, 6),)))
121
122                 if server_version_tuple(conn) < (12, 0):
123                     LOG.info('Computing word frequencies')
124                     drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
125                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
126                                      SELECT unnest(name_vector) as id, count(*)
127                                      FROM search_name GROUP BY id""")
128                     cur.execute('CREATE INDEX ON word_frequencies(id)')
129                     cur.execute("""CREATE TEMP TABLE addressword_frequencies AS
130                                      SELECT unnest(nameaddress_vector) as id, count(*)
131                                      FROM search_name GROUP BY id""")
132                     cur.execute('CREATE INDEX ON addressword_frequencies(id)')
133                     cur.execute("""CREATE OR REPLACE FUNCTION word_freq_update(wid INTEGER,
134                                                                                INOUT info JSONB)
135                                    AS $$
136                                    DECLARE rec RECORD;
137                                    BEGIN
138                                    IF info is null THEN
139                                      info = '{}'::jsonb;
140                                    END IF;
141                                    FOR rec IN SELECT count FROM word_frequencies WHERE id = wid
142                                    LOOP
143                                      info = info || jsonb_build_object('count', rec.count);
144                                    END LOOP;
145                                    FOR rec IN SELECT count FROM addressword_frequencies WHERE id = wid
146                                    LOOP
147                                      info = info || jsonb_build_object('addr_count', rec.count);
148                                    END LOOP;
149                                    IF info = '{}'::jsonb THEN
150                                      info = null;
151                                    END IF;
152                                    END;
153                                    $$ LANGUAGE plpgsql IMMUTABLE;
154                                 """)
155                     LOG.info('Update word table with recomputed frequencies')
156                     drop_tables(conn, 'tmp_word')
157                     cur.execute("""CREATE TABLE tmp_word AS
158                                     SELECT word_id, word_token, type, word,
159                                            word_freq_update(word_id, info) as info
160                                     FROM word
161                                 """)
162                     drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
163                 else:
164                     LOG.info('Computing word frequencies')
165                     drop_tables(conn, 'word_frequencies')
166                     cur.execute("""
167                       CREATE TEMP TABLE word_frequencies AS
168                       WITH word_freq AS MATERIALIZED (
169                                SELECT unnest(name_vector) as id, count(*)
170                                      FROM search_name GROUP BY id),
171                            addr_freq AS MATERIALIZED (
172                                SELECT unnest(nameaddress_vector) as id, count(*)
173                                      FROM search_name GROUP BY id)
174                       SELECT coalesce(a.id, w.id) as id,
175                              (CASE WHEN w.count is null THEN '{}'::JSONB
176                                   ELSE jsonb_build_object('count', w.count) END
177                               ||
178                               CASE WHEN a.count is null THEN '{}'::JSONB
179                                   ELSE jsonb_build_object('addr_count', a.count) END) as info
180                       FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
181                       """)
182                     cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
183                     cur.execute('ANALYSE word_frequencies')
184                     LOG.info('Update word table with recomputed frequencies')
185                     drop_tables(conn, 'tmp_word')
186                     cur.execute("""CREATE TABLE tmp_word AS
187                                     SELECT word_id, word_token, type, word,
188                                            (CASE WHEN wf.info is null THEN word.info
189                                             ELSE coalesce(word.info, '{}'::jsonb) || wf.info
190                                             END) as info
191                                     FROM word LEFT JOIN word_frequencies wf
192                                          ON word.word_id = wf.id
193                                 """)
194                     drop_tables(conn, 'word_frequencies')
195
196             with conn.cursor() as cur:
197                 cur.execute('SET max_parallel_workers_per_gather TO 0')
198
199             sqlp = SQLPreprocessor(conn, config)
200             sqlp.run_string(conn,
201                             'GRANT SELECT ON tmp_word TO "{{config.DATABASE_WEBUSER}}"')
202             conn.commit()
203         self._create_base_indices(config, 'tmp_word')
204         self._create_lookup_indices(config, 'tmp_word')
205         self._move_temporary_word_table('tmp_word')
206
207
208
209     def _cleanup_housenumbers(self) -> None:
210         """ Remove unused house numbers.
211         """
212         with connect(self.dsn) as conn:
213             if not table_exists(conn, 'search_name'):
214                 return
215             with conn.cursor(name="hnr_counter") as cur:
216                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
217                                FROM word
218                                WHERE type = 'H'
219                                  AND NOT EXISTS(SELECT * FROM search_name
220                                                 WHERE ARRAY[word.word_id] && name_vector)
221                                  AND (char_length(coalesce(word, word_token)) > 6
222                                       OR coalesce(word, word_token) not similar to '\\d+')
223                             """)
224                 candidates = {token: wid for wid, token in cur}
225             with conn.cursor(name="hnr_counter") as cur:
226                 cur.execute("""SELECT housenumber FROM placex
227                                WHERE housenumber is not null
228                                      AND (char_length(housenumber) > 6
229                                           OR housenumber not similar to '\\d+')
230                             """)
231                 for row in cur:
232                     for hnr in row[0].split(';'):
233                         candidates.pop(hnr, None)
234             LOG.info("There are %s outdated housenumbers.", len(candidates))
235             LOG.debug("Outdated housenumbers: %s", candidates.keys())
236             if candidates:
237                 with conn.cursor() as cur:
238                     cur.execute("""DELETE FROM word WHERE word_id = any(%s)""",
239                                 (list(candidates.values()), ))
240                 conn.commit()
241
242
243
244     def update_word_tokens(self) -> None:
245         """ Remove unused tokens.
246         """
247         LOG.warning("Cleaning up housenumber tokens.")
248         self._cleanup_housenumbers()
249         LOG.warning("Tokenizer house-keeping done.")
250
251
252     def name_analyzer(self) -> 'ICUNameAnalyzer':
253         """ Create a new analyzer for tokenizing names and queries
254             using this tokinzer. Analyzers are context managers and should
255             be used accordingly:
256
257             ```
258             with tokenizer.name_analyzer() as analyzer:
259                 analyser.tokenize()
260             ```
261
262             When used outside the with construct, the caller must ensure to
263             call the close() function before destructing the analyzer.
264
265             Analyzers are not thread-safe. You need to instantiate one per thread.
266         """
267         assert self.loader is not None
268         return ICUNameAnalyzer(self.dsn, self.loader.make_sanitizer(),
269                                self.loader.make_token_analysis())
270
271
272     def most_frequent_words(self, conn: Connection, num: int) -> List[str]:
273         """ Return a list of the `num` most frequent full words
274             in the database.
275         """
276         with conn.cursor() as cur:
277             cur.execute("""SELECT word, sum((info->>'count')::int) as count
278                              FROM word WHERE type = 'W'
279                              GROUP BY word
280                              ORDER BY count DESC LIMIT %s""", (num,))
281             return list(s[0].split('@')[0] for s in cur)
282
283
284     def _install_php(self, phpdir: Optional[Path], overwrite: bool = True) -> None:
285         """ Install the php script for the tokenizer.
286         """
287         if phpdir is not None:
288             assert self.loader is not None
289             php_file = self.data_dir / "tokenizer.php"
290
291             if not php_file.exists() or overwrite:
292                 php_file.write_text(dedent(f"""\
293                     <?php
294                     @define('CONST_Max_Word_Frequency', 10000000);
295                     @define('CONST_Term_Normalization_Rules', "{self.loader.normalization_rules}");
296                     @define('CONST_Transliteration', "{self.loader.get_search_rules()}");
297                     require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
298
299
300     def _save_config(self) -> None:
301         """ Save the configuration that needs to remain stable for the given
302             database as database properties.
303         """
304         assert self.loader is not None
305         with connect(self.dsn) as conn:
306             self.loader.save_config_to_db(conn)
307
308
309     def _setup_db_tables(self, config: Configuration) -> None:
310         """ Set up the word table and fill it with pre-computed word
311             frequencies.
312         """
313         with connect(self.dsn) as conn:
314             drop_tables(conn, 'word')
315             sqlp = SQLPreprocessor(conn, config)
316             sqlp.run_string(conn, """
317                 CREATE TABLE word (
318                       word_id INTEGER,
319                       word_token text NOT NULL,
320                       type text NOT NULL,
321                       word text,
322                       info jsonb
323                     ) {{db.tablespace.search_data}};
324                 GRANT SELECT ON word TO "{{config.DATABASE_WEBUSER}}";
325
326                 DROP SEQUENCE IF EXISTS seq_word;
327                 CREATE SEQUENCE seq_word start 1;
328                 GRANT SELECT ON seq_word to "{{config.DATABASE_WEBUSER}}";
329             """)
330             conn.commit()
331
332
333     def _create_base_indices(self, config: Configuration, table_name: str) -> None:
334         """ Set up the word table and fill it with pre-computed word
335             frequencies.
336         """
337         with connect(self.dsn) as conn:
338             sqlp = SQLPreprocessor(conn, config)
339             sqlp.run_string(conn,
340                             """CREATE INDEX idx_{{table_name}}_word_token ON {{table_name}}
341                                USING BTREE (word_token) {{db.tablespace.search_index}}""",
342                             table_name=table_name)
343             for name, ctype in WORD_TYPES:
344                 sqlp.run_string(conn,
345                                 """CREATE INDEX idx_{{table_name}}_{{idx_name}} ON {{table_name}}
346                                    USING BTREE (word) {{db.tablespace.address_index}}
347                                    WHERE type = '{{column_type}}'
348                                 """,
349                                 table_name=table_name, idx_name=name,
350                                 column_type=ctype)
351             conn.commit()
352
353
354     def _create_lookup_indices(self, config: Configuration, table_name: str) -> None:
355         """ Create additional indexes used when running the API.
356         """
357         with connect(self.dsn) as conn:
358             sqlp = SQLPreprocessor(conn, config)
359             # Index required for details lookup.
360             sqlp.run_string(conn, """
361                 CREATE INDEX IF NOT EXISTS idx_{{table_name}}_word_id
362                   ON {{table_name}} USING BTREE (word_id) {{db.tablespace.search_index}}
363             """,
364             table_name=table_name)
365             conn.commit()
366
367
368     def _move_temporary_word_table(self, old: str) -> None:
369         """ Rename all tables and indexes used by the tokenizer.
370         """
371         with connect(self.dsn) as conn:
372             drop_tables(conn, 'word')
373             with conn.cursor() as cur:
374                 cur.execute(f"ALTER TABLE {old} RENAME TO word")
375                 for idx in ('word_token', 'word_id'):
376                     cur.execute(f"""ALTER INDEX idx_{old}_{idx}
377                                       RENAME TO idx_word_{idx}""")
378                 for name, _ in WORD_TYPES:
379                     cur.execute(f"""ALTER INDEX idx_{old}_{name}
380                                     RENAME TO idx_word_{name}""")
381             conn.commit()
382
383
384
385
386 class ICUNameAnalyzer(AbstractAnalyzer):
387     """ The ICU analyzer uses the ICU library for splitting names.
388
389         Each instance opens a connection to the database to request the
390         normalization.
391     """
392
393     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
394                  token_analysis: ICUTokenAnalysis) -> None:
395         self.conn: Optional[Connection] = connect(dsn)
396         self.conn.autocommit = True
397         self.sanitizer = sanitizer
398         self.token_analysis = token_analysis
399
400         self._cache = _TokenCache()
401
402
403     def close(self) -> None:
404         """ Free all resources used by the analyzer.
405         """
406         if self.conn:
407             self.conn.close()
408             self.conn = None
409
410
411     def _search_normalized(self, name: str) -> str:
412         """ Return the search token transliteration of the given name.
413         """
414         return cast(str, self.token_analysis.search.transliterate(name)).strip()
415
416
417     def _normalized(self, name: str) -> str:
418         """ Return the normalized version of the given name with all
419             non-relevant information removed.
420         """
421         return cast(str, self.token_analysis.normalizer.transliterate(name)).strip()
422
423
424     def get_word_token_info(self, words: Sequence[str]) -> List[Tuple[str, str, int]]:
425         """ Return token information for the given list of words.
426             If a word starts with # it is assumed to be a full name
427             otherwise is a partial name.
428
429             The function returns a list of tuples with
430             (original word, word token, word id).
431
432             The function is used for testing and debugging only
433             and not necessarily efficient.
434         """
435         assert self.conn is not None
436         full_tokens = {}
437         partial_tokens = {}
438         for word in words:
439             if word.startswith('#'):
440                 full_tokens[word] = self._search_normalized(word[1:])
441             else:
442                 partial_tokens[word] = self._search_normalized(word)
443
444         with self.conn.cursor() as cur:
445             cur.execute("""SELECT word_token, word_id
446                             FROM word WHERE word_token = ANY(%s) and type = 'W'
447                         """, (list(full_tokens.values()),))
448             full_ids = {r[0]: r[1] for r in cur}
449             cur.execute("""SELECT word_token, word_id
450                             FROM word WHERE word_token = ANY(%s) and type = 'w'""",
451                         (list(partial_tokens.values()),))
452             part_ids = {r[0]: r[1] for r in cur}
453
454         return [(k, v, full_ids.get(v, None)) for k, v in full_tokens.items()] \
455                + [(k, v, part_ids.get(v, None)) for k, v in partial_tokens.items()]
456
457
458     def normalize_postcode(self, postcode: str) -> str:
459         """ Convert the postcode to a standardized form.
460
461             This function must yield exactly the same result as the SQL function
462             'token_normalized_postcode()'.
463         """
464         return postcode.strip().upper()
465
466
467     def update_postcodes_from_db(self) -> None:
468         """ Update postcode tokens in the word table from the location_postcode
469             table.
470         """
471         assert self.conn is not None
472         analyzer = self.token_analysis.analysis.get('@postcode')
473
474         with self.conn.cursor() as cur:
475             # First get all postcode names currently in the word table.
476             cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
477             word_entries = set((entry[0] for entry in cur))
478
479             # Then compute the required postcode names from the postcode table.
480             needed_entries = set()
481             cur.execute("SELECT country_code, postcode FROM location_postcode")
482             for cc, postcode in cur:
483                 info = PlaceInfo({'country_code': cc,
484                                   'class': 'place', 'type': 'postcode',
485                                   'address': {'postcode': postcode}})
486                 address = self.sanitizer.process_names(info)[1]
487                 for place in address:
488                     if place.kind == 'postcode':
489                         if analyzer is None:
490                             postcode_name = place.name.strip().upper()
491                             variant_base = None
492                         else:
493                             postcode_name = analyzer.get_canonical_id(place)
494                             variant_base = place.get_attr("variant")
495
496                         if variant_base:
497                             needed_entries.add(f'{postcode_name}@{variant_base}')
498                         else:
499                             needed_entries.add(postcode_name)
500                         break
501
502         # Now update the word table.
503         self._delete_unused_postcode_words(word_entries - needed_entries)
504         self._add_missing_postcode_words(needed_entries - word_entries)
505
506     def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
507         assert self.conn is not None
508         if tokens:
509             with self.conn.cursor() as cur:
510                 cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
511                             (list(tokens), ))
512
513     def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
514         assert self.conn is not None
515         if not tokens:
516             return
517
518         analyzer = self.token_analysis.analysis.get('@postcode')
519         terms = []
520
521         for postcode_name in tokens:
522             if '@' in postcode_name:
523                 term, variant = postcode_name.split('@', 2)
524                 term = self._search_normalized(term)
525                 if analyzer is None:
526                     variants = [term]
527                 else:
528                     variants = analyzer.compute_variants(variant)
529                     if term not in variants:
530                         variants.append(term)
531             else:
532                 variants = [self._search_normalized(postcode_name)]
533             terms.append((postcode_name, variants))
534
535         if terms:
536             with self.conn.cursor() as cur:
537                 cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
538
539
540
541
542     def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
543                                should_replace: bool) -> None:
544         """ Replace the search index for special phrases with the new phrases.
545             If `should_replace` is True, then the previous set of will be
546             completely replaced. Otherwise the phrases are added to the
547             already existing ones.
548         """
549         assert self.conn is not None
550         norm_phrases = set(((self._normalized(p[0]), p[1], p[2], p[3])
551                             for p in phrases))
552
553         with self.conn.cursor() as cur:
554             # Get the old phrases.
555             existing_phrases = set()
556             cur.execute("SELECT word, info FROM word WHERE type = 'S'")
557             for word, info in cur:
558                 existing_phrases.add((word, info['class'], info['type'],
559                                       info.get('op') or '-'))
560
561             added = self._add_special_phrases(cur, norm_phrases, existing_phrases)
562             if should_replace:
563                 deleted = self._remove_special_phrases(cur, norm_phrases,
564                                                        existing_phrases)
565             else:
566                 deleted = 0
567
568         LOG.info("Total phrases: %s. Added: %s. Deleted: %s",
569                  len(norm_phrases), added, deleted)
570
571
572     def _add_special_phrases(self, cursor: Cursor,
573                              new_phrases: Set[Tuple[str, str, str, str]],
574                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
575         """ Add all phrases to the database that are not yet there.
576         """
577         to_add = new_phrases - existing_phrases
578
579         added = 0
580         with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
581             for word, cls, typ, oper in to_add:
582                 term = self._search_normalized(word)
583                 if term:
584                     copy.write_row((term, 'S', word,
585                                     Jsonb({'class': cls, 'type': typ,
586                                            'op': oper if oper in ('in', 'near') else None})))
587                     added += 1
588
589         return added
590
591
592     def _remove_special_phrases(self, cursor: Cursor,
593                              new_phrases: Set[Tuple[str, str, str, str]],
594                              existing_phrases: Set[Tuple[str, str, str, str]]) -> int:
595         """ Remove all phrases from the database that are no longer in the
596             new phrase list.
597         """
598         to_delete = existing_phrases - new_phrases
599
600         if to_delete:
601             cursor.executemany(
602                 """ DELETE FROM word
603                       WHERE type = 'S' and word = %s
604                             and info->>'class' = %s and info->>'type' = %s
605                             and %s = coalesce(info->>'op', '-')
606                 """, to_delete)
607
608         return len(to_delete)
609
610
611     def add_country_names(self, country_code: str, names: Mapping[str, str]) -> None:
612         """ Add default names for the given country to the search index.
613         """
614         # Make sure any name preprocessing for country names applies.
615         info = PlaceInfo({'name': names, 'country_code': country_code,
616                           'rank_address': 4, 'class': 'boundary',
617                           'type': 'administrative'})
618         self._add_country_full_names(country_code,
619                                      self.sanitizer.process_names(info)[0],
620                                      internal=True)
621
622
623     def _add_country_full_names(self, country_code: str, names: Sequence[PlaceName],
624                                 internal: bool = False) -> None:
625         """ Add names for the given country from an already sanitized
626             name list.
627         """
628         assert self.conn is not None
629         word_tokens = set()
630         for name in names:
631             norm_name = self._search_normalized(name.name)
632             if norm_name:
633                 word_tokens.add(norm_name)
634
635         with self.conn.cursor() as cur:
636             # Get existing names
637             cur.execute("""SELECT word_token, coalesce(info ? 'internal', false) as is_internal
638                              FROM word
639                              WHERE type = 'C' and word = %s""",
640                         (country_code, ))
641             # internal/external names
642             existing_tokens: Dict[bool, Set[str]] = {True: set(), False: set()}
643             for word in cur:
644                 existing_tokens[word[1]].add(word[0])
645
646             # Delete names that no longer exist.
647             gone_tokens = existing_tokens[internal] - word_tokens
648             if internal:
649                 gone_tokens.update(existing_tokens[False] & word_tokens)
650             if gone_tokens:
651                 cur.execute("""DELETE FROM word
652                                USING unnest(%s::text[]) as token
653                                WHERE type = 'C' and word = %s
654                                      and word_token = token""",
655                             (list(gone_tokens), country_code))
656
657             # Only add those names that are not yet in the list.
658             new_tokens = word_tokens - existing_tokens[True]
659             if not internal:
660                 new_tokens -= existing_tokens[False]
661             if new_tokens:
662                 if internal:
663                     sql = """INSERT INTO word (word_token, type, word, info)
664                                (SELECT token, 'C', %s, '{"internal": "yes"}'
665                                   FROM unnest(%s::text[]) as token)
666                            """
667                 else:
668                     sql = """INSERT INTO word (word_token, type, word)
669                                    (SELECT token, 'C', %s
670                                     FROM unnest(%s::text[]) as token)
671                           """
672                 cur.execute(sql, (country_code, list(new_tokens)))
673
674
675     def process_place(self, place: PlaceInfo) -> Mapping[str, Any]:
676         """ Determine tokenizer information about the given place.
677
678             Returns a JSON-serializable structure that will be handed into
679             the database via the token_info field.
680         """
681         token_info = _TokenInfo()
682
683         names, address = self.sanitizer.process_names(place)
684
685         if names:
686             token_info.set_names(*self._compute_name_tokens(names))
687
688             if place.is_country():
689                 assert place.country_code is not None
690                 self._add_country_full_names(place.country_code, names)
691
692         if address:
693             self._process_place_address(token_info, address)
694
695         return token_info.to_dict()
696
697
698     def _process_place_address(self, token_info: '_TokenInfo',
699                                address: Sequence[PlaceName]) -> None:
700         for item in address:
701             if item.kind == 'postcode':
702                 token_info.set_postcode(self._add_postcode(item))
703             elif item.kind == 'housenumber':
704                 token_info.add_housenumber(*self._compute_housenumber_token(item))
705             elif item.kind == 'street':
706                 token_info.add_street(self._retrieve_full_tokens(item.name))
707             elif item.kind == 'place':
708                 if not item.suffix:
709                     token_info.add_place(itertools.chain(*self._compute_name_tokens([item])))
710             elif not item.kind.startswith('_') and not item.suffix and \
711                  item.kind not in ('country', 'full', 'inclusion'):
712                 token_info.add_address_term(item.kind,
713                                             itertools.chain(*self._compute_name_tokens([item])))
714
715
716     def _compute_housenumber_token(self, hnr: PlaceName) -> Tuple[Optional[int], Optional[str]]:
717         """ Normalize the housenumber and return the word token and the
718             canonical form.
719         """
720         assert self.conn is not None
721         analyzer = self.token_analysis.analysis.get('@housenumber')
722         result: Tuple[Optional[int], Optional[str]] = (None, None)
723
724         if analyzer is None:
725             # When no custom analyzer is set, simply normalize and transliterate
726             norm_name = self._search_normalized(hnr.name)
727             if norm_name:
728                 result = self._cache.housenumbers.get(norm_name, result)
729                 if result[0] is None:
730                     hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
731
732                     result = hid, norm_name
733                     self._cache.housenumbers[norm_name] = result
734         else:
735             # Otherwise use the analyzer to determine the canonical name.
736             # Per convention we use the first variant as the 'lookup name', the
737             # name that gets saved in the housenumber field of the place.
738             word_id = analyzer.get_canonical_id(hnr)
739             if word_id:
740                 result = self._cache.housenumbers.get(word_id, result)
741                 if result[0] is None:
742                     variants = analyzer.compute_variants(word_id)
743                     if variants:
744                         hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
745                                              (word_id, list(variants)))
746                         result = hid, variants[0]
747                         self._cache.housenumbers[word_id] = result
748
749         return result
750
751
752     def _retrieve_full_tokens(self, name: str) -> List[int]:
753         """ Get the full name token for the given name, if it exists.
754             The name is only retrieved for the standard analyser.
755         """
756         assert self.conn is not None
757         norm_name = self._search_normalized(name)
758
759         # return cached if possible
760         if norm_name in self._cache.fulls:
761             return self._cache.fulls[norm_name]
762
763         with self.conn.cursor() as cur:
764             cur.execute("SELECT word_id FROM word WHERE word_token = %s and type = 'W'",
765                         (norm_name, ))
766             full = [row[0] for row in cur]
767
768         self._cache.fulls[norm_name] = full
769
770         return full
771
772
773     def _compute_name_tokens(self, names: Sequence[PlaceName]) -> Tuple[Set[int], Set[int]]:
774         """ Computes the full name and partial name tokens for the given
775             dictionary of names.
776         """
777         assert self.conn is not None
778         full_tokens: Set[int] = set()
779         partial_tokens: Set[int] = set()
780
781         for name in names:
782             analyzer_id = name.get_attr('analyzer')
783             analyzer = self.token_analysis.get_analyzer(analyzer_id)
784             word_id = analyzer.get_canonical_id(name)
785             if analyzer_id is None:
786                 token_id = word_id
787             else:
788                 token_id = f'{word_id}@{analyzer_id}'
789
790             full, part = self._cache.names.get(token_id, (None, None))
791             if full is None:
792                 variants = analyzer.compute_variants(word_id)
793                 if not variants:
794                     continue
795
796                 with self.conn.cursor() as cur:
797                     cur.execute("SELECT * FROM getorcreate_full_word(%s, %s)",
798                                 (token_id, variants))
799                     full, part = cast(Tuple[int, List[int]], cur.fetchone())
800
801                 self._cache.names[token_id] = (full, part)
802
803             assert part is not None
804
805             full_tokens.add(full)
806             partial_tokens.update(part)
807
808         return full_tokens, partial_tokens
809
810
811     def _add_postcode(self, item: PlaceName) -> Optional[str]:
812         """ Make sure the normalized postcode is present in the word table.
813         """
814         assert self.conn is not None
815         analyzer = self.token_analysis.analysis.get('@postcode')
816
817         if analyzer is None:
818             postcode_name = item.name.strip().upper()
819             variant_base = None
820         else:
821             postcode_name = analyzer.get_canonical_id(item)
822             variant_base = item.get_attr("variant")
823
824         if variant_base:
825             postcode = f'{postcode_name}@{variant_base}'
826         else:
827             postcode = postcode_name
828
829         if postcode not in self._cache.postcodes:
830             term = self._search_normalized(postcode_name)
831             if not term:
832                 return None
833
834             variants = {term}
835             if analyzer is not None and variant_base:
836                 variants.update(analyzer.compute_variants(variant_base))
837
838             with self.conn.cursor() as cur:
839                 cur.execute("SELECT create_postcode_word(%s, %s)",
840                             (postcode, list(variants)))
841             self._cache.postcodes.add(postcode)
842
843         return postcode_name
844
845
846 class _TokenInfo:
847     """ Collect token information to be sent back to the database.
848     """
849     def __init__(self) -> None:
850         self.names: Optional[str] = None
851         self.housenumbers: Set[str] = set()
852         self.housenumber_tokens: Set[int] = set()
853         self.street_tokens: Optional[Set[int]] = None
854         self.place_tokens: Set[int] = set()
855         self.address_tokens: Dict[str, str] = {}
856         self.postcode: Optional[str] = None
857
858
859     def _mk_array(self, tokens: Iterable[Any]) -> str:
860         return f"{{{','.join((str(s) for s in tokens))}}}"
861
862
863     def to_dict(self) -> Dict[str, Any]:
864         """ Return the token information in database importable format.
865         """
866         out: Dict[str, Any] = {}
867
868         if self.names:
869             out['names'] = self.names
870
871         if self.housenumbers:
872             out['hnr'] = ';'.join(self.housenumbers)
873             out['hnr_tokens'] = self._mk_array(self.housenumber_tokens)
874
875         if self.street_tokens is not None:
876             out['street'] = self._mk_array(self.street_tokens)
877
878         if self.place_tokens:
879             out['place'] = self._mk_array(self.place_tokens)
880
881         if self.address_tokens:
882             out['addr'] = self.address_tokens
883
884         if self.postcode:
885             out['postcode'] = self.postcode
886
887         return out
888
889
890     def set_names(self, fulls: Iterable[int], partials: Iterable[int]) -> None:
891         """ Adds token information for the normalised names.
892         """
893         self.names = self._mk_array(itertools.chain(fulls, partials))
894
895
896     def add_housenumber(self, token: Optional[int], hnr: Optional[str]) -> None:
897         """ Extract housenumber information from a list of normalised
898             housenumbers.
899         """
900         if token:
901             assert hnr is not None
902             self.housenumbers.add(hnr)
903             self.housenumber_tokens.add(token)
904
905
906     def add_street(self, tokens: Iterable[int]) -> None:
907         """ Add addr:street match terms.
908         """
909         if self.street_tokens is None:
910             self.street_tokens = set()
911         self.street_tokens.update(tokens)
912
913
914     def add_place(self, tokens: Iterable[int]) -> None:
915         """ Add addr:place search and match terms.
916         """
917         self.place_tokens.update(tokens)
918
919
920     def add_address_term(self, key: str, partials: Iterable[int]) -> None:
921         """ Add additional address terms.
922         """
923         array = self._mk_array(partials)
924         if len(array) > 2:
925             self.address_tokens[key] = array
926
927     def set_postcode(self, postcode: Optional[str]) -> None:
928         """ Set the postcode to the given one.
929         """
930         self.postcode = postcode
931
932
933 class _TokenCache:
934     """ Cache for token information to avoid repeated database queries.
935
936         This cache is not thread-safe and needs to be instantiated per
937         analyzer.
938     """
939     def __init__(self) -> None:
940         self.names: Dict[str, Tuple[int, List[int]]] = {}
941         self.partials: Dict[str, int] = {}
942         self.fulls: Dict[str, List[int]] = {}
943         self.postcodes: Set[str] = set()
944         self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}