1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
11 from typing import Optional, Tuple, Dict, List, TextIO
12 from collections import defaultdict
13 from pathlib import Path
17 from math import isfinite
19 from psycopg import sql as pysql
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
26 LOG = logging.getLogger()
28 def _to_float(numstr: str, max_value: float) -> float:
29 """ Convert the number in string into a float. The number is expected
30 to be in the range of [-max_value, max_value]. Otherwise rises a
34 if not isfinite(num) or num <= -max_value or num >= max_value:
39 class _PostcodeCollector:
40 """ Collector for postcodes of a single country.
43 def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
44 self.country = country
45 self.matcher = matcher
46 self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
47 self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
50 def add(self, postcode: str, x: float, y: float) -> None:
51 """ Add the given postcode to the collection cache. If the postcode
52 already existed, it is overwritten with the new centroid.
54 if self.matcher is not None:
55 normalized: Optional[str]
56 if self.normalization_cache and self.normalization_cache[0] == postcode:
57 normalized = self.normalization_cache[1]
59 match = self.matcher.match(postcode)
60 normalized = self.matcher.normalize(match) if match else None
61 self.normalization_cache = (postcode, normalized)
64 self.collected[normalized] += (x, y)
67 def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
68 """ Update postcodes for the country from the postcodes selected so far
69 as well as any externally supplied postcodes.
71 self._update_from_external(analyzer, project_dir)
72 to_add, to_delete, to_update = self._compute_changes(conn)
74 LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
75 self.country, len(to_add), len(to_delete), len(to_update))
77 with conn.cursor() as cur:
79 cur.executemany(pysql.SQL(
80 """INSERT INTO location_postcode
81 (place_id, indexed_status, country_code,
83 VALUES (nextval('seq_place'), 1, {}, %s,
84 ST_SetSRID(ST_MakePoint(%s, %s), 4326))
85 """).format(pysql.Literal(self.country)),
88 cur.execute("""DELETE FROM location_postcode
89 WHERE country_code = %s and postcode = any(%s)
90 """, (self.country, to_delete))
93 pysql.SQL("""UPDATE location_postcode
94 SET indexed_status = 2,
95 geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
96 WHERE country_code = {} and postcode = %s
97 """).format(pysql.Literal(self.country)),
101 def _compute_changes(self, conn: Connection) \
102 -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
103 """ Compute which postcodes from the collected postcodes have to be
104 added or modified and which from the location_postcode table
109 with conn.cursor() as cur:
110 cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
111 FROM location_postcode
112 WHERE country_code = %s""",
114 for postcode, x, y in cur:
115 pcobj = self.collected.pop(postcode, None)
117 newx, newy = pcobj.centroid()
118 if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
119 to_update.append((newx, newy, postcode))
121 to_delete.append(postcode)
123 to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
124 self.collected = defaultdict(PointsCentroid)
126 return to_add, to_delete, to_update
129 def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
130 """ Look for an external postcode file for the active country in
131 the project directory and add missing postcodes when found.
133 csvfile = self._open_external(project_dir)
138 reader = csv.DictReader(csvfile)
140 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
141 LOG.warning("Bad format for external postcode file for country '%s'."
142 " Ignored.", self.country)
144 postcode = analyzer.normalize_postcode(row['postcode'])
145 if postcode not in self.collected:
147 # Do float conversation separately, it might throw
148 centroid = (_to_float(row['lon'], 180),
149 _to_float(row['lat'], 90))
150 self.collected[postcode] += centroid
152 LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
153 row['lat'], row['lon'], self.country)
159 def _open_external(self, project_dir: Path) -> Optional[TextIO]:
160 fname = project_dir / f'{self.country}_postcodes.csv'
163 LOG.info("Using external postcode file '%s'.", fname)
164 return open(fname, 'r', encoding='utf-8')
166 fname = project_dir / f'{self.country}_postcodes.csv.gz'
169 LOG.info("Using external postcode file '%s'.", fname)
170 return gzip.open(fname, 'rt')
175 def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
176 """ Update the table of artificial postcodes.
178 Computes artificial postcode centroids from the placex table,
179 potentially enhances it with external data and then updates the
180 postcodes in the table 'location_postcode'.
182 matcher = PostcodeFormatter()
183 with tokenizer.name_analyzer() as analyzer:
184 with connect(dsn) as conn:
185 # First get the list of countries that currently have postcodes.
186 # (Doing this before starting to insert, so it is fast on import.)
187 with conn.cursor() as cur:
188 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
189 todo_countries = set((row[0] for row in cur))
191 # Recompute the list of valid postcodes from placex.
192 with conn.cursor(name="placex_postcodes") as cur:
194 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
196 COALESCE(plx.country_code,
197 get_country_code(ST_Centroid(pl.geometry))) as cc,
198 pl.address->'postcode' as pc,
199 COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
200 FROM place AS pl LEFT OUTER JOIN placex AS plx
201 ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
202 WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
203 WHERE pc IS NOT null AND cc IS NOT null
208 for country, postcode, x, y in cur:
209 if collector is None or country != collector.country:
210 if collector is not None:
211 collector.commit(conn, analyzer, project_dir)
212 collector = _PostcodeCollector(country, matcher.get_matcher(country))
213 todo_countries.discard(country)
214 collector.add(postcode, x, y)
216 if collector is not None:
217 collector.commit(conn, analyzer, project_dir)
219 # Now handle any countries that are only in the postcode table.
220 for country in todo_countries:
221 fmt = matcher.get_matcher(country)
222 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
226 analyzer.update_postcodes_from_db()
228 def can_compute(dsn: str) -> bool:
230 Check that the place table exists so that
231 postcodes can be computed.
233 with connect(dsn) as conn:
234 return table_exists(conn, 'place')