import psutil
-from nominatim.tools.exec_utils import run_legacy_script
from nominatim.db.connection import connect
from nominatim.db import status, properties
from nominatim.version import NOMINATIM_VERSION
from ..tools import database_import
from ..tools import refresh
from ..indexer.indexer import Indexer
+ from ..tools import postcodes
if args.osm_file and not Path(args.osm_file).is_file():
LOG.fatal("OSM file '%s' does not exist.", args.osm_file)
args.threads or psutil.cpu_count() or 1)
LOG.warning('Calculate postcodes')
- run_legacy_script('setup.php', '--calculate-postcodes',
- nominatim_env=args, throw_on_fail=not args.ignore_errors)
+ postcodes.import_postcodes(args.config.get_libpq_dsn(), args.project_dir)
if args.continue_at is None or args.continue_at in ('load-data', 'indexing'):
LOG.warning('Indexing places')
--- /dev/null
+"""
+Functions for importing, updating and otherwise maintaining the table
+of artificial postcode centroids.
+"""
+
+from nominatim.db.utils import execute_file
+from nominatim.db.connection import connect
+
+def import_postcodes(dsn, project_dir):
+ """ Set up the initial list of postcodes.
+ """
+
+ with connect(dsn) as conn:
+ conn.drop_table('gb_postcode')
+ conn.drop_table('us_postcode')
+
+ with conn.cursor() as cur:
+ cur.execute("""CREATE TABLE gb_postcode (
+ id integer,
+ postcode character varying(9),
+ geometry GEOMETRY(Point, 4326))""")
+
+ with conn.cursor() as cur:
+ cur.execute("""CREATE TABLE us_postcode (
+ postcode text,
+ x double precision,
+ y double precision)""")
+ conn.commit()
+
+ gb_postcodes = project_dir / 'gb_postcode_data.sql.gz'
+ if gb_postcodes.is_file():
+ execute_file(dsn, gb_postcodes)
+
+ us_postcodes = project_dir / 'us_postcode_data.sql.gz'
+ if us_postcodes.is_file():
+ execute_file(dsn, us_postcodes)
+
+ with conn.cursor() as cur:
+ cur.execute("TRUNCATE location_postcode")
+ cur.execute("""
+ INSERT INTO location_postcode
+ (place_id, indexed_status, country_code, postcode, geometry)
+ SELECT nextval('seq_place'), 1, country_code,
+ upper(trim (both ' ' from address->'postcode')) as pc,
+ ST_Centroid(ST_Collect(ST_Centroid(geometry)))
+ FROM placex
+ WHERE address ? 'postcode' AND address->'postcode' NOT SIMILAR TO '%(,|;)%'
+ AND geometry IS NOT null
+ GROUP BY country_code, pc
+ """)
+
+ cur.execute("""
+ INSERT INTO location_postcode
+ (place_id, indexed_status, country_code, postcode, geometry)
+ SELECT nextval('seq_place'), 1, 'us', postcode,
+ ST_SetSRID(ST_Point(x,y),4326)
+ FROM us_postcode WHERE postcode NOT IN
+ (SELECT postcode FROM location_postcode
+ WHERE country_code = 'us')
+ """)
+
+ cur.execute("""
+ INSERT INTO location_postcode
+ (place_id, indexed_status, country_code, postcode, geometry)
+ SELECT nextval('seq_place'), 1, 'gb', postcode, geometry
+ FROM gb_postcode WHERE postcode NOT IN
+ (SELECT postcode FROM location_postcode
+ WHERE country_code = 'gb')
+ """)
+
+ cur.execute("""
+ DELETE FROM word WHERE class='place' and type='postcode'
+ and word NOT IN (SELECT postcode FROM location_postcode)
+ """)
+
+ cur.execute("""
+ SELECT count(getorcreate_postcode_id(v)) FROM
+ (SELECT distinct(postcode) as v FROM location_postcode) p
+ """)
+ conn.commit()
""" Execute a query and return the result as a set of tuples.
"""
self.execute(sql, params)
- if self.rowcount == 1:
- return set(tuple(self.fetchone()))
return set((tuple(row) for row in self))
import nominatim.tools.database_import
import nominatim.tools.freeze
import nominatim.tools.refresh
+import nominatim.tools.postcodes
from mocks import MockParamCapture
mock_func_factory(nominatim.tools.database_import, 'create_search_indices'),
mock_func_factory(nominatim.tools.database_import, 'create_country_names'),
mock_func_factory(nominatim.tools.refresh, 'load_address_levels_from_file'),
+ mock_func_factory(nominatim.tools.postcodes, 'import_postcodes'),
mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_full'),
mock_func_factory(nominatim.tools.refresh, 'setup_website'),
mock_func_factory(nominatim.db.properties, 'set_property')
]
cf_mock = mock_func_factory(nominatim.tools.refresh, 'create_functions')
- mock_func_factory(nominatim.clicmd.setup, 'run_legacy_script')
assert 0 == call_nominatim('import', '--osm-file', __file__)
--- /dev/null
+"""
+Tests for functions to maintain the artificial postcode table.
+"""
+
+import pytest
+
+from nominatim.tools import postcodes
+
+@pytest.fixture
+def postcode_table(temp_db_with_extensions, temp_db_cursor, table_factory,
+ placex_table, word_table):
+ table_factory('location_postcode',
+ """ place_id BIGINT,
+ parent_place_id BIGINT,
+ rank_search SMALLINT,
+ rank_address SMALLINT,
+ indexed_status SMALLINT,
+ indexed_date TIMESTAMP,
+ country_code varchar(2),
+ postcode TEXT,
+ geometry GEOMETRY(Geometry, 4326)""")
+ temp_db_cursor.execute('CREATE SEQUENCE seq_place')
+ temp_db_cursor.execute("""CREATE OR REPLACE FUNCTION getorcreate_postcode_id(postcode TEXT)
+ RETURNS INTEGER AS $$ BEGIN RETURN 1; END; $$ LANGUAGE plpgsql;
+ """)
+
+
+def test_import_postcodes_empty(dsn, temp_db_cursor, postcode_table, tmp_path):
+ postcodes.import_postcodes(dsn, tmp_path)
+
+ assert temp_db_cursor.table_exists('gb_postcode')
+ assert temp_db_cursor.table_exists('us_postcode')
+ assert temp_db_cursor.table_rows('location_postcode') == 0
+
+
+def test_import_postcodes_from_placex(dsn, temp_db_cursor, postcode_table, tmp_path):
+ temp_db_cursor.execute("""
+ INSERT INTO placex (place_id, country_code, address, geometry)
+ VALUES (1, 'xx', '"postcode"=>"9486"', 'SRID=4326;POINT(10 12)')
+ """)
+
+ postcodes.import_postcodes(dsn, tmp_path)
+
+ rows = temp_db_cursor.row_set(""" SELECT postcode, country_code,
+ ST_X(geometry), ST_Y(geometry)
+ FROM location_postcode""")
+ print(rows)
+ assert len(rows) == 1
+ assert rows == set((('9486', 'xx', 10, 12), ))
+