1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
11 from typing import Optional, Tuple, Dict, List, TextIO
12 from collections import defaultdict
13 from pathlib import Path
17 from math import isfinite
19 from psycopg import sql as pysql
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
26 LOG = logging.getLogger()
29 def _to_float(numstr: str, max_value: float) -> float:
30 """ Convert the number in string into a float. The number is expected
31 to be in the range of [-max_value, max_value]. Otherwise rises a
35 if not isfinite(num) or num <= -max_value or num >= max_value:
41 class _PostcodeCollector:
42 """ Collector for postcodes of a single country.
45 def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
46 self.country = country
47 self.matcher = matcher
48 self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
49 self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
51 def add(self, postcode: str, x: float, y: float) -> None:
52 """ Add the given postcode to the collection cache. If the postcode
53 already existed, it is overwritten with the new centroid.
55 if self.matcher is not None:
56 normalized: Optional[str]
57 if self.normalization_cache and self.normalization_cache[0] == postcode:
58 normalized = self.normalization_cache[1]
60 match = self.matcher.match(postcode)
61 normalized = self.matcher.normalize(match) if match else None
62 self.normalization_cache = (postcode, normalized)
65 self.collected[normalized] += (x, y)
67 def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
68 """ Update postcodes for the country from the postcodes selected so far
69 as well as any externally supplied postcodes.
71 self._update_from_external(analyzer, project_dir)
72 to_add, to_delete, to_update = self._compute_changes(conn)
74 LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
75 self.country, len(to_add), len(to_delete), len(to_update))
77 with conn.cursor() as cur:
79 cur.executemany(pysql.SQL(
80 """INSERT INTO location_postcode
81 (place_id, indexed_status, country_code,
83 VALUES (nextval('seq_place'), 1, {}, %s,
84 ST_SetSRID(ST_MakePoint(%s, %s), 4326))
85 """).format(pysql.Literal(self.country)),
88 cur.execute("""DELETE FROM location_postcode
89 WHERE country_code = %s and postcode = any(%s)
90 """, (self.country, to_delete))
93 pysql.SQL("""UPDATE location_postcode
94 SET indexed_status = 2,
95 geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
96 WHERE country_code = {} and postcode = %s
97 """).format(pysql.Literal(self.country)),
100 def _compute_changes(
101 self, conn: Connection
102 ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
103 """ Compute which postcodes from the collected postcodes have to be
104 added or modified and which from the location_postcode table
109 with conn.cursor() as cur:
110 cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
111 FROM location_postcode
112 WHERE country_code = %s""",
114 for postcode, x, y in cur:
115 pcobj = self.collected.pop(postcode, None)
117 newx, newy = pcobj.centroid()
118 if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
119 to_update.append((newx, newy, postcode))
121 to_delete.append(postcode)
123 to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
124 self.collected = defaultdict(PointsCentroid)
126 return to_add, to_delete, to_update
128 def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
129 """ Look for an external postcode file for the active country in
130 the project directory and add missing postcodes when found.
132 csvfile = self._open_external(project_dir)
137 reader = csv.DictReader(csvfile)
139 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
140 LOG.warning("Bad format for external postcode file for country '%s'."
141 " Ignored.", self.country)
143 postcode = analyzer.normalize_postcode(row['postcode'])
144 if postcode not in self.collected:
146 # Do float conversation separately, it might throw
147 centroid = (_to_float(row['lon'], 180),
148 _to_float(row['lat'], 90))
149 self.collected[postcode] += centroid
151 LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
152 row['lat'], row['lon'], self.country)
157 def _open_external(self, project_dir: Path) -> Optional[TextIO]:
158 fname = project_dir / f'{self.country}_postcodes.csv'
161 LOG.info("Using external postcode file '%s'.", fname)
162 return open(fname, 'r', encoding='utf-8')
164 fname = project_dir / f'{self.country}_postcodes.csv.gz'
167 LOG.info("Using external postcode file '%s'.", fname)
168 return gzip.open(fname, 'rt')
173 def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
174 """ Update the table of artificial postcodes.
176 Computes artificial postcode centroids from the placex table,
177 potentially enhances it with external data and then updates the
178 postcodes in the table 'location_postcode'.
180 matcher = PostcodeFormatter()
181 with tokenizer.name_analyzer() as analyzer:
182 with connect(dsn) as conn:
183 # First get the list of countries that currently have postcodes.
184 # (Doing this before starting to insert, so it is fast on import.)
185 with conn.cursor() as cur:
186 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
187 todo_countries = set((row[0] for row in cur))
189 # Recompute the list of valid postcodes from placex.
190 with conn.cursor(name="placex_postcodes") as cur:
192 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
194 COALESCE(plx.country_code,
195 get_country_code(ST_Centroid(pl.geometry))) as cc,
196 pl.address->'postcode' as pc,
197 COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
198 FROM place AS pl LEFT OUTER JOIN placex AS plx
199 ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
200 WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
201 WHERE pc IS NOT null AND cc IS NOT null
206 for country, postcode, x, y in cur:
207 if collector is None or country != collector.country:
208 if collector is not None:
209 collector.commit(conn, analyzer, project_dir)
210 collector = _PostcodeCollector(country, matcher.get_matcher(country))
211 todo_countries.discard(country)
212 collector.add(postcode, x, y)
214 if collector is not None:
215 collector.commit(conn, analyzer, project_dir)
217 # Now handle any countries that are only in the postcode table.
218 for country in todo_countries:
219 fmt = matcher.get_matcher(country)
220 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
224 analyzer.update_postcodes_from_db()
227 def can_compute(dsn: str) -> bool:
229 Check that the place table exists so that
230 postcodes can be computed.
232 with connect(dsn) as conn:
233 return table_exists(conn, 'place')