# 'with' statements.
ignored-classes=NominatimArgs,closing
disable=too-few-public-methods,duplicate-code
+
+good-names=i,x,y
--- /dev/null
+# Customization of the Database
+
+This section explains in detail how to configure a Nominatim import and
+the various means to use external data.
+
+## External postcode data
+
+Nominatim creates a table of known postcode centroids during import. This table
+is used for searches of postcodes and for adding postcodes to places where the
+OSM data does not provide one. These postcode centroids are mainly computed
+from the OSM data itself. In addition, Nominatim supports reading postcode
+information from an external CSV file, to supplement the postcodes that are
+missing in OSM.
+
+To enable external postcode support, simply put one CSV file per country into
+your project directory and name it `<CC>_postcodes.csv`. `<CC>` must be the
+two-letter country code for which to apply the file. The file may also be
+gzipped. Then it must be called `<CC>_postcodes.csv.gz`.
+
+The CSV file must use commas as a delimiter and have a header line. Nominatim
+expects three columns to be present: `postcode`, `lat` and `lon`. All other
+columns are ignored. `lon` and `lat` must describe the x and y coordinates of the
+postcode centroids in WGS84.
+
+The postcode files are loaded only when there is data for the given country
+in your database. For example, if there is a `us_postcodes.csv` file in your
+project directory but you import only an excerpt of Italy, then the US postcodes
+will simply be ignored.
+
+As a rule, the external postcode data should be put into the project directory
+**before** starting the initial import. Still, you can add, remove and update the
+external postcode data at any time. Simply
+run:
+
+```
+nominatim refresh --postcodes
+```
+
+to make the changes visible in your database. Be aware, however, that the changes
+only have an immediate effect on searches for postcodes. Postcodes that were
+added to places are only updated, when they are reindexed. That usually happens
+only during replication updates.
+
+## Installing Tiger housenumber data for the US
+
+Nominatim is able to use the official [TIGER](https://www.census.gov/geographies/mapping-files/time-series/geo/tiger-line-file.html)
+address set to complement the OSM house number data in the US. You can add
+TIGER data to your own Nominatim instance by following these steps. The
+entire US adds about 10GB to your database.
+
+ 1. Get preprocessed TIGER 2020 data:
+
+ cd $PROJECT_DIR
+ wget https://nominatim.org/data/tiger2020-nominatim-preprocessed.tar.gz
+
+ 2. Import the data into your Nominatim database:
+
+ nominatim add-data --tiger-data tiger2020-nominatim-preprocessed.tar.gz
+
+ 3. Enable use of the Tiger data in your `.env` by adding:
+
+ echo NOMINATIM_USE_US_TIGER_DATA=yes >> .env
+
+ 4. Apply the new settings:
+
+ nominatim refresh --functions
+
+
+See the [developer's guide](../develop/data-sources.md#us-census-tiger) for more
+information on how the data got preprocessed.
+
`nominatim refresh --wiki-data --importance`. Updating importances for
a planet can take a couple of hours.
-### Great Britain, USA postcodes
+### External postcodes
-Nominatim can use postcodes from an external source to improve searches that
-involve a GB or US postcode. This data can be optionally downloaded into the
-project directory:
+Nominatim can use postcodes from an external source to improve searching with
+postcodes. We provide precomputed postcodes sets for the US (using TIGER data)
+and the UK (using the [CodePoint OpenData set](https://osdatahub.os.uk/downloads/open/CodePointOpen).
+This data can be optionally downloaded into the project directory:
cd $PROJECT_DIR
- wget https://www.nominatim.org/data/gb_postcode_data.sql.gz
- wget https://www.nominatim.org/data/us_postcode_data.sql.gz
+ wget https://www.nominatim.org/data/gb_postcodes.csv.gz
+ wget https://www.nominatim.org/data/us_postcodes.csv.gz
+
+You can also add your own custom postcode sources, see
+[Customization of postcodes](Customization.md#external-postcode-data).
## Choosing the data to import
Note that this command downloads the phrases from the wiki link above. You
need internet access for the step.
-
-
-## Installing Tiger housenumber data for the US
-
-Nominatim is able to use the official [TIGER](https://www.census.gov/geographies/mapping-files/time-series/geo/tiger-line-file.html)
-address set to complement the OSM house number data in the US. You can add
-TIGER data to your own Nominatim instance by following these steps. The
-entire US adds about 10GB to your database.
-
- 1. Get preprocessed TIGER 2020 data:
-
- cd $PROJECT_DIR
- wget https://nominatim.org/data/tiger2020-nominatim-preprocessed.tar.gz
-
- 2. Import the data into your Nominatim database:
-
- nominatim add-data --tiger-data tiger2020-nominatim-preprocessed.tar.gz
-
- 3. Enable use of the Tiger data in your `.env` by adding:
-
- echo NOMINATIM_USE_US_TIGER_DATA=yes >> .env
-
- 4. Apply the new settings:
-
- nominatim refresh --functions
-
-
-See the [developer's guide](../develop/data-sources.md#us-census-tiger) for more
-information on how the data got preprocessed.
-
- 'Import' : 'admin/Import.md'
- 'Update' : 'admin/Update.md'
- 'Deploy' : 'admin/Deployment.md'
+ - 'Customize Imports' : 'admin/Customization.md'
- 'Nominatim UI' : 'admin/Setup-Nominatim-UI.md'
- 'Advanced Installations' : 'admin/Advanced-Installations.md'
- 'Migration from older Versions' : 'admin/Migration.md'
+++ /dev/null
--- Create a temporary table with postcodes from placex.
-
-CREATE TEMP TABLE tmp_new_postcode_locations AS
-SELECT country_code,
- upper(trim (both ' ' from address->'postcode')) as pc,
- ST_Centroid(ST_Collect(ST_Centroid(geometry))) as centroid
- FROM placex
- WHERE address ? 'postcode'
- AND address->'postcode' NOT SIMILAR TO '%(,|;|:)%'
- AND geometry IS NOT null
-GROUP BY country_code, pc;
-
-CREATE INDEX idx_tmp_new_postcode_locations
- ON tmp_new_postcode_locations (pc, country_code);
-
--- add extra US postcodes
-INSERT INTO tmp_new_postcode_locations (country_code, pc, centroid)
- SELECT 'us', postcode, ST_SetSRID(ST_Point(x,y),4326)
- FROM us_postcode u
- WHERE NOT EXISTS (SELECT 0 FROM tmp_new_postcode_locations new
- WHERE new.country_code = 'us' AND new.pc = u.postcode);
--- add extra UK postcodes
-INSERT INTO tmp_new_postcode_locations (country_code, pc, centroid)
- SELECT 'gb', postcode, geometry FROM gb_postcode g
- WHERE NOT EXISTS (SELECT 0 FROM tmp_new_postcode_locations new
- WHERE new.country_code = 'gb' and new.pc = g.postcode);
-
--- Remove all postcodes that are no longer valid
-DELETE FROM location_postcode old
- WHERE NOT EXISTS(SELECT 0 FROM tmp_new_postcode_locations new
- WHERE old.postcode = new.pc
- AND old.country_code = new.country_code);
-
--- Update geometries where necessary
-UPDATE location_postcode old SET geometry = new.centroid, indexed_status = 1
- FROM tmp_new_postcode_locations new
- WHERE old.postcode = new.pc AND old.country_code = new.country_code
- AND ST_AsText(old.geometry) != ST_AsText(new.centroid);
-
--- Remove all postcodes that already exist from the temporary table
-DELETE FROM tmp_new_postcode_locations new
- WHERE EXISTS(SELECT 0 FROM location_postcode old
- WHERE old.postcode = new.pc AND old.country_code = new.country_code);
-
--- Add newly added postcode
-INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, country_code, pc, centroid
- FROM tmp_new_postcode_locations new;
-
--- Remove unused word entries
-DELETE FROM word
- WHERE class = 'place' AND type = 'postcode'
- AND NOT EXISTS (SELECT 0 FROM location_postcode p
- WHERE p.postcode = word.word);
-
--- Finally index the newly inserted postcodes
-UPDATE location_postcode SET indexed_status = 0 WHERE indexed_status > 0;
@staticmethod
def run(args):
- from ..tools import refresh
+ from ..tools import refresh, postcodes
from ..tokenizer import factory as tokenizer_factory
+ from ..indexer.indexer import Indexer
+
+ tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
if args.postcodes:
LOG.warning("Update postcodes centroid")
- refresh.update_postcodes(args.config.get_libpq_dsn(), args.sqllib_dir)
+ postcodes.update_postcodes(args.config.get_libpq_dsn(),
+ args.project_dir, tokenizer)
+ indexer = Indexer(args.config.get_libpq_dsn(), tokenizer,
+ args.threads or 1)
+ indexer.index_postcodes()
if args.word_counts:
LOG.warning('Recompute frequency of full-word search terms')
with connect(args.config.get_libpq_dsn()) as conn:
refresh.create_functions(conn, args.config,
args.diffs, args.enable_debug_statements)
- tokenizer = tokenizer_factory.get_tokenizer_for_db(args.config)
tokenizer.update_sql_functions(args.config)
if args.wiki_data:
if args.continue_at is None or args.continue_at == 'load-data':
LOG.warning('Calculate postcodes')
- postcodes.import_postcodes(args.config.get_libpq_dsn(), args.project_dir,
- tokenizer)
+ postcodes.update_postcodes(args.config.get_libpq_dsn(),
+ args.project_dir, tokenizer)
if args.continue_at is None or args.continue_at in ('load-data', 'indexing'):
if args.continue_at is not None and args.continue_at != 'load-data':
"""
return self.normalizer.transliterate(phrase)
+ @staticmethod
+ def normalize_postcode(postcode):
+ """ Convert the postcode to a standardized form.
+
+ This function must yield exactly the same result as the SQL function
+ 'token_normalized_postcode()'.
+ """
+ return postcode.strip().upper()
+
+
@functools.lru_cache(maxsize=1024)
def make_standard_word(self, name):
""" Create the normalised version of the input.
return self.transliterator.transliterate(hnr)
- def add_postcodes_from_db(self):
- """ Add postcodes from the location_postcode table to the word table.
+ def update_postcodes_from_db(self):
+ """ Update postcode tokens in the word table from the location_postcode
+ table.
"""
+ to_delete = []
copystr = io.StringIO()
with self.conn.cursor() as cur:
- cur.execute("SELECT distinct(postcode) FROM location_postcode")
- for (postcode, ) in cur:
- copystr.write(postcode)
- copystr.write('\t ')
- copystr.write(self.transliterator.transliterate(postcode))
- copystr.write('\tplace\tpostcode\t0\n')
-
- copystr.seek(0)
- cur.copy_from(copystr, 'word',
- columns=['word', 'word_token', 'class', 'type',
- 'search_name_count'])
- # Don't really need an ID for postcodes....
- # cur.execute("""UPDATE word SET word_id = nextval('seq_word')
- # WHERE word_id is null and type = 'postcode'""")
+ # This finds us the rows in location_postcode and word that are
+ # missing in the other table.
+ cur.execute("""SELECT * FROM
+ (SELECT pc, word FROM
+ (SELECT distinct(postcode) as pc FROM location_postcode) p
+ FULL JOIN
+ (SELECT word FROM word
+ WHERE class ='place' and type = 'postcode') w
+ ON pc = word) x
+ WHERE pc is null or word is null""")
+
+ for postcode, word in cur:
+ if postcode is None:
+ to_delete.append(word)
+ else:
+ copystr.write(postcode)
+ copystr.write('\t ')
+ copystr.write(self.transliterator.transliterate(postcode))
+ copystr.write('\tplace\tpostcode\t0\n')
+
+ if to_delete:
+ cur.execute("""DELETE FROM WORD
+ WHERE class ='place' and type = 'postcode'
+ and word = any(%s)
+ """, (to_delete, ))
+
+ if copystr.getvalue():
+ copystr.seek(0)
+ cur.copy_from(copystr, 'word',
+ columns=['word', 'word_token', 'class', 'type',
+ 'search_name_count'])
def update_special_phrases(self, phrases):
def _add_postcode(self, postcode):
""" Make sure the normalized postcode is present in the word table.
"""
- if re.search(r'[:,;]', postcode) is None and not postcode in self._cache.postcodes:
- term = self.make_standard_word(postcode)
- if not term:
- return
-
- with self.conn.cursor() as cur:
- # no word_id needed for postcodes
- cur.execute("""INSERT INTO word (word, word_token, class, type,
- search_name_count)
- (SELECT pc, %s, 'place', 'postcode', 0
- FROM (VALUES (%s)) as v(pc)
- WHERE NOT EXISTS
- (SELECT * FROM word
- WHERE word = pc and class='place' and type='postcode'))
- """, (' ' + term, postcode))
- self._cache.postcodes.add(postcode)
+ if re.search(r'[:,;]', postcode) is None:
+ postcode = self.normalize_postcode(postcode)
+
+ if postcode not in self._cache.postcodes:
+ term = self.make_standard_word(postcode)
+ if not term:
+ return
+
+ with self.conn.cursor() as cur:
+ # no word_id needed for postcodes
+ cur.execute("""INSERT INTO word (word, word_token, class, type,
+ search_name_count)
+ (SELECT pc, %s, 'place', 'postcode', 0
+ FROM (VALUES (%s)) as v(pc)
+ WHERE NOT EXISTS
+ (SELECT * FROM word
+ WHERE word = pc and class='place' and type='postcode'))
+ """, (' ' + term, postcode))
+ self._cache.postcodes.add(postcode)
@staticmethod
def _split_housenumbers(hnrs):
return self.normalizer.transliterate(phrase)
- def add_postcodes_from_db(self):
- """ Add postcodes from the location_postcode table to the word table.
+ @staticmethod
+ def normalize_postcode(postcode):
+ """ Convert the postcode to a standardized form.
+
+ This function must yield exactly the same result as the SQL function
+ 'token_normalized_postcode()'.
+ """
+ return postcode.strip().upper()
+
+
+ def update_postcodes_from_db(self):
+ """ Update postcode tokens in the word table from the location_postcode
+ table.
"""
with self.conn.cursor() as cur:
- cur.execute("""SELECT count(create_postcode_id(pc))
- FROM (SELECT distinct(postcode) as pc
- FROM location_postcode) x""")
+ # This finds us the rows in location_postcode and word that are
+ # missing in the other table.
+ cur.execute("""SELECT * FROM
+ (SELECT pc, word FROM
+ (SELECT distinct(postcode) as pc FROM location_postcode) p
+ FULL JOIN
+ (SELECT word FROM word
+ WHERE class ='place' and type = 'postcode') w
+ ON pc = word) x
+ WHERE pc is null or word is null""")
+
+ to_delete = []
+ to_add = []
+
+ for postcode, word in cur:
+ if postcode is None:
+ to_delete.append(word)
+ else:
+ to_add.append(postcode)
+
+ if to_delete:
+ cur.execute("""DELETE FROM WORD
+ WHERE class ='place' and type = 'postcode'
+ and word = any(%s)
+ """, (to_delete, ))
+ if to_add:
+ cur.execute("""SELECT count(create_postcode_id(pc))
+ FROM unnest(%s) as pc
+ """, (to_add, ))
+
def update_special_phrases(self, phrases):
cur.execute('SELECT create_postcode_id(%s)', (pcode, ))
if re.search(r'[:,;]', postcode) is None:
- self._cache.postcodes.get(postcode.strip().upper(), _create_postcode_from_db)
+ self._cache.postcodes.get(self.normalize_postcode(postcode),
+ _create_postcode_from_db)
class _TokenInfo:
Functions for importing, updating and otherwise maintaining the table
of artificial postcode centroids.
"""
+import csv
+import gzip
+import logging
+from math import isfinite
+
+from psycopg2.extras import execute_values
-from nominatim.db.utils import execute_file
from nominatim.db.connection import connect
-def import_postcodes(dsn, project_dir, tokenizer):
- """ Set up the initial list of postcodes.
+LOG = logging.getLogger()
+
+def _to_float(num, max_value):
+ """ Convert the number in string into a float. The number is expected
+ to be in the range of [-max_value, max_value]. Otherwise rises a
+ ValueError.
+ """
+ num = float(num)
+ if not isfinite(num) or num <= -max_value or num >= max_value:
+ raise ValueError()
+
+ return num
+
+class _CountryPostcodesCollector:
+ """ Collector for postcodes of a single country.
"""
- with connect(dsn) as conn:
- conn.drop_table('gb_postcode')
- conn.drop_table('us_postcode')
+ def __init__(self, country):
+ self.country = country
+ self.collected = dict()
+
+
+ def add(self, postcode, x, y):
+ """ Add the given postcode to the collection cache. If the postcode
+ already existed, it is overwritten with the new centroid.
+ """
+ self.collected[postcode] = (x, y)
+
+
+ def commit(self, conn, analyzer, project_dir):
+ """ Update postcodes for the country from the postcodes selected so far
+ as well as any externally supplied postcodes.
+ """
+ self._update_from_external(analyzer, project_dir)
+ to_add, to_delete, to_update = self._compute_changes(conn)
+
+ LOG.info("Processing country '%s' (%s added, %s deleted, %s updated).",
+ self.country, len(to_add), len(to_delete), len(to_update))
with conn.cursor() as cur:
- cur.execute("""CREATE TABLE gb_postcode (
- id integer,
- postcode character varying(9),
- geometry GEOMETRY(Point, 4326))""")
+ if to_add:
+ execute_values(cur,
+ """INSERT INTO location_postcode
+ (place_id, indexed_status, country_code,
+ postcode, geometry) VALUES %s""",
+ to_add,
+ template="""(nextval('seq_place'), 1, '{}',
+ %s, 'SRID=4326;POINT(%s %s)')
+ """.format(self.country))
+ if to_delete:
+ cur.execute("""DELETE FROM location_postcode
+ WHERE country_code = %s and postcode = any(%s)
+ """, (self.country, to_delete))
+ if to_update:
+ execute_values(cur,
+ """UPDATE location_postcode
+ SET indexed_status = 2,
+ geometry = ST_SetSRID(ST_Point(v.x, v.y), 4326)
+ FROM (VALUES %s) AS v (pc, x, y)
+ WHERE country_code = '{}' and postcode = pc
+ """.format(self.country),
+ to_update)
+
+ def _compute_changes(self, conn):
+ """ Compute which postcodes from the collected postcodes have to be
+ added or modified and which from the location_postcode table
+ have to be deleted.
+ """
+ to_update = []
+ to_delete = []
with conn.cursor() as cur:
- cur.execute("""CREATE TABLE us_postcode (
- postcode text,
- x double precision,
- y double precision)""")
- conn.commit()
+ cur.execute("""SELECT postcode, ST_X(geometry), ST_Y(geometry)
+ FROM location_postcode
+ WHERE country_code = %s""",
+ (self.country, ))
+ for postcode, x, y in cur:
+ newx, newy = self.collected.pop(postcode, (None, None))
+ if newx is not None:
+ dist = (x - newx)**2 + (y - newy)**2
+ if dist > 0.0000001:
+ to_update.append((postcode, newx, newy))
+ else:
+ to_delete.append(postcode)
- gb_postcodes = project_dir / 'gb_postcode_data.sql.gz'
- if gb_postcodes.is_file():
- execute_file(dsn, gb_postcodes)
+ to_add = [(k, v[0], v[1]) for k, v in self.collected.items()]
+ self.collected = []
- us_postcodes = project_dir / 'us_postcode_data.sql.gz'
- if us_postcodes.is_file():
- execute_file(dsn, us_postcodes)
+ return to_add, to_delete, to_update
- with conn.cursor() as cur:
- cur.execute("TRUNCATE location_postcode")
- cur.execute("""
- INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, country_code,
- token_normalized_postcode(address->'postcode') as pc,
- ST_Centroid(ST_Collect(ST_Centroid(geometry)))
- FROM placex
- WHERE address ? 'postcode'
- and token_normalized_postcode(address->'postcode') is not null
- AND geometry IS NOT null
- GROUP BY country_code, pc
- """)
-
- cur.execute("""
- INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, 'us',
- token_normalized_postcode(postcode),
- ST_SetSRID(ST_Point(x,y),4326)
- FROM us_postcode WHERE token_normalized_postcode(postcode) NOT IN
- (SELECT postcode FROM location_postcode
- WHERE country_code = 'us')
- """)
-
- cur.execute("""
- INSERT INTO location_postcode
- (place_id, indexed_status, country_code, postcode, geometry)
- SELECT nextval('seq_place'), 1, 'gb',
- token_normalized_postcode(postcode), geometry
- FROM gb_postcode WHERE token_normalized_postcode(postcode) NOT IN
- (SELECT postcode FROM location_postcode
- WHERE country_code = 'gb')
- """)
-
- cur.execute("""
- DELETE FROM word WHERE class='place' and type='postcode'
- and word NOT IN (SELECT postcode FROM location_postcode)
- """)
- conn.commit()
-
- with tokenizer.name_analyzer() as analyzer:
- analyzer.add_postcodes_from_db()
+
+ def _update_from_external(self, analyzer, project_dir):
+ """ Look for an external postcode file for the active country in
+ the project directory and add missing postcodes when found.
+ """
+ csvfile = self._open_external(project_dir)
+ if csvfile is None:
+ return
+
+ try:
+ reader = csv.DictReader(csvfile)
+ for row in reader:
+ if 'postcode' not in row or 'lat' not in row or 'lon' not in row:
+ LOG.warning("Bad format for external postcode file for country '%s'."
+ " Ignored.", self.country)
+ return
+ postcode = analyzer.normalize_postcode(row['postcode'])
+ if postcode not in self.collected:
+ try:
+ self.collected[postcode] = (_to_float(row['lon'], 180),
+ _to_float(row['lat'], 90))
+ except ValueError:
+ LOG.warning("Bad coordinates %s, %s in %s country postcode file.",
+ row['lat'], row['lon'], self.country)
+
+ finally:
+ csvfile.close()
+
+
+ def _open_external(self, project_dir):
+ fname = project_dir / '{}_postcodes.csv'.format(self.country)
+
+ if fname.is_file():
+ LOG.info("Using external postcode file '%s'.", fname)
+ return open(fname, 'r')
+
+ fname = project_dir / '{}_postcodes.csv.gz'.format(self.country)
+
+ if fname.is_file():
+ LOG.info("Using external postcode file '%s'.", fname)
+ return gzip.open(fname, 'rt')
+
+ return None
+
+
+def update_postcodes(dsn, project_dir, tokenizer):
+ """ Update the table of artificial postcodes.
+
+ Computes artificial postcode centroids from the placex table,
+ potentially enhances it with external data and then updates the
+ postcodes in the table 'location_postcode'.
+ """
+ with tokenizer.name_analyzer() as analyzer:
+ with connect(dsn) as conn:
+ # First get the list of countries that currently have postcodes.
+ # (Doing this before starting to insert, so it is fast on import.)
+ with conn.cursor() as cur:
+ cur.execute("SELECT DISTINCT country_code FROM location_postcode")
+ todo_countries = set((row[0] for row in cur))
+
+ # Recompute the list of valid postcodes from placex.
+ with conn.cursor(name="placex_postcodes") as cur:
+ cur.execute("""SELECT country_code, pc, ST_X(centroid), ST_Y(centroid)
+ FROM (
+ SELECT country_code,
+ token_normalized_postcode(address->'postcode') as pc,
+ ST_Centroid(ST_Collect(ST_Centroid(geometry))) as centroid
+ FROM placex
+ WHERE address ? 'postcode' and geometry IS NOT null
+ and country_code is not null
+ GROUP BY country_code, pc) xx
+ WHERE pc is not null
+ ORDER BY country_code, pc""")
+
+ collector = None
+
+ for country, postcode, x, y in cur:
+ if collector is None or country != collector.country:
+ if collector is not None:
+ collector.commit(conn, analyzer, project_dir)
+ collector = _CountryPostcodesCollector(country)
+ todo_countries.discard(country)
+ collector.add(postcode, x, y)
+
+ if collector is not None:
+ collector.commit(conn, analyzer, project_dir)
+
+ # Now handle any countries that are only in the postcode table.
+ for country in todo_countries:
+ _CountryPostcodesCollector(country).commit(conn, analyzer, project_dir)
+
+ conn.commit()
+
+ analyzer.update_postcodes_from_db()
LOG = logging.getLogger()
-def update_postcodes(dsn, sql_dir):
- """ Recalculate postcode centroids and add, remove and update entries in the
- location_postcode table. `conn` is an opne connection to the database.
- """
- execute_file(dsn, sql_dir / 'update-postcodes.sql')
-
def recompute_word_counts(dsn, sql_dir):
""" Compute the frequency of full-word search terms.
with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
assert cur.rowcount == len(list(context.table)), \
- "Postcode table has {} rows, expected {}.".foramt(cur.rowcount, len(list(context.table)))
+ "Postcode table has {} rows, expected {}.".format(cur.rowcount, len(list(context.table)))
results = {}
for row in cur:
from nominatim.db import properties
import dummy_tokenizer
+import mocks
class _TestingCursor(psycopg2.extras.DictCursor):
""" Extension to the DictCursor class that provides execution
def placex_table(temp_db_with_extensions, temp_db_conn):
""" Create an empty version of the place table.
"""
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE placex (
- place_id BIGINT,
- parent_place_id BIGINT,
- linked_place_id BIGINT,
- importance FLOAT,
- indexed_date TIMESTAMP,
- geometry_sector INTEGER,
- rank_address SMALLINT,
- rank_search SMALLINT,
- partition SMALLINT,
- indexed_status SMALLINT,
- osm_id int8,
- osm_type char(1),
- class text,
- type text,
- name hstore,
- admin_level smallint,
- address hstore,
- extratags hstore,
- geometry Geometry(Geometry,4326),
- wikipedia TEXT,
- country_code varchar(2),
- housenumber TEXT,
- postcode TEXT,
- centroid GEOMETRY(Geometry, 4326))""")
- temp_db_conn.commit()
+ return mocks.MockPlacexTable(temp_db_conn)
@pytest.fixture
@pytest.fixture
-def word_table(temp_db, temp_db_conn):
- with temp_db_conn.cursor() as cur:
- cur.execute("""CREATE TABLE word (
- word_id INTEGER,
- word_token text,
- word text,
- class text,
- type text,
- country_code varchar(2),
- search_name_count INTEGER,
- operator TEXT)""")
- temp_db_conn.commit()
+def word_table(temp_db_conn):
+ return mocks.MockWordTable(temp_db_conn)
@pytest.fixture
def close(self):
pass
- def add_postcodes_from_db(self):
+ def normalize_postcode(self, postcode):
+ return postcode
+
+ def update_postcodes_from_db(self):
pass
def update_special_phrases(self, phrases):
"""
Custom mocks for testing.
"""
+import itertools
+import psycopg2.extras
class MockParamCapture:
""" Mock that records the parameters with which a function was called
self.last_args = args
self.last_kwargs = kwargs
return self.return_value
+
+
+class MockWordTable:
+ """ A word table for testing.
+ """
+ def __init__(self, conn):
+ self.conn = conn
+ with conn.cursor() as cur:
+ cur.execute("""CREATE TABLE word (word_id INTEGER,
+ word_token text,
+ word text,
+ class text,
+ type text,
+ country_code varchar(2),
+ search_name_count INTEGER,
+ operator TEXT)""")
+
+ conn.commit()
+
+ def add_special(self, word_token, word, cls, typ, op):
+ with self.conn.cursor() as cur:
+ cur.execute("""INSERT INTO word (word_token, word, class, type, operator)
+ VALUES (%s, %s, %s, %s, %s)
+ """, (word_token, word, cls, typ, op))
+ self.conn.commit()
+
+
+ def add_postcode(self, word_token, postcode):
+ with self.conn.cursor() as cur:
+ cur.execute("""INSERT INTO word (word_token, word, class, type)
+ VALUES (%s, %s, 'place', 'postcode')
+ """, (word_token, postcode))
+ self.conn.commit()
+
+
+ def count(self):
+ with self.conn.cursor() as cur:
+ return cur.scalar("SELECT count(*) FROM word")
+
+
+ def count_special(self):
+ with self.conn.cursor() as cur:
+ return cur.scalar("SELECT count(*) FROM word WHERE class != 'place'")
+
+
+ def get_special(self):
+ with self.conn.cursor() as cur:
+ cur.execute("""SELECT word_token, word, class, type, operator
+ FROM word WHERE class != 'place'""")
+ return set((tuple(row) for row in cur))
+
+
+ def get_postcodes(self):
+ with self.conn.cursor() as cur:
+ cur.execute("""SELECT word FROM word
+ WHERE class = 'place' and type = 'postcode'""")
+ return set((row[0] for row in cur))
+
+
+class MockPlacexTable:
+ """ A placex table for testing.
+ """
+ def __init__(self, conn):
+ self.idseq = itertools.count(10000)
+ self.conn = conn
+ with conn.cursor() as cur:
+ cur.execute("""CREATE TABLE placex (
+ place_id BIGINT,
+ parent_place_id BIGINT,
+ linked_place_id BIGINT,
+ importance FLOAT,
+ indexed_date TIMESTAMP,
+ geometry_sector INTEGER,
+ rank_address SMALLINT,
+ rank_search SMALLINT,
+ partition SMALLINT,
+ indexed_status SMALLINT,
+ osm_id int8,
+ osm_type char(1),
+ class text,
+ type text,
+ name hstore,
+ admin_level smallint,
+ address hstore,
+ extratags hstore,
+ geometry Geometry(Geometry,4326),
+ wikipedia TEXT,
+ country_code varchar(2),
+ housenumber TEXT,
+ postcode TEXT,
+ centroid GEOMETRY(Geometry, 4326))""")
+ cur.execute("CREATE SEQUENCE IF NOT EXISTS seq_place")
+ conn.commit()
+
+ def add(self, osm_type='N', osm_id=None, cls='amenity', typ='cafe', names=None,
+ admin_level=None, address=None, extratags=None, geom='POINT(10 4)',
+ country=None):
+ with self.conn.cursor() as cur:
+ psycopg2.extras.register_hstore(cur)
+ cur.execute("""INSERT INTO placex (place_id, osm_type, osm_id, class,
+ type, name, admin_level, address,
+ extratags, geometry, country_code)
+ VALUES(nextval('seq_place'), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
+ (osm_type, osm_id or next(self.idseq), cls, typ, names,
+ admin_level, address, extratags, 'SRID=4326;' + geom,
+ country))
+ self.conn.commit()
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.database_import, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'load_address_levels_from_file'),
- mock_func_factory(nominatim.tools.postcodes, 'import_postcodes'),
+ mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
mock_func_factory(nominatim.tools.database_import, 'load_data'),
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.database_import, 'create_country_names'),
- mock_func_factory(nominatim.tools.postcodes, 'import_postcodes'),
+ mock_func_factory(nominatim.tools.postcodes, 'update_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
assert func.called == 1
@pytest.mark.parametrize("command,func", [
- ('postcodes', 'update_postcodes'),
('word-counts', 'recompute_word_counts'),
('address-levels', 'load_address_levels_from_file'),
('wiki-data', 'import_wikipedia_articles'),
('importance', 'recompute_importance'),
('website', 'setup_website'),
])
-def test_refresh_command(mock_func_factory, temp_db, command, func):
+def test_refresh_command(mock_func_factory, temp_db, command, func, tokenizer_mock):
func_mock = mock_func_factory(nominatim.tools.refresh, func)
assert 0 == call_nominatim('refresh', '--' + command)
assert func_mock.called == 1
+def test_refresh_postcodes(mock_func_factory, temp_db, tokenizer_mock):
+ func_mock = mock_func_factory(nominatim.tools.postcodes, 'update_postcodes')
+ idx_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_postcodes')
+
+ assert 0 == call_nominatim('refresh', '--postcodes')
+ assert func_mock.called == 1
+
def test_refresh_create_functions(mock_func_factory, temp_db, tokenizer_mock):
func_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
assert tokenizer_mock.update_sql_functions_called
-def test_refresh_importance_computed_after_wiki_import(monkeypatch, temp_db):
+def test_refresh_importance_computed_after_wiki_import(monkeypatch, temp_db, tokenizer_mock):
calls = []
monkeypatch.setattr(nominatim.tools.refresh, 'import_wikipedia_articles',
lambda *args, **kwargs: calls.append('import') or 0)
@pytest.fixture
-def create_postcode_id(table_factory, temp_db_cursor):
- table_factory('out_postcode_table', 'postcode TEXT')
-
+def create_postcode_id(temp_db_cursor):
temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION create_postcode_id(postcode TEXT)
RETURNS BOOLEAN AS $$
- INSERT INTO out_postcode_table VALUES (postcode) RETURNING True;
+ INSERT INTO word (word_token, word, class, type)
+ VALUES (' ' || postcode, postcode, 'place', 'postcode')
+ RETURNING True;
$$ LANGUAGE SQL""")
assert analyzer.normalize('TEsT') == 'test'
-def test_add_postcodes_from_db(analyzer, table_factory, temp_db_cursor,
- create_postcode_id):
+def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table,
+ create_postcode_id):
table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
- analyzer.add_postcodes_from_db()
+ analyzer.update_postcodes_from_db()
+
+ assert word_table.count() == 3
+ assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
+
+
+def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table,
+ create_postcode_id):
+ table_factory('location_postcode', 'postcode TEXT',
+ content=(('1234',), ('45BC', ), ('XX45', )))
+ word_table.add_postcode(' 1234', '1234')
+ word_table.add_postcode(' 5678', '5678')
+
+ analyzer.update_postcodes_from_db()
- assert temp_db_cursor.row_set("SELECT * from out_postcode_table") \
- == set((('1234', ), ('12 34', ), ('AB23',)))
+ assert word_table.count() == 3
+ assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
-def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor,
- make_standard_name):
+def test_update_special_phrase_empty_table(analyzer, word_table, make_standard_name):
analyzer.update_special_phrases([
("König bei", "amenity", "royal", "near"),
("Könige", "amenity", "royal", "-"),
("strasse", "highway", "primary", "in")
])
- assert temp_db_cursor.row_set("""SELECT word_token, word, class, type, operator
- FROM word WHERE class != 'place'""") \
+ assert word_table.get_special() \
== set(((' könig bei', 'könig bei', 'amenity', 'royal', 'near'),
(' könige', 'könige', 'amenity', 'royal', None),
(' strasse', 'strasse', 'highway', 'primary', 'in')))
def test_update_special_phrase_delete_all(analyzer, word_table, temp_db_cursor,
make_standard_name):
- temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator)
- VALUES (' foo', 'foo', 'amenity', 'prison', 'in'),
- (' bar', 'bar', 'highway', 'road', null)""")
+ word_table.add_special(' foo', 'foo', 'amenity', 'prison', 'in')
+ word_table.add_special(' bar', 'bar', 'highway', 'road', None)
- assert 2 == temp_db_cursor.scalar("SELECT count(*) FROM word WHERE class != 'place'""")
+ assert word_table.count_special() == 2
analyzer.update_special_phrases([])
- assert 0 == temp_db_cursor.scalar("SELECT count(*) FROM word WHERE class != 'place'""")
+ assert word_table.count_special() == 0
-def test_update_special_phrase_modify(analyzer, word_table, temp_db_cursor,
- make_standard_name):
- temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator)
- VALUES (' foo', 'foo', 'amenity', 'prison', 'in'),
- (' bar', 'bar', 'highway', 'road', null)""")
+def test_update_special_phrase_modify(analyzer, word_table, make_standard_name):
+ word_table.add_special(' foo', 'foo', 'amenity', 'prison', 'in')
+ word_table.add_special(' bar', 'bar', 'highway', 'road', None)
- assert 2 == temp_db_cursor.scalar("SELECT count(*) FROM word WHERE class != 'place'""")
+ assert word_table.count_special() == 2
analyzer.update_special_phrases([
('prison', 'amenity', 'prison', 'in'),
('garden', 'leisure', 'garden', 'near')
])
- assert temp_db_cursor.row_set("""SELECT word_token, word, class, type, operator
- FROM word WHERE class != 'place'""") \
+ assert word_table.get_special() \
== set(((' prison', 'prison', 'amenity', 'prison', 'in'),
(' bar', 'bar', 'highway', 'road', None),
(' garden', 'garden', 'leisure', 'garden', 'near')))
@pytest.mark.parametrize('pc', ['12345', 'AB 123', '34-345'])
-def test_process_place_postcode(analyzer, temp_db_cursor, create_postcode_id, pc):
-
+def test_process_place_postcode(analyzer, create_postcode_id, word_table, pc):
info = analyzer.process_place({'address': {'postcode' : pc}})
- assert temp_db_cursor.row_set("SELECT * from out_postcode_table") \
- == set(((pc, ),))
+ assert word_table.get_postcodes() == {pc, }
@pytest.mark.parametrize('pc', ['12:23', 'ab;cd;f', '123;836'])
-def test_process_place_bad_postcode(analyzer, temp_db_cursor, create_postcode_id,
- pc):
-
+def test_process_place_bad_postcode(analyzer, create_postcode_id, word_table, pc):
info = analyzer.process_place({'address': {'postcode' : pc}})
- assert 0 == temp_db_cursor.scalar("SELECT count(*) from out_postcode_table")
+ assert not word_table.get_postcodes()
@pytest.mark.parametrize('hnr', ['123a', '1', '101'])
assert a._make_standard_hnr('iv') == 'IV'
-def test_add_postcodes_from_db(analyzer, word_table, table_factory, temp_db_cursor):
+def test_update_postcodes_from_db_empty(analyzer, table_factory, word_table):
table_factory('location_postcode', 'postcode TEXT',
content=(('1234',), ('12 34',), ('AB23',), ('1234',)))
with analyzer() as a:
- a.add_postcodes_from_db()
+ a.update_postcodes_from_db()
- assert temp_db_cursor.row_set("""SELECT word, word_token from word
- """) \
- == set((('1234', ' 1234'), ('12 34', ' 12 34'), ('AB23', ' AB23')))
+ assert word_table.count() == 3
+ assert word_table.get_postcodes() == {'1234', '12 34', 'AB23'}
+
+
+def test_update_postcodes_from_db_add_and_remove(analyzer, table_factory, word_table):
+ table_factory('location_postcode', 'postcode TEXT',
+ content=(('1234',), ('45BC', ), ('XX45', )))
+ word_table.add_postcode(' 1234', '1234')
+ word_table.add_postcode(' 5678', '5678')
+
+ with analyzer() as a:
+ a.update_postcodes_from_db()
+
+ assert word_table.count() == 3
+ assert word_table.get_postcodes() == {'1234', '45BC', 'XX45'}
def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor):
@pytest.mark.parametrize('pc', ['12345', 'AB 123', '34-345'])
-def test_process_place_postcode(analyzer, temp_db_cursor, pc):
+def test_process_place_postcode(analyzer, word_table, pc):
with analyzer() as a:
info = a.process_place({'address': {'postcode' : pc}})
- assert temp_db_cursor.row_set("""SELECT word FROM word
- WHERE class = 'place' and type = 'postcode'""") \
- == set(((pc, ),))
+ assert word_table.get_postcodes() == {pc, }
@pytest.mark.parametrize('pc', ['12:23', 'ab;cd;f', '123;836'])
-def test_process_place_bad_postcode(analyzer, temp_db_cursor, pc):
+def test_process_place_bad_postcode(analyzer, word_table, pc):
with analyzer() as a:
info = a.process_place({'address': {'postcode' : pc}})
- assert 0 == temp_db_cursor.scalar("""SELECT count(*) FROM word
- WHERE class = 'place' and type = 'postcode'""")
+ assert not word_table.get_postcodes()
@pytest.mark.parametrize('hnr', ['123a', '1', '101'])
@pytest.mark.parametrize("threads", (1, 5))
-def test_load_data(dsn, src_dir, place_row, placex_table, osmline_table, word_table,
- temp_db_cursor, threads):
+def test_load_data(dsn, src_dir, place_row, placex_table, osmline_table,
+ word_table, temp_db_cursor, threads):
for func in ('precompute_words', 'getorcreate_housenumber_id', 'make_standard_name'):
temp_db_cursor.execute("""CREATE FUNCTION {} (src TEXT)
RETURNS TEXT AS $$ SELECT 'a'::TEXT $$ LANGUAGE SQL
"""
Tests for functions to maintain the artificial postcode table.
"""
+import subprocess
import pytest
from nominatim.tools import postcodes
import dummy_tokenizer
+class MockPostcodeTable:
+ """ A location_postcode table for testing.
+ """
+ def __init__(self, conn):
+ self.conn = conn
+ with conn.cursor() as cur:
+ cur.execute("""CREATE TABLE location_postcode (
+ place_id BIGINT,
+ parent_place_id BIGINT,
+ rank_search SMALLINT,
+ rank_address SMALLINT,
+ indexed_status SMALLINT,
+ indexed_date TIMESTAMP,
+ country_code varchar(2),
+ postcode TEXT,
+ geometry GEOMETRY(Geometry, 4326))""")
+ cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
+ RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
+ """)
+ conn.commit()
+
+ def add(self, country, postcode, x, y):
+ with self.conn.cursor() as cur:
+ cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
+ country_code, postcode,
+ geometry)
+ VALUES (nextval('seq_place'), 1, %s, %s,
+ 'SRID=4326;POINT(%s %s)')""",
+ (country, postcode, x, y))
+ self.conn.commit()
+
+
+ @property
+ def row_set(self):
+ with self.conn.cursor() as cur:
+ cur.execute("""SELECT country_code, postcode,
+ ST_X(geometry), ST_Y(geometry)
+ FROM location_postcode""")
+ return set((tuple(row) for row in cur))
+
+
@pytest.fixture
def tokenizer():
return dummy_tokenizer.DummyTokenizer(None, None)
@pytest.fixture
-def postcode_table(temp_db_with_extensions, temp_db_cursor, table_factory,
- placex_table, word_table):
- table_factory('location_postcode',
- """ place_id BIGINT,
- parent_place_id BIGINT,
- rank_search SMALLINT,
- rank_address SMALLINT,
- indexed_status SMALLINT,
- indexed_date TIMESTAMP,
- country_code varchar(2),
- postcode TEXT,
- geometry GEOMETRY(Geometry, 4326)""")
- temp_db_cursor.execute('CREATE SEQUENCE seq_place')
- temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
- RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
- """)
-
-
-def test_import_postcodes_empty(dsn, temp_db_cursor, postcode_table, tmp_path, tokenizer):
- postcodes.import_postcodes(dsn, tmp_path, tokenizer)
-
- assert temp_db_cursor.table_exists('gb_postcode')
- assert temp_db_cursor.table_exists('us_postcode')
- assert temp_db_cursor.table_rows('location_postcode') == 0
-
-
-def test_import_postcodes_from_placex(dsn, temp_db_cursor, postcode_table, tmp_path, tokenizer):
- temp_db_cursor.execute("""
- INSERT INTO placex (place_id, country_code, address, geometry)
- VALUES (1, 'xx', '"postcode"=>"9486"', 'SRID=4326;POINT(10 12)')
- """)
-
- postcodes.import_postcodes(dsn, tmp_path, tokenizer)
-
- rows = temp_db_cursor.row_set(""" SELECT postcode, country_code,
- ST_X(geometry), ST_Y(geometry)
- FROM location_postcode""")
- print(rows)
- assert len(rows) == 1
- assert rows == set((('9486', 'xx', 10, 12), ))
+def postcode_table(temp_db_conn, placex_table, word_table):
+ return MockPostcodeTable(temp_db_conn)
+
+
+def test_import_postcodes_empty(dsn, postcode_table, tmp_path, tokenizer):
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert not postcode_table.row_set
+
+
+def test_import_postcodes_add_new(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='9486'))
+ postcode_table.add('yy', '9486', 99, 34)
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('xx', '9486', 10, 12), }
+
+
+def test_import_postcodes_replace_coordinates(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+ postcode_table.add('xx', 'AB 4511', 99, 34)
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
+
+
+def test_import_postcodes_replace_coordinates_close(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+ postcode_table.add('xx', 'AB 4511', 10, 11.99999)
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
+
+
+def test_import_postcodes_remove(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+ postcode_table.add('xx', 'badname', 10, 12)
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
+
+
+def test_import_postcodes_ignore_empty_country(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ placex_table.add(country=None, geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert not postcode_table.row_set
+
+
+def test_import_postcodes_remove_all(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ postcode_table.add('ch', '5613', 10, 12)
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert not postcode_table.row_set
+
+
+def test_import_postcodes_multi_country(dsn, placex_table, postcode_table, tmp_path, tokenizer):
+ placex_table.add(country='de', geom='POINT(10 12)',
+ address=dict(postcode='54451'))
+ placex_table.add(country='cc', geom='POINT(100 56)',
+ address=dict(postcode='DD23 T'))
+ placex_table.add(country='de', geom='POINT(10.3 11.0)',
+ address=dict(postcode='54452'))
+ placex_table.add(country='cc', geom='POINT(10.3 11.0)',
+ address=dict(postcode='54452'))
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('de', '54451', 10, 12),
+ ('de', '54452', 10.3, 11.0),
+ ('cc', '54452', 10.3, 11.0),
+ ('cc', 'DD23 T', 100, 56)}
+
+
+@pytest.mark.parametrize("gzipped", [True, False])
+def test_import_postcodes_extern(dsn, placex_table, postcode_table, tmp_path,
+ tokenizer, gzipped):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+
+ extfile = tmp_path / 'xx_postcodes.csv'
+ extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
+
+ if gzipped:
+ subprocess.run(['gzip', str(extfile)])
+ assert not extfile.is_file()
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
+ ('xx', 'CD 4511', -10, -5)}
+
+
+def test_import_postcodes_extern_bad_column(dsn, placex_table, postcode_table,
+ tmp_path, tokenizer):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+
+ extfile = tmp_path / 'xx_postcodes.csv'
+ extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+
+ assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
+
+
+def test_import_postcodes_extern_bad_number(dsn, placex_table, postcode_table,
+ tmp_path, tokenizer):
+ placex_table.add(country='xx', geom='POINT(10 12)',
+ address=dict(postcode='AB 4511'))
+
+ extfile = tmp_path / 'xx_postcodes.csv'
+ extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
+
+ postcodes.update_postcodes(dsn, tmp_path, tokenizer)
+ assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
+ ('xx', 'CD 4511', -10, -5)}