internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
-from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, cast
+from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, cast, Callable
import enum
import dataclasses
import datetime as dt
import sqlalchemy as sa
from nominatim.typing import SaSelect, SaRow
-from nominatim.db.sqlalchemy_functions import CrosscheckNames
+from nominatim.db.sqlalchemy_types import Geometry
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
of the value or an artificial value computed from the place's
search rank.
"""
- return self.importance or (0.7500001 - (self.rank_search/40.0))
+ return self.importance or (0.40001 - (self.rank_search/75.0))
def localize(self, locales: Locales) -> None:
def _result_row_to_address_row(row: SaRow, isaddress: Optional[bool] = None) -> AddressLine:
- """ Create a new AddressLine from the results of a datbase query.
+ """ Create a new AddressLine from the results of a database query.
"""
extratags: Dict[str, str] = getattr(row, 'extratags', {}) or {}
if 'linked_place' in extratags:
async def _finalize_entry(conn: SearchConnection, result: BaseResultT) -> None:
- assert result.address_rows
- postcode = result.postcode
- if not postcode and result.address:
- postcode = result.address.get('postcode')
- if postcode and ',' not in postcode and ';' not in postcode:
- result.address_rows.append(AddressLine(
- category=('place', 'postcode'),
- names={'ref': postcode},
- fromarea=False, isaddress=True, rank_address=5,
- distance=0.0))
+ assert result.address_rows is not None
+ if result.category[0] not in ('boundary', 'place')\
+ or result.category[1] not in ('postal_code', 'postcode'):
+ postcode = result.postcode
+ if not postcode and result.address:
+ postcode = result.address.get('postcode')
+ if postcode and ',' not in postcode and ';' not in postcode:
+ result.address_rows.append(AddressLine(
+ category=('place', 'postcode'),
+ names={'ref': postcode},
+ fromarea=False, isaddress=True, rank_address=5,
+ distance=0.0))
if result.country_code:
async def _get_country_names() -> Optional[Dict[str, str]]:
t = conn.t.country_name
extratags=result.extratags or {},
admin_level=result.admin_level,
fromarea=True, isaddress=True,
- rank_address=result.rank_address or 100, distance=0.0))
+ rank_address=result.rank_address, distance=0.0))
if result.source_table == SourceTable.PLACEX and result.address:
housenumber = result.address.get('housenumber')\
or result.address.get('streetnumber')\
if not lookup_ids:
return
- ltab = sa.func.json_array_elements(sa.type_coerce(lookup_ids, sa.JSON))\
- .table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(lookup_ids, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
t = conn.t.placex
taddr = conn.t.addressline
.order_by('src_place_id')\
.order_by(sa.column('rank_address').desc())\
.order_by((taddr.c.place_id == ltab.c.value['pid'].as_integer()).desc())\
- .order_by(sa.case((CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
+ .order_by(sa.case((sa.func.CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
(taddr.c.isaddress, 0),
(sa.and_(taddr.c.fromarea,
t.c.geometry.ST_Contains(
parent_lookup_ids = list(filter(lambda e: e['pid'] != e['lid'], lookup_ids))
if parent_lookup_ids:
- ltab = sa.func.json_array_elements(sa.type_coerce(parent_lookup_ids, sa.JSON))\
- .table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(parent_lookup_ids, sa.JSON))\
+ .table_valued(sa.column('value', type_=sa.JSON))
sql = sa.select(ltab.c.value['pid'].as_integer().label('src_place_id'),
t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type, t.c.extratags,
rank_address=row.rank_address, distance=0.0))
### Now sort everything
+ def mk_sort_key(place_id: Optional[int]) -> Callable[[AddressLine], Tuple[bool, int, bool]]:
+ return lambda a: (a.place_id != place_id, -a.rank_address, a.isaddress)
+
for result in results:
assert result.address_rows is not None
- result.address_rows.sort(key=lambda a: (-a.rank_address, a.isaddress))
+ result.address_rows.sort(key=mk_sort_key(result.place_id))
def _placex_select_address_row(conn: SearchConnection,
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_.label('class'), t.c.type,
t.c.admin_level, t.c.housenumber,
- sa.literal_column("""ST_GeometryType(geometry) in
- ('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
+ t.c.geometry.is_area().label('fromarea'),
t.c.rank_address,
- sa.literal_column(
- f"""ST_DistanceSpheroid(geometry,
- 'SRID=4326;{centroid.to_wkt()}'::geometry,
- 'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
- """).label('distance'))
+ t.c.geometry.distance_spheroid(
+ sa.bindparam('centroid', value=centroid, type_=Geometry)).label('distance'))
async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
for name_tokens, address_tokens in await conn.execute(sql):
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(name_tokens))):
result.name_keywords.append(WordInfo(*row))
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(address_tokens))):
result.address_keywords.append(WordInfo(*row))