1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Custom types for SQLAlchemy.
10 from __future__ import annotations
11 from typing import Callable, Any, cast
14 import sqlalchemy as sa
15 from sqlalchemy.ext.compiler import compiles
16 from sqlalchemy import types
18 from nominatim.typing import SaColumn, SaBind
22 class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
23 """ Function to compute the spherical distance in meters.
26 name = 'Geometry_DistanceSpheroid'
30 @compiles(Geometry_DistanceSpheroid) # type: ignore[no-untyped-call, misc]
31 def _default_distance_spheroid(element: SaColumn,
32 compiler: 'sa.Compiled', **kw: Any) -> str:
33 return "ST_DistanceSpheroid(%s,"\
34 " 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
35 % compiler.process(element.clauses, **kw)
38 @compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
39 def _spatialite_distance_spheroid(element: SaColumn,
40 compiler: 'sa.Compiled', **kw: Any) -> str:
41 return "Distance(%s, true)" % compiler.process(element.clauses, **kw)
44 class Geometry_IsLineLike(sa.sql.expression.FunctionElement[bool]):
45 """ Check if the geometry is a line or multiline.
48 name = 'Geometry_IsLineLike'
52 @compiles(Geometry_IsLineLike) # type: ignore[no-untyped-call, misc]
53 def _default_is_line_like(element: SaColumn,
54 compiler: 'sa.Compiled', **kw: Any) -> str:
55 return "ST_GeometryType(%s) IN ('ST_LineString', 'ST_MultiLineString')" % \
56 compiler.process(element.clauses, **kw)
59 @compiles(Geometry_IsLineLike, 'sqlite') # type: ignore[no-untyped-call, misc]
60 def _sqlite_is_line_like(element: SaColumn,
61 compiler: 'sa.Compiled', **kw: Any) -> str:
62 return "ST_GeometryType(%s) IN ('LINESTRING', 'MULTILINESTRING')" % \
63 compiler.process(element.clauses, **kw)
66 class Geometry_IsAreaLike(sa.sql.expression.FunctionElement[bool]):
67 """ Check if the geometry is a polygon or multipolygon.
70 name = 'Geometry_IsLineLike'
74 @compiles(Geometry_IsAreaLike) # type: ignore[no-untyped-call, misc]
75 def _default_is_area_like(element: SaColumn,
76 compiler: 'sa.Compiled', **kw: Any) -> str:
77 return "ST_GeometryType(%s) IN ('ST_Polygon', 'ST_MultiPolygon')" % \
78 compiler.process(element.clauses, **kw)
81 @compiles(Geometry_IsAreaLike, 'sqlite') # type: ignore[no-untyped-call, misc]
82 def _sqlite_is_area_like(element: SaColumn,
83 compiler: 'sa.Compiled', **kw: Any) -> str:
84 return "ST_GeometryType(%s) IN ('POLYGON', 'MULTIPOLYGON')" % \
85 compiler.process(element.clauses, **kw)
88 class Geometry_IntersectsBbox(sa.sql.expression.FunctionElement[bool]):
89 """ Check if the bounding boxes of the given geometries intersect.
92 name = 'Geometry_IntersectsBbox'
96 @compiles(Geometry_IntersectsBbox) # type: ignore[no-untyped-call, misc]
97 def _default_intersects(element: SaColumn,
98 compiler: 'sa.Compiled', **kw: Any) -> str:
99 arg1, arg2 = list(element.clauses)
100 return "%s && %s" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
103 @compiles(Geometry_IntersectsBbox, 'sqlite') # type: ignore[no-untyped-call, misc]
104 def _sqlite_intersects(element: SaColumn,
105 compiler: 'sa.Compiled', **kw: Any) -> str:
106 return "MbrIntersects(%s)" % compiler.process(element.clauses, **kw)
109 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
110 """ Simplified type decorator for PostGIS geometry. This type
111 only supports geometries in 4326 projection.
115 def __init__(self, subtype: str = 'Geometry'):
116 self.subtype = subtype
119 def get_col_spec(self) -> str:
120 return f'GEOMETRY({self.subtype}, 4326)'
123 def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
124 def process(value: Any) -> str:
125 if isinstance(value, str):
128 return cast(str, value.to_wkt())
132 def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
133 def process(value: Any) -> str:
134 assert isinstance(value, str)
139 def column_expression(self, col: SaColumn) -> SaColumn:
140 return sa.func.ST_AsEWKB(col)
143 def bind_expression(self, bindvalue: SaBind) -> SaColumn:
144 return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
147 class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
149 def intersects(self, other: SaColumn) -> 'sa.Operators':
150 return Geometry_IntersectsBbox(self, other)
153 def is_line_like(self) -> SaColumn:
154 return Geometry_IsLineLike(self)
157 def is_area(self) -> SaColumn:
158 return Geometry_IsAreaLike(self)
161 def ST_DWithin(self, other: SaColumn, distance: SaColumn) -> SaColumn:
162 return sa.func.ST_DWithin(self, other, distance, type_=sa.Boolean)
165 def ST_DWithin_no_index(self, other: SaColumn, distance: SaColumn) -> SaColumn:
166 return sa.func.ST_DWithin(sa.func.coalesce(sa.null(), self),
167 other, distance, type_=sa.Boolean)
170 def ST_Intersects_no_index(self, other: SaColumn) -> 'sa.Operators':
171 return sa.func.coalesce(sa.null(), self).op('&&')(other)
174 def ST_Distance(self, other: SaColumn) -> SaColumn:
175 return sa.func.ST_Distance(self, other, type_=sa.Float)
178 def ST_Contains(self, other: SaColumn) -> SaColumn:
179 return sa.func.ST_Contains(self, other, type_=sa.Boolean)
182 def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
183 return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
186 def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
187 return sa.func.coalesce(sa.func.ST_ClosestPoint(self, other, type_=Geometry),
191 def ST_Buffer(self, other: SaColumn) -> SaColumn:
192 return sa.func.ST_Buffer(self, other, type_=Geometry)
195 def ST_Expand(self, other: SaColumn) -> SaColumn:
196 return sa.func.ST_Expand(self, other, type_=Geometry)
199 def ST_Collect(self) -> SaColumn:
200 return sa.func.ST_Collect(self, type_=Geometry)
203 def ST_Centroid(self) -> SaColumn:
204 return sa.func.ST_Centroid(self, type_=Geometry)
207 def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
208 return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
211 def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
212 return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
215 def distance_spheroid(self, other: SaColumn) -> SaColumn:
216 return Geometry_DistanceSpheroid(self, other)
219 @compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
220 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
224 SQLITE_FUNCTION_ALIAS = (
225 ('ST_AsEWKB', sa.Text, 'AsEWKB'),
226 ('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
227 ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
228 ('ST_AsKML', sa.Text, 'AsKML'),
229 ('ST_AsSVG', sa.Text, 'AsSVG'),
230 ('ST_LineLocatePoint', sa.Float, 'ST_Line_Locate_Point'),
231 ('ST_LineInterpolatePoint', sa.Float, 'ST_Line_Interpolate_Point'),
234 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
235 _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
239 "inherit_cache": True})
241 func_templ = f"{alias}(%s)"
243 def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
244 return func_templ % compiler.process(element.clauses, **kw)
246 compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
248 for alias in SQLITE_FUNCTION_ALIAS:
249 _add_function_alias(*alias)
252 class ST_DWithin(sa.sql.functions.GenericFunction[bool]):
258 @compiles(ST_DWithin, 'sqlite') # type: ignore[no-untyped-call, misc]
259 def default_json_array_each(element: SaColumn, compiler: 'sa.Compiled', **kw: Any) -> str:
260 geom1, geom2, dist = list(element.clauses)
261 return "(MbrIntersects(%s, ST_Expand(%s, %s)) = 1 AND ST_Distance(%s, %s) <= %s)" % (
262 compiler.process(geom1, **kw), compiler.process(geom2, **kw),
263 compiler.process(dist, **kw),
264 compiler.process(geom1, **kw), compiler.process(geom2, **kw),
265 compiler.process(dist, **kw))