internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
-from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, Any, Union
+from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, cast
import enum
import dataclasses
import datetime as dt
import sqlalchemy as sa
-from nominatim.typing import SaSelect, SaRow, SaColumn
+from nominatim.typing import SaSelect, SaRow
+from nominatim.db.sqlalchemy_types import Geometry
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
and its function as an address object. Most fields are optional.
Their presence depends on the kind and function of the address part.
"""
- place_id: Optional[int]
- """ Internal ID of the place.
- """
- osm_object: Optional[Tuple[str, int]]
- """ OSM type and ID of the place, if such an object exists.
- """
category: Tuple[str, str]
""" Main category of the place, described by a key-value pair.
"""
""" All available names for the place including references, alternative
names and translations.
"""
- extratags: Optional[Dict[str, str]]
- """ Any extra information available about the place. This is a dictionary
- that usually contains OSM tag key-value pairs.
- """
-
- admin_level: Optional[int]
- """ The administrative level of a boundary as tagged in the input data.
- This field is only meaningful for places of the category
- (boundary, administrative).
- """
fromarea: bool
""" If true, then the exact area of the place is known. Without area
information, Nominatim has to make an educated guess if an address
distance: float
""" Distance in degrees between the result place and this address part.
"""
+ place_id: Optional[int] = None
+ """ Internal ID of the place.
+ """
+ osm_object: Optional[Tuple[str, int]] = None
+ """ OSM type and ID of the place, if such an object exists.
+ """
+ extratags: Optional[Dict[str, str]] = None
+ """ Any extra information available about the place. This is a dictionary
+ that usually contains OSM tag key-value pairs.
+ """
+
+ admin_level: Optional[int] = None
+ """ The administrative level of a boundary as tagged in the input data.
+ This field is only meaningful for places of the category
+ (boundary, administrative).
+ """
local_name: Optional[str] = None
""" Place holder for localization of this address part. See
place_id : Optional[int] = None
osm_object: Optional[Tuple[str, int]] = None
+ parent_place_id: Optional[int] = None
+ linked_place_id: Optional[int] = None
+ admin_level: int = 15
locale_name: Optional[str] = None
display_name: Optional[str] = None
""" A search result with more internal information from the database
added.
"""
- parent_place_id: Optional[int] = None
- linked_place_id: Optional[int] = None
- admin_level: int = 15
indexed_date: Optional[dt.datetime] = None
May be empty when no result was found.
"""
- def localize(self, locales: Locales) -> None:
- """ Apply the given locales to all results.
- """
- for result in self:
- result.localize(locales)
-
def _filter_geometries(row: SaRow) -> Dict[str, str]:
return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
place_id=row.place_id,
osm_object=(row.osm_type, row.osm_id),
category=(row.class_, row.type),
+ parent_place_id = row.parent_place_id,
+ linked_place_id = getattr(row, 'linked_place_id', None),
+ admin_level = getattr(row, 'admin_level', 15),
names=_mingle_name_tags(row.name),
address=row.address,
extratags=row.extratags,
res = class_type(source_table=SourceTable.OSMLINE,
place_id=row.place_id,
+ parent_place_id = row.parent_place_id,
osm_object=('W', row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
address=row.address,
res = class_type(source_table=SourceTable.TIGER,
place_id=row.place_id,
+ parent_place_id = row.parent_place_id,
osm_object=(osm_type or row.osm_type, osm_id or row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
postcode=row.postcode,
return class_type(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
+ parent_place_id = row.parent_place_id,
category=('place', 'postcode'),
names={'ref': row.postcode},
rank_search=row.rank_search,
centroid=Point.from_wkb(row.centroid),
names=row.name,
rank_address=4, rank_search=4,
- country_code=row.country_code)
+ country_code=row.country_code,
+ geometry=_filter_geometries(row))
async def add_result_details(conn: SearchConnection, results: List[BaseResultT],
log().comment('Query keywords')
for result in results:
await complete_keywords(conn, result)
+ for result in results:
+ result.localize(details.locales)
-def _result_row_to_address_row(row: SaRow) -> AddressLine:
+def _result_row_to_address_row(row: SaRow, isaddress: Optional[bool] = None) -> AddressLine:
""" Create a new AddressLine from the results of a datbase query.
"""
- extratags: Dict[str, str] = getattr(row, 'extratags', {})
- if hasattr(row, 'place_type') and row.place_type:
- extratags['place'] = row.place_type
+ extratags: Dict[str, str] = getattr(row, 'extratags', {}) or {}
+ if 'linked_place' in extratags:
+ extratags['place'] = extratags['linked_place']
names = _mingle_name_tags(row.name) or {}
if getattr(row, 'housenumber', None) is not None:
names['housenumber'] = row.housenumber
+ if isaddress is None:
+ isaddress = getattr(row, 'isaddress', True)
+
return AddressLine(place_id=row.place_id,
osm_object=None if row.osm_type is None else (row.osm_type, row.osm_id),
category=(getattr(row, 'class'), row.type),
extratags=extratags,
admin_level=row.admin_level,
fromarea=row.fromarea,
- isaddress=getattr(row, 'isaddress', True),
+ isaddress=isaddress,
rank_address=row.rank_address,
distance=row.distance)
-def _get_housenumber_details(results: List[BaseResultT]) -> Tuple[List[int], List[int]]:
- places = []
- hnrs = []
- for result in results:
- if result.place_id:
- housenumber = -1
- if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
- if result.housenumber is not None:
- housenumber = int(result.housenumber)
- elif result.extratags is not None and 'startnumber' in result.extratags:
- # details requests do not come with a specific house number
- housenumber = int(result.extratags['startnumber'])
- places.append(result.place_id)
- hnrs.append(housenumber)
-
- return places, hnrs
+def _get_address_lookup_id(result: BaseResultT) -> int:
+ assert result.place_id
+ if result.source_table != SourceTable.PLACEX or result.rank_search > 27:
+ return result.parent_place_id or result.place_id
+
+ return result.linked_place_id or result.place_id
+
+
+async def _finalize_entry(conn: SearchConnection, result: BaseResultT) -> None:
+ assert result.address_rows is not None
+ postcode = result.postcode
+ if not postcode and result.address:
+ postcode = result.address.get('postcode')
+ if postcode and ',' not in postcode and ';' not in postcode:
+ result.address_rows.append(AddressLine(
+ category=('place', 'postcode'),
+ names={'ref': postcode},
+ fromarea=False, isaddress=True, rank_address=5,
+ distance=0.0))
+ if result.country_code:
+ async def _get_country_names() -> Optional[Dict[str, str]]:
+ t = conn.t.country_name
+ sql = sa.select(t.c.name, t.c.derived_name)\
+ .where(t.c.country_code == result.country_code)
+ for cres in await conn.execute(sql):
+ names = cast(Dict[str, str], cres[0])
+ if cres[1]:
+ names.update(cast(Dict[str, str], cres[1]))
+ return names
+ return None
+
+ country_names = await conn.get_cached_value('COUNTRY_NAME',
+ result.country_code,
+ _get_country_names)
+ if country_names:
+ result.address_rows.append(AddressLine(
+ category=('place', 'country'),
+ names=country_names,
+ fromarea=False, isaddress=True, rank_address=4,
+ distance=0.0))
+ result.address_rows.append(AddressLine(
+ category=('place', 'country_code'),
+ names={'ref': result.country_code}, extratags = {},
+ fromarea=True, isaddress=False, rank_address=4,
+ distance=0.0))
+
+
+def _setup_address_details(result: BaseResultT) -> None:
+ """ Retrieve information about places that make up the address of the result.
+ """
+ result.address_rows = AddressLines()
+ if result.names:
+ result.address_rows.append(AddressLine(
+ place_id=result.place_id,
+ osm_object=result.osm_object,
+ category=result.category,
+ names=result.names,
+ extratags=result.extratags or {},
+ admin_level=result.admin_level,
+ fromarea=True, isaddress=True,
+ rank_address=result.rank_address or 100, distance=0.0))
+ if result.source_table == SourceTable.PLACEX and result.address:
+ housenumber = result.address.get('housenumber')\
+ or result.address.get('streetnumber')\
+ or result.address.get('conscriptionnumber')
+ elif result.housenumber:
+ housenumber = result.housenumber
+ else:
+ housenumber = None
+ if housenumber:
+ result.address_rows.append(AddressLine(
+ category=('place', 'house_number'),
+ names={'ref': housenumber},
+ fromarea=True, isaddress=True, rank_address=28, distance=0))
+ if result.address and '_unlisted_place' in result.address:
+ result.address_rows.append(AddressLine(
+ category=('place', 'locality'),
+ names={'name': result.address['_unlisted_place']},
+ fromarea=False, isaddress=True, rank_address=25, distance=0))
async def complete_address_details(conn: SearchConnection, results: List[BaseResultT]) -> None:
""" Retrieve information about places that make up the address of the result.
"""
- places, hnrs = _get_housenumber_details(results)
-
- if not places:
- return
+ for result in results:
+ _setup_address_details(result)
- def _get_addressdata(place_id: Union[int, SaColumn], hnr: Union[int, SaColumn]) -> Any:
- return sa.func.get_addressdata(place_id, hnr)\
- .table_valued( # type: ignore[no-untyped-call]
- sa.column('place_id', type_=sa.Integer),
- 'osm_type',
- sa.column('osm_id', type_=sa.BigInteger),
- sa.column('name', type_=conn.t.types.Composite),
- 'class', 'type', 'place_type',
- sa.column('admin_level', type_=sa.Integer),
- sa.column('fromarea', type_=sa.Boolean),
- sa.column('isaddress', type_=sa.Boolean),
- sa.column('rank_address', type_=sa.SmallInteger),
- sa.column('distance', type_=sa.Float),
- joins_implicitly=True)
-
-
- if len(places) == 1:
- # Optimized case for exactly one result (reverse)
- sql = sa.select(_get_addressdata(places[0], hnrs[0]))\
- .order_by(sa.column('rank_address').desc(),
- sa.column('isaddress').desc())
-
- alines = AddressLines()
- for row in await conn.execute(sql):
- alines.append(_result_row_to_address_row(row))
+ ### Lookup entries from place_address line
- for result in results:
- if result.place_id == places[0]:
- result.address_rows = alines
- return
+ lookup_ids = [{'pid': r.place_id,
+ 'lid': _get_address_lookup_id(r),
+ 'names': list(r.address.values()) if r.address else [],
+ 'c': ('SRID=4326;' + r.centroid.to_wkt()) if r.centroid else '' }
+ for r in results if r.place_id]
+ if not lookup_ids:
+ return
- darray = sa.func.unnest(conn.t.types.to_array(places), conn.t.types.to_array(hnrs))\
- .table_valued( # type: ignore[no-untyped-call]
- sa.column('place_id', type_= sa.Integer),
- sa.column('housenumber', type_= sa.Integer)
- ).render_derived()
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(lookup_ids, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
- sfn = _get_addressdata(darray.c.place_id, darray.c.housenumber)
+ t = conn.t.placex
+ taddr = conn.t.addressline
+
+ sql = sa.select(ltab.c.value['pid'].as_integer().label('src_place_id'),
+ t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type, t.c.extratags,
+ t.c.admin_level, taddr.c.fromarea,
+ sa.case((t.c.rank_address == 11, 5),
+ else_=t.c.rank_address).label('rank_address'),
+ taddr.c.distance, t.c.country_code, t.c.postcode)\
+ .join(taddr, sa.or_(taddr.c.place_id == ltab.c.value['pid'].as_integer(),
+ taddr.c.place_id == ltab.c.value['lid'].as_integer()))\
+ .join(t, taddr.c.address_place_id == t.c.place_id)\
+ .order_by('src_place_id')\
+ .order_by(sa.column('rank_address').desc())\
+ .order_by((taddr.c.place_id == ltab.c.value['pid'].as_integer()).desc())\
+ .order_by(sa.case((sa.func.CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
+ (taddr.c.isaddress, 0),
+ (sa.and_(taddr.c.fromarea,
+ t.c.geometry.ST_Contains(
+ sa.func.ST_GeomFromEWKT(
+ ltab.c.value['c'].as_string()))), 1),
+ else_=-1).desc())\
+ .order_by(taddr.c.fromarea.desc())\
+ .order_by(taddr.c.distance.desc())\
+ .order_by(t.c.rank_search.desc())
- sql = sa.select(darray.c.place_id.label('result_place_id'), sfn)\
- .order_by(darray.c.place_id,
- sa.column('rank_address').desc(),
- sa.column('isaddress').desc())
current_result = None
+ current_rank_address = -1
for row in await conn.execute(sql):
- if current_result is None or row.result_place_id != current_result.place_id:
- for result in results:
- if result.place_id == row.result_place_id:
- current_result = result
- break
+ if current_result is None or row.src_place_id != current_result.place_id:
+ current_result = next((r for r in results if r.place_id == row.src_place_id), None)
+ assert current_result is not None
+ current_rank_address = -1
+
+ location_isaddress = row.rank_address != current_rank_address
+
+ if current_result.country_code is None and row.country_code:
+ current_result.country_code = row.country_code
+
+ if row.type in ('postcode', 'postal_code') and location_isaddress:
+ if not row.fromarea or \
+ (current_result.address and 'postcode' in current_result.address):
+ location_isaddress = False
else:
- assert False
- current_result.address_rows = AddressLines()
- current_result.address_rows.append(_result_row_to_address_row(row))
+ current_result.postcode = None
+
+ assert current_result.address_rows is not None
+ current_result.address_rows.append(_result_row_to_address_row(row, location_isaddress))
+ current_rank_address = row.rank_address
+
+ for result in results:
+ await _finalize_entry(conn, result)
+
+
+ ### Finally add the record for the parent entry where necessary.
+
+ parent_lookup_ids = list(filter(lambda e: e['pid'] != e['lid'], lookup_ids))
+ if parent_lookup_ids:
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(parent_lookup_ids, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
+ sql = sa.select(ltab.c.value['pid'].as_integer().label('src_place_id'),
+ t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
+ t.c.class_, t.c.type, t.c.extratags,
+ t.c.admin_level,
+ t.c.rank_address)\
+ .where(t.c.place_id == ltab.c.value['lid'].as_integer())
+
+ for row in await conn.execute(sql):
+ current_result = next((r for r in results if r.place_id == row.src_place_id), None)
+ assert current_result is not None
+ assert current_result.address_rows is not None
+
+ current_result.address_rows.append(AddressLine(
+ place_id=row.place_id,
+ osm_object=(row.osm_type, row.osm_id),
+ category=(row.class_, row.type),
+ names=row.name, extratags=row.extratags or {},
+ admin_level=row.admin_level,
+ fromarea=True, isaddress=True,
+ rank_address=row.rank_address, distance=0.0))
+
+ ### Now sort everything
+ for result in results:
+ assert result.address_rows is not None
+ result.address_rows.sort(key=lambda a: (-a.rank_address, a.isaddress))
-# pylint: disable=consider-using-f-string
def _placex_select_address_row(conn: SearchConnection,
centroid: Point) -> SaSelect:
t = conn.t.placex
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_.label('class'), t.c.type,
t.c.admin_level, t.c.housenumber,
- sa.literal_column("""ST_GeometryType(geometry) in
- ('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
+ t.c.geometry.is_area().label('fromarea'),
t.c.rank_address,
- sa.literal_column(
- """ST_DistanceSpheroid(geometry, 'SRID=4326;POINT(%f %f)'::geometry,
- 'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
- """ % centroid).label('distance'))
+ t.c.geometry.distance_spheroid(
+ sa.bindparam('centroid', value=centroid, type_=Geometry)).label('distance'))
async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
for name_tokens, address_tokens in await conn.execute(sql):
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(name_tokens))):
result.name_keywords.append(WordInfo(*row))
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(address_tokens))):
result.address_keywords.append(WordInfo(*row))