]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/tools/postcodes.py
type annotations for DB utils
[nominatim.git] / nominatim / tools / postcodes.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions for importing, updating and otherwise maintaining the table
9 of artificial postcode centroids.
10 """
11 from collections import defaultdict
12 import csv
13 import gzip
14 import logging
15 from math import isfinite
16
17 from psycopg2 import sql as pysql
18
19 from nominatim.db.connection import connect
20 from nominatim.utils.centroid import PointsCentroid
21 from nominatim.data.postcode_format import PostcodeFormatter
22
23 LOG = logging.getLogger()
24
25 def _to_float(num, max_value):
26     """ Convert the number in string into a float. The number is expected
27         to be in the range of [-max_value, max_value]. Otherwise rises a
28         ValueError.
29     """
30     num = float(num)
31     if not isfinite(num) or num <= -max_value or num >= max_value:
32         raise ValueError()
33
34     return num
35
36 class _PostcodeCollector:
37     """ Collector for postcodes of a single country.
38     """
39
40     def __init__(self, country, matcher):
41         self.country = country
42         self.matcher = matcher
43         self.collected = defaultdict(PointsCentroid)
44         self.normalization_cache = None
45
46
47     def add(self, postcode, x, y):
48         """ Add the given postcode to the collection cache. If the postcode
49             already existed, it is overwritten with the new centroid.
50         """
51         if self.matcher is not None:
52             if self.normalization_cache and self.normalization_cache[0] == postcode:
53                 normalized = self.normalization_cache[1]
54             else:
55                 match = self.matcher.match(postcode)
56                 normalized = self.matcher.normalize(match) if match else None
57                 self.normalization_cache = (postcode, normalized)
58
59             if normalized:
60                 self.collected[normalized] += (x, y)
61
62
63     def commit(self, conn, analyzer, project_dir):
64         """ Update postcodes for the country from the postcodes selected so far
65             as well as any externally supplied postcodes.
66         """
67         self._update_from_external(analyzer, project_dir)
68         to_add, to_delete, to_update = self._compute_changes(conn)
69
70         LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
71                  self.country, len(to_add), len(to_delete), len(to_update))
72
73         with conn.cursor() as cur:
74             if to_add:
75                 cur.execute_values(
76                     """INSERT INTO location_postcode
77                          (place_id, indexed_status, country_code,
78                           postcode, geometry) VALUES %s""",
79                     to_add,
80                     template=pysql.SQL("""(nextval('seq_place'), 1, {},
81                                           %s, 'SRID=4326;POINT(%s %s)')
82                                        """).format(pysql.Literal(self.country)))
83             if to_delete:
84                 cur.execute("""DELETE FROM location_postcode
85                                WHERE country_code = %s and postcode = any(%s)
86                             """, (self.country, to_delete))
87             if to_update:
88                 cur.execute_values(
89                     pysql.SQL("""UPDATE location_postcode
90                                  SET indexed_status = 2,
91                                      geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
92                                  FROM (VALUES %s) AS v (pc, x, y)
93                                  WHERE country_code = {} and postcode = pc
94                               """).format(pysql.Literal(self.country)), to_update)
95
96
97     def _compute_changes(self, conn):
98         """ Compute which postcodes from the collected postcodes have to be
99             added or modified and which from the location_postcode table
100             have to be deleted.
101         """
102         to_update = []
103         to_delete = []
104         with conn.cursor() as cur:
105             cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
106                            FROM location_postcode
107                            WHERE country_code = %s""",
108                         (self.country, ))
109             for postcode, x, y in cur:
110                 pcobj = self.collected.pop(postcode, None)
111                 if pcobj:
112                     newx, newy = pcobj.centroid()
113                     if (x - newx) > 0.0000001 or (y - newy) > 0.0000001:
114                         to_update.append((postcode, newx, newy))
115                 else:
116                     to_delete.append(postcode)
117
118         to_add = [(k, *v.centroid()) for k, v in self.collected.items()]
119         self.collected = None
120
121         return to_add, to_delete, to_update
122
123
124     def _update_from_external(self, analyzer, project_dir):
125         """ Look for an external postcode file for the active country in
126             the project directory and add missing postcodes when found.
127         """
128         csvfile = self._open_external(project_dir)
129         if csvfile is None:
130             return
131
132         try:
133             reader = csv.DictReader(csvfile)
134             for row in reader:
135                 if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
136                     LOG.warning("Bad format for external postcode file for country '%s'."
137                                 " Ignored.", self.country)
138                     return
139                 postcode = analyzer.normalize_postcode(row['postcode'])
140                 if postcode not in self.collected:
141                     try:
142                         # Do float conversation separately, it might throw
143                         centroid = (_to_float(row['lon'], 180),
144                                     _to_float(row['lat'], 90))
145                         self.collected[postcode] += centroid
146                     except ValueError:
147                         LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
148                                     row['lat'], row['lon'], self.country)
149
150         finally:
151             csvfile.close()
152
153
154     def _open_external(self, project_dir):
155         fname = project_dir / f'{self.country}_postcodes.csv'
156
157         if fname.is_file():
158             LOG.info("Using external postcode file '%s'.", fname)
159             return open(fname, 'r', encoding='utf-8')
160
161         fname = project_dir / f'{self.country}_postcodes.csv.gz'
162
163         if fname.is_file():
164             LOG.info("Using external postcode file '%s'.", fname)
165             return gzip.open(fname, 'rt')
166
167         return None
168
169
170 def update_postcodes(dsn, project_dir, tokenizer):
171     """ Update the table of artificial postcodes.
172
173         Computes artificial postcode centroids from the placex table,
174         potentially enhances it with external data and then updates the
175         postcodes in the table 'location_postcode'.
176     """
177     matcher = PostcodeFormatter()
178     with tokenizer.name_analyzer() as analyzer:
179         with connect(dsn) as conn:
180             # First get the list of countries that currently have postcodes.
181             # (Doing this before starting to insert, so it is fast on import.)
182             with conn.cursor() as cur:
183                 cur.execute("SELECT DISTINCT country_code FROM location_postcode")
184                 todo_countries = set((row[0] for row in cur))
185
186             # Recompute the list of valid postcodes from placex.
187             with conn.cursor(name="placex_postcodes") as cur:
188                 cur.execute("""
189                 SELECT cc, pc, ST_X(centroid), ST_Y(centroid)
190                 FROM (SELECT
191                         COALESCE(plx.country_code,
192                                  get_country_code(ST_Centroid(pl.geometry))) as cc,
193                         pl.address->'postcode' as pc,
194                         COALESCE(plx.centroid, ST_Centroid(pl.geometry)) as centroid
195                       FROM place AS pl LEFT OUTER JOIN placex AS plx
196                              ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
197                     WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null) xx
198                 WHERE pc IS NOT null AND cc IS NOT null
199                 ORDER BY cc, pc""")
200
201                 collector = None
202
203                 for country, postcode, x, y in cur:
204                     if collector is None or country != collector.country:
205                         if collector is not None:
206                             collector.commit(conn, analyzer, project_dir)
207                         collector = _PostcodeCollector(country, matcher.get_matcher(country))
208                         todo_countries.discard(country)
209                     collector.add(postcode, x, y)
210
211                 if collector is not None:
212                     collector.commit(conn, analyzer, project_dir)
213
214             # Now handle any countries that are only in the postcode table.
215             for country in todo_countries:
216                 fmt = matcher.get_matcher(country)
217                 _PostcodeCollector(country, fmt).commit(conn, analyzer, project_dir)
218
219             conn.commit()
220
221         analyzer.update_postcodes_from_db()
222
223 def can_compute(dsn):
224     """
225         Check that the place table exists so that
226         postcodes can be computed.
227     """
228     with connect(dsn) as conn:
229         return conn.table_exists('place')