1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
11 from typing import Optional, Tuple, Dict, List, TextIO
12 from collections import defaultdict
13 from pathlib import Path
17 from math import isfinite
19 from psycopg import sql as pysql
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
26 LOG = logging.getLogger()
29 def _to_float(numstr: str, max_value: float) -> float:
30 """ Convert the number in string into a float. The number is expected
31 to be in the range of [-max_value, max_value]. Otherwise rises a
35 if not isfinite(num) or num <= -max_value or num >= max_value:
41 class _PostcodeCollector:
42 """ Collector for postcodes of a single country.
45 def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
46 self.country = country
47 self.matcher = matcher
48 self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
49 self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
51 def add(self, postcode: str, x: float, y: float) -> None:
52 """ Add the given postcode to the collection cache. If the postcode
53 already existed, it is overwritten with the new centroid.
55 if self.matcher is not None:
56 normalized: Optional[str]
57 if self.normalization_cache and self.normalization_cache[0] == postcode:
58 normalized = self.normalization_cache[1]
60 match = self.matcher.match(postcode)
61 normalized = self.matcher.normalize(match) if match else None
62 self.normalization_cache = (postcode, normalized)
65 self.collected[normalized] += (x, y)
67 def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
68 project_dir: Optional[Path]) -> None:
69 """ Update postcodes for the country from the postcodes selected so far.
71 When 'project_dir' is set, then any postcode files found in this
72 directory are taken into account as well.
74 if project_dir is not None:
75 self._update_from_external(analyzer, project_dir)
76 to_add, to_delete, to_update = self._compute_changes(conn)
78 LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
79 self.country, len(to_add), len(to_delete), len(to_update))
81 with conn.cursor() as cur:
83 cur.executemany(pysql.SQL(
84 """INSERT INTO location_postcode
85 (place_id, indexed_status, country_code,
87 VALUES (nextval('seq_place'), 1, {}, %s,
88 ST_SetSRID(ST_MakePoint(%s, %s), 4326))
89 """).format(pysql.Literal(self.country)),
92 cur.execute("""DELETE FROM location_postcode
93 WHERE country_code = %s and postcode = any(%s)
94 """, (self.country, to_delete))
97 pysql.SQL("""UPDATE location_postcode
98 SET indexed_status = 2,
99 geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
100 WHERE country_code = {} and postcode = %s
101 """).format(pysql.Literal(self.country)),
104 def _compute_changes(
105 self, conn: Connection
106 ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
107 """ Compute which postcodes from the collected postcodes have to be
108 added or modified and which from the location_postcode table
113 with conn.cursor() as cur:
114 cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
115 FROM location_postcode
116 WHERE country_code = %s""",
118 for postcode, x, y in cur:
119 pcobj = self.collected.pop(postcode, None)
121 newx, newy = pcobj.centroid()
122 if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
123 to_update.append((newx, newy, postcode))
125 to_delete.append(postcode)
127 to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
128 self.collected = defaultdict(PointsCentroid)
130 return to_add, to_delete, to_update
132 def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
133 """ Look for an external postcode file for the active country in
134 the project directory and add missing postcodes when found.
136 csvfile = self._open_external(project_dir)
141 reader = csv.DictReader(csvfile)
143 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
144 LOG.warning("Bad format for external postcode file for country '%s'."
145 " Ignored.", self.country)
147 postcode = analyzer.normalize_postcode(row['postcode'])
148 if postcode not in self.collected:
150 # Do float conversation separately, it might throw
151 centroid = (_to_float(row['lon'], 180),
152 _to_float(row['lat'], 90))
153 self.collected[postcode] += centroid
155 LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
156 row['lat'], row['lon'], self.country)
161 def _open_external(self, project_dir: Path) -> Optional[TextIO]:
162 fname = project_dir / f'{self.country}_postcodes.csv'
165 LOG.info("Using external postcode file '%s'.", fname)
166 return open(fname, 'r', encoding='utf-8')
168 fname = project_dir / f'{self.country}_postcodes.csv.gz'
171 LOG.info("Using external postcode file '%s'.", fname)
172 return gzip.open(fname, 'rt')
177 def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractTokenizer) -> None:
178 """ Update the table of artificial postcodes.
180 Computes artificial postcode centroids from the placex table,
181 potentially enhances it with external data and then updates the
182 postcodes in the table 'location_postcode'.
184 matcher = PostcodeFormatter()
185 with tokenizer.name_analyzer() as analyzer:
186 with connect(dsn) as conn:
187 # First get the list of countries that currently have postcodes.
188 # (Doing this before starting to insert, so it is fast on import.)
189 with conn.cursor() as cur:
190 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
191 todo_countries = set((row[0] for row in cur))
193 # Recompute the list of valid postcodes from placex.
194 with conn.cursor(name="placex_postcodes") as cur:
196 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
198 COALESCE(plx.country_code,
199 get_country_code(ST_Centroid(pl.geometry))) as cc,
200 pl.address->'postcode' as pc,
201 COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
202 FROM place AS pl LEFT OUTER JOIN placex AS plx
203 ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
204 WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
205 WHERE pc IS NOT null AND cc IS NOT null
210 for country, postcode, x, y in cur:
211 if collector is None or country != collector.country:
212 if collector is not None:
213 collector.commit(conn, analyzer, project_dir)
214 collector = _PostcodeCollector(country, matcher.get_matcher(country))
215 todo_countries.discard(country)
216 collector.add(postcode, x, y)
218 if collector is not None:
219 collector.commit(conn, analyzer, project_dir)
221 # Now handle any countries that are only in the postcode table.
222 for country in todo_countries:
223 fmt = matcher.get_matcher(country)
224 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
228 analyzer.update_postcodes_from_db()
231 def can_compute(dsn: str) -> bool:
233 Check that the place table exists so that
234 postcodes can be computed.
236 with connect(dsn) as conn:
237 return table_exists(conn, 'place')