]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
band-aid for SQLAlchemy 1.4
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17 from binascii import unhexlify
18
19 import sqlalchemy as sa
20
21 from nominatim.errors import UsageError
22
23 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
24
25 @dataclasses.dataclass
26 class PlaceID:
27     """ Reference an object by Nominatim's internal ID.
28     """
29     place_id: int
30
31
32 @dataclasses.dataclass
33 class OsmID:
34     """ Reference by the OSM ID and potentially the basic category.
35     """
36     osm_type: str
37     osm_id: int
38     osm_class: Optional[str] = None
39
40     def __post_init__(self) -> None:
41         if self.osm_type not in ('N', 'W', 'R'):
42             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
43
44
45 PlaceRef = Union[PlaceID, OsmID]
46
47
48 class Point(NamedTuple):
49     """ A geographic point in WGS84 projection.
50     """
51     x: float
52     y: float
53
54
55     @property
56     def lat(self) -> float:
57         """ Return the latitude of the point.
58         """
59         return self.y
60
61
62     @property
63     def lon(self) -> float:
64         """ Return the longitude of the point.
65         """
66         return self.x
67
68
69     def to_geojson(self) -> str:
70         """ Return the point in GeoJSON format.
71         """
72         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
73
74
75     @staticmethod
76     def from_wkb(wkb: Union[str, bytes]) -> 'Point':
77         """ Create a point from EWKB as returned from the database.
78         """
79         if isinstance(wkb, str):
80             wkb = unhexlify(wkb)
81         if len(wkb) != 25:
82             raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
83         if wkb[0] == 0:
84             gtype, srid, x, y = unpack('>iidd', wkb[1:])
85         elif wkb[0] == 1:
86             gtype, srid, x, y = unpack('<iidd', wkb[1:])
87         else:
88             raise ValueError("WKB has unknown endian value.")
89
90         if gtype != 0x20000001:
91             raise ValueError("WKB must be a point geometry.")
92         if srid != 4326:
93             raise ValueError("Only WGS84 WKB supported.")
94
95         return Point(x, y)
96
97
98     @staticmethod
99     def from_param(inp: Any) -> 'Point':
100         """ Create a point from an input parameter. The parameter
101             may be given as a point, a string or a sequence of
102             strings or floats. Raises a UsageError if the format is
103             not correct.
104         """
105         if isinstance(inp, Point):
106             return inp
107
108         seq: Sequence[str]
109         if isinstance(inp, str):
110             seq = inp.split(',')
111         elif isinstance(inp, abc.Sequence):
112             seq = inp
113
114         if len(seq) != 2:
115             raise UsageError('Point parameter needs 2 coordinates.')
116         try:
117             x, y = filter(math.isfinite, map(float, seq))
118         except ValueError as exc:
119             raise UsageError('Point parameter needs to be numbers.') from exc
120
121         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
122             raise UsageError('Point coordinates invalid.')
123
124         return Point(x, y)
125
126
127     def to_wkt(self) -> str:
128         """ Return the WKT representation of the point.
129         """
130         return f'POINT({self.x} {self.y})'
131
132
133
134 AnyPoint = Union[Point, Tuple[float, float]]
135
136 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
137 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
138
139 class Bbox:
140     """ A bounding box in WSG84 projection.
141
142         The coordinates are available as an array in the 'coord'
143         property in the order (minx, miny, maxx, maxy).
144     """
145     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
146         self.coords = (minx, miny, maxx, maxy)
147
148
149     @property
150     def minlat(self) -> float:
151         """ Southern-most latitude, corresponding to the minimum y coordinate.
152         """
153         return self.coords[1]
154
155
156     @property
157     def maxlat(self) -> float:
158         """ Northern-most latitude, corresponding to the maximum y coordinate.
159         """
160         return self.coords[3]
161
162
163     @property
164     def minlon(self) -> float:
165         """ Western-most longitude, corresponding to the minimum x coordinate.
166         """
167         return self.coords[0]
168
169
170     @property
171     def maxlon(self) -> float:
172         """ Eastern-most longitude, corresponding to the maximum x coordinate.
173         """
174         return self.coords[2]
175
176
177     @property
178     def area(self) -> float:
179         """ Return the area of the box in WGS84.
180         """
181         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
182
183
184     def contains(self, pt: Point) -> bool:
185         """ Check if the point is inside or on the boundary of the box.
186         """
187         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
188                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
189
190
191     def to_wkt(self) -> str:
192         """ Return the WKT representation of the Bbox. This
193             is a simple polygon with four points.
194         """
195         return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'.format(*self.coords)
196
197
198     @staticmethod
199     def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
200         """ Create a Bbox from a bounding box polygon as returned by
201             the database. Return s None if the input value is None.
202         """
203         if wkb is None:
204             return None
205
206         if isinstance(wkb, str):
207             wkb = unhexlify(wkb)
208
209         if len(wkb) != 97:
210             raise ValueError("WKB must be a bounding box polygon")
211         if wkb.startswith(WKB_BBOX_HEADER_LE):
212             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
213         elif wkb.startswith(WKB_BBOX_HEADER_BE):
214             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
215         else:
216             raise ValueError("WKB has wrong header")
217
218         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
219
220
221     @staticmethod
222     def from_point(pt: Point, buffer: float) -> 'Bbox':
223         """ Return a Bbox around the point with the buffer added to all sides.
224         """
225         return Bbox(pt[0] - buffer, pt[1] - buffer,
226                     pt[0] + buffer, pt[1] + buffer)
227
228
229     @staticmethod
230     def from_param(inp: Any) -> 'Bbox':
231         """ Return a Bbox from an input parameter. The box may be
232             given as a Bbox, a string or a list or strings or integer.
233             Raises a UsageError if the format is incorrect.
234         """
235         if isinstance(inp, Bbox):
236             return inp
237
238         seq: Sequence[str]
239         if isinstance(inp, str):
240             seq = inp.split(',')
241         elif isinstance(inp, abc.Sequence):
242             seq = inp
243
244         if len(seq) != 4:
245             raise UsageError('Bounding box parameter needs 4 coordinates.')
246         try:
247             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
248         except ValueError as exc:
249             raise UsageError('Bounding box parameter needs to be numbers.') from exc
250
251         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
252            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
253             raise UsageError('Bounding box coordinates invalid.')
254
255         if x1 == x2 or y1 == y2:
256             raise UsageError('Bounding box with invalid parameters.')
257
258         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
259
260
261 class GeometryFormat(enum.Flag):
262     """ Geometry output formats supported by Nominatim.
263     """
264     NONE = 0
265     GEOJSON = enum.auto()
266     KML = enum.auto()
267     SVG = enum.auto()
268     TEXT = enum.auto()
269
270
271 class DataLayer(enum.Flag):
272     """ Layer types that can be selected for reverse and forward search.
273     """
274     POI = enum.auto()
275     ADDRESS = enum.auto()
276     RAILWAY = enum.auto()
277     MANMADE = enum.auto()
278     NATURAL = enum.auto()
279
280
281 def format_country(cc: Any) -> List[str]:
282     """ Extract a list of country codes from the input which may be either
283         a string or list of strings. Filters out all values that are not
284         a two-letter string.
285     """
286     clist: Sequence[str]
287     if isinstance(cc, str):
288         clist = cc.split(',')
289     elif isinstance(cc, abc.Sequence):
290         clist = cc
291     else:
292         raise UsageError("Parameter 'country' needs to be a comma-separated list "
293                          "or a Python list of strings.")
294
295     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
296
297
298 def format_excluded(ids: Any) -> List[int]:
299     """ Extract a list of place ids from the input which may be either
300         a string or a list of strings or ints. Ignores empty value but
301         throws a UserError on anything that cannot be converted to int.
302     """
303     plist: Sequence[str]
304     if isinstance(ids, str):
305         plist = [s.strip() for s in ids.split(',')]
306     elif isinstance(ids, abc.Sequence):
307         plist = ids
308     else:
309         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
310                          "or a Python list of numbers.")
311     if not all(isinstance(i, int) or
312                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
313         raise UsageError("Parameter 'excluded' only takes place IDs.")
314
315     return [int(id) for id in plist if id] or [0]
316
317
318 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
319     """ Extract a list of categories. Currently a noop.
320     """
321     return categories
322
323 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
324
325 @dataclasses.dataclass
326 class LookupDetails:
327     """ Collection of parameters that define the amount of details
328         returned with a lookup or details result.
329     """
330     geometry_output: GeometryFormat = GeometryFormat.NONE
331     """ Add the full geometry of the place to the result. Multiple
332         formats may be selected. Note that geometries can become quite large.
333     """
334     address_details: bool = False
335     """ Get detailed information on the places that make up the address
336         for the result.
337     """
338     linked_places: bool = False
339     """ Get detailed information on the places that link to the result.
340     """
341     parented_places: bool = False
342     """ Get detailed information on all places that this place is a parent
343         for, i.e. all places for which it provides the address details.
344         Only POI places can have parents.
345     """
346     keywords: bool = False
347     """ Add information about the search terms used for this place.
348     """
349     geometry_simplification: float = 0.0
350     """ Simplification factor for a geometry in degrees WGS. A factor of
351         0.0 means the original geometry is kept. The higher the value, the
352         more the geometry gets simplified.
353     """
354
355     @classmethod
356     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
357         """ Load the data fields of the class from a dictionary.
358             Unknown entries in the dictionary are ignored, missing ones
359             get the default setting.
360
361             The function supports type checking and throws a UsageError
362             when the value does not fit.
363         """
364         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
365             if v is None:
366                 return field.default_factory() \
367                        if field.default_factory != dataclasses.MISSING \
368                        else field.default
369             if field.metadata and 'transform' in field.metadata:
370                 return field.metadata['transform'](v)
371             if not isinstance(v, field.type):
372                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
373             return v
374
375         return cls(**{f.name: _check_field(kwargs[f.name], f)
376                       for f in dataclasses.fields(cls) if f.name in kwargs})
377
378
379 @dataclasses.dataclass
380 class ReverseDetails(LookupDetails):
381     """ Collection of parameters for the reverse call.
382     """
383     max_rank: int = dataclasses.field(default=30,
384                                       metadata={'transform': lambda v: max(0, min(v, 30))}
385                                      )
386     """ Highest address rank to return.
387     """
388     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
389     """ Filter which kind of data to include.
390     """
391
392 @dataclasses.dataclass
393 class SearchDetails(LookupDetails):
394     """ Collection of parameters for the search call.
395     """
396     max_results: int = 10
397     """ Maximum number of results to be returned. The actual number of results
398         may be less.
399     """
400     min_rank: int = dataclasses.field(default=0,
401                                       metadata={'transform': lambda v: max(0, min(v, 30))}
402                                      )
403     """ Lowest address rank to return.
404     """
405     max_rank: int = dataclasses.field(default=30,
406                                       metadata={'transform': lambda v: max(0, min(v, 30))}
407                                      )
408     """ Highest address rank to return.
409     """
410     layers: Optional[DataLayer] = dataclasses.field(default=None,
411                                                     metadata={'transform': lambda r : r})
412     """ Filter which kind of data to include. When 'None' (the default) then
413         filtering by layers is disabled.
414     """
415     countries: List[str] = dataclasses.field(default_factory=list,
416                                              metadata={'transform': format_country})
417     """ Restrict search results to the given countries. An empty list (the
418         default) will disable this filter.
419     """
420     excluded: List[int] = dataclasses.field(default_factory=list,
421                                             metadata={'transform': format_excluded})
422     """ List of OSM objects to exclude from the results. Currenlty only
423         works when the internal place ID is given.
424         An empty list (the default) will disable this filter.
425     """
426     viewbox: Optional[Bbox] = dataclasses.field(default=None,
427                                                 metadata={'transform': Bbox.from_param})
428     """ Focus the search on a given map area.
429     """
430     bounded_viewbox: bool = False
431     """ Use 'viewbox' as a filter and restrict results to places within the
432         given area.
433     """
434     near: Optional[Point] = dataclasses.field(default=None,
435                                               metadata={'transform': Point.from_param})
436     """ Order results by distance to the given point.
437     """
438     near_radius: Optional[float] = dataclasses.field(default=None,
439                                               metadata={'transform': lambda r : r})
440     """ Use near point as a filter and drop results outside the given
441         radius. Radius is given in degrees WSG84.
442     """
443     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
444                                                           metadata={'transform': format_categories})
445     """ Restrict search to places with one of the given class/type categories.
446         An empty list (the default) will disable this filter.
447     """
448
449     def __post_init__(self) -> None:
450         if self.viewbox is not None:
451             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
452             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
453             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
454                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
455         else:
456             self.viewbox_x2 = None
457
458
459     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
460         """ Change the min_rank and max_rank fields to respect the
461             given boundaries.
462         """
463         assert new_min <= new_max
464         self.min_rank = max(self.min_rank, new_min)
465         self.max_rank = min(self.max_rank, new_max)
466
467
468     def is_impossible(self) -> bool:
469         """ Check if the parameter configuration is contradictionary and
470             cannot yield any results.
471         """
472         return (self.min_rank > self.max_rank
473                 or (self.bounded_viewbox
474                     and self.viewbox is not None and self.near is not None
475                     and self.viewbox.contains(self.near))
476                 or self.layers is not None and not self.layers)
477
478
479     def layer_enabled(self, layer: DataLayer) -> bool:
480         """ Check if the given layer has been choosen. Also returns
481             true when layer restriction has been disabled completely.
482         """
483         return self.layers is None or bool(self.layers & layer)