internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
-from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, Any
+from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, Any, Union
import enum
import dataclasses
import datetime as dt
import sqlalchemy as sa
-from nominatim.typing import SaSelect, SaRow
+from nominatim.typing import SaSelect, SaRow, SaColumn
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
rank_search=row.rank_search,
importance=row.importance,
country_code=row.country_code,
- centroid=Point.from_wkb(row.centroid.data),
+ centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
address=row.address,
postcode=row.postcode,
country_code=row.country_code,
- centroid=Point.from_wkb(row.centroid.data),
+ centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
if hnr is None:
def create_from_tiger_row(row: Optional[SaRow],
- class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ class_type: Type[BaseResultT],
+ osm_type: Optional[str] = None,
+ osm_id: Optional[int] = None) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the Tiger data interpolation table. 'class_type' defines
the type of result to return. Returns None if the row is None.
res = class_type(source_table=SourceTable.TIGER,
place_id=row.place_id,
- osm_object=(row.osm_type, row.osm_id),
+ osm_object=(osm_type or row.osm_type, osm_id or row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
postcode=row.postcode,
country_code='us',
- centroid=Point.from_wkb(row.centroid.data),
+ centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
if hnr is None:
rank_search=row.rank_search,
rank_address=row.rank_address,
country_code=row.country_code,
- centroid=Point.from_wkb(row.centroid.data),
+ centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
return class_type(source_table=SourceTable.COUNTRY,
category=('place', 'country'),
- centroid=Point.from_wkb(row.centroid.data),
+ centroid=Point.from_wkb(row.centroid),
names=row.name,
rank_address=4, rank_search=4,
country_code=row.country_code)
distance=row.distance)
+def _get_housenumber_details(results: List[BaseResultT]) -> Tuple[List[int], List[int]]:
+ places = []
+ hnrs = []
+ for result in results:
+ if result.place_id:
+ housenumber = -1
+ if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
+ if result.housenumber is not None:
+ housenumber = int(result.housenumber)
+ elif result.extratags is not None and 'startnumber' in result.extratags:
+ # details requests do not come with a specific house number
+ housenumber = int(result.extratags['startnumber'])
+ places.append(result.place_id)
+ hnrs.append(housenumber)
+
+ return places, hnrs
+
+
async def complete_address_details(conn: SearchConnection, results: List[BaseResultT]) -> None:
""" Retrieve information about places that make up the address of the result.
"""
- def get_hnr(result: BaseResult) -> Tuple[int, int]:
- housenumber = -1
- if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
- if result.housenumber is not None:
- housenumber = int(result.housenumber)
- elif result.extratags is not None and 'startnumber' in result.extratags:
- # details requests do not come with a specific house number
- housenumber = int(result.extratags['startnumber'])
- assert result.place_id
- return result.place_id, housenumber
-
- data: List[Tuple[Any, ...]] = [get_hnr(r) for r in results if r.place_id]
-
- if not data:
+ places, hnrs = _get_housenumber_details(results)
+
+ if not places:
return
- values = sa.values(sa.column('place_id', type_=sa.Integer),
- sa.column('housenumber', type_=sa.Integer),
- name='places',
- literal_binds=True).data(data)
-
- sfn = sa.func.get_addressdata(values.c.place_id, values.c.housenumber)\
- .table_valued( # type: ignore[no-untyped-call]
- sa.column('place_id', type_=sa.Integer),
- 'osm_type',
- sa.column('osm_id', type_=sa.BigInteger),
- sa.column('name', type_=conn.t.types.Composite),
- 'class', 'type', 'place_type',
- sa.column('admin_level', type_=sa.Integer),
- sa.column('fromarea', type_=sa.Boolean),
- sa.column('isaddress', type_=sa.Boolean),
- sa.column('rank_address', type_=sa.SmallInteger),
- sa.column('distance', type_=sa.Float),
- joins_implicitly=True)
-
- sql = sa.select(values.c.place_id.label('result_place_id'), sfn)\
- .order_by(values.c.place_id,
+ def _get_addressdata(place_id: Union[int, SaColumn], hnr: Union[int, SaColumn]) -> Any:
+ return sa.func.get_addressdata(place_id, hnr)\
+ .table_valued( # type: ignore[no-untyped-call]
+ sa.column('place_id', type_=sa.Integer),
+ 'osm_type',
+ sa.column('osm_id', type_=sa.BigInteger),
+ sa.column('name', type_=conn.t.types.Composite),
+ 'class', 'type', 'place_type',
+ sa.column('admin_level', type_=sa.Integer),
+ sa.column('fromarea', type_=sa.Boolean),
+ sa.column('isaddress', type_=sa.Boolean),
+ sa.column('rank_address', type_=sa.SmallInteger),
+ sa.column('distance', type_=sa.Float),
+ joins_implicitly=True)
+
+
+ if len(places) == 1:
+ # Optimized case for exactly one result (reverse)
+ sql = sa.select(_get_addressdata(places[0], hnrs[0]))\
+ .order_by(sa.column('rank_address').desc(),
+ sa.column('isaddress').desc())
+
+ alines = AddressLines()
+ for row in await conn.execute(sql):
+ alines.append(_result_row_to_address_row(row))
+
+ for result in results:
+ if result.place_id == places[0]:
+ result.address_rows = alines
+ return
+
+
+ darray = sa.func.unnest(conn.t.types.to_array(places), conn.t.types.to_array(hnrs))\
+ .table_valued( # type: ignore[no-untyped-call]
+ sa.column('place_id', type_= sa.Integer),
+ sa.column('housenumber', type_= sa.Integer)
+ ).render_derived()
+
+ sfn = _get_addressdata(darray.c.place_id, darray.c.housenumber)
+
+ sql = sa.select(darray.c.place_id.label('result_place_id'), sfn)\
+ .order_by(darray.c.place_id,
sa.column('rank_address').desc(),
sa.column('isaddress').desc())