from psycopg.types.json import Jsonb
from psycopg import sql as pysql
-from ..db.connection import connect, Connection, Cursor, server_version_tuple, \
+from ..db.connection import connect, Connection, Cursor, \
drop_tables, table_exists, execute_scalar
from ..config import Configuration
from ..db.sql_preprocessor import SQLPreprocessor
cur.execute(pysql.SQL('SET max_parallel_workers_per_gather TO {}')
.format(pysql.Literal(min(threads, 6),)))
- if server_version_tuple(conn) < (12, 0):
- LOG.info('Computing word frequencies')
- drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
- cur.execute("""CREATE TEMP TABLE word_frequencies AS
- SELECT unnest(name_vector) as id, count(*)
- FROM search_name GROUP BY id""")
- cur.execute('CREATE INDEX ON word_frequencies(id)')
- cur.execute("""CREATE TEMP TABLE addressword_frequencies AS
- SELECT unnest(nameaddress_vector) as id, count(*)
- FROM search_name GROUP BY id""")
- cur.execute('CREATE INDEX ON addressword_frequencies(id)')
- cur.execute("""
- CREATE OR REPLACE FUNCTION word_freq_update(wid INTEGER,
- INOUT info JSONB)
- AS $$
- DECLARE rec RECORD;
- BEGIN
- IF info is null THEN
- info = '{}'::jsonb;
- END IF;
- FOR rec IN SELECT count FROM word_frequencies WHERE id = wid
- LOOP
- info = info || jsonb_build_object('count', rec.count);
- END LOOP;
- FOR rec IN SELECT count FROM addressword_frequencies WHERE id = wid
- LOOP
- info = info || jsonb_build_object('addr_count', rec.count);
- END LOOP;
- IF info = '{}'::jsonb THEN
- info = null;
- END IF;
- END;
- $$ LANGUAGE plpgsql IMMUTABLE;
- """)
- LOG.info('Update word table with recomputed frequencies')
- drop_tables(conn, 'tmp_word')
- cur.execute("""CREATE TABLE tmp_word AS
- SELECT word_id, word_token, type, word,
- word_freq_update(word_id, info) as info
- FROM word
- """)
- drop_tables(conn, 'word_frequencies', 'addressword_frequencies')
- else:
- LOG.info('Computing word frequencies')
- drop_tables(conn, 'word_frequencies')
- cur.execute("""
- CREATE TEMP TABLE word_frequencies AS
- WITH word_freq AS MATERIALIZED (
- SELECT unnest(name_vector) as id, count(*)
- FROM search_name GROUP BY id),
- addr_freq AS MATERIALIZED (
- SELECT unnest(nameaddress_vector) as id, count(*)
- FROM search_name GROUP BY id)
- SELECT coalesce(a.id, w.id) as id,
- (CASE WHEN w.count is null THEN '{}'::JSONB
- ELSE jsonb_build_object('count', w.count) END
- ||
- CASE WHEN a.count is null THEN '{}'::JSONB
- ELSE jsonb_build_object('addr_count', a.count) END) as info
- FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
- """)
- cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
- cur.execute('ANALYSE word_frequencies')
- LOG.info('Update word table with recomputed frequencies')
- drop_tables(conn, 'tmp_word')
- cur.execute("""CREATE TABLE tmp_word AS
- SELECT word_id, word_token, type, word,
- (CASE WHEN wf.info is null THEN word.info
- ELSE coalesce(word.info, '{}'::jsonb) || wf.info
- END) as info
- FROM word LEFT JOIN word_frequencies wf
- ON word.word_id = wf.id
- """)
- drop_tables(conn, 'word_frequencies')
+ LOG.info('Computing word frequencies')
+ drop_tables(conn, 'word_frequencies')
+ cur.execute("""
+ CREATE TEMP TABLE word_frequencies AS
+ WITH word_freq AS MATERIALIZED (
+ SELECT unnest(name_vector) as id, count(*)
+ FROM search_name GROUP BY id),
+ addr_freq AS MATERIALIZED (
+ SELECT unnest(nameaddress_vector) as id, count(*)
+ FROM search_name GROUP BY id)
+ SELECT coalesce(a.id, w.id) as id,
+ (CASE WHEN w.count is null THEN '{}'::JSONB
+ ELSE jsonb_build_object('count', w.count) END
+ ||
+ CASE WHEN a.count is null THEN '{}'::JSONB
+ ELSE jsonb_build_object('addr_count', a.count) END) as info
+ FROM word_freq w FULL JOIN addr_freq a ON a.id = w.id;
+ """)
+ cur.execute('CREATE UNIQUE INDEX ON word_frequencies(id) INCLUDE(info)')
+ cur.execute('ANALYSE word_frequencies')
+ LOG.info('Update word table with recomputed frequencies')
+ drop_tables(conn, 'tmp_word')
+ cur.execute("""CREATE TABLE tmp_word AS
+ SELECT word_id, word_token, type, word,
+ (CASE WHEN wf.info is null THEN word.info
+ ELSE coalesce(word.info, '{}'::jsonb) || wf.info
+ END) as info
+ FROM word LEFT JOIN word_frequencies wf
+ ON word.word_id = wf.id
+ """)
+ drop_tables(conn, 'word_frequencies')
with conn.cursor() as cur:
cur.execute('SET max_parallel_workers_per_gather TO 0')
return postcode.strip().upper()
def update_postcodes_from_db(self) -> None:
- """ Update postcode tokens in the word table from the location_postcode
- table.
+ """ Postcode update.
+
+ Removes all postcodes from the word table because they are not
+ needed. Postcodes are recognised by pattern.
"""
assert self.conn is not None
- analyzer = self.token_analysis.analysis.get('@postcode')
with self.conn.cursor() as cur:
- # First get all postcode names currently in the word table.
- cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
- word_entries = set((entry[0] for entry in cur))
-
- # Then compute the required postcode names from the postcode table.
- needed_entries = set()
- cur.execute("SELECT country_code, postcode FROM location_postcode")
- for cc, postcode in cur:
- info = PlaceInfo({'country_code': cc,
- 'class': 'place', 'type': 'postcode',
- 'address': {'postcode': postcode}})
- address = self.sanitizer.process_names(info)[1]
- for place in address:
- if place.kind == 'postcode':
- if analyzer is None:
- postcode_name = place.name.strip().upper()
- variant_base = None
- else:
- postcode_name = analyzer.get_canonical_id(place)
- variant_base = place.get_attr("variant")
-
- if variant_base:
- needed_entries.add(f'{postcode_name}@{variant_base}')
- else:
- needed_entries.add(postcode_name)
- break
-
- # Now update the word table.
- self._delete_unused_postcode_words(word_entries - needed_entries)
- self._add_missing_postcode_words(needed_entries - word_entries)
-
- def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
- assert self.conn is not None
- if tokens:
- with self.conn.cursor() as cur:
- cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
- (list(tokens), ))
-
- def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
- assert self.conn is not None
- if not tokens:
- return
-
- analyzer = self.token_analysis.analysis.get('@postcode')
- terms = []
-
- for postcode_name in tokens:
- if '@' in postcode_name:
- term, variant = postcode_name.split('@', 2)
- term = self._search_normalized(term)
- if analyzer is None:
- variants = [term]
- else:
- variants = analyzer.compute_variants(variant)
- if term not in variants:
- variants.append(term)
- else:
- variants = [self._search_normalized(postcode_name)]
- terms.append((postcode_name, variants))
-
- if terms:
- with self.conn.cursor() as cur:
- cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)
+ cur.execute("DELETE FROM word WHERE type = 'P'")
def update_special_phrases(self, phrases: Iterable[Tuple[str, str, str, str]],
should_replace: bool) -> None:
analyzer = self.token_analysis.analysis.get('@postcode')
if analyzer is None:
- postcode_name = item.name.strip().upper()
- variant_base = None
- else:
- postcode_name = analyzer.get_canonical_id(item)
- variant_base = item.get_attr("variant")
-
- if variant_base:
- postcode = f'{postcode_name}@{variant_base}'
+ return item.name.strip().upper()
else:
- postcode = postcode_name
-
- if postcode not in self._cache.postcodes:
- term = self._search_normalized(postcode_name)
- if not term:
- return None
-
- variants = {term}
- if analyzer is not None and variant_base:
- variants.update(analyzer.compute_variants(variant_base))
-
- with self.conn.cursor() as cur:
- cur.execute("SELECT create_postcode_word(%s, %s)",
- (postcode, list(variants)))
- self._cache.postcodes.add(postcode)
-
- return postcode_name
+ return analyzer.get_canonical_id(item)
class _TokenInfo:
self.names: Dict[str, Tuple[int, List[int]]] = {}
self.partials: Dict[str, int] = {}
self.fulls: Dict[str, List[int]] = {}
- self.postcodes: Set[str] = set()
self.housenumbers: Dict[str, Tuple[Optional[int], Optional[str]]] = {}