Functions for importing, updating and otherwise maintaining the table
of artificial postcode centroids.
"""
+import csv
+import gzip
+import logging
+from math import isfinite
+
+from psycopg2 import sql as pysql
-from nominatim.db.utils import execute_file
from nominatim.db.connection import connect
-def import_postcodes(dsn, project_dir):
- """ Set up the initial list of postcodes.
+LOG = logging.getLogger()
+
+def _to_float(num, max_value):
+ """ Convert the number in string into a float. The number is expected
+ to be in the range of [-max_value, max_value]. Otherwise rises a
+ ValueError.
"""
+ num = float(num)
+ if not isfinite(num) or num <= -max_value or num >= max_value:
+ raise ValueError()
- with connect(dsn) as conn:
- conn.drop_table('gb_postcode')
- conn.drop_table('us_postcode')
+ return num
+
+class _CountryPostcodesCollector:
+ """ Collector for postcodes of a single country.
+ """
+
+ def __init__(self, country):
+ self.country = country
+ self.collected = dict()
+
+
+ def add(self, postcode, x, y):
+ """ Add the given postcode to the collection cache. If the postcode
+ already existed, it is overwritten with the new centroid.
+ """
+ self.collected[postcode] = (x, y)
+
+
+ def commit(self, conn, analyzer, project_dir):
+ """ Update postcodes for the country from the postcodes selected so far
+ as well as any externally supplied postcodes.
+ """
+ self._update_from_external(analyzer, project_dir)
+ to_add, to_delete, to_update = self._compute_changes(conn)
+
+ LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
+ self.country, len(to_add), len(to_delete), len(to_update))
with conn.cursor() as cur:
- cur.execute("""CREATE TABLE gb_postcode (
- id integer,
- postcode character varying(9),
- geometry GEOMETRY(Point, 4326))""")
+ if to_add:
+ cur.execute_values(
+ """INSERT INTO location_postcode
+ (place_id, indexed_status, country_code,
+ postcode, geometry) VALUES %s""",
+ to_add,
+ template=pysql.SQL("""(nextval('seq_place'), 1, {},
+ %s, 'SRID=4326;POINT(%s %s)')
+ """).format(pysql.Literal(self.country)))
+ if to_delete:
+ cur.execute("""DELETE FROM location_postcode
+ WHERE country_code = %s and postcode = any(%s)
+ """, (self.country, to_delete))
+ if to_update:
+ cur.execute_values(
+ pysql.SQL("""UPDATE location_postcode
+ SET indexed_status = 2,
+ geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
+ FROM (VALUES %s) AS v (pc, x, y)
+ WHERE country_code = {} and postcode = pc
+ """).format(pysql.Literal(self.country)), to_update)
+
+ def _compute_changes(self, conn):
+ """ Compute which postcodes from the collected postcodes have to be
+ added or modified and which from the location_postcode table
+ have to be deleted.
+ """
+ to_update = []
+ to_delete = []
with conn.cursor() as cur:
- cur.execute("""CREATE TABLE us_postcode (
- postcode text,
- x double precision,
- y double precision)""")
- conn.commit()
+ cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
+ FROM location_postcode
+ WHERE country_code = %s""",
+ (self.country, ))
+ for postcode, x, y in cur:
+ newx, newy = self.collected.pop(postcode, (None, None))
+ if newx is not None:
+ dist = (x - newx)**2 + (y - newy)**2
+ if dist > 0.0000001:
+ to_update.append((postcode, newx, newy))
+ else:
+ to_delete.append(postcode)
- gb_postcodes = project_dir / 'gb_postcode_data.sql.gz'
- if gb_postcodes.is_file():
- execute_file(dsn, gb_postcodes)
+ to_add = [(k, v[0], v[1]) for k, v in self.collected.items()]
+ self.collected = []
- us_postcodes = project_dir / 'us_postcode_data.sql.gz'
- if us_postcodes.is_file():
- execute_file(dsn, us_postcodes)
+ return to_add, to_delete, to_update
- with conn.cursor() as cur:
- cur.execute("TRUNCATE location_postcode")
- cur.execute("""
- INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, country_code,
- upper(trim (both ' ' from address->'postcode')) as pc,
- ST_Centroid(ST_Collect(ST_Centroid(geometry)))
- FROM placex
- WHERE address ? 'postcode' AND address->'postcode' NOT SIMILAR TO '%(,|;)%'
- AND geometry IS NOT null
- GROUP BY country_code, pc
- """)
-
- cur.execute("""
- INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, 'us', postcode,
- ST_SetSRID(ST_Point(x,y),4326)
- FROM us_postcode WHERE postcode NOT IN
- (SELECT postcode FROM location_postcode
- WHERE country_code = 'us')
- """)
-
- cur.execute("""
- INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, 'gb', postcode, geometry
- FROM gb_postcode WHERE postcode NOT IN
- (SELECT postcode FROM location_postcode
- WHERE country_code = 'gb')
- """)
-
- cur.execute("""
- DELETE FROM word WHERE class='place' and type='postcode'
- and word NOT IN (SELECT postcode FROM location_postcode)
- """)
-
- cur.execute("""
- SELECT count(getorcreate_postcode_id(v)) FROM
- (SELECT distinct(postcode) as v FROM location_postcode) p
- """)
- conn.commit()
+
+ def _update_from_external(self, analyzer, project_dir):
+ """ Look for an external postcode file for the active country in
+ the project directory and add missing postcodes when found.
+ """
+ csvfile = self._open_external(project_dir)
+ if csvfile is None:
+ return
+
+ try:
+ reader = csv.DictReader(csvfile)
+ for row in reader:
+ if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
+ LOG.warning("Bad format for external postcode file for country '%s'."
+ " Ignored.", self.country)
+ return
+ postcode = analyzer.normalize_postcode(row['postcode'])
+ if postcode not in self.collected:
+ try:
+ self.collected[postcode] = (_to_float(row['lon'], 180),
+ _to_float(row['lat'], 90))
+ except ValueError:
+ LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
+ row['lat'], row['lon'], self.country)
+
+ finally:
+ csvfile.close()
+
+
+ def _open_external(self, project_dir):
+ fname = project_dir / '{}_postcodes.csv'.format(self.country)
+
+ if fname.is_file():
+ LOG.info("Using external postcode file '%s'.", fname)
+ return open(fname, 'r')
+
+ fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
+
+ if fname.is_file():
+ LOG.info("Using external postcode file '%s'.", fname)
+ return gzip.open(fname, 'rt')
+
+ return None
+
+
+def update_postcodes(dsn, project_dir, tokenizer):
+ """ Update the table of artificial postcodes.
+
+ Computes artificial postcode centroids from the placex table,
+ potentially enhances it with external data and then updates the
+ postcodes in the table 'location_postcode'.
+ """
+ with tokenizer.name_analyzer() as analyzer:
+ with connect(dsn) as conn:
+ # First get the list of countries that currently have postcodes.
+ # (Doing this before starting to insert, so it is fast on import.)
+ with conn.cursor() as cur:
+ cur.execute("SELECT DISTINCT country_code FROM location_postcode")
+ todo_countries = set((row[0] for row in cur))
+
+ # Recompute the list of valid postcodes from placex.
+ with conn.cursor(name="placex_postcodes") as cur:
+ cur.execute("""
+ SELECT cc as country_code, pc, ST_X(centroid), ST_Y(centroid)
+ FROM (SELECT
+ COALESCE(plx.country_code,
+ get_country_code(ST_Centroid(pl.geometry))) as cc,
+ token_normalized_postcode(pl.address->'postcode') as pc,
+ ST_Centroid(ST_Collect(COALESCE(plx.centroid,
+ ST_Centroid(pl.geometry)))) as centroid
+ FROM place AS pl LEFT OUTER JOIN placex AS plx
+ ON pl.osm_id = plx.osm_id AND pl.osm_type = plx.osm_type
+ WHERE pl.address ? 'postcode' AND pl.geometry IS NOT null
+ GROUP BY cc, pc) xx
+ WHERE pc IS NOT null AND cc IS NOT null
+ ORDER BY country_code, pc""")
+
+ collector = None
+
+ for country, postcode, x, y in cur:
+ if collector is None or country != collector.country:
+ if collector is not None:
+ collector.commit(conn, analyzer, project_dir)
+ collector = _CountryPostcodesCollector(country)
+ todo_countries.discard(country)
+ collector.add(postcode, x, y)
+
+ if collector is not None:
+ collector.commit(conn, analyzer, project_dir)
+
+ # Now handle any countries that are only in the postcode table.
+ for country in todo_countries:
+ _CountryPostcodesCollector(country).commit(conn, analyzer, project_dir)
+
+ conn.commit()
+
+ analyzer.update_postcodes_from_db()
+
+def can_compute(dsn):
+ """
+ Check that the place table exists so that
+ postcodes can be computed.
+ """
+ with connect(dsn) as conn:
+ return conn.table_exists('place')