X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c314a3092c5b51c7782015f6fa9ac093b46fa174..aee051740e2159ec2eea1af4f5844748c38a1905:/src/nominatim_db/tokenizer/icu_tokenizer.py diff --git a/src/nominatim_db/tokenizer/icu_tokenizer.py b/src/nominatim_db/tokenizer/icu_tokenizer.py index 22e2d048..452bf26c 100644 --- a/src/nominatim_db/tokenizer/icu_tokenizer.py +++ b/src/nominatim_db/tokenizer/icu_tokenizer.py @@ -11,14 +11,15 @@ libICU instead of the PostgreSQL module. from typing import Optional, Sequence, List, Tuple, Mapping, Any, cast, \ Dict, Set, Iterable import itertools -import json import logging from pathlib import Path -from textwrap import dedent -from ..db.connection import connect, Connection, Cursor +from psycopg.types.json import Jsonb +from psycopg import sql as pysql + +from ..db.connection import connect, Connection, Cursor, server_version_tuple,\ + drop_tables, table_exists, execute_scalar from ..config import Configuration -from ..db.utils import CopyBuffer from ..db.sql_preprocessor import SQLPreprocessor from ..data.place_info import PlaceInfo from ..data.place_name import PlaceName @@ -62,7 +63,6 @@ class ICUTokenizer(AbstractTokenizer): """ self.loader = ICURuleLoader(config) - self._install_php(config.lib_dir.php, overwrite=True) self._save_config() if init_db: @@ -79,8 +79,6 @@ class ICUTokenizer(AbstractTokenizer): with connect(self.dsn) as conn: self.loader.load_config_from_db(conn) - self._install_php(config.lib_dir.php, overwrite=False) - def finalize_import(self, config: Configuration) -> None: """ Do any required postprocessing to make the tokenizer data ready @@ -108,19 +106,18 @@ class ICUTokenizer(AbstractTokenizer): """ Recompute frequencies for all name words. """ with connect(self.dsn) as conn: - if not conn.table_exists('search_name'): + if not table_exists(conn, 'search_name'): return with conn.cursor() as cur: cur.execute('ANALYSE search_name') if threads > 1: - cur.execute('SET max_parallel_workers_per_gather TO %s', - (min(threads, 6),)) + cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}') + .format(pysql.Literal(min(threads, 6),))) - if conn.server_version_tuple() < (12, 0): + if server_version_tuple(conn) < (12, 0): LOG.info('Computing word frequencies') - cur.drop_table('word_frequencies') - cur.drop_table('addressword_frequencies') + drop_tables(conn, 'word_frequencies', 'addressword_frequencies') cur.execute("""CREATE TEMP TABLE word_frequencies AS SELECT unnest(name_vector) as id, count(*) FROM search_name GROUP BY id""") @@ -152,17 +149,16 @@ class ICUTokenizer(AbstractTokenizer): $$ LANGUAGE plpgsql IMMUTABLE; """) LOG.info('Update word table with recomputed frequencies') - cur.drop_table('tmp_word') + drop_tables(conn, 'tmp_word') cur.execute("""CREATE TABLE tmp_word AS SELECT word_id, word_token, type, word, word_freq_update(word_id, info) as info FROM word """) - cur.drop_table('word_frequencies') - cur.drop_table('addressword_frequencies') + drop_tables(conn, 'word_frequencies', 'addressword_frequencies') else: LOG.info('Computing word frequencies') - cur.drop_table('word_frequencies') + drop_tables(conn, 'word_frequencies') cur.execute(""" CREATE TEMP TABLE word_frequencies AS WITH word_freq AS MATERIALIZED ( @@ -182,7 +178,7 @@ class ICUTokenizer(AbstractTokenizer): cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)') cur.execute('ANALYSE word_frequencies') LOG.info('Update word table with recomputed frequencies') - cur.drop_table('tmp_word') + drop_tables(conn, 'tmp_word') cur.execute("""CREATE TABLE tmp_word AS SELECT word_id, word_token, type, word, (CASE WHEN wf.info is null THEN word.info @@ -190,8 +186,9 @@ class ICUTokenizer(AbstractTokenizer): END) as info FROM word LEFT JOIN word_frequencies wf ON word.word_id = wf.id + ORDER BY word_id """) - cur.drop_table('word_frequencies') + drop_tables(conn, 'word_frequencies') with conn.cursor() as cur: cur.execute('SET max_parallel_workers_per_gather TO 0') @@ -210,7 +207,7 @@ class ICUTokenizer(AbstractTokenizer): """ Remove unused house numbers. """ with connect(self.dsn) as conn: - if not conn.table_exists('search_name'): + if not table_exists(conn, 'search_name'): return with conn.cursor(name="hnr_counter") as cur: cur.execute("""SELECT DISTINCT word_id, coalesce(info->>'lookup', word_token) @@ -281,22 +278,6 @@ class ICUTokenizer(AbstractTokenizer): return list(s[0].split('@')[0] for s in cur) - def _install_php(self, phpdir: Optional[Path], overwrite: bool = True) -> None: - """ Install the php script for the tokenizer. - """ - if phpdir is not None: - assert self.loader is not None - php_file = self.data_dir / "tokenizer.php" - - if not php_file.exists() or overwrite: - php_file.write_text(dedent(f"""\ - None: """ Save the configuration that needs to remain stable for the given database as database properties. @@ -311,8 +292,7 @@ class ICUTokenizer(AbstractTokenizer): frequencies. """ with connect(self.dsn) as conn: - with conn.cursor() as cur: - cur.drop_table('word') + drop_tables(conn, 'word') sqlp = SQLPreprocessor(conn, config) sqlp.run_string(conn, """ CREATE TABLE word ( @@ -370,8 +350,8 @@ class ICUTokenizer(AbstractTokenizer): """ Rename all tables and indexes used by the tokenizer. """ with connect(self.dsn) as conn: + drop_tables(conn, 'word') with conn.cursor() as cur: - cur.drop_table('word') cur.execute(f"ALTER TABLE {old} RENAME TO word") for idx in ('word_token', 'word_id'): cur.execute(f"""ALTER INDEX idx_{old}_{idx} @@ -393,7 +373,7 @@ class ICUNameAnalyzer(AbstractAnalyzer): def __init__(self, dsn: str, sanitizer: PlaceSanitizer, token_analysis: ICUTokenAnalysis) -> None: - self.conn: Optional[Connection] = connect(dsn).connection + self.conn: Optional[Connection] = connect(dsn) self.conn.autocommit = True self.sanitizer = sanitizer self.token_analysis = token_analysis @@ -535,9 +515,7 @@ class ICUNameAnalyzer(AbstractAnalyzer): if terms: with self.conn.cursor() as cur: - cur.execute_values("""SELECT create_postcode_word(pc, var) - FROM (VALUES %s) AS v(pc, var)""", - terms) + cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms) @@ -580,18 +558,15 @@ class ICUNameAnalyzer(AbstractAnalyzer): to_add = new_phrases - existing_phrases added = 0 - with CopyBuffer() as copystr: + with cursor.copy('COPY word(word_token, type, word, info) FROM STDIN') as copy: for word, cls, typ, oper in to_add: term = self._search_normalized(word) if term: - copystr.add(term, 'S', word, - json.dumps({'class': cls, 'type': typ, - 'op': oper if oper in ('in', 'near') else None})) + copy.write_row((term, 'S', word, + Jsonb({'class': cls, 'type': typ, + 'op': oper if oper in ('in', 'near') else None}))) added += 1 - copystr.copy_out(cursor, 'word', - columns=['word_token', 'type', 'word', 'info']) - return added @@ -604,11 +579,11 @@ class ICUNameAnalyzer(AbstractAnalyzer): to_delete = existing_phrases - new_phrases if to_delete: - cursor.execute_values( - """ DELETE FROM word USING (VALUES %s) as v(name, in_class, in_type, op) - WHERE type = 'S' and word = name - and info->>'class' = in_class and info->>'type' = in_type - and ((op = '-' and info->>'op' is null) or op = info->>'op') + cursor.executemany( + """ DELETE FROM word + WHERE type = 'S' and word = %s + and info->>'class' = %s and info->>'type' = %s + and %s = coalesce(info->>'op', '-') """, to_delete) return len(to_delete) @@ -655,7 +630,7 @@ class ICUNameAnalyzer(AbstractAnalyzer): gone_tokens.update(existing_tokens[False] & word_tokens) if gone_tokens: cur.execute("""DELETE FROM word - USING unnest(%s) as token + USING unnest(%s::text[]) as token WHERE type = 'C' and word = %s and word_token = token""", (list(gone_tokens), country_code)) @@ -668,12 +643,12 @@ class ICUNameAnalyzer(AbstractAnalyzer): if internal: sql = """INSERT INTO word (word_token, type, word, info) (SELECT token, 'C', %s, '{"internal": "yes"}' - FROM unnest(%s) as token) + FROM unnest(%s::text[]) as token) """ else: sql = """INSERT INTO word (word_token, type, word) (SELECT token, 'C', %s - FROM unnest(%s) as token) + FROM unnest(%s::text[]) as token) """ cur.execute(sql, (country_code, list(new_tokens))) @@ -733,11 +708,10 @@ class ICUNameAnalyzer(AbstractAnalyzer): if norm_name: result = self._cache.housenumbers.get(norm_name, result) if result[0] is None: - with self.conn.cursor() as cur: - hid = cur.scalar("SELECT getorcreate_hnr_id(%s)", (norm_name, )) + hid = execute_scalar(self.conn, "SELECT getorcreate_hnr_id(%s)", (norm_name, )) - result = hid, norm_name - self._cache.housenumbers[norm_name] = result + result = hid, norm_name + self._cache.housenumbers[norm_name] = result else: # Otherwise use the analyzer to determine the canonical name. # Per convention we use the first variant as the 'lookup name', the @@ -748,11 +722,10 @@ class ICUNameAnalyzer(AbstractAnalyzer): if result[0] is None: variants = analyzer.compute_variants(word_id) if variants: - with self.conn.cursor() as cur: - hid = cur.scalar("SELECT create_analyzed_hnr_id(%s, %s)", + hid = execute_scalar(self.conn, "SELECT create_analyzed_hnr_id(%s, %s)", (word_id, list(variants))) - result = hid, variants[0] - self._cache.housenumbers[word_id] = result + result = hid, variants[0] + self._cache.housenumbers[word_id] = result return result