]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/db/sqlalchemy_types.py
9d1e48fae31e3c1763a28abe43ce13a37e1dd03c
[nominatim.git] / nominatim / db / sqlalchemy_types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Custom types for SQLAlchemy.
9 """
10 from typing import Callable, Any, cast
11 import sys
12
13 import sqlalchemy as sa
14 from sqlalchemy.ext.compiler import compiles
15 from sqlalchemy import types
16
17 from nominatim.typing import SaColumn, SaBind
18
19 #pylint: disable=all
20
21 SQLITE_FUNCTION_ALIAS = (
22     ('ST_AsEWKB', sa.Text, 'AsEWKB'),
23     ('ST_AsGeoJSON', sa.Text, 'AsGeoJSON'),
24     ('ST_AsKML', sa.Text, 'AsKML'),
25     ('ST_AsSVG', sa.Text, 'AsSVG'),
26 )
27
28 def _add_function_alias(func: str, ftype: type, alias: str) -> None:
29     _FuncDef = type(func, (sa.sql.functions.GenericFunction, ), {
30         "type": ftype,
31         "name": func,
32         "identifier": func,
33         "inherit_cache": True})
34
35     func_templ = f"{alias}(%s)"
36
37     def _sqlite_impl(element: Any, compiler: Any, **kw: Any) -> Any:
38         return func_templ % compiler.process(element.clauses, **kw)
39
40     compiles(_FuncDef, 'sqlite')(_sqlite_impl) # type: ignore[no-untyped-call]
41
42 for alias in SQLITE_FUNCTION_ALIAS:
43     _add_function_alias(*alias)
44
45
46 class Geometry(types.UserDefinedType): # type: ignore[type-arg]
47     """ Simplified type decorator for PostGIS geometry. This type
48         only supports geometries in 4326 projection.
49     """
50     cache_ok = True
51
52     def __init__(self, subtype: str = 'Geometry'):
53         self.subtype = subtype
54
55
56     def get_col_spec(self) -> str:
57         return f'GEOMETRY({self.subtype}, 4326)'
58
59
60     def bind_processor(self, dialect: 'sa.Dialect') -> Callable[[Any], str]:
61         def process(value: Any) -> str:
62             if isinstance(value, str):
63                 return value
64
65             return cast(str, value.to_wkt())
66         return process
67
68
69     def result_processor(self, dialect: 'sa.Dialect', coltype: object) -> Callable[[Any], str]:
70         def process(value: Any) -> str:
71             assert isinstance(value, str)
72             return value
73         return process
74
75
76     def column_expression(self, col: SaColumn) -> SaColumn:
77         return sa.func.ST_AsEWKB(col)
78
79
80     def bind_expression(self, bindvalue: SaBind) -> SaColumn:
81         return sa.func.ST_GeomFromText(bindvalue, sa.text('4326'), type_=self)
82
83
84     class comparator_factory(types.UserDefinedType.Comparator): # type: ignore[type-arg]
85
86         def intersects(self, other: SaColumn) -> 'sa.Operators':
87             return self.op('&&')(other)
88
89         def is_line_like(self) -> SaColumn:
90             return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_LineString',
91                                                                        'ST_MultiLineString'))
92
93         def is_area(self) -> SaColumn:
94             return sa.func.ST_GeometryType(self, type_=sa.String).in_(('ST_Polygon',
95                                                                        'ST_MultiPolygon'))
96
97
98         def ST_DWithin(self, other: SaColumn, distance: SaColumn) -> SaColumn:
99             return sa.func.ST_DWithin(self, other, distance, type_=sa.Boolean)
100
101
102         def ST_DWithin_no_index(self, other: SaColumn, distance: SaColumn) -> SaColumn:
103             return sa.func.ST_DWithin(sa.func.coalesce(sa.null(), self),
104                                       other, distance, type_=sa.Boolean)
105
106
107         def ST_Intersects_no_index(self, other: SaColumn) -> 'sa.Operators':
108             return sa.func.coalesce(sa.null(), self).op('&&')(other)
109
110
111         def ST_Distance(self, other: SaColumn) -> SaColumn:
112             return sa.func.ST_Distance(self, other, type_=sa.Float)
113
114
115         def ST_Contains(self, other: SaColumn) -> SaColumn:
116             return sa.func.ST_Contains(self, other, type_=sa.Boolean)
117
118
119         def ST_CoveredBy(self, other: SaColumn) -> SaColumn:
120             return sa.func.ST_CoveredBy(self, other, type_=sa.Boolean)
121
122
123         def ST_ClosestPoint(self, other: SaColumn) -> SaColumn:
124             return sa.func.ST_ClosestPoint(self, other, type_=Geometry)
125
126
127         def ST_Buffer(self, other: SaColumn) -> SaColumn:
128             return sa.func.ST_Buffer(self, other, type_=Geometry)
129
130
131         def ST_Expand(self, other: SaColumn) -> SaColumn:
132             return sa.func.ST_Expand(self, other, type_=Geometry)
133
134
135         def ST_Collect(self) -> SaColumn:
136             return sa.func.ST_Collect(self, type_=Geometry)
137
138
139         def ST_Centroid(self) -> SaColumn:
140             return sa.func.ST_Centroid(self, type_=Geometry)
141
142
143         def ST_LineInterpolatePoint(self, other: SaColumn) -> SaColumn:
144             return sa.func.ST_LineInterpolatePoint(self, other, type_=Geometry)
145
146
147         def ST_LineLocatePoint(self, other: SaColumn) -> SaColumn:
148             return sa.func.ST_LineLocatePoint(self, other, type_=sa.Float)
149
150
151 @compiles(Geometry, 'sqlite') # type: ignore[no-untyped-call]
152 def get_col_spec(self, *args, **kwargs): # type: ignore[no-untyped-def]
153     return 'GEOMETRY'