]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_db/tools/postcodes.py
fix style issue found by flake8
[nominatim.git] / src / nominatim_db / tools / postcodes.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
10 """
11 from typing import Optional, Tuple, Dict, List, TextIO
12 from collections import defaultdict
13 from pathlib import Path
14 import csv
15 import gzip
16 import logging
17 from math import isfinite
18
19 from psycopg import sql as pysql
20
21 from ..db.connection import connect, Connection, table_exists
22 from ..utils.centroid import PointsCentroid
23 from ..data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from ..tokenizer.base import AbstractAnalyzer, AbstractTokenizer
25
26 LOG = logging.getLogger()
27
28
29 def _to_float(numstr: str, max_value: float) -> float:
30     """ Convert the number in string into a float. The number is expected
31         to be in the range of [-max_value, max_value]. Otherwise rises a
32         ValueError.
33     """
34     num = float(numstr)
35     if not isfinite(num) or num <= -max_value or num >= max_value:
36         raise ValueError()
37
38     return num
39
40
41 class _PostcodeCollector:
42     """ Collector for postcodes of a single country.
43     """
44
45     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
46         self.country = country
47         self.matcher = matcher
48         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
49         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
50
51     def add(self, postcode: str, x: float, y: float) -> None:
52         """ Add the given postcode to the collection cache. If the postcode
53             already existed, it is overwritten with the new centroid.
54         """
55         if self.matcher is not None:
56             normalized: Optional[str]
57             if self.normalization_cache and self.normalization_cache[0] == postcode:
58                 normalized = self.normalization_cache[1]
59             else:
60                 match = self.matcher.match(postcode)
61                 normalized = self.matcher.normalize(match) if match else None
62                 self.normalization_cache = (postcode, normalized)
63
64             if normalized:
65                 self.collected[normalized] += (x, y)
66
67     def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
68         """ Update postcodes for the country from the postcodes selected so far
69             as well as any externally supplied postcodes.
70         """
71         self._update_from_external(analyzer, project_dir)
72         to_add, to_delete, to_update = self._compute_changes(conn)
73
74         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
75                  self.country, len(to_add), len(to_delete), len(to_update))
76
77         with conn.cursor() as cur:
78             if to_add:
79                 cur.executemany(pysql.SQL(
80                     """INSERT INTO location_postcode
81                          (place_id, indexed_status, country_code,
82                           postcode, geometry)
83                        VALUES (nextval('seq_place'), 1, {}, %s,
84                                ST_SetSRID(ST_MakePoint(%s, %s), 4326))
85                     """).format(pysql.Literal(self.country)),
86                     to_add)
87             if to_delete:
88                 cur.execute("""DELETE FROM location_postcode
89                                WHERE country_code = %s and postcode = any(%s)
90                             """, (self.country, to_delete))
91             if to_update:
92                 cur.executemany(
93                     pysql.SQL("""UPDATE location_postcode
94                                  SET indexed_status = 2,
95                                      geometry = ST_SetSRID(ST_Point(%s, %s), 4326)
96                                  WHERE country_code = {} and postcode = %s
97                               """).format(pysql.Literal(self.country)),
98                     to_update)
99
100     def _compute_changes(
101             self, conn: Connection
102             ) -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[float, float, str]]]:
103         """ Compute which postcodes from the collected postcodes have to be
104             added or modified and which from the location_postcode table
105             have to be deleted.
106         """
107         to_update = []
108         to_delete = []
109         with conn.cursor() as cur:
110             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
111                            FROM location_postcode
112                            WHERE country_code = %s""",
113                         (self.country, ))
114             for postcode, x, y in cur:
115                 pcobj = self.collected.pop(postcode, None)
116                 if pcobj:
117                     newx, newy = pcobj.centroid()
118                     if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
119                         to_update.append((newx, newy, postcode))
120                 else:
121                     to_delete.append(postcode)
122
123         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
124         self.collected = defaultdict(PointsCentroid)
125
126         return to_add, to_delete, to_update
127
128     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
129         """ Look for an external postcode file for the active country in
130             the project directory and add missing postcodes when found.
131         """
132         csvfile = self._open_external(project_dir)
133         if csvfile is None:
134             return
135
136         try:
137             reader = csv.DictReader(csvfile)
138             for row in reader:
139                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
140                     LOG.warning("Bad format for external postcode file for country '%s'."
141                                 " Ignored.", self.country)
142                     return
143                 postcode = analyzer.normalize_postcode(row['postcode'])
144                 if postcode not in self.collected:
145                     try:
146                         # Do float conversation separately, it might throw
147                         centroid = (_to_float(row['lon'], 180),
148                                     _to_float(row['lat'], 90))
149                         self.collected[postcode] += centroid
150                     except ValueError:
151                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
152                                     row['lat'], row['lon'], self.country)
153
154         finally:
155             csvfile.close()
156
157     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
158         fname = project_dir / f'{self.country}_postcodes.csv'
159
160         if fname.is_file():
161             LOG.info("Using external postcode file '%s'.", fname)
162             return open(fname, 'r', encoding='utf-8')
163
164         fname = project_dir / f'{self.country}_postcodes.csv.gz'
165
166         if fname.is_file():
167             LOG.info("Using external postcode file '%s'.", fname)
168             return gzip.open(fname, 'rt')
169
170         return None
171
172
173 def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
174     """ Update the table of artificial postcodes.
175
176         Computes artificial postcode centroids from the placex table,
177         potentially enhances it with external data and then updates the
178         postcodes in the table 'location_postcode'.
179     """
180     matcher = PostcodeFormatter()
181     with tokenizer.name_analyzer() as analyzer:
182         with connect(dsn) as conn:
183             # First get the list of countries that currently have postcodes.
184             # (Doing this before starting to insert, so it is fast on import.)
185             with conn.cursor() as cur:
186                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
187                 todo_countries = set((row[0] for row in cur))
188
189             # Recompute the list of valid postcodes from placex.
190             with conn.cursor(name="placex_postcodes") as cur:
191                 cur.execute("""
192                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
193                 FROM (SELECT
194                         COALESCE(plx.country_code,
195                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
196                         pl.address->'postcode' as pc,
197                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
198                       FROM place AS pl LEFT OUTER JOIN placex AS plx
199                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
200                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
201                 WHERE pc IS NOT null AND cc IS NOT null
202                 ORDER BY cc, pc""")
203
204                 collector = None
205
206                 for country, postcode, x, y in cur:
207                     if collector is None or country != collector.country:
208                         if collector is not None:
209                             collector.commit(conn, analyzer, project_dir)
210                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
211                         todo_countries.discard(country)
212                     collector.add(postcode, x, y)
213
214                 if collector is not None:
215                     collector.commit(conn, analyzer, project_dir)
216
217             # Now handle any countries that are only in the postcode table.
218             for country in todo_countries:
219                 fmt = matcher.get_matcher(country)
220                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
221
222             conn.commit()
223
224         analyzer.update_postcodes_from_db()
225
226
227 def can_compute(dsn: str) -> bool:
228     """
229         Check that the place table exists so that
230         postcodes can be computed.
231     """
232     with connect(dsn) as conn:
233         return table_exists(conn, 'place')