]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_db/tokenizer/icu_tokenizer.py
indexing: precompute row counts
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
index 22e2d048291bcaa6d566e0f5fd3cf92fe9fcf870..7cd96d591fcec97483a721542829f4da90e52430 100644 (file)
@@ -11,14 +11,16 @@ libICU instead of the PostgreSQL module.
 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
                    Dict, Set, Iterable
 import itertools
-import json
 import logging
 from pathlib import Path
 from textwrap import dedent
 
-from ..db.connection import connect, Connection, Cursor
+from psycopg.types.json import Jsonb
+from psycopg import sql as pysql
+
+from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
+                            drop_tables, table_exists, execute_scalar
 from ..config import Configuration
-from ..db.utils import CopyBuffer
 from ..db.sql_preprocessor import SQLPreprocessor
 from ..data.place_info import PlaceInfo
 from ..data.place_name import PlaceName
@@ -108,19 +110,18 @@ class ICUTokenizer(AbstractTokenizer):
         """ Recompute frequencies for all name words.
         """
         with connect(self.dsn) as conn:
-            if not conn.table_exists('search_name'):
+            if not table_exists(conn, 'search_name'):
                 return
 
             with conn.cursor() as cur:
                 cur.execute('ANALYSE search_name')
                 if threads > 1:
-                    cur.execute('SET max_parallel_workers_per_gather TO %s',
-                                (min(threads, 6),))
+                    cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
+                                     .format(pysql.Literal(min(threads, 6),)))
 
-                if conn.server_version_tuple() < (12, 0):
+                if server_version_tuple(conn) < (12, 0):
                     LOG.info('Computing word frequencies')
-                    cur.drop_table('word_frequencies')
-                    cur.drop_table('addressword_frequencies')
+                    drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
                     cur.execute("""CREATE TEMP TABLE word_frequencies AS
                                      SELECT unnest(name_vector) as id, count(*)
                                      FROM search_name GROUP BY id""")
@@ -152,17 +153,16 @@ class ICUTokenizer(AbstractTokenizer):
                                    $$ LANGUAGE plpgsql IMMUTABLE;
                                 """)
                     LOG.info('Update word table with recomputed frequencies')
-                    cur.drop_table('tmp_word')
+                    drop_tables(conn, 'tmp_word')
                     cur.execute("""CREATE TABLE tmp_word AS
                                     SELECT word_id, word_token, type, word,
                                            word_freq_update(word_id, info) as info
                                     FROM word
                                 """)
-                    cur.drop_table('word_frequencies')
-                    cur.drop_table('addressword_frequencies')
+                    drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
                 else:
                     LOG.info('Computing word frequencies')
-                    cur.drop_table('word_frequencies')
+                    drop_tables(conn, 'word_frequencies')
                     cur.execute("""
                       CREATE TEMP TABLE word_frequencies AS
                       WITH word_freq AS MATERIALIZED (
@@ -182,7 +182,7 @@ class ICUTokenizer(AbstractTokenizer):
                     cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
                     cur.execute('ANALYSE word_frequencies')
                     LOG.info('Update word table with recomputed frequencies')
-                    cur.drop_table('tmp_word')
+                    drop_tables(conn, 'tmp_word')
                     cur.execute("""CREATE TABLE tmp_word AS
                                     SELECT word_id, word_token, type, word,
                                            (CASE WHEN wf.info is null THEN word.info
@@ -191,7 +191,7 @@ class ICUTokenizer(AbstractTokenizer):
                                     FROM word LEFT JOIN word_frequencies wf
                                          ON word.word_id = wf.id
                                 """)
-                    cur.drop_table('word_frequencies')
+                    drop_tables(conn, 'word_frequencies')
 
             with conn.cursor() as cur:
                 cur.execute('SET max_parallel_workers_per_gather TO 0')
@@ -210,7 +210,7 @@ class ICUTokenizer(AbstractTokenizer):
         """ Remove unused house numbers.
         """
         with connect(self.dsn) as conn:
-            if not conn.table_exists('search_name'):
+            if not table_exists(conn, 'search_name'):
                 return
             with conn.cursor(name="hnr_counter") as cur:
                 cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
@@ -311,8 +311,7 @@ class ICUTokenizer(AbstractTokenizer):
             frequencies.
         """
         with connect(self.dsn) as conn:
-            with conn.cursor() as cur:
-                cur.drop_table('word')
+            drop_tables(conn, 'word')
             sqlp = SQLPreprocessor(conn, config)
             sqlp.run_string(conn, """
                 CREATE TABLE word (
@@ -370,8 +369,8 @@ class ICUTokenizer(AbstractTokenizer):
         """ Rename all tables and indexes used by the tokenizer.
         """
         with connect(self.dsn) as conn:
+            drop_tables(conn, 'word')
             with conn.cursor() as cur:
-                cur.drop_table('word')
                 cur.execute(f"ALTER TABLE {old} RENAME TO word")
                 for idx in ('word_token', 'word_id'):
                     cur.execute(f"""ALTER INDEX idx_{old}_{idx}
@@ -393,7 +392,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
 
     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
                  token_analysis: ICUTokenAnalysis) -> None:
-        self.conn: Optional[Connection] = connect(dsn).connection
+        self.conn: Optional[Connection] = connect(dsn)
         self.conn.autocommit = True
         self.sanitizer = sanitizer
         self.token_analysis = token_analysis
@@ -535,9 +534,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
 
         if terms:
             with self.conn.cursor() as cur:
-                cur.execute_values("""SELECT create_postcode_word(pc, var)
-                                      FROM (VALUES %s) AS v(pc, var)""",
-                                   terms)
+                cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
 
 
 
@@ -580,18 +577,15 @@ class ICUNameAnalyzer(AbstractAnalyzer):
         to_add = new_phrases - existing_phrases
 
         added = 0
-        with CopyBuffer() as copystr:
+        with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
             for word, cls, typ, oper in to_add:
                 term = self._search_normalized(word)
                 if term:
-                    copystr.add(term, 'S', word,
-                                json.dumps({'class': cls, 'type': typ,
-                                            'op': oper if oper in ('in', 'near') else None}))
+                    copy.write_row((term, 'S', word,
+                                    Jsonb({'class': cls, 'type': typ,
+                                           'op': oper if oper in ('in', 'near') else None})))
                     added += 1
 
-            copystr.copy_out(cursor, 'word',
-                             columns=['word_token', 'type', 'word', 'info'])
-
         return added
 
 
@@ -604,11 +598,11 @@ class ICUNameAnalyzer(AbstractAnalyzer):
         to_delete = existing_phrases - new_phrases
 
         if to_delete:
-            cursor.execute_values(
-                """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
-                    WHERE type = 'S' and word = name
-                          and info->>'class' = in_class and info->>'type' = in_type
-                          and ((op = '-' and info->>'op' is null) or op = info->>'op')
+            cursor.executemany(
+                """ DELETE FROM word
+                      WHERE type = 'S' and word = %s
+                            and info->>'class' = %s and info->>'type' = %s
+                            and %s = coalesce(info->>'op', '-')
                 """, to_delete)
 
         return len(to_delete)
@@ -655,7 +649,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 gone_tokens.update(existing_tokens[False] & word_tokens)
             if gone_tokens:
                 cur.execute("""DELETE FROM word
-                               USING unnest(%s) as token
+                               USING unnest(%s::text[]) as token
                                WHERE type = 'C' and word = %s
                                      and word_token = token""",
                             (list(gone_tokens), country_code))
@@ -668,12 +662,12 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 if internal:
                     sql = """INSERT INTO word (word_token, type, word, info)
                                (SELECT token, 'C', %s, '{"internal": "yes"}'
-                                  FROM unnest(%s) as token)
+                                  FROM unnest(%s::text[]) as token)
                            """
                 else:
                     sql = """INSERT INTO word (word_token, type, word)
                                    (SELECT token, 'C', %s
-                                    FROM unnest(%s) as token)
+                                    FROM unnest(%s::text[]) as token)
                           """
                 cur.execute(sql, (country_code, list(new_tokens)))
 
@@ -733,11 +727,10 @@ class ICUNameAnalyzer(AbstractAnalyzer):
             if norm_name:
                 result = self._cache.housenumbers.get(norm_name, result)
                 if result[0] is None:
-                    with self.conn.cursor() as cur:
-                        hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
+                    hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
 
-                        result = hid, norm_name
-                        self._cache.housenumbers[norm_name] = result
+                    result = hid, norm_name
+                    self._cache.housenumbers[norm_name] = result
         else:
             # Otherwise use the analyzer to determine the canonical name.
             # Per convention we use the first variant as the 'lookup name', the
@@ -748,11 +741,10 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 if result[0] is None:
                     variants = analyzer.compute_variants(word_id)
                     if variants:
-                        with self.conn.cursor() as cur:
-                            hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)",
+                        hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
                                              (word_id, list(variants)))
-                            result = hid, variants[0]
-                            self._cache.housenumbers[word_id] = result
+                        result = hid, variants[0]
+                        self._cache.housenumbers[word_id] = result
 
         return result