]> git.openstreetmap.org Git - nominatim.git/blobdiff - lib-sql/functions/placex_triggers.sql
improve wording
[nominatim.git] / lib-sql / functions / placex_triggers.sql
index 5ffd4e4bc23615322866ec9d30b180d1f6e2ad59..01f99715e6cbf63770a8cab2f813383196166b7d 100644 (file)
+-- SPDX-License-Identifier: GPL-2.0-only
+--
+-- This file is part of Nominatim. (https://nominatim.org)
+--
+-- Copyright (C) 2024 by the Nominatim developer community.
+-- For a full list of authors see the git log.
+
 -- Trigger functions for the placex table.
 
 -- Trigger functions for the placex table.
 
+-- Information returned by update preparation.
+DROP TYPE IF EXISTS prepare_update_info CASCADE;
+CREATE TYPE prepare_update_info AS (
+  name HSTORE,
+  address HSTORE,
+  rank_address SMALLINT,
+  country_code TEXT,
+  class TEXT,
+  type TEXT,
+  linked_place_id BIGINT,
+  centroid_x float,
+  centroid_y float
+);
+
 -- Retrieve the data needed by the indexer for updating the place.
 -- Retrieve the data needed by the indexer for updating the place.
---
--- Return parameters:
---  name            list of names
---  address         list of address tags, either from the object or a surrounding
---                  building
---  country_feature If the place is a country feature, this contains the
---                  country code, otherwise it is null.
-CREATE OR REPLACE FUNCTION placex_prepare_update(p placex,
-                                                 OUT name HSTORE,
-                                                 OUT address HSTORE,
-                                                 OUT country_feature VARCHAR)
+CREATE OR REPLACE FUNCTION placex_indexing_prepare(p placex)
+  RETURNS prepare_update_info
   AS $$
   AS $$
+DECLARE
+  location RECORD;
+  result prepare_update_info;
+  extra_names HSTORE;
 BEGIN
 BEGIN
+  IF not p.address ? '_inherited' THEN
+    result.address := p.address;
+  END IF;
+
   -- For POI nodes, check if the address should be derived from a surrounding
   -- building.
   -- For POI nodes, check if the address should be derived from a surrounding
   -- building.
-  IF p.rank_search < 30 OR p.osm_type != 'N' OR p.address is not null THEN
-    RAISE WARNING 'self address for % %', p.osm_type, p.osm_id;
-    address := p.address;
-  ELSE
-    -- The additional && condition works around the misguided query
-    -- planner of postgis 3.0.
-    SELECT placex.address || hstore('_inherited', '') INTO address
-      FROM placex
-     WHERE ST_Covers(geometry, p.centroid)
-           and geometry && p.centroid
-           and (placex.address ? 'housenumber' or placex.address ? 'street' or placex.address ? 'place')
-           and rank_search > 28 AND ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon')
-     LIMIT 1;
-    RAISE WARNING 'other address for % %: % (%)', p.osm_type, p.osm_id, address, p.centroid;
+  IF p.rank_search = 30 AND p.osm_type = 'N' THEN
+    IF p.address is null THEN
+        -- The additional && condition works around the misguided query
+        -- planner of postgis 3.0.
+        SELECT placex.address || hstore('_inherited', '') INTO result.address
+          FROM placex
+         WHERE ST_Covers(geometry, p.centroid)
+               and geometry && p.centroid
+               and placex.address is not null
+               and (placex.address ? 'housenumber' or placex.address ? 'street' or placex.address ? 'place')
+               and rank_search = 30 AND ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon')
+         LIMIT 1;
+    ELSE
+      -- See if we can inherit additional address tags from an interpolation.
+      -- These will become permanent.
+      FOR location IN
+        SELECT (address - 'interpolation'::text - 'housenumber'::text) as address
+          FROM place, planet_osm_ways w
+          WHERE place.osm_type = 'W' and place.address ? 'interpolation'
+                and place.geometry && p.geometry
+                and place.osm_id = w.id
+                and p.osm_id = any(w.nodes)
+      LOOP
+        result.address := location.address || result.address;
+      END LOOP;
+    END IF;
   END IF;
 
   END IF;
 
-  address := address - '_unlisted_place'::TEXT;
-  name := p.name;
+  -- remove internal and derived names
+  result.address := result.address - '_unlisted_place'::TEXT;
+  SELECT hstore(array_agg(key), array_agg(value)) INTO result.name
+    FROM each(p.name) WHERE key not like '\_%';
+
+  result.class := p.class;
+  result.type := p.type;
+  result.country_code := p.country_code;
+  result.rank_address := p.rank_address;
+  result.centroid_x := ST_X(p.centroid);
+  result.centroid_y := ST_Y(p.centroid);
+
+  -- Names of linked places need to be merged in, so search for a linkable
+  -- place already here.
+  SELECT * INTO location FROM find_linked_place(p);
+
+  IF location.place_id is not NULL THEN
+    result.linked_place_id := location.place_id;
+
+    IF location.name is not NULL THEN
+      {% if debug %}RAISE WARNING 'Names original: %, location: %', result.name, location.name;{% endif %}
+      -- Add all names from the place nodes that deviate from the name
+      -- in the relation with the prefix '_place_'. Deviation means that
+      -- either the value is different or a given key is missing completely
+      IF result.name is null THEN
+        SELECT hstore(array_agg('_place_' || key), array_agg(value))
+          INTO result.name
+          FROM each(location.name);
+      ELSE
+        SELECT hstore(array_agg('_place_' || key), array_agg(value)) INTO extra_names
+          FROM each(location.name - result.name);
+        {% if debug %}RAISE WARNING 'Extra names: %', extra_names;{% endif %}
+
+        IF extra_names is not null THEN
+            result.name := result.name || extra_names;
+        END IF;
+      END IF;
+
+      {% if debug %}RAISE WARNING 'Final names: %', result.name;{% endif %}
+    END IF;
+  END IF;
 
 
-  country_feature := CASE WHEN p.admin_level = 2
-                               and p.class = 'boundary' and p.type = 'administrative'
-                               and p.osm_type = 'R'
-                          THEN p.country_code
-                          ELSE null
-                     END;
+  RETURN result;
+END;
+$$
+LANGUAGE plpgsql STABLE;
+
+
+CREATE OR REPLACE FUNCTION find_associated_street(poi_osm_type CHAR(1),
+                                                  poi_osm_id BIGINT,
+                                                  bbox GEOMETRY)
+  RETURNS BIGINT
+  AS $$
+DECLARE
+  location RECORD;
+  member JSONB;
+  parent RECORD;
+  result BIGINT;
+  distance FLOAT;
+  new_distance FLOAT;
+  waygeom GEOMETRY;
+BEGIN
+{% if db.middle_db_format == '1' %}
+  FOR location IN
+    SELECT members FROM planet_osm_rels
+    WHERE parts @> ARRAY[poi_osm_id]
+          and members @> ARRAY[lower(poi_osm_type) || poi_osm_id]
+          and tags @> ARRAY['associatedStreet']
+  LOOP
+    FOR i IN 1..array_upper(location.members, 1) BY 2 LOOP
+      IF location.members[i+1] = 'street' THEN
+        FOR parent IN
+          SELECT place_id, geometry
+           FROM placex
+           WHERE osm_type = upper(substring(location.members[i], 1, 1))::char(1)
+                 and osm_id = substring(location.members[i], 2)::bigint
+                 and name is not null
+                 and rank_search between 26 and 27
+        LOOP
+          -- Find the closest 'street' member.
+          -- Avoid distance computation for the frequent case where there is
+          -- only one street member.
+          IF waygeom is null THEN
+            result := parent.place_id;
+            waygeom := parent.geometry;
+          ELSE
+            distance := coalesce(distance, ST_Distance(waygeom, bbox));
+            new_distance := ST_Distance(parent.geometry, bbox);
+            IF new_distance < distance THEN
+              distance := new_distance;
+              result := parent.place_id;
+              waygeom := parent.geometry;
+            END IF;
+          END IF;
+        END LOOP;
+      END IF;
+    END LOOP;
+  END LOOP;
+
+{% else %}
+  FOR member IN
+    SELECT value FROM planet_osm_rels r, LATERAL jsonb_array_elements(members)
+    WHERE planet_osm_member_ids(members, poi_osm_type::char(1)) && ARRAY[poi_osm_id]
+          and tags->>'type' = 'associatedStreet'
+          and value->>'role' = 'street'
+  LOOP
+    FOR parent IN
+      SELECT place_id, geometry
+       FROM placex
+       WHERE osm_type = (member->>'type')::char(1)
+             and osm_id = (member->>'ref')::bigint
+             and name is not null
+             and rank_search between 26 and 27
+    LOOP
+      -- Find the closest 'street' member.
+      -- Avoid distance computation for the frequent case where there is
+      -- only one street member.
+      IF waygeom is null THEN
+        result := parent.place_id;
+        waygeom := parent.geometry;
+      ELSE
+        distance := coalesce(distance, ST_Distance(waygeom, bbox));
+        new_distance := ST_Distance(parent.geometry, bbox);
+        IF new_distance < distance THEN
+          distance := new_distance;
+          result := parent.place_id;
+          waygeom := parent.geometry;
+        END IF;
+      END IF;
+    END LOOP;
+  END LOOP;
+{% endif %}
+
+  RETURN result;
 END;
 $$
 LANGUAGE plpgsql STABLE;
 END;
 $$
 LANGUAGE plpgsql STABLE;
@@ -56,118 +213,77 @@ CREATE OR REPLACE FUNCTION find_parent_for_poi(poi_osm_type CHAR(1),
                                                poi_osm_id BIGINT,
                                                poi_partition SMALLINT,
                                                bbox GEOMETRY,
                                                poi_osm_id BIGINT,
                                                poi_partition SMALLINT,
                                                bbox GEOMETRY,
-                                               addr_street TEXT,
-                                               addr_place TEXT,
-                                               fallback BOOL = true)
+                                               token_info JSONB,
+                                               is_place_addr BOOLEAN)
   RETURNS BIGINT
   AS $$
 DECLARE
   parent_place_id BIGINT DEFAULT NULL;
   location RECORD;
   RETURNS BIGINT
   AS $$
 DECLARE
   parent_place_id BIGINT DEFAULT NULL;
   location RECORD;
-  parent RECORD;
 BEGIN
 BEGIN
-    {% if debug %}RAISE WARNING 'finding street for % %', poi_osm_type, poi_osm_id;{% endif %}
+  {% if debug %}RAISE WARNING 'finding street for % %', poi_osm_type, poi_osm_id;{% endif %}
+
+  -- Is this object part of an associatedStreet relation?
+  parent_place_id := find_associated_street(poi_osm_type, poi_osm_id, bbox);
 
 
-    -- Is this object part of an associatedStreet relation?
+  IF parent_place_id is null THEN
+    parent_place_id := find_parent_for_address(token_info, poi_partition, bbox);
+  END IF;
+
+  IF parent_place_id is null and poi_osm_type = 'N' THEN
     FOR location IN
     FOR location IN
-      SELECT members FROM planet_osm_rels
-      WHERE parts @> ARRAY[poi_osm_id]
-        and members @> ARRAY[lower(poi_osm_type) || poi_osm_id]
-        and tags @> ARRAY['associatedStreet']
+      SELECT p.place_id, p.osm_id, p.rank_search, p.address,
+             coalesce(p.centroid, ST_Centroid(p.geometry)) as centroid
+        FROM placex p, planet_osm_ways w
+       WHERE p.osm_type = 'W' and p.rank_search >= 26
+             and p.geometry && bbox
+             and w.id = p.osm_id and poi_osm_id = any(w.nodes)
     LOOP
     LOOP
-      FOR i IN 1..array_upper(location.members, 1) BY 2 LOOP
-        IF location.members[i+1] = 'street' THEN
-          FOR parent IN
-            SELECT place_id from placex
-             WHERE osm_type = 'W' and osm_id = substring(location.members[i],2)::bigint
-               and name is not null
-               and rank_search between 26 and 27
-          LOOP
-            RETURN parent.place_id;
-          END LOOP;
-        END IF;
-      END LOOP;
-    END LOOP;
+      {% if debug %}RAISE WARNING 'Node is part of way % ', location.osm_id;{% endif %}
 
 
-    parent_place_id := find_parent_for_address(addr_street, addr_place,
-                                               poi_partition, bbox);
-    IF parent_place_id is not null THEN
-      RETURN parent_place_id;
-    END IF;
+      -- Way IS a road then we are on it - that must be our road
+      IF location.rank_search < 28 THEN
+        {% if debug %}RAISE WARNING 'node in way that is a street %',location;{% endif %}
+        RETURN location.place_id;
+      END IF;
+
+      parent_place_id := find_associated_street('W', location.osm_id, bbox);
+    END LOOP;
+  END IF;
 
 
-    IF poi_osm_type = 'N' THEN
-      -- Is this node part of an interpolation?
-      FOR parent IN
-        SELECT q.parent_place_id
-          FROM location_property_osmline q, planet_osm_ways x
-         WHERE q.linegeo && bbox and x.id = q.osm_id
-               and poi_osm_id = any(x.nodes)
-         LIMIT 1
+  IF parent_place_id is NULL THEN
+    IF is_place_addr THEN
+      -- The address is attached to a place we don't know.
+      -- Instead simply use the containing area with the largest rank.
+      FOR location IN
+        SELECT place_id FROM placex
+         WHERE bbox && geometry AND _ST_Covers(geometry, ST_Centroid(bbox))
+               AND rank_address between 5 and 25
+               AND ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon')
+         ORDER BY rank_address desc
       LOOP
       LOOP
-        {% if debug %}RAISE WARNING 'Get parent from interpolation: %', parent.parent_place_id;{% endif %}
-        RETURN parent.parent_place_id;
+        RETURN location.place_id;
       END LOOP;
       END LOOP;
-
-      -- Is this node part of any other way?
+    ELSEIF ST_Area(bbox) < 0.005 THEN
+      -- for smaller features get the nearest road
+      SELECT getNearestRoadPlaceId(poi_partition, bbox) INTO parent_place_id;
+      {% if debug %}RAISE WARNING 'Checked for nearest way (%)', parent_place_id;{% endif %}
+    ELSE
+      -- for larger features simply find the area with the largest rank that
+      -- contains the bbox, only use addressable features
       FOR location IN
       FOR location IN
-        SELECT p.place_id, p.osm_id, p.rank_search, p.address,
-               coalesce(p.centroid, ST_Centroid(p.geometry)) as centroid
-          FROM placex p, planet_osm_ways w
-         WHERE p.osm_type = 'W' and p.rank_search >= 26
-               and p.geometry && bbox
-               and w.id = p.osm_id and poi_osm_id = any(w.nodes)
+        SELECT place_id FROM placex
+         WHERE bbox && geometry AND _ST_Covers(geometry, ST_Centroid(bbox))
+               AND rank_address between 5 and 25
+               AND ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon')
+        ORDER BY rank_address desc
       LOOP
       LOOP
-        {% if debug %}RAISE WARNING 'Node is part of way % ', location.osm_id;{% endif %}
-
-        -- Way IS a road then we are on it - that must be our road
-        IF location.rank_search < 28 THEN
-          {% if debug %}RAISE WARNING 'node in way that is a street %',location;{% endif %}
-          return location.place_id;
-        END IF;
-
-        SELECT find_parent_for_poi('W', location.osm_id, poi_partition,
-                                   location.centroid,
-                                   location.address->'street',
-                                   location.address->'place',
-                                   false)
-          INTO parent_place_id;
-        IF parent_place_id is not null THEN
-          RETURN parent_place_id;
-        END IF;
+        RETURN location.place_id;
       END LOOP;
     END IF;
       END LOOP;
     END IF;
+  END IF;
 
 
-    IF fallback THEN
-      IF addr_street is null and addr_place is not null THEN
-        -- The address is attached to a place we don't know.
-        -- Instead simply use the containing area with the largest rank.
-        FOR location IN
-          SELECT place_id FROM placex
-            WHERE bbox && geometry AND _ST_Covers(geometry, ST_Centroid(bbox))
-                  AND rank_address between 5 and 25
-            ORDER BY rank_address desc
-        LOOP
-            RETURN location.place_id;
-        END LOOP;
-      ELSEIF ST_Area(bbox) < 0.005 THEN
-        -- for smaller features get the nearest road
-        SELECT getNearestRoadPlaceId(poi_partition, bbox) INTO parent_place_id;
-        {% if debug %}RAISE WARNING 'Checked for nearest way (%)', parent_place_id;{% endif %}
-      ELSE
-        -- for larger features simply find the area with the largest rank that
-        -- contains the bbox, only use addressable features
-        FOR location IN
-          SELECT place_id FROM placex
-            WHERE bbox && geometry AND _ST_Covers(geometry, ST_Centroid(bbox))
-                  AND rank_address between 5 and 25
-            ORDER BY rank_address desc
-        LOOP
-            RETURN location.place_id;
-        END LOOP;
-      END IF;
-    END IF;
-
-    RETURN parent_place_id;
+  RETURN parent_place_id;
 END;
 $$
 LANGUAGE plpgsql STABLE;
 END;
 $$
 LANGUAGE plpgsql STABLE;
@@ -177,7 +293,11 @@ CREATE OR REPLACE FUNCTION find_linked_place(bnd placex)
   RETURNS placex
   AS $$
 DECLARE
   RETURNS placex
   AS $$
 DECLARE
+{% if db.middle_db_format == '1' %}
   relation_members TEXT[];
   relation_members TEXT[];
+{% else %}
+  relation_members JSONB;
+{% endif %}
   rel_member RECORD;
   linked_placex placex%ROWTYPE;
   bnd_name TEXT;
   rel_member RECORD;
   linked_placex placex%ROWTYPE;
   bnd_name TEXT;
@@ -223,16 +343,17 @@ BEGIN
 
   -- If extratags has a place tag, look for linked nodes by their place type.
   -- Area and node still have to have the same name.
 
   -- If extratags has a place tag, look for linked nodes by their place type.
   -- Area and node still have to have the same name.
-  IF bnd.extratags ? 'place' and bnd_name is not null THEN
+  IF bnd.extratags ? 'place' and bnd.extratags->'place' != 'postcode'
+     and bnd_name is not null
+  THEN
     FOR linked_placex IN
       SELECT * FROM placex
       WHERE (position(lower(name->'name') in bnd_name) > 0
              OR position(bnd_name in lower(name->'name')) > 0)
         AND placex.class = 'place' AND placex.type = bnd.extratags->'place'
         AND placex.osm_type = 'N'
     FOR linked_placex IN
       SELECT * FROM placex
       WHERE (position(lower(name->'name') in bnd_name) > 0
              OR position(bnd_name in lower(name->'name')) > 0)
         AND placex.class = 'place' AND placex.type = bnd.extratags->'place'
         AND placex.osm_type = 'N'
-        AND placex.linked_place_id is null
+        AND (placex.linked_place_id is null or placex.linked_place_id = bnd.place_id)
         AND placex.rank_search < 26 -- needed to select the right index
         AND placex.rank_search < 26 -- needed to select the right index
-        AND placex.type != 'postcode'
         AND ST_Covers(bnd.geometry, placex.geometry)
     LOOP
       {% if debug %}RAISE WARNING 'Found type-matching place node %', linked_placex.osm_id;{% endif %}
         AND ST_Covers(bnd.geometry, placex.geometry)
     LOOP
       {% if debug %}RAISE WARNING 'Found type-matching place node %', linked_placex.osm_id;{% endif %}
@@ -246,7 +367,7 @@ BEGIN
       WHERE placex.class = 'place' AND placex.osm_type = 'N'
         AND placex.extratags ? 'wikidata' -- needed to select right index
         AND placex.extratags->'wikidata' = bnd.extratags->'wikidata'
       WHERE placex.class = 'place' AND placex.osm_type = 'N'
         AND placex.extratags ? 'wikidata' -- needed to select right index
         AND placex.extratags->'wikidata' = bnd.extratags->'wikidata'
-        AND placex.linked_place_id is null
+        AND (placex.linked_place_id is null or placex.linked_place_id = bnd.place_id)
         AND placex.rank_search < 26
         AND _st_covers(bnd.geometry, placex.geometry)
       ORDER BY lower(name->'name') = bnd_name desc
         AND placex.rank_search < 26
         AND _st_covers(bnd.geometry, placex.geometry)
       ORDER BY lower(name->'name') = bnd_name desc
@@ -270,7 +391,7 @@ BEGIN
              OR (bnd.rank_address = 0 and placex.rank_search = bnd.rank_search))
         AND placex.osm_type = 'N'
         AND placex.class = 'place'
              OR (bnd.rank_address = 0 and placex.rank_search = bnd.rank_search))
         AND placex.osm_type = 'N'
         AND placex.class = 'place'
-        AND placex.linked_place_id is null
+        AND (placex.linked_place_id is null or placex.linked_place_id = bnd.place_id)
         AND placex.rank_search < 26 -- needed to select the right index
         AND placex.type != 'postcode'
         AND ST_Covers(bnd.geometry, placex.geometry)
         AND placex.rank_search < 26 -- needed to select the right index
         AND placex.type != 'postcode'
         AND ST_Covers(bnd.geometry, placex.geometry)
@@ -286,6 +407,99 @@ $$
 LANGUAGE plpgsql STABLE;
 
 
 LANGUAGE plpgsql STABLE;
 
 
+CREATE OR REPLACE FUNCTION create_poi_search_terms(obj_place_id BIGINT,
+                                                   in_partition SMALLINT,
+                                                   parent_place_id BIGINT,
+                                                   is_place_addr BOOLEAN,
+                                                   country TEXT,
+                                                   token_info JSONB,
+                                                   geometry GEOMETRY,
+                                                   OUT name_vector INTEGER[],
+                                                   OUT nameaddress_vector INTEGER[])
+  AS $$
+DECLARE
+  parent_name_vector INTEGER[];
+  parent_address_vector INTEGER[];
+  addr_place_ids INTEGER[];
+  hnr_vector INTEGER[];
+
+  addr_item RECORD;
+  addr_place RECORD;
+  parent_address_place_ids BIGINT[];
+BEGIN
+  nameaddress_vector := '{}'::INTEGER[];
+
+  SELECT s.name_vector, s.nameaddress_vector
+    INTO parent_name_vector, parent_address_vector
+    FROM search_name s
+    WHERE s.place_id = parent_place_id;
+
+  FOR addr_item IN
+    SELECT ranks.*, key,
+           token_get_address_search_tokens(token_info, key) as search_tokens
+      FROM token_get_address_keys(token_info) as key,
+           LATERAL get_addr_tag_rank(key, country) as ranks
+      WHERE not token_get_address_search_tokens(token_info, key) <@ parent_address_vector
+  LOOP
+    addr_place := get_address_place(in_partition, geometry,
+                                    addr_item.from_rank, addr_item.to_rank,
+                                    addr_item.extent, token_info, addr_item.key);
+
+    IF addr_place is null THEN
+      -- No place found in OSM that matches. Make it at least searchable.
+      nameaddress_vector := array_merge(nameaddress_vector, addr_item.search_tokens);
+    ELSE
+      IF parent_address_place_ids is null THEN
+        SELECT array_agg(parent_place_id) INTO parent_address_place_ids
+          FROM place_addressline
+          WHERE place_id = parent_place_id;
+      END IF;
+
+      -- If the parent already lists the place in place_address line, then we
+      -- are done. Otherwise, add its own place_address line.
+      IF not parent_address_place_ids @> ARRAY[addr_place.place_id] THEN
+        nameaddress_vector := array_merge(nameaddress_vector, addr_place.keywords);
+
+        INSERT INTO place_addressline (place_id, address_place_id, fromarea,
+                                       isaddress, distance, cached_rank_address)
+          VALUES (obj_place_id, addr_place.place_id, not addr_place.isguess,
+                    true, addr_place.distance, addr_place.rank_address);
+      END IF;
+    END IF;
+  END LOOP;
+
+  name_vector := token_get_name_search_tokens(token_info);
+
+  -- Check if the parent covers all address terms.
+  -- If not, create a search name entry with the house number as the name.
+  -- This is unusual for the search_name table but prevents that the place
+  -- is returned when we only search for the street/place.
+
+  hnr_vector := token_get_housenumber_search_tokens(token_info);
+
+  IF hnr_vector is not null and not nameaddress_vector <@ parent_address_vector THEN
+    name_vector := array_merge(name_vector, hnr_vector);
+  END IF;
+
+  -- Cheating here by not recomputing all terms but simply using the ones
+  -- from the parent object.
+  nameaddress_vector := array_merge(nameaddress_vector, parent_name_vector);
+  nameaddress_vector := array_merge(nameaddress_vector, parent_address_vector);
+
+  -- make sure addr:place terms are always searchable
+  IF is_place_addr THEN
+    addr_place_ids := token_addr_place_search_tokens(token_info);
+    IF hnr_vector is not null AND not addr_place_ids <@ parent_name_vector
+    THEN
+      name_vector := array_merge(name_vector, hnr_vector);
+    END IF;
+    nameaddress_vector := array_merge(nameaddress_vector, addr_place_ids);
+  END IF;
+END;
+$$
+LANGUAGE plpgsql;
+
+
 -- Insert address of a place into the place_addressline table.
 --
 -- \param obj_place_id  Place_id of the place to compute the address for.
 -- Insert address of a place into the place_addressline table.
 --
 -- \param obj_place_id  Place_id of the place to compute the address for.
@@ -306,8 +520,9 @@ LANGUAGE plpgsql STABLE;
 CREATE OR REPLACE FUNCTION insert_addresslines(obj_place_id BIGINT,
                                                partition SMALLINT,
                                                maxrank SMALLINT,
 CREATE OR REPLACE FUNCTION insert_addresslines(obj_place_id BIGINT,
                                                partition SMALLINT,
                                                maxrank SMALLINT,
-                                               address HSTORE,
+                                               token_info JSONB,
                                                geometry GEOMETRY,
                                                geometry GEOMETRY,
+                                               centroid GEOMETRY,
                                                country TEXT,
                                                OUT parent_place_id BIGINT,
                                                OUT postcode TEXT,
                                                country TEXT,
                                                OUT parent_place_id BIGINT,
                                                OUT postcode TEXT,
@@ -321,7 +536,8 @@ DECLARE
   current_node_area GEOMETRY := NULL;
 
   parent_place_rank INT := 0;
   current_node_area GEOMETRY := NULL;
 
   parent_place_rank INT := 0;
-  addr_place_ids BIGINT[];
+  addr_place_ids BIGINT[] := '{}'::int[];
+  new_address_vector INT[];
 
   location RECORD;
 BEGIN
 
   location RECORD;
 BEGIN
@@ -331,16 +547,25 @@ BEGIN
   address_havelevel := array_fill(false, ARRAY[maxrank]);
 
   FOR location IN
   address_havelevel := array_fill(false, ARRAY[maxrank]);
 
   FOR location IN
-    SELECT * FROM get_places_for_addr_tags(partition, geometry,
-                                                   address, country)
-    ORDER BY rank_address, distance, isguess desc
+    SELECT apl.*, key
+      FROM (SELECT extra.*, key
+              FROM token_get_address_keys(token_info) as key,
+                   LATERAL get_addr_tag_rank(key, country) as extra) x,
+           LATERAL get_address_place(partition, geometry, from_rank, to_rank,
+                              extent, token_info, key) as apl
+      ORDER BY rank_address, distance, isguess desc
   LOOP
   LOOP
-    {% if not db.reverse_only %}
+    IF location.place_id is null THEN
+      {% if not db.reverse_only %}
       nameaddress_vector := array_merge(nameaddress_vector,
       nameaddress_vector := array_merge(nameaddress_vector,
-                                        location.keywords::int[]);
-    {% endif %}
+                                        token_get_address_search_tokens(token_info,
+                                                                        location.key));
+      {% endif %}
+    ELSE
+      {% if not db.reverse_only %}
+      nameaddress_vector := array_merge(nameaddress_vector, location.keywords::INTEGER[]);
+      {% endif %}
 
 
-    IF location.place_id is not null THEN
       location_isaddress := not address_havelevel[location.rank_address];
       IF not address_havelevel[location.rank_address] THEN
         address_havelevel[location.rank_address] := true;
       location_isaddress := not address_havelevel[location.rank_address];
       IF not address_havelevel[location.rank_address] THEN
         address_havelevel[location.rank_address] := true;
@@ -355,13 +580,13 @@ BEGIN
         VALUES (obj_place_id, location.place_id, not location.isguess,
                 true, location.distance, location.rank_address);
 
         VALUES (obj_place_id, location.place_id, not location.isguess,
                 true, location.distance, location.rank_address);
 
-      addr_place_ids := array_append(addr_place_ids, location.place_id);
+      addr_place_ids := addr_place_ids || location.place_id;
     END IF;
   END LOOP;
 
   FOR location IN
     END IF;
   END LOOP;
 
   FOR location IN
-    SELECT * FROM getNearFeatures(partition, geometry, maxrank)
-    WHERE addr_place_ids is null or not addr_place_ids @> ARRAY[place_id]
+    SELECT * FROM getNearFeatures(partition, geometry, centroid, maxrank)
+    WHERE not addr_place_ids @> ARRAY[place_id]
     ORDER BY rank_address, isguess asc,
              distance *
                CASE WHEN rank_address = 16 AND rank_search = 15 THEN 0.2
     ORDER BY rank_address, isguess asc,
              distance *
                CASE WHEN rank_address = 16 AND rank_search = 15 THEN 0.2
@@ -489,6 +714,12 @@ BEGIN
       NEW.country_code := NULL;
     END IF;
 
       NEW.country_code := NULL;
     END IF;
 
+    -- Simplify polygons with a very large memory footprint when they
+    -- do not take part in address computation.
+    IF NEW.rank_address = 0 THEN
+      NEW.geometry := simplify_large_polygons(NEW.geometry);
+    END IF;
+
   END IF;
 
   {% if debug %}RAISE WARNING 'placex_insert:END: % % % %',NEW.osm_type,NEW.osm_id,NEW.class,NEW.type;{% endif %}
   END IF;
 
   {% if debug %}RAISE WARNING 'placex_insert:END: % % % %',NEW.osm_type,NEW.osm_id,NEW.class,NEW.type;{% endif %}
@@ -496,13 +727,12 @@ BEGIN
 {% if not disable_diff_updates %}
   -- The following is not needed until doing diff updates, and slows the main index process down
 
 {% if not disable_diff_updates %}
   -- The following is not needed until doing diff updates, and slows the main index process down
 
-  IF NEW.osm_type = 'N' and NEW.rank_search > 28 THEN
-      -- might be part of an interpolation
-      result := osmline_reinsert(NEW.osm_id, NEW.geometry);
-  ELSEIF NEW.rank_address > 0 THEN
+  IF NEW.rank_address between 2 and 27 THEN
     IF (ST_GeometryType(NEW.geometry) in ('ST_Polygon','ST_MultiPolygon') AND ST_IsValid(NEW.geometry)) THEN
       -- Performance: We just can't handle re-indexing for country level changes
     IF (ST_GeometryType(NEW.geometry) in ('ST_Polygon','ST_MultiPolygon') AND ST_IsValid(NEW.geometry)) THEN
       -- Performance: We just can't handle re-indexing for country level changes
-      IF st_area(NEW.geometry) < 1 THEN
+      IF (NEW.rank_address < 26 and st_area(NEW.geometry) < 1)
+         OR (NEW.rank_address >= 26 and st_area(NEW.geometry) < 0.01)
+      THEN
         -- mark items within the geometry for re-indexing
   --    RAISE WARNING 'placex poly insert: % % % %',NEW.osm_type,NEW.osm_id,NEW.class,NEW.type;
 
         -- mark items within the geometry for re-indexing
   --    RAISE WARNING 'placex poly insert: % % % %',NEW.osm_type,NEW.osm_id,NEW.class,NEW.type;
 
@@ -517,16 +747,18 @@ BEGIN
                     or name is not null
                     or (NEW.rank_address >= 16 and address ? 'place'));
       END IF;
                     or name is not null
                     or (NEW.rank_address >= 16 and address ? 'place'));
       END IF;
-    ELSE
+    ELSEIF ST_GeometryType(NEW.geometry) not in ('ST_LineString', 'ST_MultiLineString')
+           OR ST_Length(NEW.geometry) < 0.5
+    THEN
       -- mark nearby items for re-indexing, where 'nearby' depends on the features rank_search and is a complete guess :(
       -- mark nearby items for re-indexing, where 'nearby' depends on the features rank_search and is a complete guess :(
-      diameter := update_place_diameter(NEW.rank_search);
+      diameter := update_place_diameter(NEW.rank_address);
       IF diameter > 0 THEN
   --      RAISE WARNING 'placex point insert: % % % % %',NEW.osm_type,NEW.osm_id,NEW.class,NEW.type,diameter;
         IF NEW.rank_search >= 26 THEN
           -- roads may cause reparenting for >27 rank places
           update placex set indexed_status = 2 where indexed_status = 0 and rank_search > NEW.rank_search and ST_DWithin(placex.geometry, NEW.geometry, diameter);
           -- reparenting also for OSM Interpolation Lines (and for Tiger?)
       IF diameter > 0 THEN
   --      RAISE WARNING 'placex point insert: % % % % %',NEW.osm_type,NEW.osm_id,NEW.class,NEW.type,diameter;
         IF NEW.rank_search >= 26 THEN
           -- roads may cause reparenting for >27 rank places
           update placex set indexed_status = 2 where indexed_status = 0 and rank_search > NEW.rank_search and ST_DWithin(placex.geometry, NEW.geometry, diameter);
           -- reparenting also for OSM Interpolation Lines (and for Tiger?)
-          update location_property_osmline set indexed_status = 2 where indexed_status = 0 and ST_DWithin(location_property_osmline.linegeo, NEW.geometry, diameter);
+          update location_property_osmline set indexed_status = 2 where indexed_status = 0 and startnumber is not null and ST_DWithin(location_property_osmline.linegeo, NEW.geometry, diameter);
         ELSEIF NEW.rank_search >= 16 THEN
           -- up to rank 16, street-less addresses may need reparenting
           update placex set indexed_status = 2 where indexed_status = 0 and rank_search > NEW.rank_search and ST_DWithin(placex.geometry, NEW.geometry, diameter) and (rank_search < 28 or name is not null or address ? 'place');
         ELSEIF NEW.rank_search >= 16 THEN
           -- up to rank 16, street-less addresses may need reparenting
           update placex set indexed_status = 2 where indexed_status = 0 and rank_search > NEW.rank_search and ST_DWithin(placex.geometry, NEW.geometry, diameter) and (rank_search < 28 or name is not null or address ? 'place');
@@ -563,25 +795,29 @@ CREATE OR REPLACE FUNCTION placex_update()
 DECLARE
   i INTEGER;
   location RECORD;
 DECLARE
   i INTEGER;
   location RECORD;
+{% if db.middle_db_format == '1' %}
   relation_members TEXT[];
   relation_members TEXT[];
+{% else %}
+  relation_member JSONB;
+{% endif %}
 
   geom GEOMETRY;
   parent_address_level SMALLINT;
   place_address_level SMALLINT;
 
 
   geom GEOMETRY;
   parent_address_level SMALLINT;
   place_address_level SMALLINT;
 
-  addr_street TEXT;
-  addr_place TEXT;
-
   max_rank SMALLINT;
 
   name_vector INTEGER[];
   nameaddress_vector INTEGER[];
   addr_nameaddress_vector INTEGER[];
 
   max_rank SMALLINT;
 
   name_vector INTEGER[];
   nameaddress_vector INTEGER[];
   addr_nameaddress_vector INTEGER[];
 
+  linked_place BIGINT;
+
   linked_node_id BIGINT;
   linked_importance FLOAT;
   linked_wikipedia TEXT;
 
   linked_node_id BIGINT;
   linked_importance FLOAT;
   linked_wikipedia TEXT;
 
+  is_place_address BOOLEAN;
   result BOOLEAN;
 BEGIN
   -- deferred delete
   result BOOLEAN;
 BEGIN
   -- deferred delete
@@ -606,30 +842,57 @@ BEGIN
   DELETE FROM place_addressline WHERE place_id = NEW.place_id;
   result := deleteRoad(NEW.partition, NEW.place_id);
   result := deleteLocationArea(NEW.partition, NEW.place_id, NEW.rank_search);
   DELETE FROM place_addressline WHERE place_id = NEW.place_id;
   result := deleteRoad(NEW.partition, NEW.place_id);
   result := deleteLocationArea(NEW.partition, NEW.place_id, NEW.rank_search);
-  UPDATE placex set linked_place_id = null, indexed_status = 2
-         where linked_place_id = NEW.place_id;
-  -- update not necessary for osmline, cause linked_place_id does not exist
 
   NEW.extratags := NEW.extratags - 'linked_place'::TEXT;
 
   NEW.extratags := NEW.extratags - 'linked_place'::TEXT;
-
-  IF NEW.linked_place_id is not null THEN
-    {% if debug %}RAISE WARNING 'place already linked to %', NEW.linked_place_id;{% endif %}
-    RETURN NEW;
+  IF NEW.extratags = ''::hstore THEN
+    NEW.extratags := NULL;
   END IF;
 
   END IF;
 
+  -- NEW.linked_place_id contains the precomputed linkee. Save this and restore
+  -- the previous link status.
+  linked_place := NEW.linked_place_id;
+  NEW.linked_place_id := OLD.linked_place_id;
+
+  -- Remove linkage, if we have computed a different new linkee.
+  UPDATE placex SET linked_place_id = null, indexed_status = 2
+    WHERE linked_place_id = NEW.place_id
+          and (linked_place is null or linked_place_id != linked_place);
+  -- update not necessary for osmline, cause linked_place_id does not exist
+
   -- Postcodes are just here to compute the centroids. They are not searchable
   -- unless they are a boundary=postal_code.
   -- There was an error in the style so that boundary=postal_code used to be
   -- imported as place=postcode. That's why relations are allowed to pass here.
   -- This can go away in a couple of versions.
   IF NEW.class = 'place'  and NEW.type = 'postcode' and NEW.osm_type != 'R' THEN
   -- Postcodes are just here to compute the centroids. They are not searchable
   -- unless they are a boundary=postal_code.
   -- There was an error in the style so that boundary=postal_code used to be
   -- imported as place=postcode. That's why relations are allowed to pass here.
   -- This can go away in a couple of versions.
   IF NEW.class = 'place'  and NEW.type = 'postcode' and NEW.osm_type != 'R' THEN
+    NEW.token_info := null;
     RETURN NEW;
   END IF;
 
     RETURN NEW;
   END IF;
 
-  -- Speed up searches - just use the centroid of the feature
-  -- cheaper but less acurate
+  -- Compute a preliminary centroid.
   NEW.centroid := ST_PointOnSurface(NEW.geometry);
   NEW.centroid := ST_PointOnSurface(NEW.geometry);
-  {% if debug %}RAISE WARNING 'Computing preliminary centroid at %',ST_AsText(NEW.centroid);{% endif %}
+
+    -- recalculate country and partition
+  IF NEW.rank_search = 4 AND NEW.address is not NULL AND NEW.address ? 'country' THEN
+    -- for countries, believe the mapped country code,
+    -- so that we remain in the right partition if the boundaries
+    -- suddenly expand.
+    NEW.country_code := lower(NEW.address->'country');
+    NEW.partition := get_partition(lower(NEW.country_code));
+    IF NEW.partition = 0 THEN
+      NEW.country_code := lower(get_country_code(NEW.centroid));
+      NEW.partition := get_partition(NEW.country_code);
+    END IF;
+  ELSE
+    IF NEW.rank_search >= 4 THEN
+      NEW.country_code := lower(get_country_code(NEW.centroid));
+    ELSE
+      NEW.country_code := NULL;
+    END IF;
+    NEW.partition := get_partition(NEW.country_code);
+  END IF;
+  {% if debug %}RAISE WARNING 'Country updated: "%"', NEW.country_code;{% endif %}
+
 
   -- recompute the ranks, they might change when linking changes
   SELECT * INTO NEW.rank_search, NEW.rank_address
 
   -- recompute the ranks, they might change when linking changes
   SELECT * INTO NEW.rank_search, NEW.rank_address
@@ -640,6 +903,16 @@ BEGIN
                             NEW.class, NEW.type, NEW.admin_level,
                             (NEW.extratags->'capital') = 'yes',
                             NEW.address->'postcode');
                             NEW.class, NEW.type, NEW.admin_level,
                             (NEW.extratags->'capital') = 'yes',
                             NEW.address->'postcode');
+
+  -- Short-cut out for linked places. Note that this must happen after the
+  -- address rank has been recomputed. The linking might nullify a shift in
+  -- address rank.
+  IF NEW.linked_place_id is not null THEN
+    NEW.token_info := null;
+    {% if debug %}RAISE WARNING 'place already linked to %', OLD.linked_place_id;{% endif %}
+    RETURN NEW;
+  END IF;
+
   -- We must always increase the address level relative to the admin boundary.
   IF NEW.class = 'boundary' and NEW.type = 'administrative'
      and NEW.osm_type = 'R' and NEW.rank_address > 0
   -- We must always increase the address level relative to the admin boundary.
   IF NEW.class = 'boundary' and NEW.type = 'administrative'
      and NEW.osm_type = 'R' and NEW.rank_address > 0
@@ -655,7 +928,8 @@ BEGIN
       FROM placex
       WHERE osm_type = 'R' and class = 'boundary' and type = 'administrative'
             and admin_level < NEW.admin_level and admin_level > 3
       FROM placex
       WHERE osm_type = 'R' and class = 'boundary' and type = 'administrative'
             and admin_level < NEW.admin_level and admin_level > 3
-            and rank_address > 0
+            and rank_address between 1 and 25 -- for index selection
+            and ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon') -- for index selection
             and geometry && NEW.centroid and _ST_Covers(geometry, NEW.centroid)
       ORDER BY admin_level desc LIMIT 1
     LOOP
             and geometry && NEW.centroid and _ST_Covers(geometry, NEW.centroid)
       ORDER BY admin_level desc LIMIT 1
     LOOP
@@ -677,29 +951,61 @@ BEGIN
 
     IF NEW.rank_address > 9 THEN
         -- Second check that the boundary is not completely contained in a
 
     IF NEW.rank_address > 9 THEN
         -- Second check that the boundary is not completely contained in a
-        -- place area with a higher address rank
+        -- place area with a equal or higher address rank.
         FOR location IN
         FOR location IN
-          SELECT rank_address FROM placex
-          WHERE class = 'place' and rank_address < 24
-                and rank_address > NEW.rank_address
+          SELECT rank_address
+          FROM placex,
+               LATERAL compute_place_rank(country_code, 'A', class, type,
+                                          admin_level, False, null) prank
+          WHERE class = 'place' and rank_address between 1 and 23
+                and prank.address_rank >= NEW.rank_address
+                and ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon') -- select right index
                 and geometry && NEW.geometry
                 and geometry ~ NEW.geometry -- needed because ST_Relate does not do bbox cover test
                 and ST_Relate(geometry, NEW.geometry, 'T*T***FF*') -- contains but not equal
                 and geometry && NEW.geometry
                 and geometry ~ NEW.geometry -- needed because ST_Relate does not do bbox cover test
                 and ST_Relate(geometry, NEW.geometry, 'T*T***FF*') -- contains but not equal
-          ORDER BY rank_address desc LIMIT 1
+          ORDER BY prank.address_rank desc LIMIT 1
         LOOP
           NEW.rank_address := location.rank_address + 2;
         END LOOP;
     END IF;
         LOOP
           NEW.rank_address := location.rank_address + 2;
         END LOOP;
     END IF;
+  ELSEIF NEW.class = 'place'
+         and ST_GeometryType(NEW.geometry) in ('ST_Polygon', 'ST_MultiPolygon')
+         and NEW.rank_address between 16 and 23
+  THEN
+    -- For place areas make sure they are not completely contained in an area
+    -- with a equal or higher address rank.
+    FOR location IN
+          SELECT rank_address
+          FROM placex,
+               LATERAL compute_place_rank(country_code, 'A', class, type,
+                                          admin_level, False, null) prank
+          WHERE prank.address_rank < 24
+                and rank_address between 1 and 25 -- select right index
+                and ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon') -- select right index
+                and prank.address_rank >= NEW.rank_address
+                and geometry && NEW.geometry
+                and geometry ~ NEW.geometry -- needed because ST_Relate does not do bbox cover test
+                and ST_Relate(geometry, NEW.geometry, 'T*T***FF*') -- contains but not equal
+          ORDER BY prank.address_rank desc LIMIT 1
+        LOOP
+          NEW.rank_address := location.rank_address + 2;
+        END LOOP;
   ELSEIF NEW.class = 'place' and NEW.osm_type = 'N'
   ELSEIF NEW.class = 'place' and NEW.osm_type = 'N'
-     and NEW.rank_address between 16 and 23
+         and NEW.rank_address between 16 and 23
   THEN
   THEN
-    -- If a place node is contained in a admin boundary with the same address level
-    -- and has not been linked, then make the node a subpart by increasing the
-    -- address rank (city level and above).
+    -- If a place node is contained in an admin or place boundary with the same
+    -- address level and has not been linked, then make the node a subpart
+    -- by increasing the address rank (city level and above).
     FOR location IN
     FOR location IN
-        SELECT rank_address FROM placex
-        WHERE osm_type = 'R' and class = 'boundary' and type = 'administrative'
-              and rank_address = NEW.rank_address
+        SELECT rank_address
+        FROM placex,
+             LATERAL compute_place_rank(country_code, 'A', class, type,
+                                        admin_level, False, null) prank
+        WHERE osm_type = 'R'
+              and rank_address between 1 and 25 -- select right index
+              and ST_GeometryType(geometry) in ('ST_Polygon','ST_MultiPolygon') -- select right index
+              and ((class = 'place' and prank.address_rank = NEW.rank_address)
+                   or (class = 'boundary' and rank_address = NEW.rank_address))
               and geometry && NEW.centroid and _ST_Covers(geometry, NEW.centroid)
         LIMIT 1
     LOOP
               and geometry && NEW.centroid and _ST_Covers(geometry, NEW.centroid)
         LIMIT 1
     LOOP
@@ -709,56 +1015,13 @@ BEGIN
     parent_address_level := 3;
   END IF;
 
     parent_address_level := 3;
   END IF;
 
-  {% if debug %}RAISE WARNING 'Copy over address tags';{% endif %}
-  -- housenumber is a computed field, so start with an empty value
-  NEW.housenumber := NULL;
-  IF NEW.address is not NULL THEN
-      IF NEW.address ? 'conscriptionnumber' THEN
-        IF NEW.address ? 'streetnumber' THEN
-            NEW.housenumber := (NEW.address->'conscriptionnumber') || '/' || (NEW.address->'streetnumber');
-        ELSE
-            NEW.housenumber := NEW.address->'conscriptionnumber';
-        END IF;
-      ELSEIF NEW.address ? 'streetnumber' THEN
-        NEW.housenumber := NEW.address->'streetnumber';
-      ELSEIF NEW.address ? 'housenumber' THEN
-        NEW.housenumber := NEW.address->'housenumber';
-      END IF;
-      NEW.housenumber := create_housenumber_id(NEW.housenumber);
-
-      addr_street := NEW.address->'street';
-      addr_place := NEW.address->'place';
-
-      IF NEW.address ? 'postcode' and NEW.address->'postcode' not similar to '%(:|,|;)%' THEN
-        i := getorcreate_postcode_id(NEW.address->'postcode');
-      END IF;
-  END IF;
+  NEW.housenumber := token_normalized_housenumber(NEW.token_info);
 
   NEW.postcode := null;
 
 
   NEW.postcode := null;
 
-  -- recalculate country and partition
-  IF NEW.rank_search = 4 AND NEW.address is not NULL AND NEW.address ? 'country' THEN
-    -- for countries, believe the mapped country code,
-    -- so that we remain in the right partition if the boundaries
-    -- suddenly expand.
-    NEW.country_code := lower(NEW.address->'country');
-    NEW.partition := get_partition(lower(NEW.country_code));
-    IF NEW.partition = 0 THEN
-      NEW.country_code := lower(get_country_code(NEW.centroid));
-      NEW.partition := get_partition(NEW.country_code);
-    END IF;
-  ELSE
-    IF NEW.rank_search >= 4 THEN
-      NEW.country_code := lower(get_country_code(NEW.centroid));
-    ELSE
-      NEW.country_code := NULL;
-    END IF;
-    NEW.partition := get_partition(NEW.country_code);
-  END IF;
-  {% if debug %}RAISE WARNING 'Country updated: "%"', NEW.country_code;{% endif %}
-
   -- waterway ways are linked when they are part of a relation and have the same class/type
   IF NEW.osm_type = 'R' and NEW.class = 'waterway' THEN
   -- waterway ways are linked when they are part of a relation and have the same class/type
   IF NEW.osm_type = 'R' and NEW.class = 'waterway' THEN
+{% if db.middle_db_format == '1' %}
       FOR relation_members IN select members from planet_osm_rels r where r.id = NEW.osm_id and r.parts != array[]::bigint[]
       LOOP
           FOR i IN 1..array_upper(relation_members, 1) BY 2 LOOP
       FOR relation_members IN select members from planet_osm_rels r where r.id = NEW.osm_id and r.parts != array[]::bigint[]
       LOOP
           FOR i IN 1..array_upper(relation_members, 1) BY 2 LOOP
@@ -777,12 +1040,35 @@ BEGIN
               END IF;
           END LOOP;
       END LOOP;
               END IF;
           END LOOP;
       END LOOP;
+{% else %}
+    FOR relation_member IN
+      SELECT value FROM planet_osm_rels r, LATERAL jsonb_array_elements(r.members)
+      WHERE r.id = NEW.osm_id
+    LOOP
+      IF relation_member->>'role' IN ('', 'main_stream', 'side_stream')
+         and relation_member->>'type' = 'W'
+      THEN
+        {% if debug %}RAISE WARNING 'waterway parent %, child %', NEW.osm_id, relation_member;{% endif %}
+        FOR linked_node_id IN
+          SELECT place_id FROM placex
+          WHERE osm_type = 'W' and osm_id = (relation_member->>'ref')::bigint
+                and class = NEW.class and type in ('river', 'stream', 'canal', 'drain', 'ditch')
+                and (relation_member->>'role' != 'side_stream' or NEW.name->'name' = name->'name')
+        LOOP
+          UPDATE placex SET linked_place_id = NEW.place_id WHERE place_id = linked_node_id;
+          {% if 'search_name' in db.tables %}
+            DELETE FROM search_name WHERE place_id = linked_node_id;
+          {% endif %}
+        END LOOP;
+      END IF;
+    END LOOP;
+{% endif %}
       {% if debug %}RAISE WARNING 'Waterway processed';{% endif %}
   END IF;
 
   NEW.importance := null;
   SELECT wikipedia, importance
       {% if debug %}RAISE WARNING 'Waterway processed';{% endif %}
   END IF;
 
   NEW.importance := null;
   SELECT wikipedia, importance
-    FROM compute_importance(NEW.extratags, NEW.country_code, NEW.osm_type, NEW.osm_id)
+    FROM compute_importance(NEW.extratags, NEW.country_code, NEW.rank_search, NEW.centroid)
     INTO NEW.wikipedia,NEW.importance;
 
 {% if debug %}RAISE WARNING 'Importance computed from wikipedia: %', NEW.importance;{% endif %}
     INTO NEW.wikipedia,NEW.importance;
 
 {% if debug %}RAISE WARNING 'Importance computed from wikipedia: %', NEW.importance;{% endif %}
@@ -793,12 +1079,14 @@ BEGIN
 
     {% if debug %}RAISE WARNING 'finding street for % %', NEW.osm_type, NEW.osm_id;{% endif %}
     NEW.parent_place_id := null;
 
     {% if debug %}RAISE WARNING 'finding street for % %', NEW.osm_type, NEW.osm_id;{% endif %}
     NEW.parent_place_id := null;
+    is_place_address := not token_is_street_address(NEW.token_info);
 
     -- We have to find our parent road.
     NEW.parent_place_id := find_parent_for_poi(NEW.osm_type, NEW.osm_id,
                                                NEW.partition,
                                                ST_Envelope(NEW.geometry),
 
     -- We have to find our parent road.
     NEW.parent_place_id := find_parent_for_poi(NEW.osm_type, NEW.osm_id,
                                                NEW.partition,
                                                ST_Envelope(NEW.geometry),
-                                               addr_street, addr_place);
+                                               NEW.token_info,
+                                               is_place_address);
 
     -- If we found the road take a shortcut here.
     -- Otherwise fall back to the full address getting method below.
 
     -- If we found the road take a shortcut here.
     -- Otherwise fall back to the full address getting method below.
@@ -808,12 +1096,12 @@ BEGIN
       SELECT p.country_code, p.postcode, p.name FROM placex p
        WHERE p.place_id = NEW.parent_place_id INTO location;
 
       SELECT p.country_code, p.postcode, p.name FROM placex p
        WHERE p.place_id = NEW.parent_place_id INTO location;
 
-      IF addr_street is null and addr_place is not null THEN
+      IF is_place_address and NEW.address ? 'place' THEN
         -- Check if the addr:place tag is part of the parent name
         SELECT count(*) INTO i
         -- Check if the addr:place tag is part of the parent name
         SELECT count(*) INTO i
-          FROM svals(location.name) AS pname WHERE pname = addr_place;
+          FROM svals(location.name) AS pname WHERE pname = NEW.address->'place';
         IF i = 0 THEN
         IF i = 0 THEN
-          NEW.address = NEW.address || hstore('_unlisted_place', addr_place);
+          NEW.address = NEW.address || hstore('_unlisted_place', NEW.address->'place');
         END IF;
       END IF;
 
         END IF;
       END IF;
 
@@ -821,39 +1109,21 @@ BEGIN
       {% if debug %}RAISE WARNING 'Got parent details from search name';{% endif %}
 
       -- determine postcode
       {% if debug %}RAISE WARNING 'Got parent details from search name';{% endif %}
 
       -- determine postcode
-      IF NEW.address is not null AND NEW.address ? 'postcode' THEN
-          NEW.postcode = upper(trim(NEW.address->'postcode'));
-      ELSE
-         NEW.postcode := location.postcode;
-      END IF;
-      IF NEW.postcode is null THEN
-        NEW.postcode := get_nearest_postcode(NEW.country_code, NEW.geometry);
-      END IF;
+      NEW.postcode := coalesce(token_get_postcode(NEW.token_info),
+                               location.postcode,
+                               get_nearest_postcode(NEW.country_code, NEW.centroid));
 
       IF NEW.name is not NULL THEN
           NEW.name := add_default_place_name(NEW.country_code, NEW.name);
 
       IF NEW.name is not NULL THEN
           NEW.name := add_default_place_name(NEW.country_code, NEW.name);
-          name_vector := make_keywords(NEW.name);
-
-          IF NEW.rank_search <= 25 and NEW.rank_address > 0 THEN
-            result := add_location(NEW.place_id, NEW.country_code, NEW.partition,
-                                   name_vector, NEW.rank_search, NEW.rank_address,
-                                   upper(trim(NEW.address->'postcode')), NEW.geometry,
-                                   NEW.centroid);
-            {% if debug %}RAISE WARNING 'Place added to location table';{% endif %}
-          END IF;
-
       END IF;
 
       {% if not db.reverse_only %}
       END IF;
 
       {% if not db.reverse_only %}
-      IF array_length(name_vector, 1) is not NULL
-         OR NEW.address is not NULL
-      THEN
+      IF NEW.name is not NULL OR NEW.address is not NULL THEN
         SELECT * INTO name_vector, nameaddress_vector
           FROM create_poi_search_terms(NEW.place_id,
                                        NEW.partition, NEW.parent_place_id,
         SELECT * INTO name_vector, nameaddress_vector
           FROM create_poi_search_terms(NEW.place_id,
                                        NEW.partition, NEW.parent_place_id,
-                                       NEW.address,
-                                       NEW.country_code, NEW.housenumber,
-                                       name_vector, NEW.centroid);
+                                       is_place_address, NEW.country_code,
+                                       NEW.token_info, NEW.centroid);
 
         IF array_length(name_vector, 1) is not NULL THEN
           INSERT INTO search_name (place_id, search_rank, address_rank,
 
         IF array_length(name_vector, 1) is not NULL THEN
           INSERT INTO search_name (place_id, search_rank, address_rank,
@@ -867,15 +1137,7 @@ BEGIN
       END IF;
       {% endif %}
 
       END IF;
       {% endif %}
 
-      -- If the address was inherited from a surrounding building,
-      -- do not add it permanently to the table.
-      IF NEW.address ? '_inherited' THEN
-        IF NEW.address ? '_unlisted_place' THEN
-          NEW.address := hstore('_unlisted_place', NEW.address->'_unlisted_place');
-        ELSE
-          NEW.address := null;
-        END IF;
-      END IF;
+      NEW.token_info := token_strip_info(NEW.token_info);
 
       RETURN NEW;
     END IF;
 
       RETURN NEW;
     END IF;
@@ -885,8 +1147,16 @@ BEGIN
   -- ---------------------------------------------------------------------------
   -- Full indexing
   {% if debug %}RAISE WARNING 'Using full index mode for % %', NEW.osm_type, NEW.osm_id;{% endif %}
   -- ---------------------------------------------------------------------------
   -- Full indexing
   {% if debug %}RAISE WARNING 'Using full index mode for % %', NEW.osm_type, NEW.osm_id;{% endif %}
-  SELECT * INTO location FROM find_linked_place(NEW);
-  IF location.place_id is not null THEN
+  IF linked_place is not null THEN
+    -- Recompute the ranks here as the ones from the linked place might
+    -- have been shifted to accommodate surrounding boundaries.
+    SELECT place_id, osm_id, class, type, extratags, rank_search,
+           centroid, geometry,
+           (compute_place_rank(country_code, osm_type, class, type, admin_level,
+                              (extratags->'capital') = 'yes', null)).*
+      INTO location
+      FROM placex WHERE place_id = linked_place;
+
     {% if debug %}RAISE WARNING 'Linked %', location;{% endif %}
 
     -- Use the linked point as the centre point of the geometry,
     {% if debug %}RAISE WARNING 'Linked %', location;{% endif %}
 
     -- Use the linked point as the centre point of the geometry,
@@ -896,16 +1166,11 @@ BEGIN
         NEW.centroid := geom;
     END IF;
 
         NEW.centroid := geom;
     END IF;
 
-    {% if debug %}RAISE WARNING 'parent address: % rank address: %', parent_address_level, location.rank_address;{% endif %}
-    IF location.rank_address > parent_address_level
-       and location.rank_address < 26
+    {% if debug %}RAISE WARNING 'parent address: % rank address: %', parent_address_level, location.address_rank;{% endif %}
+    IF location.address_rank > parent_address_level
+       and location.address_rank < 26
     THEN
     THEN
-      NEW.rank_address := location.rank_address;
-    END IF;
-
-    -- merge in the label name
-    IF NOT location.name IS NULL THEN
-      NEW.name := location.name || NEW.name;
+      NEW.rank_address := location.address_rank;
     END IF;
 
     -- merge in extra tags
     END IF;
 
     -- merge in extra tags
@@ -914,7 +1179,9 @@ BEGIN
                      || coalesce(NEW.extratags, ''::hstore);
 
     -- mark the linked place (excludes from search results)
                      || coalesce(NEW.extratags, ''::hstore);
 
     -- mark the linked place (excludes from search results)
-    UPDATE placex set linked_place_id = NEW.place_id
+    -- Force reindexing to remove any traces from the search indexes and
+    -- reset the address rank if necessary.
+    UPDATE placex set linked_place_id = NEW.place_id, indexed_status = 2
       WHERE place_id = location.place_id;
     -- ensure that those places are not found anymore
     {% if 'search_name' in db.tables %}
       WHERE place_id = location.place_id;
     -- ensure that those places are not found anymore
     {% if 'search_name' in db.tables %}
@@ -924,7 +1191,7 @@ BEGIN
 
     SELECT wikipedia, importance
       FROM compute_importance(location.extratags, NEW.country_code,
 
     SELECT wikipedia, importance
       FROM compute_importance(location.extratags, NEW.country_code,
-                              'N', location.osm_id)
+                              location.rank_search, NEW.centroid)
       INTO linked_wikipedia,linked_importance;
 
     -- Use the maximum importance if one could be computed from the linked object.
       INTO linked_wikipedia,linked_importance;
 
     -- Use the maximum importance if one could be computed from the linked object.
@@ -936,7 +1203,7 @@ BEGIN
   ELSE
     -- No linked place? As a last resort check if the boundary is tagged with
     -- a place type and adapt the rank address.
   ELSE
     -- No linked place? As a last resort check if the boundary is tagged with
     -- a place type and adapt the rank address.
-    IF NEW.rank_address > 0 and NEW.extratags ? 'place' THEN
+    IF NEW.rank_address between 4 and 25 and NEW.extratags ? 'place' THEN
       SELECT address_rank INTO place_address_level
         FROM compute_place_rank(NEW.country_code, 'A', 'place',
                                 NEW.extratags->'place', 0::SMALLINT, False, null);
       SELECT address_rank INTO place_address_level
         FROM compute_place_rank(NEW.country_code, 'A', 'place',
                                 NEW.extratags->'place', 0::SMALLINT, False, null);
@@ -947,28 +1214,35 @@ BEGIN
     END IF;
   END IF;
 
     END IF;
   END IF;
 
-  -- Initialise the name vector using our name
-  NEW.name := add_default_place_name(NEW.country_code, NEW.name);
-  name_vector := make_keywords(NEW.name);
+  {% if not disable_diff_updates %}
+  IF OLD.rank_address != NEW.rank_address THEN
+    -- After a rank shift all addresses containing us must be updated.
+    UPDATE placex p SET indexed_status = 2 FROM place_addressline pa
+      WHERE pa.address_place_id = NEW.place_id and p.place_id = pa.place_id
+            and p.indexed_status = 0 and p.rank_address between 4 and 25;
+  END IF;
+  {% endif %}
 
 
-  -- make sure all names are in the word table
   IF NEW.admin_level = 2
      AND NEW.class = 'boundary' AND NEW.type = 'administrative'
      AND NEW.country_code IS NOT NULL AND NEW.osm_type = 'R'
   THEN
   IF NEW.admin_level = 2
      AND NEW.class = 'boundary' AND NEW.type = 'administrative'
      AND NEW.country_code IS NOT NULL AND NEW.osm_type = 'R'
   THEN
-    PERFORM create_country(NEW.name, lower(NEW.country_code));
-    {% if debug %}RAISE WARNING 'Country names updated';{% endif %}
-
-    -- Also update the list of country names. Adding an additional sanity
-    -- check here: make sure the country does overlap with the area where
-    -- we expect it to be as per static country grid.
+    -- Update the list of country names.
+    -- Only take the name from the largest area for the given country code
+    -- in the hope that this is the authoritative one.
+    -- Also replace any old names so that all mapping mistakes can
+    -- be fixed through regular OSM updates.
     FOR location IN
     FOR location IN
-      SELECT country_code FROM country_osm_grid
-       WHERE ST_Covers(geometry, NEW.centroid) and country_code = NEW.country_code
+      SELECT osm_id FROM placex
+       WHERE rank_search = 4 and osm_type = 'R'
+             and country_code = NEW.country_code
+       ORDER BY ST_Area(geometry) desc
        LIMIT 1
     LOOP
        LIMIT 1
     LOOP
-      {% if debug %}RAISE WARNING 'Updating names for country '%' with: %', NEW.country_code, NEW.name;{% endif %}
-      UPDATE country_name SET name = name || NEW.name WHERE country_code = NEW.country_code;
+      IF location.osm_id = NEW.osm_id THEN
+        {% if debug %}RAISE WARNING 'Updating names for country '%' with: %', NEW.country_code, NEW.name;{% endif %}
+        UPDATE country_name SET derived_name = NEW.name WHERE country_code = NEW.country_code;
+      END IF;
     END LOOP;
   END IF;
 
     END LOOP;
   END IF;
 
@@ -991,30 +1265,31 @@ BEGIN
     END IF;
   ELSEIF NEW.rank_address > 25 THEN
     max_rank := 25;
     END IF;
   ELSEIF NEW.rank_address > 25 THEN
     max_rank := 25;
+  ELSEIF NEW.class in ('place','boundary') and NEW.type in ('postcode','postal_code') THEN
+    max_rank := NEW.rank_search;
   ELSE
   ELSE
-    max_rank = NEW.rank_address;
+    max_rank := NEW.rank_address;
   END IF;
 
   SELECT * FROM insert_addresslines(NEW.place_id, NEW.partition, max_rank,
   END IF;
 
   SELECT * FROM insert_addresslines(NEW.place_id, NEW.partition, max_rank,
-                                    NEW.address, geom, NEW.country_code)
+                                    NEW.token_info, geom, NEW.centroid,
+                                    NEW.country_code)
     INTO NEW.parent_place_id, NEW.postcode, nameaddress_vector;
 
   {% if debug %}RAISE WARNING 'RETURN insert_addresslines: %, %, %', NEW.parent_place_id, NEW.postcode, nameaddress_vector;{% endif %}
 
     INTO NEW.parent_place_id, NEW.postcode, nameaddress_vector;
 
   {% if debug %}RAISE WARNING 'RETURN insert_addresslines: %, %, %', NEW.parent_place_id, NEW.postcode, nameaddress_vector;{% endif %}
 
-  IF NEW.address is not null AND NEW.address ? 'postcode' 
-     AND NEW.address->'postcode' not similar to '%(,|;)%' THEN
-    NEW.postcode := upper(trim(NEW.address->'postcode'));
-  END IF;
-
-  IF NEW.postcode is null AND NEW.rank_search > 8 THEN
-    NEW.postcode := get_nearest_postcode(NEW.country_code, NEW.geometry);
-  END IF;
+  NEW.postcode := coalesce(token_get_postcode(NEW.token_info), NEW.postcode);
 
   -- if we have a name add this to the name search table
   IF NEW.name IS NOT NULL THEN
 
   -- if we have a name add this to the name search table
   IF NEW.name IS NOT NULL THEN
+    -- Initialise the name vector using our name
+    NEW.name := add_default_place_name(NEW.country_code, NEW.name);
+    name_vector := token_get_name_search_tokens(NEW.token_info);
 
     IF NEW.rank_search <= 25 and NEW.rank_address > 0 THEN
 
     IF NEW.rank_search <= 25 and NEW.rank_address > 0 THEN
-      result := add_location(NEW.place_id, NEW.country_code, NEW.partition, name_vector, NEW.rank_search, NEW.rank_address, upper(trim(NEW.address->'postcode')), NEW.geometry, NEW.centroid);
+      result := add_location(NEW.place_id, NEW.country_code, NEW.partition,
+                             name_vector, NEW.rank_search, NEW.rank_address,
+                             NEW.postcode, NEW.geometry, NEW.centroid);
       {% if debug %}RAISE WARNING 'added to location (full)';{% endif %}
     END IF;
 
       {% if debug %}RAISE WARNING 'added to location (full)';{% endif %}
     END IF;
 
@@ -1023,8 +1298,11 @@ BEGIN
       {% if debug %}RAISE WARNING 'insert into road location table (full)';{% endif %}
     END IF;
 
       {% if debug %}RAISE WARNING 'insert into road location table (full)';{% endif %}
     END IF;
 
-    result := insertSearchName(NEW.partition, NEW.place_id, name_vector,
-                               NEW.rank_search, NEW.rank_address, NEW.geometry);
+    IF NEW.rank_address between 16 and 27 THEN
+      result := insertSearchName(NEW.partition, NEW.place_id,
+                                 token_get_name_match_tokens(NEW.token_info),
+                                 NEW.rank_search, NEW.rank_address, NEW.geometry);
+    END IF;
     {% if debug %}RAISE WARNING 'added to search name (full)';{% endif %}
 
     {% if not db.reverse_only %}
     {% if debug %}RAISE WARNING 'added to search name (full)';{% endif %}
 
     {% if not db.reverse_only %}
@@ -1035,11 +1313,21 @@ BEGIN
                        NEW.importance, NEW.country_code, name_vector,
                        nameaddress_vector, NEW.centroid);
     {% endif %}
                        NEW.importance, NEW.country_code, name_vector,
                        nameaddress_vector, NEW.centroid);
     {% endif %}
+  END IF;
 
 
+  IF NEW.postcode is null AND NEW.rank_search > 8
+     AND (NEW.rank_address > 0
+          OR ST_GeometryType(NEW.geometry) not in ('ST_LineString','ST_MultiLineString')
+          OR ST_Length(NEW.geometry) < 0.02)
+  THEN
+    NEW.postcode := get_nearest_postcode(NEW.country_code,
+                                         CASE WHEN NEW.rank_address > 25
+                                              THEN NEW.centroid ELSE NEW.geometry END);
   END IF;
 
   END IF;
 
-  {% if debug %}RAISE WARNING 'place update % % finsihed.', NEW.osm_type, NEW.osm_id;{% endif %}
+  {% if debug %}RAISE WARNING 'place update % % finished.', NEW.osm_type, NEW.osm_id;{% endif %}
 
 
+  NEW.token_info := token_strip_info(NEW.token_info);
   RETURN NEW;
 END;
 $$
   RETURN NEW;
 END;
 $$
@@ -1115,6 +1403,8 @@ BEGIN
 
   {% if debug %}RAISE WARNING 'placex_delete:12 % %',OLD.osm_type,OLD.osm_id;{% endif %}
 
 
   {% if debug %}RAISE WARNING 'placex_delete:12 % %',OLD.osm_type,OLD.osm_id;{% endif %}
 
+  UPDATE location_postcode SET indexed_status = 2 WHERE parent_place_id = OLD.place_id;
+
   RETURN OLD;
 
 END;
   RETURN OLD;
 
 END;