from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
Dict, Set, Iterable
import itertools
-import json
import logging
from pathlib import Path
from textwrap import dedent
+from psycopg.types.json import Jsonb
+from psycopg import sql as pysql
+
from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
drop_tables, table_exists, execute_scalar
from ..config import Configuration
-from ..db.utils import CopyBuffer
from ..db.sql_preprocessor import SQLPreprocessor
from ..data.place_info import PlaceInfo
from ..data.place_name import PlaceName
with conn.cursor() as cur:
cur.execute('ANALYSE search_name')
if threads > 1:
- cur.execute('SET max_parallel_workers_per_gather TO %s',
- (min(threads, 6),))
+ cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
+ .format(pysql.Literal(min(threads, 6),)))
if server_version_tuple(conn) < (12, 0):
LOG.info('Computing word frequencies')
def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
token_analysis: ICUTokenAnalysis) -> None:
- self.conn: Optional[Connection] = connect(dsn).connection
+ self.conn: Optional[Connection] = connect(dsn)
self.conn.autocommit = True
self.sanitizer = sanitizer
self.token_analysis = token_analysis
if terms:
with self.conn.cursor() as cur:
- cur.execute_values("""SELECT create_postcode_word(pc, var)
- FROM (VALUES %s) AS v(pc, var)""",
- terms)
+ cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
to_add = new_phrases - existing_phrases
added = 0
- with CopyBuffer() as copystr:
+ with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
for word, cls, typ, oper in to_add:
term = self._search_normalized(word)
if term:
- copystr.add(term, 'S', word,
- json.dumps({'class': cls, 'type': typ,
- 'op': oper if oper in ('in', 'near') else None}))
+ copy.write_row((term, 'S', word,
+ Jsonb({'class': cls, 'type': typ,
+ 'op': oper if oper in ('in', 'near') else None})))
added += 1
- copystr.copy_out(cursor, 'word',
- columns=['word_token', 'type', 'word', 'info'])
-
return added
to_delete = existing_phrases - new_phrases
if to_delete:
- cursor.execute_values(
- """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
- WHERE type = 'S' and word = name
- and info->>'class' = in_class and info->>'type' = in_type
- and ((op = '-' and info->>'op' is null) or op = info->>'op')
+ cursor.executemany(
+ """ DELETE FROM word
+ WHERE type = 'S' and word = %s
+ and info->>'class' = %s and info->>'type' = %s
+ and %s = coalesce(info->>'op', '-')
""", to_delete)
return len(to_delete)
gone_tokens.update(existing_tokens[False] & word_tokens)
if gone_tokens:
cur.execute("""DELETE FROM word
- USING unnest(%s) as token
+ USING unnest(%s::text[]) as token
WHERE type = 'C' and word = %s
and word_token = token""",
(list(gone_tokens), country_code))
if internal:
sql = """INSERT INTO word (word_token, type, word, info)
(SELECT token, 'C', %s, '{"internal": "yes"}'
- FROM unnest(%s) as token)
+ FROM unnest(%s::text[]) as token)
"""
else:
sql = """INSERT INTO word (word_token, type, word)
(SELECT token, 'C', %s
- FROM unnest(%s) as token)
+ FROM unnest(%s::text[]) as token)
"""
cur.execute(sql, (country_code, list(new_tokens)))