]> git.openstreetmap.org Git - nominatim.git/blobdiff - src/nominatim_db/tokenizer/icu_tokenizer.py
Merge pull request #3506 from mtmail/development-environment-more-dependencies
[nominatim.git] / src / nominatim_db / tokenizer / icu_tokenizer.py
index 70c5c27a096dd1d6c8764370fa7ec6d504a4e99d..7cd96d591fcec97483a721542829f4da90e52430 100644 (file)
@@ -11,15 +11,16 @@ libICU instead of the PostgreSQL module.
 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
                    Dict, Set, Iterable
 import itertools
 from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
                    Dict, Set, Iterable
 import itertools
-import json
 import logging
 from pathlib import Path
 from textwrap import dedent
 
 import logging
 from pathlib import Path
 from textwrap import dedent
 
+from psycopg.types.json import Jsonb
+from psycopg import sql as pysql
+
 from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
                             drop_tables, table_exists, execute_scalar
 from ..config import Configuration
 from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
                             drop_tables, table_exists, execute_scalar
 from ..config import Configuration
-from ..db.utils import CopyBuffer
 from ..db.sql_preprocessor import SQLPreprocessor
 from ..data.place_info import PlaceInfo
 from ..data.place_name import PlaceName
 from ..db.sql_preprocessor import SQLPreprocessor
 from ..data.place_info import PlaceInfo
 from ..data.place_name import PlaceName
@@ -115,8 +116,8 @@ class ICUTokenizer(AbstractTokenizer):
             with conn.cursor() as cur:
                 cur.execute('ANALYSE search_name')
                 if threads > 1:
             with conn.cursor() as cur:
                 cur.execute('ANALYSE search_name')
                 if threads > 1:
-                    cur.execute('SET max_parallel_workers_per_gather TO %s',
-                                (min(threads, 6),))
+                    cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
+                                     .format(pysql.Literal(min(threads, 6),)))
 
                 if server_version_tuple(conn) < (12, 0):
                     LOG.info('Computing word frequencies')
 
                 if server_version_tuple(conn) < (12, 0):
                     LOG.info('Computing word frequencies')
@@ -391,7 +392,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
 
     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
                  token_analysis: ICUTokenAnalysis) -> None:
 
     def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
                  token_analysis: ICUTokenAnalysis) -> None:
-        self.conn: Optional[Connection] = connect(dsn).connection
+        self.conn: Optional[Connection] = connect(dsn)
         self.conn.autocommit = True
         self.sanitizer = sanitizer
         self.token_analysis = token_analysis
         self.conn.autocommit = True
         self.sanitizer = sanitizer
         self.token_analysis = token_analysis
@@ -533,9 +534,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
 
         if terms:
             with self.conn.cursor() as cur:
 
         if terms:
             with self.conn.cursor() as cur:
-                cur.execute_values("""SELECT create_postcode_word(pc, var)
-                                      FROM (VALUES %s) AS v(pc, var)""",
-                                   terms)
+                cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
 
 
 
 
 
 
@@ -578,18 +577,15 @@ class ICUNameAnalyzer(AbstractAnalyzer):
         to_add = new_phrases - existing_phrases
 
         added = 0
         to_add = new_phrases - existing_phrases
 
         added = 0
-        with CopyBuffer() as copystr:
+        with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
             for word, cls, typ, oper in to_add:
                 term = self._search_normalized(word)
                 if term:
             for word, cls, typ, oper in to_add:
                 term = self._search_normalized(word)
                 if term:
-                    copystr.add(term, 'S', word,
-                                json.dumps({'class': cls, 'type': typ,
-                                            'op': oper if oper in ('in', 'near') else None}))
+                    copy.write_row((term, 'S', word,
+                                    Jsonb({'class': cls, 'type': typ,
+                                           'op': oper if oper in ('in', 'near') else None})))
                     added += 1
 
                     added += 1
 
-            copystr.copy_out(cursor, 'word',
-                             columns=['word_token', 'type', 'word', 'info'])
-
         return added
 
 
         return added
 
 
@@ -602,11 +598,11 @@ class ICUNameAnalyzer(AbstractAnalyzer):
         to_delete = existing_phrases - new_phrases
 
         if to_delete:
         to_delete = existing_phrases - new_phrases
 
         if to_delete:
-            cursor.execute_values(
-                """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
-                    WHERE type = 'S' and word = name
-                          and info->>'class' = in_class and info->>'type' = in_type
-                          and ((op = '-' and info->>'op' is null) or op = info->>'op')
+            cursor.executemany(
+                """ DELETE FROM word
+                      WHERE type = 'S' and word = %s
+                            and info->>'class' = %s and info->>'type' = %s
+                            and %s = coalesce(info->>'op', '-')
                 """, to_delete)
 
         return len(to_delete)
                 """, to_delete)
 
         return len(to_delete)
@@ -653,7 +649,7 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 gone_tokens.update(existing_tokens[False] & word_tokens)
             if gone_tokens:
                 cur.execute("""DELETE FROM word
                 gone_tokens.update(existing_tokens[False] & word_tokens)
             if gone_tokens:
                 cur.execute("""DELETE FROM word
-                               USING unnest(%s) as token
+                               USING unnest(%s::text[]) as token
                                WHERE type = 'C' and word = %s
                                      and word_token = token""",
                             (list(gone_tokens), country_code))
                                WHERE type = 'C' and word = %s
                                      and word_token = token""",
                             (list(gone_tokens), country_code))
@@ -666,12 +662,12 @@ class ICUNameAnalyzer(AbstractAnalyzer):
                 if internal:
                     sql = """INSERT INTO word (word_token, type, word, info)
                                (SELECT token, 'C', %s, '{"internal": "yes"}'
                 if internal:
                     sql = """INSERT INTO word (word_token, type, word, info)
                                (SELECT token, 'C', %s, '{"internal": "yes"}'
-                                  FROM unnest(%s) as token)
+                                  FROM unnest(%s::text[]) as token)
                            """
                 else:
                     sql = """INSERT INTO word (word_token, type, word)
                                    (SELECT token, 'C', %s
                            """
                 else:
                     sql = """INSERT INTO word (word_token, type, word)
                                    (SELECT token, 'C', %s
-                                    FROM unnest(%s) as token)
+                                    FROM unnest(%s::text[]) as token)
                           """
                 cur.execute(sql, (country_code, list(new_tokens)))
 
                           """
                 cur.execute(sql, (country_code, list(new_tokens)))