- # First get all postcode names currently in the word table.
- cur.execute("SELECT DISTINCT word FROM word WHERE type = 'P'")
- word_entries = set((entry[0] for entry in cur))
-
- # Then compute the required postcode names from the postcode table.
- needed_entries = set()
- cur.execute("SELECT country_code, postcode FROM location_postcode")
- for cc, postcode in cur:
- info = PlaceInfo({'country_code': cc,
- 'class': 'place', 'type': 'postcode',
- 'address': {'postcode': postcode}})
- address = self.sanitizer.process_names(info)[1]
- for place in address:
- if place.kind == 'postcode':
- if analyzer is None:
- postcode_name = place.name.strip().upper()
- variant_base = None
- else:
- postcode_name = analyzer.get_canonical_id(place)
- variant_base = place.get_attr("variant")
-
- if variant_base:
- needed_entries.add(f'{postcode_name}@{variant_base}')
- else:
- needed_entries.add(postcode_name)
- break
-
- # Now update the word table.
- self._delete_unused_postcode_words(word_entries - needed_entries)
- self._add_missing_postcode_words(needed_entries - word_entries)
-
- def _delete_unused_postcode_words(self, tokens: Iterable[str]) -> None:
- assert self.conn is not None
- if tokens:
- with self.conn.cursor() as cur:
- cur.execute("DELETE FROM word WHERE type = 'P' and word = any(%s)",
- (list(tokens), ))
-
- def _add_missing_postcode_words(self, tokens: Iterable[str]) -> None:
- assert self.conn is not None
- if not tokens:
- return
-
- analyzer = self.token_analysis.analysis.get('@postcode')
- terms = []
-
- for postcode_name in tokens:
- if '@' in postcode_name:
- term, variant = postcode_name.split('@', 2)
- term = self._search_normalized(term)
- if analyzer is None:
- variants = [term]
- else:
- variants = analyzer.compute_variants(variant)
- if term not in variants:
- variants.append(term)
- else:
- variants = [self._search_normalized(postcode_name)]
- terms.append((postcode_name, variants))
-
- if terms:
- with self.conn.cursor() as cur:
- cur.executemany("""SELECT create_postcode_word(%s, %s)""", terms)