]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/postcodes.py
respect socket timeout also in other replication functions
[nominatim.git] / nominatim / tools / postcodes.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
10 """
11 from typing import Optional, Tuple, Dict, List, TextIO
12 from collections import defaultdict
13 from pathlib import Path
14 import csv
15 import gzip
16 import logging
17 from math import isfinite
18
19 from psycopg2 import sql as pysql
20
21 from nominatim.db.connection import connect, Connection
22 from nominatim.utils.centroid import PointsCentroid
23 from nominatim.data.postcode_format import PostcodeFormatter, CountryPostcodeMatcher
24 from nominatim.tokenizer.base import AbstractAnalyzer, AbstractTokenizer
25
26 LOG = logging.getLogger()
27
28 def _to_float(numstr: str, max_value: float) -> float:
29     """ Convert the number in string into a float. The number is expected
30         to be in the range of [-max_value, max_value]. Otherwise rises a
31         ValueError.
32     """
33     num = float(numstr)
34     if not isfinite(num) or num <= -max_value or num >= max_value:
35         raise ValueError()
36
37     return num
38
39 class _PostcodeCollector:
40     """ Collector for postcodes of a single country.
41     """
42
43     def __init__(self, country: str, matcher: Optional[CountryPostcodeMatcher]):
44         self.country = country
45         self.matcher = matcher
46         self.collected: Dict[str, PointsCentroid] = defaultdict(PointsCentroid)
47         self.normalization_cache: Optional[Tuple[str, Optional[str]]] = None
48
49
50     def add(self, postcode: str, x: float, y: float) -> None:
51         """ Add the given postcode to the collection cache. If the postcode
52             already existed, it is overwritten with the new centroid.
53         """
54         if self.matcher is not None:
55             normalized: Optional[str]
56             if self.normalization_cache and self.normalization_cache[0] == postcode:
57                 normalized = self.normalization_cache[1]
58             else:
59                 match = self.matcher.match(postcode)
60                 normalized = self.matcher.normalize(match) if match else None
61                 self.normalization_cache = (postcode, normalized)
62
63             if normalized:
64                 self.collected[normalized] += (x, y)
65
66
67     def commit(self, conn: Connection, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
68         """ Update postcodes for the country from the postcodes selected so far
69             as well as any externally supplied postcodes.
70         """
71         self._update_from_external(analyzer, project_dir)
72         to_add, to_delete, to_update = self._compute_changes(conn)
73
74         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
75                  self.country, len(to_add), len(to_delete), len(to_update))
76
77         with conn.cursor() as cur:
78             if to_add:
79                 cur.execute_values(
80                     """INSERT INTO location_postcode
81                          (place_id, indexed_status, country_code,
82                           postcode, geometry) VALUES %s""",
83                     to_add,
84                     template=pysql.SQL("""(nextval('seq_place'), 1, {},
85                                           %s, 'SRID=4326;POINT(%s %s)')
86                                        """).format(pysql.Literal(self.country)))
87             if to_delete:
88                 cur.execute("""DELETE FROM location_postcode
89                                WHERE country_code = %s and postcode = any(%s)
90                             """, (self.country, to_delete))
91             if to_update:
92                 cur.execute_values(
93                     pysql.SQL("""UPDATE location_postcode
94                                  SET indexed_status = 2,
95                                      geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
96                                  FROM (VALUES %s) AS v (pc, x, y)
97                                  WHERE country_code = {} and postcode = pc
98                               """).format(pysql.Literal(self.country)), to_update)
99
100
101     def _compute_changes(self, conn: Connection) \
102           -> Tuple[List[Tuple[str, float, float]], List[str], List[Tuple[str, float, float]]]:
103         """ Compute which postcodes from the collected postcodes have to be
104             added or modified and which from the location_postcode table
105             have to be deleted.
106         """
107         to_update = []
108         to_delete = []
109         with conn.cursor() as cur:
110             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
111                            FROM location_postcode
112                            WHERE country_code = %s""",
113                         (self.country, ))
114             for postcode, x, y in cur:
115                 pcobj = self.collected.pop(postcode, None)
116                 if pcobj:
117                     newx, newy = pcobj.centroid()
118                     if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
119                         to_update.append((postcode, newx, newy))
120                 else:
121                     to_delete.append(postcode)
122
123         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
124         self.collected = defaultdict(PointsCentroid)
125
126         return to_add, to_delete, to_update
127
128
129     def _update_from_external(self, analyzer: AbstractAnalyzer, project_dir: Path) -> None:
130         """ Look for an external postcode file for the active country in
131             the project directory and add missing postcodes when found.
132         """
133         csvfile = self._open_external(project_dir)
134         if csvfile is None:
135             return
136
137         try:
138             reader = csv.DictReader(csvfile)
139             for row in reader:
140                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
141                     LOG.warning("Bad format for external postcode file for country '%s'."
142                                 " Ignored.", self.country)
143                     return
144                 postcode = analyzer.normalize_postcode(row['postcode'])
145                 if postcode not in self.collected:
146                     try:
147                         # Do float conversation separately, it might throw
148                         centroid = (_to_float(row['lon'], 180),
149                                     _to_float(row['lat'], 90))
150                         self.collected[postcode] += centroid
151                     except ValueError:
152                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
153                                     row['lat'], row['lon'], self.country)
154
155         finally:
156             csvfile.close()
157
158
159     def _open_external(self, project_dir: Path) -> Optional[TextIO]:
160         fname = project_dir / f'{self.country}_postcodes.csv'
161
162         if fname.is_file():
163             LOG.info("Using external postcode file '%s'.", fname)
164             return open(fname, 'r', encoding='utf-8')
165
166         fname = project_dir / f'{self.country}_postcodes.csv.gz'
167
168         if fname.is_file():
169             LOG.info("Using external postcode file '%s'.", fname)
170             return gzip.open(fname, 'rt')
171
172         return None
173
174
175 def update_postcodes(dsn: str, project_dir: Path, tokenizer: AbstractTokenizer) -> None:
176     """ Update the table of artificial postcodes.
177
178         Computes artificial postcode centroids from the placex table,
179         potentially enhances it with external data and then updates the
180         postcodes in the table 'location_postcode'.
181     """
182     matcher = PostcodeFormatter()
183     with tokenizer.name_analyzer() as analyzer:
184         with connect(dsn) as conn:
185             # First get the list of countries that currently have postcodes.
186             # (Doing this before starting to insert, so it is fast on import.)
187             with conn.cursor() as cur:
188                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
189                 todo_countries = set((row[0] for row in cur))
190
191             # Recompute the list of valid postcodes from placex.
192             with conn.cursor(name="placex_postcodes") as cur:
193                 cur.execute("""
194                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
195                 FROM (SELECT
196                         COALESCE(plx.country_code,
197                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
198                         pl.address->'postcode' as pc,
199                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
200                       FROM place AS pl LEFT OUTER JOIN placex AS plx
201                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
202                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
203                 WHERE pc IS NOT null AND cc IS NOT null
204                 ORDER BY cc, pc""")
205
206                 collector = None
207
208                 for country, postcode, x, y in cur:
209                     if collector is None or country != collector.country:
210                         if collector is not None:
211                             collector.commit(conn, analyzer, project_dir)
212                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
213                         todo_countries.discard(country)
214                     collector.add(postcode, x, y)
215
216                 if collector is not None:
217                     collector.commit(conn, analyzer, project_dir)
218
219             # Now handle any countries that are only in the postcode table.
220             for country in todo_countries:
221                 fmt = matcher.get_matcher(country)
222                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
223
224             conn.commit()
225
226         analyzer.update_postcodes_from_db()
227
228 def can_compute(dsn: str) -> bool:
229     """
230         Check that the place table exists so that
231         postcodes can be computed.
232     """
233     with connect(dsn) as conn:
234         return conn.table_exists('place')