X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/ffc2d82b0ed150d52a718dc563f9399062e579a7..5d32ac9ac918929c11602a740c3033bcfdb0f929:/test/python/test_tools_postcodes.py diff --git a/test/python/test_tools_postcodes.py b/test/python/test_tools_postcodes.py index 37b47dfa..a3415769 100644 --- a/test/python/test_tools_postcodes.py +++ b/test/python/test_tools_postcodes.py @@ -1,55 +1,213 @@ """ Tests for functions to maintain the artificial postcode table. """ +import subprocess import pytest from nominatim.tools import postcodes import dummy_tokenizer +class MockPostcodeTable: + """ A location_postcode table for testing. + """ + def __init__(self, conn): + self.conn = conn + with conn.cursor() as cur: + cur.execute("""CREATE TABLE location_postcode ( + place_id BIGINT, + parent_place_id BIGINT, + rank_search SMALLINT, + rank_address SMALLINT, + indexed_status SMALLINT, + indexed_date TIMESTAMP, + country_code varchar(2), + postcode TEXT, + geometry GEOMETRY(Geometry, 4326))""") + cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT) + RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql; + + CREATE OR REPLACE FUNCTION get_country_code(place geometry) + RETURNS TEXT AS $$ BEGIN + RETURN null; + END; $$ LANGUAGE plpgsql; + """) + conn.commit() + + def add(self, country, postcode, x, y): + with self.conn.cursor() as cur: + cur.execute("""INSERT INTO location_postcode (place_id, indexed_status, + country_code, postcode, + geometry) + VALUES (nextval('seq_place'), 1, %s, %s, + 'SRID=4326;POINT(%s %s)')""", + (country, postcode, x, y)) + self.conn.commit() + + + @property + def row_set(self): + with self.conn.cursor() as cur: + cur.execute("""SELECT country_code, postcode, + ST_X(geometry), ST_Y(geometry) + FROM location_postcode""") + return set((tuple(row) for row in cur)) + + @pytest.fixture def tokenizer(): return dummy_tokenizer.DummyTokenizer(None, None) @pytest.fixture -def postcode_table(temp_db_with_extensions, temp_db_cursor, table_factory, - placex_table, word_table): - table_factory('location_postcode', - """ place_id BIGINT, - parent_place_id BIGINT, - rank_search SMALLINT, - rank_address SMALLINT, - indexed_status SMALLINT, - indexed_date TIMESTAMP, - country_code varchar(2), - postcode TEXT, - geometry GEOMETRY(Geometry, 4326)""") - temp_db_cursor.execute('CREATE SEQUENCE seq_place') - temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT) - RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql; - """) - - -def test_import_postcodes_empty(dsn, temp_db_cursor, postcode_table, tmp_path, tokenizer): - postcodes.import_postcodes(dsn, tmp_path, tokenizer) - - assert temp_db_cursor.table_exists('gb_postcode') - assert temp_db_cursor.table_exists('us_postcode') - assert temp_db_cursor.table_rows('location_postcode') == 0 - - -def test_import_postcodes_from_placex(dsn, temp_db_cursor, postcode_table, tmp_path, tokenizer): +def postcode_table(temp_db_conn, placex_table, word_table): + return MockPostcodeTable(temp_db_conn) + + +def test_postcodes_empty(dsn, postcode_table, place_table, + tmp_path, tokenizer): + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert not postcode_table.row_set + + +def test_postcodes_add_new(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486')) + postcode_table.add('yy', '9486', 99, 34) + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', '9486', 10, 12), } + + +def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) + postcode_table.add('xx', 'AB 4511', 99, 34) + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)} + + +def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) + postcode_table.add('xx', 'AB 4511', 10, 11.99999) + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)} + + +def test_postcodes_remove(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) + postcode_table.add('xx', 'badname', 10, 12) + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)} + + +def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511')) + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + assert not postcode_table.row_set + + +def test_postcodes_remove_all(dsn, postcode_table, place_table, + tmp_path, tokenizer): + postcode_table.add('ch', '5613', 10, 12) + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert not postcode_table.row_set + + +def test_postcodes_multi_country(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451')) + insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T')) + insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452')) + insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452')) + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('de', '54451', 10, 12), + ('de', '54452', 10.3, 11.0), + ('cc', '54452', 10.3, 11.0), + ('cc', 'DD23 T', 100, 56)} + + +@pytest.mark.parametrize("gzipped", [True, False]) +def test_postcodes_extern(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer, gzipped): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) + + extfile = tmp_path / 'xx_postcodes.csv' + extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10") + + if gzipped: + subprocess.run(['gzip', str(extfile)]) + assert not extfile.is_file() + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12), + ('xx', 'CD 4511', -10, -5)} + + +def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path, + insert_implicit_postcode, tokenizer): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) + + extfile = tmp_path / 'xx_postcodes.csv' + extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10") + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)} + + +def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode, + postcode_table, tmp_path, tokenizer): + insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511')) + + extfile = tmp_path / 'xx_postcodes.csv' + extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0") + + postcodes.update_postcodes(dsn, tmp_path, tokenizer) + + assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12), + ('xx', 'CD 4511', -10, -5)} + +def test_can_compute(dsn, table_factory): + assert not postcodes.can_compute(dsn) + table_factory('place') + assert postcodes.can_compute(dsn) + +def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer): + #Rewrite the get_country_code function to verify its execution. temp_db_cursor.execute(""" - INSERT INTO placex (place_id, country_code, address, geometry) - VALUES (1, 'xx', '"postcode"=>"9486"', 'SRID=4326;POINT(10 12)') + CREATE OR REPLACE FUNCTION get_country_code(place geometry) + RETURNS TEXT AS $$ BEGIN + RETURN 'fr'; + END; $$ LANGUAGE plpgsql; """) + place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511')) + postcodes.update_postcodes(dsn, tmp_path, tokenizer) - postcodes.import_postcodes(dsn, tmp_path, tokenizer) + assert postcode_table.row_set == {('fr', 'AB 4511', 10, 12)} - rows = temp_db_cursor.row_set(""" SELECT postcode, country_code, - ST_X(geometry), ST_Y(geometry) - FROM location_postcode""") - print(rows) - assert len(rows) == 1 - assert rows == set((('9486', 'xx', 10, 12), )) +@pytest.fixture +def insert_implicit_postcode(placex_table, place_row): + """ + Inserts data into the placex and place table + which can then be used to compute one postcode. + """ + def _insert_implicit_postcode(osm_id, country, geometry, address): + placex_table.add(osm_id=osm_id, country=country, geom=geometry) + place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address) + return _insert_implicit_postcode