1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Custom types for SQLAlchemy.
10 from typing import Callable, Any, cast
13 import sqlalchemy as sa
14 from sqlalchemy.ext.compiler import compiles
15 from sqlalchemy import types
17 from nominatim.typing import SaColumn, SaBind
21 SQLITE_FUNCTION_ALIAS = (
22 ('ST_AsEWKB', sa.Text, 'AsEWKB'),
23 ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
24 ('ST_AsKML', sa.Text, 'AsKML'),
25 ('ST_AsSVG', sa.Text, 'AsSVG'),
28 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
29 _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
33 "inherit_cache": True})
35 func_templ = f"{alias}(%s)"
37 def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
38 return func_templ % compiler.process(element.clauses, **kw)
40 compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
42 for alias in SQLITE_FUNCTION_ALIAS:
43 _add_function_alias(*alias)
46 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
47 """ Simplified type decorator for PostGIS geometry. This type
48 only supports geometries in 4326 projection.
52 def __init__(self, subtype: str = 'Geometry'):
53 self.subtype = subtype
56 def get_col_spec(self) -> str:
57 return f'GEOMETRY({self.subtype}, 4326)'
60 def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
61 def process(value: Any) -> str:
62 if isinstance(value, str):
65 return cast(str, value.to_wkt())
69 def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
70 def process(value: Any) -> str:
71 assert isinstance(value, str)
76 def column_expression(self, col: SaColumn) -> SaColumn:
77 return sa.func.ST_AsEWKB(col)
80 def bind_expression(self, bindvalue: SaBind) -> SaColumn:
81 return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
84 class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
86 def intersects(self, other: SaColumn) -> 'sa.Operators':
87 return self.op('&&')(other)
89 def is_line_like(self) -> SaColumn:
90 return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_LineString',
91 'ST_MultiLineString'))
93 def is_area(self) -> SaColumn:
94 return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_Polygon',
98 def ST_DWithin(self, other: SaColumn, distance: SaColumn) -> SaColumn:
99 return sa.func.ST_DWithin(self, other, distance, type_=sa.Boolean)
102 def ST_DWithin_no_index(self, other: SaColumn, distance: SaColumn) -> SaColumn:
103 return sa.func.ST_DWithin(sa.func.coalesce(sa.null(), self),
104 other, distance, type_=sa.Boolean)
107 def ST_Intersects_no_index(self, other: SaColumn) -> 'sa.Operators':
108 return sa.func.coalesce(sa.null(), self).op('&&')(other)
111 def ST_Distance(self, other: SaColumn) -> SaColumn:
112 return sa.func.ST_Distance(self, other, type_=sa.Float)
115 def ST_Contains(self, other: SaColumn) -> SaColumn:
116 return sa.func.ST_Contains(self, other, type_=sa.Boolean)
119 def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
120 return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
123 def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
124 return sa.func.ST_ClosestPoint(self, other, type_=Geometry)
127 def ST_Buffer(self, other: SaColumn) -> SaColumn:
128 return sa.func.ST_Buffer(self, other, type_=Geometry)
131 def ST_Expand(self, other: SaColumn) -> SaColumn:
132 return sa.func.ST_Expand(self, other, type_=Geometry)
135 def ST_Collect(self) -> SaColumn:
136 return sa.func.ST_Collect(self, type_=Geometry)
139 def ST_Centroid(self) -> SaColumn:
140 return sa.func.ST_Centroid(self, type_=Geometry)
143 def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
144 return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
147 def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
148 return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
151 @compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
152 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]