1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Custom types for SQLAlchemy.
10 from __future__ import annotations
11 from typing import Callable, Any, cast
13 import sqlalchemy as sa
14 from sqlalchemy.ext.compiler import compiles
15 from sqlalchemy import types
17 from ...typing import SaColumn, SaBind
20 class Geometry_DistanceSpheroid(sa.sql.expression.FunctionElement[float]):
21 """ Function to compute the spherical distance in meters.
24 name = 'Geometry_DistanceSpheroid'
28 @compiles(Geometry_DistanceSpheroid)
29 def _default_distance_spheroid(element: Geometry_DistanceSpheroid,
30 compiler: 'sa.Compiled', **kw: Any) -> str:
31 return "ST_DistanceSpheroid(%s,"\
32 " 'SPHEROID[\"WGS 84\",6378137,298.257223563, AUTHORITY[\"EPSG\",\"7030\"]]')"\
33 % compiler.process(element.clauses, **kw)
36 @compiles(Geometry_DistanceSpheroid, 'sqlite')
37 def _spatialite_distance_spheroid(element: Geometry_DistanceSpheroid,
38 compiler: 'sa.Compiled', **kw: Any) -> str:
39 return "COALESCE(Distance(%s, true), 0.0)" % compiler.process(element.clauses, **kw)
42 class Geometry_IsLineLike(sa.sql.expression.FunctionElement[Any]):
43 """ Check if the geometry is a line or multiline.
45 name = 'Geometry_IsLineLike'
49 @compiles(Geometry_IsLineLike)
50 def _default_is_line_like(element: Geometry_IsLineLike,
51 compiler: 'sa.Compiled', **kw: Any) -> str:
52 return "ST_GeometryType(%s) IN ('ST_LineString', 'ST_MultiLineString')" % \
53 compiler.process(element.clauses, **kw)
56 @compiles(Geometry_IsLineLike, 'sqlite')
57 def _sqlite_is_line_like(element: Geometry_IsLineLike,
58 compiler: 'sa.Compiled', **kw: Any) -> str:
59 return "ST_GeometryType(%s) IN ('LINESTRING', 'MULTILINESTRING')" % \
60 compiler.process(element.clauses, **kw)
63 class Geometry_IsAreaLike(sa.sql.expression.FunctionElement[Any]):
64 """ Check if the geometry is a polygon or multipolygon.
66 name = 'Geometry_IsLineLike'
70 @compiles(Geometry_IsAreaLike)
71 def _default_is_area_like(element: Geometry_IsAreaLike,
72 compiler: 'sa.Compiled', **kw: Any) -> str:
73 return "ST_GeometryType(%s) IN ('ST_Polygon', 'ST_MultiPolygon')" % \
74 compiler.process(element.clauses, **kw)
77 @compiles(Geometry_IsAreaLike, 'sqlite')
78 def _sqlite_is_area_like(element: Geometry_IsAreaLike,
79 compiler: 'sa.Compiled', **kw: Any) -> str:
80 return "ST_GeometryType(%s) IN ('POLYGON', 'MULTIPOLYGON')" % \
81 compiler.process(element.clauses, **kw)
84 class Geometry_IntersectsBbox(sa.sql.expression.FunctionElement[Any]):
85 """ Check if the bounding boxes of the given geometries intersect.
87 name = 'Geometry_IntersectsBbox'
91 @compiles(Geometry_IntersectsBbox)
92 def _default_intersects(element: Geometry_IntersectsBbox,
93 compiler: 'sa.Compiled', **kw: Any) -> str:
94 arg1, arg2 = list(element.clauses)
95 return "%s && %s" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
98 @compiles(Geometry_IntersectsBbox, 'sqlite')
99 def _sqlite_intersects(element: Geometry_IntersectsBbox,
100 compiler: 'sa.Compiled', **kw: Any) -> str:
101 return "MbrIntersects(%s) = 1" % compiler.process(element.clauses, **kw)
104 class Geometry_ColumnIntersectsBbox(sa.sql.expression.FunctionElement[Any]):
105 """ Check if the bounding box of the geometry intersects with the
106 given table column, using the spatial index for the column.
108 The index must exist or the query may return nothing.
110 name = 'Geometry_ColumnIntersectsBbox'
114 @compiles(Geometry_ColumnIntersectsBbox)
115 def default_intersects_column(element: Geometry_ColumnIntersectsBbox,
116 compiler: 'sa.Compiled', **kw: Any) -> str:
117 arg1, arg2 = list(element.clauses)
118 return "%s && %s" % (compiler.process(arg1, **kw), compiler.process(arg2, **kw))
121 @compiles(Geometry_ColumnIntersectsBbox, 'sqlite')
122 def spatialite_intersects_column(element: Geometry_ColumnIntersectsBbox,
123 compiler: 'sa.Compiled', **kw: Any) -> str:
124 arg1, arg2 = list(element.clauses)
125 return "MbrIntersects(%s, %s) = 1 and "\
126 "%s.ROWID IN (SELECT ROWID FROM SpatialIndex "\
127 " WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
128 " AND search_frame = %s)"\
129 % (compiler.process(arg1, **kw),
130 compiler.process(arg2, **kw),
131 arg1.table.name, arg1.table.name, arg1.name,
132 compiler.process(arg2, **kw))
135 class Geometry_ColumnDWithin(sa.sql.expression.FunctionElement[Any]):
136 """ Check if the geometry is within the distance of the
137 given table column, using the spatial index for the column.
139 The index must exist or the query may return nothing.
141 name = 'Geometry_ColumnDWithin'
145 @compiles(Geometry_ColumnDWithin)
146 def default_dwithin_column(element: Geometry_ColumnDWithin,
147 compiler: 'sa.Compiled', **kw: Any) -> str:
148 return "ST_DWithin(%s)" % compiler.process(element.clauses, **kw)
151 @compiles(Geometry_ColumnDWithin, 'sqlite')
152 def spatialite_dwithin_column(element: Geometry_ColumnDWithin,
153 compiler: 'sa.Compiled', **kw: Any) -> str:
154 geom1, geom2, dist = list(element.clauses)
155 return "ST_Distance(%s, %s) < %s and "\
156 "%s.ROWID IN (SELECT ROWID FROM SpatialIndex "\
157 " WHERE f_table_name = '%s' AND f_geometry_column = '%s' "\
158 " AND search_frame = ST_Expand(%s, %s))"\
159 % (compiler.process(geom1, **kw),
160 compiler.process(geom2, **kw),
161 compiler.process(dist, **kw),
162 geom1.table.name, geom1.table.name, geom1.name,
163 compiler.process(geom2, **kw),
164 compiler.process(dist, **kw))
167 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
168 """ Simplified type decorator for PostGIS geometry. This type
169 only supports geometries in 4326 projection.
173 def __init__(self, subtype: str = 'Geometry'):
174 self.subtype = subtype
176 def get_col_spec(self) -> str:
177 return f'GEOMETRY({self.subtype}, 4326)'
179 def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
180 def process(value: Any) -> str:
181 if isinstance(value, str):
184 return cast(str, value.to_wkt())
187 def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
188 def process(value: Any) -> str:
189 assert isinstance(value, str)
193 def column_expression(self, col: SaColumn) -> SaColumn:
194 return sa.func.ST_AsEWKB(col)
196 def bind_expression(self, bindvalue: SaBind) -> SaColumn:
197 return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
199 class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
201 def intersects(self, other: SaColumn, use_index: bool = True) -> 'sa.Operators':
203 return Geometry_IntersectsBbox(sa.func.coalesce(sa.null(), self.expr), other)
205 if isinstance(self.expr, sa.Column):
206 return Geometry_ColumnIntersectsBbox(self.expr, other)
208 return Geometry_IntersectsBbox(self.expr, other)
210 def is_line_like(self) -> SaColumn:
211 return Geometry_IsLineLike(self)
213 def is_area(self) -> SaColumn:
214 return Geometry_IsAreaLike(self)
216 def within_distance(self, other: SaColumn, distance: SaColumn) -> SaColumn:
217 if isinstance(self.expr, sa.Column):
218 return Geometry_ColumnDWithin(self.expr, other, distance)
220 return self.ST_Distance(other) < distance
222 def ST_Distance(self, other: SaColumn) -> SaColumn:
223 return sa.func.ST_Distance(self, other, type_=sa.Float)
225 def ST_Contains(self, other: SaColumn) -> SaColumn:
226 return sa.func.ST_Contains(self, other, type_=sa.Boolean)
228 def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
229 return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
231 def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
232 return sa.func.coalesce(sa.func.ST_ClosestPoint(self, other, type_=Geometry),
235 def ST_Buffer(self, other: SaColumn) -> SaColumn:
236 return sa.func.ST_Buffer(self, other, type_=Geometry)
238 def ST_Expand(self, other: SaColumn) -> SaColumn:
239 return sa.func.ST_Expand(self, other, type_=Geometry)
241 def ST_Collect(self) -> SaColumn:
242 return sa.func.ST_Collect(self, type_=Geometry)
244 def ST_Centroid(self) -> SaColumn:
245 return sa.func.ST_Centroid(self, type_=Geometry)
247 def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
248 return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
250 def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
251 return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
253 def distance_spheroid(self, other: SaColumn) -> SaColumn:
254 return Geometry_DistanceSpheroid(self, other)
257 @compiles(Geometry, 'sqlite')
258 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
262 SQLITE_FUNCTION_ALIAS = (
263 ('ST_AsEWKB', sa.Text, 'AsEWKB'),
264 ('ST_GeomFromEWKT', Geometry, 'GeomFromEWKT'),
265 ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
266 ('ST_AsKML', sa.Text, 'AsKML'),
267 ('ST_AsSVG', sa.Text, 'AsSVG'),
268 ('ST_LineLocatePoint', sa.Float, 'ST_Line_Locate_Point'),
269 ('ST_LineInterpolatePoint', sa.Float, 'ST_Line_Interpolate_Point'),
273 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
274 _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
278 "inherit_cache": True})
280 func_templ = f"{alias}(%s)"
282 def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
283 return func_templ % compiler.process(element.clauses, **kw)
285 compiles(_FuncDef, 'sqlite')(_sqlite_impl)
288 for alias in SQLITE_FUNCTION_ALIAS:
289 _add_function_alias(*alias)