import csv
import gzip
import logging
+from math import isfinite
from psycopg2.extras import execute_values
LOG = logging.getLogger()
+def _to_float(num, max_value):
+ """ Convert the number in string into a float. The number is expected
+ to be in the range of [-max_value, max_value]. Otherwise rises a
+ ValueError.
+ """
+ num = float(num)
+ if not isfinite(num) or num <= -max_value or num >= max_value:
+ raise ValueError()
+
+ return num
+
class _CountryPostcodesCollector:
""" Collector for postcodes of a single country.
"""
self._update_from_external(analyzer, project_dir)
to_add, to_delete, to_update = self._compute_changes(conn)
+ LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
+ self.country, len(to_add), len(to_delete), len(to_update))
+
with conn.cursor() as cur:
if to_add:
execute_values(cur,
- """INSERT INTO location_postcodes
- (place_id, indexed_status, countrycode,
+ """INSERT INTO location_postcode
+ (place_id, indexed_status, country_code,
postcode, geometry) VALUES %s""",
to_add,
template="""(nextval('seq_place'), 1, '{}',
%s, 'SRID=4326;POINT(%s %s)')
""".format(self.country))
if to_delete:
- cur.execute("""DELETE FROM location_postcodes
+ cur.execute("""DELETE FROM location_postcode
WHERE country_code = %s and postcode = any(%s)
""", (self.country, to_delete))
if to_update:
execute_values(cur,
- """UPDATE location_postcodes
+ """UPDATE location_postcode
SET indexed_status = 2,
geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
FROM (VALUES %s) AS v (pc, x, y)
def _compute_changes(self, conn):
""" Compute which postcodes from the collected postcodes have to be
- added or modified and which from the location_postcodes table
+ added or modified and which from the location_postcode table
have to be deleted.
"""
to_update = []
to_delete = []
with conn.cursor() as cur:
cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
- FROM location_postcodes
+ FROM location_postcode
WHERE country_code = %s""",
(self.country, ))
for postcode, x, y in cur:
- oldx, oldy = self.collected.pop(postcode, (None, None))
- if oldx is not None:
- dist = (x - oldx)**2 + (y - oldy)**2
- if dist > 0.000001:
- to_update.append(postcode, x, y)
+ newx, newy = self.collected.pop(postcode, (None, None))
+ if newx is not None:
+ dist = (x - newx)**2 + (y - newy)**2
+ if dist > 0.0000001:
+ to_update.append((postcode, newx, newy))
else:
to_delete.append(postcode)
postcode = analyzer.normalize_postcode(row['postcode'])
if postcode not in self.collected:
try:
- self.collected[postcode] = (float(row['lon'], float(row['lat'])))
+ self.collected[postcode] = (_to_float(row['lon'], 180),
+ _to_float(row['lat'], 90))
except ValueError:
LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
row['lat'], row['lon'], self.country)
"""
with tokenizer.name_analyzer() as analyzer:
with connect(dsn) as conn:
- with conn.cursor("placex_postcodes") as cur:
+ # First get the list of countries that currently have postcodes.
+ # (Doing this before starting to insert, so it is fast on import.)
+ with conn.cursor() as cur:
+ cur.execute("SELECT DISTINCT country_code FROM location_postcode")
+ todo_countries = set((row[0] for row in cur))
+
+ # Recompute the list of valid postcodes from placex.
+ with conn.cursor(name="placex_postcodes") as cur:
cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid)
FROM (
SELECT country_code,
ST_Centroid(ST_Collect(ST_Centroid(geometry))) as centroid
FROM placex
WHERE address ? 'postcode' and geometry IS NOT null
+ and country_code is not null
GROUP BY country_code, pc) xx
WHERE pc is not null
ORDER BY country_code, pc""")
if collector is not None:
collector.commit(conn, analyzer, project_dir)
collector = _CountryPostcodesCollector(country)
+ todo_countries.discard(country)
collector.add(postcode, x, y)
if collector is not None:
collector.commit(conn, analyzer, project_dir)
+ # Now handle any countries that are only in the postcode table.
+ for country in todo_countries:
+ _CountryPostcodesCollector(country).commit(conn, analyzer, project_dir)
+
conn.commit()
- analyzer.add_postcodes_from_db()
+ analyzer.update_postcodes_from_db()