internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
-from typing import Optional, Tuple, Dict, Sequence
+from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, Any, Union
import enum
import dataclasses
import datetime as dt
import sqlalchemy as sa
-from nominatim.typing import SaSelect, SaRow
-from nominatim.api.types import Point, LookupDetails
+from nominatim.typing import SaSelect, SaRow, SaColumn
+from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
+from nominatim.api.logging import log
+from nominatim.api.localization import Locales
# This file defines complex result data classes.
# pylint: disable=too-many-instance-attributes
+def _mingle_name_tags(names: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
+ """ Mix-in names from linked places, so that they show up
+ as standard names where necessary.
+ """
+ if not names:
+ return None
+
+ out = {}
+ for k, v in names.items():
+ if k.startswith('_place_'):
+ outkey = k[7:]
+ out[k if outkey in names else outkey] = v
+ else:
+ out[k] = v
+
+ return out
+
+
class SourceTable(enum.Enum):
""" Enumeration of kinds of results.
"""
rank_address: int
distance: float
+ local_name: Optional[str] = None
+
+
+class AddressLines(List[AddressLine]):
+ """ Sequence of address lines order in descending order by their rank.
+ """
+
+ def localize(self, locales: Locales) -> List[str]:
+ """ Set the local name of address parts according to the chosen
+ locale. Return the list of local names without duplications.
+
+ Only address parts that are marked as isaddress are localized
+ and returned.
+ """
+ label_parts: List[str] = []
+
+ for line in self:
+ if line.isaddress and line.names:
+ line.local_name = locales.display_name(line.names)
+ if not label_parts or label_parts[-1] != line.local_name:
+ label_parts.append(line.local_name)
+
+ return label_parts
-AddressLines = Sequence[AddressLine]
@dataclasses.dataclass
@dataclasses.dataclass
-class SearchResult:
- """ Data class collecting all available information about a search result.
+class BaseResult:
+ """ Data class collecting information common to all
+ types of search results.
"""
source_table: SourceTable
category: Tuple[str, str]
centroid: Point
place_id : Optional[int] = None
- parent_place_id: Optional[int] = None
- linked_place_id: Optional[int] = None
osm_object: Optional[Tuple[str, int]] = None
- admin_level: int = 15
+
+ locale_name: Optional[str] = None
+ display_name: Optional[str] = None
names: Optional[Dict[str, str]] = None
address: Optional[Dict[str, str]] = None
country_code: Optional[str] = None
- indexed_date: Optional[dt.datetime] = None
-
address_rows: Optional[AddressLines] = None
linked_rows: Optional[AddressLines] = None
parented_rows: Optional[AddressLines] = None
geometry: Dict[str, str] = dataclasses.field(default_factory=dict)
-
@property
def lat(self) -> float:
""" Get the latitude (or y) of the center point of the place.
return self.importance or (0.7500001 - (self.rank_search/40.0))
- # pylint: disable=consider-using-f-string
- def centroid_as_geojson(self) -> str:
- """ Get the centroid in GeoJSON format.
+ def localize(self, locales: Locales) -> None:
+ """ Fill the locale_name and the display_name field for the
+ place and, if available, its address information.
"""
- return '{"type": "Point","coordinates": [%f, %f]}' % self.centroid
+ self.locale_name = locales.display_name(self.names)
+ if self.address_rows:
+ self.display_name = ', '.join(self.address_rows.localize(locales))
+ else:
+ self.display_name = self.locale_name
+
+
+
+BaseResultT = TypeVar('BaseResultT', bound=BaseResult)
+
+@dataclasses.dataclass
+class DetailedResult(BaseResult):
+ """ A search result with more internal information from the database
+ added.
+ """
+ parent_place_id: Optional[int] = None
+ linked_place_id: Optional[int] = None
+ admin_level: int = 15
+ indexed_date: Optional[dt.datetime] = None
+
+
+@dataclasses.dataclass
+class ReverseResult(BaseResult):
+ """ A search result for reverse geocoding.
+ """
+ distance: Optional[float] = None
+ bbox: Optional[Bbox] = None
+
+
+class ReverseResults(List[ReverseResult]):
+ """ Sequence of reverse lookup results ordered by distance.
+ May be empty when no result was found.
+ """
+
+
+@dataclasses.dataclass
+class SearchResult(BaseResult):
+ """ A search result for forward geocoding.
+ """
+ bbox: Optional[Bbox] = None
+ accuracy: float = 0.0
+
+
+ @property
+ def ranking(self) -> float:
+ """ Return the ranking, a combined measure of accuracy and importance.
+ """
+ return (self.accuracy if self.accuracy is not None else 1) \
+ - self.calculated_importance()
+
+
+class SearchResults(List[SearchResult]):
+ """ Sequence of forward lookup results ordered by relevance.
+ May be empty when no result was found.
+ """
+
+ def localize(self, locales: Locales) -> None:
+ """ Apply the given locales to all results.
+ """
+ for result in self:
+ result.localize(locales)
def _filter_geometries(row: SaRow) -> Dict[str, str]:
if k.startswith('geometry_')}
-def create_from_placex_row(row: SaRow) -> SearchResult:
- """ Construct a new SearchResult and add the data from the result row
- from the placex table.
+def create_from_placex_row(row: Optional[SaRow],
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ """ Construct a new result and add the data from the result row
+ from the placex table. 'class_type' defines the type of result
+ to return. Returns None if the row is None.
+ """
+ if row is None:
+ return None
+
+ return class_type(source_table=SourceTable.PLACEX,
+ place_id=row.place_id,
+ osm_object=(row.osm_type, row.osm_id),
+ category=(row.class_, row.type),
+ names=_mingle_name_tags(row.name),
+ address=row.address,
+ extratags=row.extratags,
+ housenumber=row.housenumber,
+ postcode=row.postcode,
+ wikipedia=row.wikipedia,
+ rank_address=row.rank_address,
+ rank_search=row.rank_search,
+ importance=row.importance,
+ country_code=row.country_code,
+ centroid=Point.from_wkb(row.centroid),
+ geometry=_filter_geometries(row))
+
+
+def create_from_osmline_row(row: Optional[SaRow],
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ """ Construct a new result and add the data from the result row
+ from the address interpolation table osmline. 'class_type' defines
+ the type of result to return. Returns None if the row is None.
+
+ If the row contains a housenumber, then the housenumber is filled out.
+ Otherwise the result contains the interpolation information in extratags.
"""
- result = SearchResult(source_table=SourceTable.PLACEX,
- place_id=row.place_id,
- parent_place_id=row.parent_place_id,
- linked_place_id=row.linked_place_id,
- osm_object=(row.osm_type, row.osm_id),
- category=(row.class_, row.type),
- admin_level=row.admin_level,
- names=row.name,
- address=row.address,
- extratags=row.extratags,
- housenumber=row.housenumber,
- postcode=row.postcode,
- wikipedia=row.wikipedia,
- rank_address=row.rank_address,
- rank_search=row.rank_search,
- importance=row.importance,
- country_code=row.country_code,
- indexed_date=getattr(row, 'indexed_date'),
- centroid=Point(row.x, row.y),
- geometry = _filter_geometries(row))
-
- return result
-
-
-def create_from_osmline_row(row: SaRow) -> SearchResult:
- """ Construct a new SearchResult and add the data from the result row
- from the osmline table.
+ if row is None:
+ return None
+
+ hnr = getattr(row, 'housenumber', None)
+
+ res = class_type(source_table=SourceTable.OSMLINE,
+ place_id=row.place_id,
+ osm_object=('W', row.osm_id),
+ category=('place', 'houses' if hnr is None else 'house'),
+ address=row.address,
+ postcode=row.postcode,
+ country_code=row.country_code,
+ centroid=Point.from_wkb(row.centroid),
+ geometry=_filter_geometries(row))
+
+ if hnr is None:
+ res.extratags = {'startnumber': str(row.startnumber),
+ 'endnumber': str(row.endnumber),
+ 'step': str(row.step)}
+ else:
+ res.housenumber = str(hnr)
+
+ return res
+
+
+def create_from_tiger_row(row: Optional[SaRow],
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ """ Construct a new result and add the data from the result row
+ from the Tiger data interpolation table. 'class_type' defines
+ the type of result to return. Returns None if the row is None.
+
+ If the row contains a housenumber, then the housenumber is filled out.
+ Otherwise the result contains the interpolation information in extratags.
"""
- result = SearchResult(source_table=SourceTable.OSMLINE,
- place_id=row.place_id,
- parent_place_id=row.parent_place_id,
- osm_object=('W', row.osm_id),
- category=('place', 'houses'),
- address=row.address,
- postcode=row.postcode,
- extratags={'startnumber': str(row.startnumber),
- 'endnumber': str(row.endnumber),
- 'step': str(row.step)},
- country_code=row.country_code,
- indexed_date=getattr(row, 'indexed_date'),
- centroid=Point(row.x, row.y),
- geometry = _filter_geometries(row))
-
- return result
-
-
-async def add_result_details(conn: SearchConnection, result: SearchResult,
+ if row is None:
+ return None
+
+ hnr = getattr(row, 'housenumber', None)
+
+ res = class_type(source_table=SourceTable.TIGER,
+ place_id=row.place_id,
+ osm_object=(row.osm_type, row.osm_id),
+ category=('place', 'houses' if hnr is None else 'house'),
+ postcode=row.postcode,
+ country_code='us',
+ centroid=Point.from_wkb(row.centroid),
+ geometry=_filter_geometries(row))
+
+ if hnr is None:
+ res.extratags = {'startnumber': str(row.startnumber),
+ 'endnumber': str(row.endnumber),
+ 'step': str(row.step)}
+ else:
+ res.housenumber = str(hnr)
+
+ return res
+
+
+def create_from_postcode_row(row: Optional[SaRow],
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ """ Construct a new result and add the data from the result row
+ from the postcode table. 'class_type' defines
+ the type of result to return. Returns None if the row is None.
+ """
+ if row is None:
+ return None
+
+ return class_type(source_table=SourceTable.POSTCODE,
+ place_id=row.place_id,
+ category=('place', 'postcode'),
+ names={'ref': row.postcode},
+ rank_search=row.rank_search,
+ rank_address=row.rank_address,
+ country_code=row.country_code,
+ centroid=Point.from_wkb(row.centroid),
+ geometry=_filter_geometries(row))
+
+
+def create_from_country_row(row: Optional[SaRow],
+ class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
+ """ Construct a new result and add the data from the result row
+ from the fallback country tables. 'class_type' defines
+ the type of result to return. Returns None if the row is None.
+ """
+ if row is None:
+ return None
+
+ return class_type(source_table=SourceTable.COUNTRY,
+ category=('place', 'country'),
+ centroid=Point.from_wkb(row.centroid),
+ names=row.name,
+ rank_address=4, rank_search=4,
+ country_code=row.country_code)
+
+
+async def add_result_details(conn: SearchConnection, results: List[BaseResultT],
details: LookupDetails) -> None:
""" Retrieve more details from the database according to the
parameters specified in 'details'.
"""
- if details.address_details:
- await complete_address_details(conn, result)
- if details.linked_places:
- await complete_linked_places(conn, result)
- if details.parented_places:
- await complete_parented_places(conn, result)
- if details.keywords:
- await complete_keywords(conn, result)
+ if results:
+ log().section('Query details for result')
+ if details.address_details:
+ log().comment('Query address details')
+ await complete_address_details(conn, results)
+ if details.linked_places:
+ log().comment('Query linked places')
+ for result in results:
+ await complete_linked_places(conn, result)
+ if details.parented_places:
+ log().comment('Query parent places')
+ for result in results:
+ await complete_parented_places(conn, result)
+ if details.keywords:
+ log().comment('Query keywords')
+ for result in results:
+ await complete_keywords(conn, result)
def _result_row_to_address_row(row: SaRow) -> AddressLine:
""" Create a new AddressLine from the results of a datbase query.
"""
extratags: Dict[str, str] = getattr(row, 'extratags', {})
- if 'place_type' in row:
- extratags['place_type'] = row.place_type
+ if hasattr(row, 'place_type') and row.place_type:
+ extratags['place'] = row.place_type
- names = row.name
+ names = _mingle_name_tags(row.name) or {}
if getattr(row, 'housenumber', None) is not None:
- if names is None:
- names = {}
names['housenumber'] = row.housenumber
return AddressLine(place_id=row.place_id,
distance=row.distance)
-async def complete_address_details(conn: SearchConnection, result: SearchResult) -> None:
+def _get_housenumber_details(results: List[BaseResultT]) -> Tuple[List[int], List[int]]:
+ places = []
+ hnrs = []
+ for result in results:
+ if result.place_id:
+ housenumber = -1
+ if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
+ if result.housenumber is not None:
+ housenumber = int(result.housenumber)
+ elif result.extratags is not None and 'startnumber' in result.extratags:
+ # details requests do not come with a specific house number
+ housenumber = int(result.extratags['startnumber'])
+ places.append(result.place_id)
+ hnrs.append(housenumber)
+
+ return places, hnrs
+
+
+async def complete_address_details(conn: SearchConnection, results: List[BaseResultT]) -> None:
""" Retrieve information about places that make up the address of the result.
"""
- housenumber = -1
- if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
- if result.housenumber is not None:
- housenumber = int(result.housenumber)
- elif result.extratags is not None and 'startnumber' in result.extratags:
- # details requests do not come with a specific house number
- housenumber = int(result.extratags['startnumber'])
-
- sfn = sa.func.get_addressdata(result.place_id, housenumber)\
- .table_valued( # type: ignore[no-untyped-call]
- sa.column('place_id', type_=sa.Integer),
- 'osm_type',
- sa.column('osm_id', type_=sa.BigInteger),
- sa.column('name', type_=conn.t.types.Composite),
- 'class', 'type', 'place_type',
- sa.column('admin_level', type_=sa.Integer),
- sa.column('fromarea', type_=sa.Boolean),
- sa.column('isaddress', type_=sa.Boolean),
- sa.column('rank_address', type_=sa.SmallInteger),
- sa.column('distance', type_=sa.Float))
- sql = sa.select(sfn).order_by(sa.column('rank_address').desc(),
- sa.column('isaddress').desc())
-
- result.address_rows = []
+ places, hnrs = _get_housenumber_details(results)
+
+ if not places:
+ return
+
+ def _get_addressdata(place_id: Union[int, SaColumn], hnr: Union[int, SaColumn]) -> Any:
+ return sa.func.get_addressdata(place_id, hnr)\
+ .table_valued( # type: ignore[no-untyped-call]
+ sa.column('place_id', type_=sa.Integer),
+ 'osm_type',
+ sa.column('osm_id', type_=sa.BigInteger),
+ sa.column('name', type_=conn.t.types.Composite),
+ 'class', 'type', 'place_type',
+ sa.column('admin_level', type_=sa.Integer),
+ sa.column('fromarea', type_=sa.Boolean),
+ sa.column('isaddress', type_=sa.Boolean),
+ sa.column('rank_address', type_=sa.SmallInteger),
+ sa.column('distance', type_=sa.Float),
+ joins_implicitly=True)
+
+
+ if len(places) == 1:
+ # Optimized case for exactly one result (reverse)
+ sql = sa.select(_get_addressdata(places[0], hnrs[0]))\
+ .order_by(sa.column('rank_address').desc(),
+ sa.column('isaddress').desc())
+
+ alines = AddressLines()
+ for row in await conn.execute(sql):
+ alines.append(_result_row_to_address_row(row))
+
+ for result in results:
+ if result.place_id == places[0]:
+ result.address_rows = alines
+ return
+
+
+ darray = sa.func.unnest(conn.t.types.to_array(places), conn.t.types.to_array(hnrs))\
+ .table_valued( # type: ignore[no-untyped-call]
+ sa.column('place_id', type_= sa.Integer),
+ sa.column('housenumber', type_= sa.Integer)
+ ).render_derived()
+
+ sfn = _get_addressdata(darray.c.place_id, darray.c.housenumber)
+
+ sql = sa.select(darray.c.place_id.label('result_place_id'), sfn)\
+ .order_by(darray.c.place_id,
+ sa.column('rank_address').desc(),
+ sa.column('isaddress').desc())
+
+ current_result = None
for row in await conn.execute(sql):
- result.address_rows.append(_result_row_to_address_row(row))
+ if current_result is None or row.result_place_id != current_result.place_id:
+ for result in results:
+ if result.place_id == row.result_place_id:
+ current_result = result
+ break
+ else:
+ assert False
+ current_result.address_rows = AddressLines()
+ current_result.address_rows.append(_result_row_to_address_row(row))
+
# pylint: disable=consider-using-f-string
def _placex_select_address_row(conn: SearchConnection,
""" % centroid).label('distance'))
-async def complete_linked_places(conn: SearchConnection, result: SearchResult) -> None:
+async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that link to the result.
"""
- result.linked_rows = []
+ result.linked_rows = AddressLines()
if result.source_table != SourceTable.PLACEX:
return
result.linked_rows.append(_result_row_to_address_row(row))
-async def complete_keywords(conn: SearchConnection, result: SearchResult) -> None:
+async def complete_keywords(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about the search terms used for this place.
+
+ Requires that the query analyzer was initialised to get access to
+ the word table.
"""
t = conn.t.search_name
sql = sa.select(t.c.name_vector, t.c.nameaddress_vector)\
result.name_keywords = []
result.address_keywords = []
- for name_tokens, address_tokens in await conn.execute(sql):
- t = conn.t.word
- sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
+ t = conn.t.meta.tables['word']
+ sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
+
+ for name_tokens, address_tokens in await conn.execute(sql):
for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
result.name_keywords.append(WordInfo(*row))
result.address_keywords.append(WordInfo(*row))
-async def complete_parented_places(conn: SearchConnection, result: SearchResult) -> None:
+async def complete_parented_places(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that the result provides the
address for.
"""
- result.parented_rows = []
+ result.parented_rows = AddressLines()
if result.source_table != SourceTable.PLACEX:
return