+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
"""
Functions for importing, updating and otherwise maintaining the table
of artificial postcode centroids.
"""
+from typing import Optional, Tuple, Dict, List, TextIO
+from collections import defaultdict
+from pathlib import Path
import csv
import gzip
import logging
from math import isfinite
-from psycopg2.extras import execute_values
+from psycopg2 import sql as pysql
-from nominatim.db.connection import _Connection, connect
+from nominatim.db.connection import connect, Connection
+from nominatim.utils.centroid import PointsCentroid
+from nominatim.data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
+from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
LOG = logging.getLogger()
-def _to_float(num, max_value):
+def _to_float(numstr: str, max_value: float) -> float:
""" Convert the number in string into a float. The number is expected
to be in the range of [-max_value, max_value]. Otherwise rises a
ValueError.
"""
- num = float(num)
+ num = float(numstr)
if not isfinite(num) or num <= -max_value or num >= max_value:
raise ValueError()
return num
-class _CountryPostcodesCollector:
+class _PostcodeCollector:
""" Collector for postcodes of a single country.
"""
- def __init__(self, country):
+ def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
self.country = country
- self.collected = dict()
+ self.matcher = matcher
+ self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
+ self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
- def add(self, postcode, x, y):
+ def add(self, postcode: str, x: float, y: float) -> None:
""" Add the given postcode to the collection cache. If the postcode
already existed, it is overwritten with the new centroid.
"""
- self.collected[postcode] = (x, y)
+ if self.matcher is not None:
+ normalized: Optional[str]
+ if self.normalization_cache and self.normalization_cache[0] == postcode:
+ normalized = self.normalization_cache[1]
+ else:
+ match = self.matcher.match(postcode)
+ normalized = self.matcher.normalize(match) if match else None
+ self.normalization_cache = (postcode, normalized)
+ if normalized:
+ self.collected[normalized] += (x, y)
- def commit(self, conn, analyzer, project_dir):
+
+ def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
""" Update postcodes for the country from the postcodes selected so far
as well as any externally supplied postcodes.
"""
with conn.cursor() as cur:
if to_add:
- execute_values(cur,
- """INSERT INTO location_postcode
- (place_id, indexed_status, country_code,
- postcode, geometry) VALUES %s""",
- to_add,
- template="""(nextval('seq_place'), 1, '{}',
- %s, 'SRID=4326;POINT(%s %s)')
- """.format(self.country))
+ cur.execute_values(
+ """INSERT INTO location_postcode
+ (place_id, indexed_status, country_code,
+ postcode, geometry) VALUES %s""",
+ to_add,
+ template=pysql.SQL("""(nextval('seq_place'), 1, {},
+ %s, 'SRID=4326;POINT(%s %s)')
+ """).format(pysql.Literal(self.country)))
if to_delete:
cur.execute("""DELETE FROM location_postcode
WHERE country_code = %s and postcode = any(%s)
""", (self.country, to_delete))
if to_update:
- execute_values(cur,
- """UPDATE location_postcode
- SET indexed_status = 2,
- geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
- FROM (VALUES %s) AS v (pc, x, y)
- WHERE country_code = '{}' and postcode = pc
- """.format(self.country),
- to_update)
+ cur.execute_values(
+ pysql.SQL("""UPDATE location_postcode
+ SET indexed_status = 2,
+ geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
+ FROM (VALUES %s) AS v (pc, x, y)
+ WHERE country_code = {} and postcode = pc
+ """).format(pysql.Literal(self.country)), to_update)
- def _compute_changes(self, conn):
+ def _compute_changes(self, conn: Connection) \
+ -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[str, float, float]]]:
""" Compute which postcodes from the collected postcodes have to be
added or modified and which from the location_postcode table
have to be deleted.
WHERE country_code = %s""",
(self.country, ))
for postcode, x, y in cur:
- newx, newy = self.collected.pop(postcode, (None, None))
- if newx is not None:
- dist = (x - newx)**2 + (y - newy)**2
- if dist > 0.0000001:
+ pcobj = self.collected.pop(postcode, None)
+ if pcobj:
+ newx, newy = pcobj.centroid()
+ if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
to_update.append((postcode, newx, newy))
else:
to_delete.append(postcode)
- to_add = [(k, v[0], v[1]) for k, v in self.collected.items()]
- self.collected = []
+ to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
+ self.collected = defaultdict(PointsCentroid)
return to_add, to_delete, to_update
- def _update_from_external(self, analyzer, project_dir):
+ def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
""" Look for an external postcode file for the active country in
the project directory and add missing postcodes when found.
"""
postcode = analyzer.normalize_postcode(row['postcode'])
if postcode not in self.collected:
try:
- self.collected[postcode] = (_to_float(row['lon'], 180),
- _to_float(row['lat'], 90))
+ # Do float conversation separately, it might throw
+ centroid = (_to_float(row['lon'], 180),
+ _to_float(row['lat'], 90))
+ self.collected[postcode] += centroid
except ValueError:
LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
row['lat'], row['lon'], self.country)
csvfile.close()
- def _open_external(self, project_dir):
- fname = project_dir / '{}_postcodes.csv'.format(self.country)
+ def _open_external(self, project_dir: Path) -> Optional[TextIO]:
+ fname = project_dir / f'{self.country}_postcodes.csv'
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
- return open(fname, 'r')
+ return open(fname, 'r', encoding='utf-8')
- fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
+ fname = project_dir / f'{self.country}_postcodes.csv.gz'
if fname.is_file():
LOG.info("Using external postcode file '%s'.", fname)
return None
-def update_postcodes(dsn, project_dir, tokenizer):
+def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
""" Update the table of artificial postcodes.
Computes artificial postcode centroids from the placex table,
potentially enhances it with external data and then updates the
postcodes in the table 'location_postcode'.
"""
+ matcher = PostcodeFormatter()
with tokenizer.name_analyzer() as analyzer:
with connect(dsn) as conn:
# First get the list of countries that currently have postcodes.
# Recompute the list of valid postcodes from placex.
with conn.cursor(name="placex_postcodes") as cur:
cur.execute("""
- SELECT cc as country_code, pc, ST_X(centroid), ST_Y(centroid)
- FROM (
- SELECT
- COALESCE(plx.country_code, get_country_code(ST_Centroid(pl.geometry))) as cc,
- token_normalized_postcode(pl.address->'postcode') as pc,
- ST_Centroid(ST_Collect(ST_Centroid(pl.geometry))) as centroid
- FROM place AS pl LEFT OUTER JOIN placex AS plx ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
- WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null
- GROUP BY cc, pc
- ) xx
+ SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
+ FROM (SELECT
+ COALESCE(plx.country_code,
+ get_country_code(ST_Centroid(pl.geometry))) as cc,
+ pl.address->'postcode' as pc,
+ COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
+ FROM place AS pl LEFT OUTER JOIN placex AS plx
+ ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
+ WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
WHERE pc IS NOT null AND cc IS NOT null
- ORDER BY country_code, pc""")
+ ORDER BY cc, pc""")
collector = None
if collector is None or country != collector.country:
if collector is not None:
collector.commit(conn, analyzer, project_dir)
- collector = _CountryPostcodesCollector(country)
+ collector = _PostcodeCollector(country, matcher.get_matcher(country))
todo_countries.discard(country)
collector.add(postcode, x, y)
# Now handle any countries that are only in the postcode table.
for country in todo_countries:
- _CountryPostcodesCollector(country).commit(conn, analyzer, project_dir)
+ fmt = matcher.get_matcher(country)
+ _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
conn.commit()
analyzer.update_postcodes_from_db()
-def can_compute(dsn):
+def can_compute(dsn: str) -> bool:
"""
Check that the place table exists so that
postcodes can be computed.
"""
- with _Connection(dsn) as conn:
+ with connect(dsn) as conn:
return conn.table_exists('place')