internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
-from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, cast
+from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, cast, Callable
import enum
import dataclasses
import datetime as dt
import sqlalchemy as sa
from nominatim.typing import SaSelect, SaRow
-from nominatim.db.sqlalchemy_functions import CrosscheckNames
+from nominatim.db.sqlalchemy_types import Geometry
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
of the value or an artificial value computed from the place's
search rank.
"""
- return self.importance or (0.7500001 - (self.rank_search/40.0))
+ return self.importance or (0.40001 - (self.rank_search/75.0))
def localize(self, locales: Locales) -> None:
centroid=Point.from_wkb(row.centroid),
names=row.name,
rank_address=4, rank_search=4,
- country_code=row.country_code)
+ country_code=row.country_code,
+ geometry=_filter_geometries(row))
async def add_result_details(conn: SearchConnection, results: List[BaseResultT],
def _result_row_to_address_row(row: SaRow, isaddress: Optional[bool] = None) -> AddressLine:
- """ Create a new AddressLine from the results of a datbase query.
+ """ Create a new AddressLine from the results of a database query.
"""
extratags: Dict[str, str] = getattr(row, 'extratags', {}) or {}
if 'linked_place' in extratags:
distance=row.distance)
-def _get_housenumber_details(results: List[BaseResultT]) -> Tuple[List[int], List[int]]:
- places = []
- hnrs = []
- for result in results:
- if result.place_id:
- housenumber = -1
- if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
- if result.housenumber is not None:
- housenumber = int(result.housenumber)
- elif result.extratags is not None and 'startnumber' in result.extratags:
- # details requests do not come with a specific house number
- housenumber = int(result.extratags['startnumber'])
- places.append(result.place_id)
- hnrs.append(housenumber)
-
- return places, hnrs
-
-
def _get_address_lookup_id(result: BaseResultT) -> int:
assert result.place_id
if result.source_table != SourceTable.PLACEX or result.rank_search > 27:
async def _finalize_entry(conn: SearchConnection, result: BaseResultT) -> None:
- assert result.address_rows
- postcode = result.postcode
- if not postcode and result.address:
- postcode = result.address.get('postcode')
- if postcode and ',' not in postcode and ';' not in postcode:
- result.address_rows.append(AddressLine(
- category=('place', 'postcode'),
- names={'ref': postcode},
- fromarea=False, isaddress=True, rank_address=5,
- distance=0.0))
+ assert result.address_rows is not None
+ if result.category[0] not in ('boundary', 'place')\
+ or result.category[1] not in ('postal_code', 'postcode'):
+ postcode = result.postcode
+ if not postcode and result.address:
+ postcode = result.address.get('postcode')
+ if postcode and ',' not in postcode and ';' not in postcode:
+ result.address_rows.append(AddressLine(
+ category=('place', 'postcode'),
+ names={'ref': postcode},
+ fromarea=False, isaddress=True, rank_address=5,
+ distance=0.0))
if result.country_code:
async def _get_country_names() -> Optional[Dict[str, str]]:
t = conn.t.country_name
if not lookup_ids:
return
- ltab = sa.func.json_array_elements(sa.type_coerce(lookup_ids, sa.JSON))\
- .table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(lookup_ids, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
t = conn.t.placex
taddr = conn.t.addressline
.order_by('src_place_id')\
.order_by(sa.column('rank_address').desc())\
.order_by((taddr.c.place_id == ltab.c.value['pid'].as_integer()).desc())\
- .order_by(sa.case((CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
+ .order_by(sa.case((sa.func.CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
(taddr.c.isaddress, 0),
(sa.and_(taddr.c.fromarea,
t.c.geometry.ST_Contains(
parent_lookup_ids = list(filter(lambda e: e['pid'] != e['lid'], lookup_ids))
if parent_lookup_ids:
- ltab = sa.func.json_array_elements(sa.type_coerce(parent_lookup_ids, sa.JSON))\
- .table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(parent_lookup_ids, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
sql = sa.select(ltab.c.value['pid'].as_integer().label('src_place_id'),
t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type, t.c.extratags,
rank_address=row.rank_address, distance=0.0))
### Now sort everything
+ def mk_sort_key(place_id: Optional[int]) -> Callable[[AddressLine], Tuple[bool, int, bool]]:
+ return lambda a: (a.place_id != place_id, -a.rank_address, a.isaddress)
+
for result in results:
assert result.address_rows is not None
- result.address_rows.sort(key=lambda a: (-a.rank_address, a.isaddress))
+ result.address_rows.sort(key=mk_sort_key(result.place_id))
def _placex_select_address_row(conn: SearchConnection,
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_.label('class'), t.c.type,
t.c.admin_level, t.c.housenumber,
- sa.literal_column("""ST_GeometryType(geometry) in
- ('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
+ t.c.geometry.is_area().label('fromarea'),
t.c.rank_address,
- sa.literal_column(
- f"""ST_DistanceSpheroid(geometry,
- 'SRID=4326;{centroid.to_wkt()}'::geometry,
- 'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
- """).label('distance'))
+ t.c.geometry.distance_spheroid(
+ sa.bindparam('centroid', value=centroid, type_=Geometry)).label('distance'))
async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
for name_tokens, address_tokens in await conn.execute(sql):
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(name_tokens))):
result.name_keywords.append(WordInfo(*row))
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(address_tokens))):
result.address_keywords.append(WordInfo(*row))