from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \
Dict, Set, Iterable
import itertools
-import json
import logging
from pathlib import Path
-from textwrap import dedent
-from ..db.connection import connect, Connection, Cursor
+from psycopg.types.json import Jsonb
+from psycopg import sql as pysql
+
+from ..db.connection import connect, Connection, Cursor, server_version_tuple,\
+ drop_tables, table_exists, execute_scalar
from ..config import Configuration
-from ..db.utils import CopyBuffer
from ..db.sql_preprocessor import SQLPreprocessor
from ..data.place_info import PlaceInfo
from ..data.place_name import PlaceName
"""
self.loader = ICURuleLoader(config)
- self._install_php(config.lib_dir.php, overwrite=True)
self._save_config()
if init_db:
with connect(self.dsn) as conn:
self.loader.load_config_from_db(conn)
- self._install_php(config.lib_dir.php, overwrite=False)
-
def finalize_import(self, config: Configuration) -> None:
""" Do any required postprocessing to make the tokenizer data ready
""" Recompute frequencies for all name words.
"""
with connect(self.dsn) as conn:
- if not conn.table_exists('search_name'):
+ if not table_exists(conn, 'search_name'):
return
with conn.cursor() as cur:
cur.execute('ANALYSE search_name')
if threads > 1:
- cur.execute('SET max_parallel_workers_per_gather TO %s',
- (min(threads, 6),))
+ cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
+ .format(pysql.Literal(min(threads, 6),)))
- if conn.server_version_tuple() < (12, 0):
+ if server_version_tuple(conn) < (12, 0):
LOG.info('Computing word frequencies')
- cur.drop_table('word_frequencies')
- cur.drop_table('addressword_frequencies')
+ drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
cur.execute("""CREATE TEMP TABLE word_frequencies AS
SELECT unnest(name_vector) as id, count(*)
FROM search_name GROUP BY id""")
$$ LANGUAGE plpgsql IMMUTABLE;
""")
LOG.info('Update word table with recomputed frequencies')
- cur.drop_table('tmp_word')
+ drop_tables(conn, 'tmp_word')
cur.execute("""CREATE TABLE tmp_word AS
SELECT word_id, word_token, type, word,
word_freq_update(word_id, info) as info
FROM word
""")
- cur.drop_table('word_frequencies')
- cur.drop_table('addressword_frequencies')
+ drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
else:
LOG.info('Computing word frequencies')
- cur.drop_table('word_frequencies')
+ drop_tables(conn, 'word_frequencies')
cur.execute("""
CREATE TEMP TABLE word_frequencies AS
WITH word_freq AS MATERIALIZED (
cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
cur.execute('ANALYSE word_frequencies')
LOG.info('Update word table with recomputed frequencies')
- cur.drop_table('tmp_word')
+ drop_tables(conn, 'tmp_word')
cur.execute("""CREATE TABLE tmp_word AS
SELECT word_id, word_token, type, word,
(CASE WHEN wf.info is null THEN word.info
END) as info
FROM word LEFT JOIN word_frequencies wf
ON word.word_id = wf.id
+ ORDER BY word_id
""")
- cur.drop_table('word_frequencies')
+ drop_tables(conn, 'word_frequencies')
with conn.cursor() as cur:
cur.execute('SET max_parallel_workers_per_gather TO 0')
""" Remove unused house numbers.
"""
with connect(self.dsn) as conn:
- if not conn.table_exists('search_name'):
+ if not table_exists(conn, 'search_name'):
return
with conn.cursor(name="hnr_counter") as cur:
cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token)
return list(s[0].split('@')[0] for s in cur)
- def _install_php(self, phpdir: Optional[Path], overwrite: bool = True) -> None:
- """ Install the php script for the tokenizer.
- """
- if phpdir is not None:
- assert self.loader is not None
- php_file = self.data_dir / "tokenizer.php"
-
- if not php_file.exists() or overwrite:
- php_file.write_text(dedent(f"""\
- <?php
- @define('CONST_Max_Word_Frequency', 10000000);
- @define('CONST_Term_Normalization_Rules', "{self.loader.normalization_rules}");
- @define('CONST_Transliteration', "{self.loader.get_search_rules()}");
- require_once('{phpdir}/tokenizer/icu_tokenizer.php');"""), encoding='utf-8')
-
-
def _save_config(self) -> None:
""" Save the configuration that needs to remain stable for the given
database as database properties.
frequencies.
"""
with connect(self.dsn) as conn:
- with conn.cursor() as cur:
- cur.drop_table('word')
+ drop_tables(conn, 'word')
sqlp = SQLPreprocessor(conn, config)
sqlp.run_string(conn, """
CREATE TABLE word (
""" Rename all tables and indexes used by the tokenizer.
"""
with connect(self.dsn) as conn:
+ drop_tables(conn, 'word')
with conn.cursor() as cur:
- cur.drop_table('word')
cur.execute(f"ALTER TABLE {old} RENAME TO word")
for idx in ('word_token', 'word_id'):
cur.execute(f"""ALTER INDEX idx_{old}_{idx}
def __init__(self, dsn: str, sanitizer: PlaceSanitizer,
token_analysis: ICUTokenAnalysis) -> None:
- self.conn: Optional[Connection] = connect(dsn).connection
+ self.conn: Optional[Connection] = connect(dsn)
self.conn.autocommit = True
self.sanitizer = sanitizer
self.token_analysis = token_analysis
if terms:
with self.conn.cursor() as cur:
- cur.execute_values("""SELECT create_postcode_word(pc, var)
- FROM (VALUES %s) AS v(pc, var)""",
- terms)
+ cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
to_add = new_phrases - existing_phrases
added = 0
- with CopyBuffer() as copystr:
+ with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy:
for word, cls, typ, oper in to_add:
term = self._search_normalized(word)
if term:
- copystr.add(term, 'S', word,
- json.dumps({'class': cls, 'type': typ,
- 'op': oper if oper in ('in', 'near') else None}))
+ copy.write_row((term, 'S', word,
+ Jsonb({'class': cls, 'type': typ,
+ 'op': oper if oper in ('in', 'near') else None})))
added += 1
- copystr.copy_out(cursor, 'word',
- columns=['word_token', 'type', 'word', 'info'])
-
return added
to_delete = existing_phrases - new_phrases
if to_delete:
- cursor.execute_values(
- """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op)
- WHERE type = 'S' and word = name
- and info->>'class' = in_class and info->>'type' = in_type
- and ((op = '-' and info->>'op' is null) or op = info->>'op')
+ cursor.executemany(
+ """ DELETE FROM word
+ WHERE type = 'S' and word = %s
+ and info->>'class' = %s and info->>'type' = %s
+ and %s = coalesce(info->>'op', '-')
""", to_delete)
return len(to_delete)
gone_tokens.update(existing_tokens[False] & word_tokens)
if gone_tokens:
cur.execute("""DELETE FROM word
- USING unnest(%s) as token
+ USING unnest(%s::text[]) as token
WHERE type = 'C' and word = %s
and word_token = token""",
(list(gone_tokens), country_code))
if internal:
sql = """INSERT INTO word (word_token, type, word, info)
(SELECT token, 'C', %s, '{"internal": "yes"}'
- FROM unnest(%s) as token)
+ FROM unnest(%s::text[]) as token)
"""
else:
sql = """INSERT INTO word (word_token, type, word)
(SELECT token, 'C', %s
- FROM unnest(%s) as token)
+ FROM unnest(%s::text[]) as token)
"""
cur.execute(sql, (country_code, list(new_tokens)))
if norm_name:
result = self._cache.housenumbers.get(norm_name, result)
if result[0] is None:
- with self.conn.cursor() as cur:
- hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, ))
+ hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, ))
- result = hid, norm_name
- self._cache.housenumbers[norm_name] = result
+ result = hid, norm_name
+ self._cache.housenumbers[norm_name] = result
else:
# Otherwise use the analyzer to determine the canonical name.
# Per convention we use the first variant as the 'lookup name', the
if result[0] is None:
variants = analyzer.compute_variants(word_id)
if variants:
- with self.conn.cursor() as cur:
- hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)",
+ hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)",
(word_id, list(variants)))
- result = hid, variants[0]
- self._cache.housenumbers[word_id] = result
+ result = hid, variants[0]
+ self._cache.housenumbers[word_id] = result
return result