import sqlalchemy as sa
from nominatim.typing import SaSelect, SaRow
-from nominatim.db.sqlalchemy_functions import CrosscheckNames
+from nominatim.db.sqlalchemy_types import Geometry
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
if not lookup_ids:
return
- ltab = sa.func.json_array_elements(sa.type_coerce(lookup_ids, sa.JSON))\
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(lookup_ids, sa.JSON))\
.table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
t = conn.t.placex
.order_by('src_place_id')\
.order_by(sa.column('rank_address').desc())\
.order_by((taddr.c.place_id == ltab.c.value['pid'].as_integer()).desc())\
- .order_by(sa.case((CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
+ .order_by(sa.case((sa.func.CrosscheckNames(t.c.name, ltab.c.value['names']), 2),
(taddr.c.isaddress, 0),
(sa.and_(taddr.c.fromarea,
t.c.geometry.ST_Contains(
parent_lookup_ids = list(filter(lambda e: e['pid'] != e['lid'], lookup_ids))
if parent_lookup_ids:
- ltab = sa.func.json_array_elements(sa.type_coerce(parent_lookup_ids, sa.JSON))\
+ ltab = sa.func.JsonArrayEach(sa.type_coerce(parent_lookup_ids, sa.JSON))\
.table_valued(sa.column('value', type_=sa.JSON)) # type: ignore[no-untyped-call]
sql = sa.select(ltab.c.value['pid'].as_integer().label('src_place_id'),
t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_.label('class'), t.c.type,
t.c.admin_level, t.c.housenumber,
- sa.literal_column("""ST_GeometryType(geometry) in
- ('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
+ t.c.geometry.is_area().label('fromarea'),
t.c.rank_address,
- sa.literal_column(
- f"""ST_DistanceSpheroid(geometry,
- 'SRID=4326;{centroid.to_wkt()}'::geometry,
- 'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
- """).label('distance'))
+ t.c.geometry.distance_spheroid(
+ sa.bindparam('centroid', value=centroid, type_=Geometry)).label('distance'))
async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
for name_tokens, address_tokens in await conn.execute(sql):
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(name_tokens))):
result.name_keywords.append(WordInfo(*row))
- for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
+ for row in await conn.execute(sel.where(t.c.word_id.in_(address_tokens))):
result.address_keywords.append(WordInfo(*row))
#pylint: disable=all
-SQLITE_FUNCTION_ALIAS = (
- ('ST_AsEWKB', sa.Text, 'AsEWKB'),
- ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
- ('ST_AsKML', sa.Text, 'AsKML'),
- ('ST_AsSVG', sa.Text, 'AsSVG'),
-)
-
-def _add_function_alias(func: str, ftype: type, alias: str) -> None:
- _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
- "type": ftype,
- "name": func,
- "identifier": func,
- "inherit_cache": True})
+class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
+ """ Function to compute the spherical distance in meters.
+ """
+ type = sa.Float()
+ name = 'Geometry_DistanceSpheroid'
+ inherit_cache = True
- func_templ = f"{alias}(%s)"
- def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
- return func_templ % compiler.process(element.clauses, **kw)
+@compiles(Geometry_DistanceSpheroid) # type: ignore[no-untyped-call, misc]
+def _default_distance_spheroid(element: SaColumn,
+ compiler: 'sa.Compiled', **kw: Any) -> str:
+ return "ST_DistanceSpheroid(%s,"\
+ " 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
+ % compiler.process(element.clauses, **kw)
- compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
-for alias in SQLITE_FUNCTION_ALIAS:
- _add_function_alias(*alias)
+@compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
+def _spatialite_distance_spheroid(element: SaColumn,
+ compiler: 'sa.Compiled', **kw: Any) -> str:
+ return "Distance(%s, true)" % compiler.process(element.clauses, **kw)
class Geometry(types.UserDefinedType): # type: ignore[type-arg]
return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
+ def distance_spheroid(self, other: SaColumn) -> SaColumn:
+ return Geometry_DistanceSpheroid(self, other)
+
+
@compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
return 'GEOMETRY'
+
+
+SQLITE_FUNCTION_ALIAS = (
+ ('ST_AsEWKB', sa.Text, 'AsEWKB'),
+ ('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
+ ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
+ ('ST_AsKML', sa.Text, 'AsKML'),
+ ('ST_AsSVG', sa.Text, 'AsSVG'),
+)
+
+def _add_function_alias(func: str, ftype: type, alias: str) -> None:
+ _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
+ "type": ftype,
+ "name": func,
+ "identifier": func,
+ "inherit_cache": True})
+
+ func_templ = f"{alias}(%s)"
+
+ def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
+ return func_templ % compiler.process(element.clauses, **kw)
+
+ compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
+
+for alias in SQLITE_FUNCTION_ALIAS:
+ _add_function_alias(*alias)
+
+
+
@pytest.mark.parametrize('idobj', (napi.PlaceID(332), napi.OsmID('W', 4),
napi.OsmID('W', 4, 'highway')))
-def test_lookup_in_placex(apiobj, idobj):
+def test_lookup_in_placex(apiobj, frontend, idobj):
import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential',
indexed_date=import_date,
geometry='LINESTRING(23 34, 23.1 34, 23.1 34.1, 23 34)')
- result = apiobj.api.details(idobj)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(idobj)
assert result is not None
assert result.geometry == {'type': 'ST_LineString'}
-def test_lookup_in_placex_minimal_info(apiobj):
+def test_lookup_in_placex_minimal_info(apiobj, frontend):
import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential',
indexed_date=import_date,
geometry='LINESTRING(23 34, 23.1 34, 23.1 34.1, 23 34)')
- result = apiobj.api.details(napi.PlaceID(332))
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332))
assert result is not None
assert result.geometry == {'type': 'ST_LineString'}
-def test_lookup_in_placex_with_geometry(apiobj):
+def test_lookup_in_placex_with_geometry(apiobj, frontend):
apiobj.add_placex(place_id=332,
geometry='LINESTRING(23 34, 23.1 34)')
- result = apiobj.api.details(napi.PlaceID(332), geometry_output=napi.GeometryFormat.GEOJSON)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332), geometry_output=napi.GeometryFormat.GEOJSON)
assert result.geometry == {'geojson': '{"type":"LineString","coordinates":[[23,34],[23.1,34]]}'}
-def test_lookup_placex_with_address_details(apiobj):
+def test_lookup_placex_with_address_details(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential', name='Street',
country_code='pl',
country_code='pl',
rank_search=17, rank_address=16)
- result = apiobj.api.details(napi.PlaceID(332), address_details=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332), address_details=True)
assert result.address_rows == [
napi.AddressLine(place_id=332, osm_object=('W', 4),
]
-def test_lookup_place_with_linked_places_none_existing(apiobj):
+def test_lookup_place_with_linked_places_none_existing(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential', name='Street',
country_code='pl', linked_place_id=45,
rank_search=27, rank_address=26)
- result = apiobj.api.details(napi.PlaceID(332), linked_places=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332), linked_places=True)
assert result.linked_rows == []
-def test_lookup_place_with_linked_places_existing(apiobj):
+def test_lookup_place_with_linked_places_existing(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential', name='Street',
country_code='pl', linked_place_id=45,
country_code='pl', linked_place_id=332,
rank_search=27, rank_address=26)
- result = apiobj.api.details(napi.PlaceID(332), linked_places=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332), linked_places=True)
assert result.linked_rows == [
napi.AddressLine(place_id=1001, osm_object=('W', 5),
]
-def test_lookup_place_with_parented_places_not_existing(apiobj):
+def test_lookup_place_with_parented_places_not_existing(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential', name='Street',
country_code='pl', parent_place_id=45,
rank_search=27, rank_address=26)
- result = apiobj.api.details(napi.PlaceID(332), parented_places=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332), parented_places=True)
assert result.parented_rows == []
-def test_lookup_place_with_parented_places_existing(apiobj):
+def test_lookup_place_with_parented_places_existing(apiobj, frontend):
apiobj.add_placex(place_id=332, osm_type='W', osm_id=4,
class_='highway', type='residential', name='Street',
country_code='pl', parent_place_id=45,
country_code='pl', parent_place_id=332,
rank_search=27, rank_address=26)
- result = apiobj.api.details(napi.PlaceID(332), parented_places=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(332), parented_places=True)
assert result.parented_rows == [
napi.AddressLine(place_id=1001, osm_object=('N', 5),
@pytest.mark.parametrize('idobj', (napi.PlaceID(4924), napi.OsmID('W', 9928)))
-def test_lookup_in_osmline(apiobj, idobj):
+def test_lookup_in_osmline(apiobj, frontend, idobj):
import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
apiobj.add_osmline(place_id=4924, osm_id=9928,
parent_place_id=12,
indexed_date=import_date,
geometry='LINESTRING(23 34, 23 35)')
- result = apiobj.api.details(idobj)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(idobj)
assert result is not None
assert result.geometry == {'type': 'ST_LineString'}
-def test_lookup_in_osmline_split_interpolation(apiobj):
+def test_lookup_in_osmline_split_interpolation(apiobj, frontend):
apiobj.add_osmline(place_id=1000, osm_id=9,
startnumber=2, endnumber=4, step=1)
apiobj.add_osmline(place_id=1001, osm_id=9,
apiobj.add_osmline(place_id=1002, osm_id=9,
startnumber=11, endnumber=20, step=1)
+ api = frontend(apiobj, options={'details'})
for i in range(1, 6):
- result = apiobj.api.details(napi.OsmID('W', 9, str(i)))
+ result = api.details(napi.OsmID('W', 9, str(i)))
assert result.place_id == 1000
for i in range(7, 11):
- result = apiobj.api.details(napi.OsmID('W', 9, str(i)))
+ result = api.details(napi.OsmID('W', 9, str(i)))
assert result.place_id == 1001
for i in range(12, 22):
- result = apiobj.api.details(napi.OsmID('W', 9, str(i)))
+ result = api.details(napi.OsmID('W', 9, str(i)))
assert result.place_id == 1002
-def test_lookup_osmline_with_address_details(apiobj):
+def test_lookup_osmline_with_address_details(apiobj, frontend):
apiobj.add_osmline(place_id=9000, osm_id=9,
startnumber=2, endnumber=4, step=1,
parent_place_id=332)
country_code='pl',
rank_search=17, rank_address=16)
- result = apiobj.api.details(napi.PlaceID(9000), address_details=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(9000), address_details=True)
assert result.address_rows == [
napi.AddressLine(place_id=332, osm_object=('W', 4),
]
-def test_lookup_in_tiger(apiobj):
+def test_lookup_in_tiger(apiobj, frontend):
apiobj.add_tiger(place_id=4924,
parent_place_id=12,
startnumber=1, endnumber=4, step=1,
osm_type='W', osm_id=6601223,
geometry='LINESTRING(23 34, 23 35)')
- result = apiobj.api.details(napi.PlaceID(4924))
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(4924))
assert result is not None
assert result.geometry == {'type': 'ST_LineString'}
-def test_lookup_tiger_with_address_details(apiobj):
+def test_lookup_tiger_with_address_details(apiobj, frontend):
apiobj.add_tiger(place_id=9000,
startnumber=2, endnumber=4, step=1,
parent_place_id=332)
country_code='us',
rank_search=17, rank_address=16)
- result = apiobj.api.details(napi.PlaceID(9000), address_details=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(9000), address_details=True)
assert result.address_rows == [
napi.AddressLine(place_id=332, osm_object=('W', 4),
]
-def test_lookup_in_postcode(apiobj):
+def test_lookup_in_postcode(apiobj, frontend):
import_date = dt.datetime(2022, 12, 7, 14, 14, 46, 0)
apiobj.add_postcode(place_id=554,
parent_place_id=152,
indexed_date=import_date,
geometry='POINT(-9.45 5.6)')
- result = apiobj.api.details(napi.PlaceID(554))
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(554))
assert result is not None
assert result.geometry == {'type': 'ST_Point'}
-def test_lookup_postcode_with_address_details(apiobj):
+def test_lookup_postcode_with_address_details(apiobj, frontend):
apiobj.add_postcode(place_id=9000,
parent_place_id=332,
postcode='34 425',
country_code='gb',
rank_search=17, rank_address=16)
- result = apiobj.api.details(napi.PlaceID(9000), address_details=True)
+ api = frontend(apiobj, options={'details'})
+ result = api.details(napi.PlaceID(9000), address_details=True)
assert result.address_rows == [
napi.AddressLine(place_id=9000, osm_object=None,
@pytest.mark.parametrize('objid', [napi.PlaceID(1736),
napi.OsmID('W', 55),
napi.OsmID('N', 55, 'amenity')])
-def test_lookup_missing_object(apiobj, objid):
+def test_lookup_missing_object(apiobj, frontend, objid):
apiobj.add_placex(place_id=1, osm_type='N', osm_id=55,
class_='place', type='suburb')
- assert apiobj.api.details(objid) is None
+ api = frontend(apiobj, options={'details'})
+ assert api.details(objid) is None
@pytest.mark.parametrize('gtype', (napi.GeometryFormat.KML,
napi.GeometryFormat.SVG,
napi.GeometryFormat.TEXT))
-def test_lookup_unsupported_geometry(apiobj, gtype):
+def test_lookup_unsupported_geometry(apiobj, frontend, gtype):
apiobj.add_placex(place_id=332)
+ api = frontend(apiobj, options={'details'})
with pytest.raises(ValueError):
- apiobj.api.details(napi.PlaceID(332), geometry_output=gtype)
+ api.details(napi.PlaceID(332), geometry_output=gtype)