]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/postcodes.py
release 5.1.0.post5
[nominatim.git] / src / nominatim_db / tools / postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2025 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
10 """
11 from typing import Optional, Tuple, Dict, List, TextIO
12 from collections import defaultdict
13 from pathlib import Path
14 import csv
15 import gzip
16 import logging
17 from math import isfinite
18
19 from psycopg import sql as pysql
20
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
25
26 LOG = logging.getLogger()
27
28
29 def _to_float(numstr: str, max_value: float) -> float:
30     """ Convert the number in string into a float. The number is expected
31         to be in the range of [-max_value, max_value]. Otherwise rises a
32         ValueError.
33     """
34     num = float(numstr)
35     if not isfinite(num) or num <= -max_value or num >= max_value:
36         raise ValueError()
37
38     return num
39
40
41 class _PostcodeCollector:
42     """ Collector for postcodes of a single country.
43     """
44
45     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
46         self.country = country
47         self.matcher = matcher
48         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
49         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
50
51     def add(self, postcode: str, x: float, y: float) -> None:
52         """ Add the given postcode to the collection cache. If the postcode
53             already existed, it is overwritten with the new centroid.
54         """
55         if self.matcher is not None:
56             normalized: Optional[str]
57             if self.normalization_cache and self.normalization_cache[0] == postcode:
58                 normalized = self.normalization_cache[1]
59             else:
60                 match = self.matcher.match(postcode)
61                 normalized = self.matcher.normalize(match) if match else None
62                 self.normalization_cache = (postcode, normalized)
63
64             if normalized:
65                 self.collected[normalized] += (x, y)
66
67     def commit(self, conn: Connection, analyzer: AbstractAnalyzer,
68                project_dir: Optional[Path]) -> None:
69         """ Update postcodes for the country from the postcodes selected so far.
70
71             When 'project_dir' is set, then any postcode files found in this
72             directory are taken into account as well.
73         """
74         if project_dir is not None:
75             self._update_from_external(analyzer, project_dir)
76         to_add, to_delete, to_update = self._compute_changes(conn)
77
78         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
79                  self.country, len(to_add), len(to_delete), len(to_update))
80
81         with conn.cursor() as cur:
82             if to_add:
83                 cur.executemany(pysql.SQL(
84                     """INSERT INTO location_postcode
85                          (place_id, indexed_status, country_code,
86                           postcode, geometry)
87                        VALUES (nextval('seq_place'), 1, {}, %s,
88                                ST_SetSRID(ST_MakePoint(%s, %s), 4326))
89                     """).format(pysql.Literal(self.country)),
90                     to_add)
91             if to_delete:
92                 cur.execute("""DELETE FROM location_postcode
93                                WHERE country_code = %s and postcode = any(%s)
94                             """, (self.country, to_delete))
95             if to_update:
96                 cur.executemany(
97                     pysql.SQL("""UPDATE location_postcode
98                                  SET indexed_status = 2,
99                                      geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
100                                  WHERE country_code = {} and postcode = %s
101                               """).format(pysql.Literal(self.country)),
102                     to_update)
103
104     def _compute_changes(
105             self, conn: Connection
106             ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
107         """ Compute which postcodes from the collected postcodes have to be
108             added or modified and which from the location_postcode table
109             have to be deleted.
110         """
111         to_update = []
112         to_delete = []
113         with conn.cursor() as cur:
114             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
115                            FROM location_postcode
116                            WHERE country_code = %s""",
117                         (self.country, ))
118             for postcode, x, y in cur:
119                 pcobj = self.collected.pop(postcode, None)
120                 if pcobj:
121                     newx, newy = pcobj.centroid()
122                     if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
123                         to_update.append((newx, newy, postcode))
124                 else:
125                     to_delete.append(postcode)
126
127         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
128         self.collected = defaultdict(PointsCentroid)
129
130         return to_add, to_delete, to_update
131
132     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
133         """ Look for an external postcode file for the active country in
134             the project directory and add missing postcodes when found.
135         """
136         csvfile = self._open_external(project_dir)
137         if csvfile is None:
138             return
139
140         try:
141             reader = csv.DictReader(csvfile)
142             for row in reader:
143                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
144                     LOG.warning("Bad format for external postcode file for country '%s'."
145                                 " Ignored.", self.country)
146                     return
147                 postcode = analyzer.normalize_postcode(row['postcode'])
148                 if postcode not in self.collected:
149                     try:
150                         # Do float conversation separately, it might throw
151                         centroid = (_to_float(row['lon'], 180),
152                                     _to_float(row['lat'], 90))
153                         self.collected[postcode] += centroid
154                     except ValueError:
155                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
156                                     row['lat'], row['lon'], self.country)
157
158         finally:
159             csvfile.close()
160
161     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
162         fname = project_dir / f'{self.country}_postcodes.csv'
163
164         if fname.is_file():
165             LOG.info("Using external postcode file '%s'.", fname)
166             return open(fname, 'r', encoding='utf-8')
167
168         fname = project_dir / f'{self.country}_postcodes.csv.gz'
169
170         if fname.is_file():
171             LOG.info("Using external postcode file '%s'.", fname)
172             return gzip.open(fname, 'rt')
173
174         return None
175
176
177 def update_postcodes(dsn: str, project_dir: Optional[Path], tokenizer: AbstractTokenizer) -> None:
178     """ Update the table of artificial postcodes.
179
180         Computes artificial postcode centroids from the placex table,
181         potentially enhances it with external data and then updates the
182         postcodes in the table 'location_postcode'.
183     """
184     matcher = PostcodeFormatter()
185     with tokenizer.name_analyzer() as analyzer:
186         with connect(dsn) as conn:
187             # First get the list of countries that currently have postcodes.
188             # (Doing this before starting to insert, so it is fast on import.)
189             with conn.cursor() as cur:
190                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
191                 todo_countries = set((row[0] for row in cur))
192
193             # Recompute the list of valid postcodes from placex.
194             with conn.cursor(name="placex_postcodes") as cur:
195                 cur.execute("""
196                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
197                 FROM (SELECT
198                         COALESCE(plx.country_code,
199                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
200                         pl.address->'postcode' as pc,
201                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
202                       FROM place AS pl LEFT OUTER JOIN placex AS plx
203                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
204                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
205                 WHERE pc IS NOT null AND cc IS NOT null
206                 ORDER BY cc, pc""")
207
208                 collector = None
209
210                 for country, postcode, x, y in cur:
211                     if collector is None or country != collector.country:
212                         if collector is not None:
213                             collector.commit(conn, analyzer, project_dir)
214                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
215                         todo_countries.discard(country)
216                     collector.add(postcode, x, y)
217
218                 if collector is not None:
219                     collector.commit(conn, analyzer, project_dir)
220
221             # Now handle any countries that are only in the postcode table.
222             for country in todo_countries:
223                 fmt = matcher.get_matcher(country)
224                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
225
226             conn.commit()
227
228         analyzer.update_postcodes_from_db()
229
230
231 def can_compute(dsn: str) -> bool:
232     """
233         Check that the place table exists so that
234         postcodes can be computed.
235     """
236     with connect(dsn) as conn:
237         return table_exists(conn, 'place')