]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
Merge pull request #3093 from lonvia/remove-sanic
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17
18 from geoalchemy2 import WKTElement
19 import geoalchemy2.functions
20
21 from nominatim.errors import UsageError
22
23 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
24
25 @dataclasses.dataclass
26 class PlaceID:
27     """ Reference an object by Nominatim's internal ID.
28     """
29     place_id: int
30
31
32 @dataclasses.dataclass
33 class OsmID:
34     """ Reference by the OSM ID and potentially the basic category.
35     """
36     osm_type: str
37     osm_id: int
38     osm_class: Optional[str] = None
39
40     def __post_init__(self) -> None:
41         if self.osm_type not in ('N', 'W', 'R'):
42             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
43
44
45 PlaceRef = Union[PlaceID, OsmID]
46
47
48 class Point(NamedTuple):
49     """ A geographic point in WGS84 projection.
50     """
51     x: float
52     y: float
53
54
55     @property
56     def lat(self) -> float:
57         """ Return the latitude of the point.
58         """
59         return self.y
60
61
62     @property
63     def lon(self) -> float:
64         """ Return the longitude of the point.
65         """
66         return self.x
67
68
69     def to_geojson(self) -> str:
70         """ Return the point in GeoJSON format.
71         """
72         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
73
74
75     @staticmethod
76     def from_wkb(wkb: bytes) -> 'Point':
77         """ Create a point from EWKB as returned from the database.
78         """
79         if len(wkb) != 25:
80             raise ValueError("Point wkb has unexpected length")
81         if wkb[0] == 0:
82             gtype, srid, x, y = unpack('>iidd', wkb[1:])
83         elif wkb[0] == 1:
84             gtype, srid, x, y = unpack('<iidd', wkb[1:])
85         else:
86             raise ValueError("WKB has unknown endian value.")
87
88         if gtype != 0x20000001:
89             raise ValueError("WKB must be a point geometry.")
90         if srid != 4326:
91             raise ValueError("Only WGS84 WKB supported.")
92
93         return Point(x, y)
94
95
96     @staticmethod
97     def from_param(inp: Any) -> 'Point':
98         """ Create a point from an input parameter. The parameter
99             may be given as a point, a string or a sequence of
100             strings or floats. Raises a UsageError if the format is
101             not correct.
102         """
103         if isinstance(inp, Point):
104             return inp
105
106         seq: Sequence[str]
107         if isinstance(inp, str):
108             seq = inp.split(',')
109         elif isinstance(inp, abc.Sequence):
110             seq = inp
111
112         if len(seq) != 2:
113             raise UsageError('Point parameter needs 2 coordinates.')
114         try:
115             x, y = filter(math.isfinite, map(float, seq))
116         except ValueError as exc:
117             raise UsageError('Point parameter needs to be numbers.') from exc
118
119         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
120             raise UsageError('Point coordinates invalid.')
121
122         return Point(x, y)
123
124
125     def sql_value(self) -> WKTElement:
126         """ Create an SQL expression for the point.
127         """
128         return WKTElement(f'POINT({self.x} {self.y})', srid=4326)
129
130
131
132 AnyPoint = Union[Point, Tuple[float, float]]
133
134 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
135 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
136
137 class Bbox:
138     """ A bounding box in WSG84 projection.
139
140         The coordinates are available as an array in the 'coord'
141         property in the order (minx, miny, maxx, maxy).
142     """
143     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
144         self.coords = (minx, miny, maxx, maxy)
145
146
147     @property
148     def minlat(self) -> float:
149         """ Southern-most latitude, corresponding to the minimum y coordinate.
150         """
151         return self.coords[1]
152
153
154     @property
155     def maxlat(self) -> float:
156         """ Northern-most latitude, corresponding to the maximum y coordinate.
157         """
158         return self.coords[3]
159
160
161     @property
162     def minlon(self) -> float:
163         """ Western-most longitude, corresponding to the minimum x coordinate.
164         """
165         return self.coords[0]
166
167
168     @property
169     def maxlon(self) -> float:
170         """ Eastern-most longitude, corresponding to the maximum x coordinate.
171         """
172         return self.coords[2]
173
174
175     @property
176     def area(self) -> float:
177         """ Return the area of the box in WGS84.
178         """
179         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
180
181
182     def sql_value(self) -> Any:
183         """ Create an SQL expression for the box.
184         """
185         return geoalchemy2.functions.ST_MakeEnvelope(*self.coords, 4326)
186
187
188     def contains(self, pt: Point) -> bool:
189         """ Check if the point is inside or on the boundary of the box.
190         """
191         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
192                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
193
194
195     @staticmethod
196     def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
197         """ Create a Bbox from a bounding box polygon as returned by
198             the database. Return s None if the input value is None.
199         """
200         if wkb is None:
201             return None
202
203         if len(wkb) != 97:
204             raise ValueError("WKB must be a bounding box polygon")
205         if wkb.startswith(WKB_BBOX_HEADER_LE):
206             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
207         elif wkb.startswith(WKB_BBOX_HEADER_BE):
208             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
209         else:
210             raise ValueError("WKB has wrong header")
211
212         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
213
214
215     @staticmethod
216     def from_point(pt: Point, buffer: float) -> 'Bbox':
217         """ Return a Bbox around the point with the buffer added to all sides.
218         """
219         return Bbox(pt[0] - buffer, pt[1] - buffer,
220                     pt[0] + buffer, pt[1] + buffer)
221
222
223     @staticmethod
224     def from_param(inp: Any) -> 'Bbox':
225         """ Return a Bbox from an input parameter. The box may be
226             given as a Bbox, a string or a list or strings or integer.
227             Raises a UsageError if the format is incorrect.
228         """
229         if isinstance(inp, Bbox):
230             return inp
231
232         seq: Sequence[str]
233         if isinstance(inp, str):
234             seq = inp.split(',')
235         elif isinstance(inp, abc.Sequence):
236             seq = inp
237
238         if len(seq) != 4:
239             raise UsageError('Bounding box parameter needs 4 coordinates.')
240         try:
241             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
242         except ValueError as exc:
243             raise UsageError('Bounding box parameter needs to be numbers.') from exc
244
245         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
246            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
247             raise UsageError('Bounding box coordinates invalid.')
248
249         if x1 == x2 or y1 == y2:
250             raise UsageError('Bounding box with invalid parameters.')
251
252         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
253
254
255 class GeometryFormat(enum.Flag):
256     """ Geometry output formats supported by Nominatim.
257     """
258     NONE = 0
259     GEOJSON = enum.auto()
260     KML = enum.auto()
261     SVG = enum.auto()
262     TEXT = enum.auto()
263
264
265 class DataLayer(enum.Flag):
266     """ Layer types that can be selected for reverse and forward search.
267     """
268     POI = enum.auto()
269     ADDRESS = enum.auto()
270     RAILWAY = enum.auto()
271     MANMADE = enum.auto()
272     NATURAL = enum.auto()
273
274
275 def format_country(cc: Any) -> List[str]:
276     """ Extract a list of country codes from the input which may be either
277         a string or list of strings. Filters out all values that are not
278         a two-letter string.
279     """
280     clist: Sequence[str]
281     if isinstance(cc, str):
282         clist = cc.split(',')
283     elif isinstance(cc, abc.Sequence):
284         clist = cc
285     else:
286         raise UsageError("Parameter 'country' needs to be a comma-separated list "
287                          "or a Python list of strings.")
288
289     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
290
291
292 def format_excluded(ids: Any) -> List[int]:
293     """ Extract a list of place ids from the input which may be either
294         a string or a list of strings or ints. Ignores empty value but
295         throws a UserError on anything that cannot be converted to int.
296     """
297     plist: Sequence[str]
298     if isinstance(ids, str):
299         plist = [s.strip() for s in ids.split(',')]
300     elif isinstance(ids, abc.Sequence):
301         plist = ids
302     else:
303         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
304                          "or a Python list of numbers.")
305     if not all(isinstance(i, int) or
306                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
307         raise UsageError("Parameter 'excluded' only takes place IDs.")
308
309     return [int(id) for id in plist if id] or [0]
310
311
312 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
313     """ Extract a list of categories. Currently a noop.
314     """
315     return categories
316
317 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
318
319 @dataclasses.dataclass
320 class LookupDetails:
321     """ Collection of parameters that define the amount of details
322         returned with a lookup or details result.
323     """
324     geometry_output: GeometryFormat = GeometryFormat.NONE
325     """ Add the full geometry of the place to the result. Multiple
326         formats may be selected. Note that geometries can become quite large.
327     """
328     address_details: bool = False
329     """ Get detailed information on the places that make up the address
330         for the result.
331     """
332     linked_places: bool = False
333     """ Get detailed information on the places that link to the result.
334     """
335     parented_places: bool = False
336     """ Get detailed information on all places that this place is a parent
337         for, i.e. all places for which it provides the address details.
338         Only POI places can have parents.
339     """
340     keywords: bool = False
341     """ Add information about the search terms used for this place.
342     """
343     geometry_simplification: float = 0.0
344     """ Simplification factor for a geometry in degrees WGS. A factor of
345         0.0 means the original geometry is kept. The higher the value, the
346         more the geometry gets simplified.
347     """
348
349     @classmethod
350     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
351         """ Load the data fields of the class from a dictionary.
352             Unknown entries in the dictionary are ignored, missing ones
353             get the default setting.
354
355             The function supports type checking and throws a UsageError
356             when the value does not fit.
357         """
358         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
359             if v is None:
360                 return field.default_factory() \
361                        if field.default_factory != dataclasses.MISSING \
362                        else field.default
363             if field.metadata and 'transform' in field.metadata:
364                 return field.metadata['transform'](v)
365             if not isinstance(v, field.type):
366                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
367             return v
368
369         return cls(**{f.name: _check_field(kwargs[f.name], f)
370                       for f in dataclasses.fields(cls) if f.name in kwargs})
371
372
373 @dataclasses.dataclass
374 class ReverseDetails(LookupDetails):
375     """ Collection of parameters for the reverse call.
376     """
377     max_rank: int = dataclasses.field(default=30,
378                                       metadata={'transform': lambda v: max(0, min(v, 30))}
379                                      )
380     """ Highest address rank to return.
381     """
382     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
383     """ Filter which kind of data to include.
384     """
385
386 @dataclasses.dataclass
387 class SearchDetails(LookupDetails):
388     """ Collection of parameters for the search call.
389     """
390     max_results: int = 10
391     """ Maximum number of results to be returned. The actual number of results
392         may be less.
393     """
394     min_rank: int = dataclasses.field(default=0,
395                                       metadata={'transform': lambda v: max(0, min(v, 30))}
396                                      )
397     """ Lowest address rank to return.
398     """
399     max_rank: int = dataclasses.field(default=30,
400                                       metadata={'transform': lambda v: max(0, min(v, 30))}
401                                      )
402     """ Highest address rank to return.
403     """
404     layers: Optional[DataLayer] = dataclasses.field(default=None,
405                                                     metadata={'transform': lambda r : r})
406     """ Filter which kind of data to include. When 'None' (the default) then
407         filtering by layers is disabled.
408     """
409     countries: List[str] = dataclasses.field(default_factory=list,
410                                              metadata={'transform': format_country})
411     """ Restrict search results to the given countries. An empty list (the
412         default) will disable this filter.
413     """
414     excluded: List[int] = dataclasses.field(default_factory=list,
415                                             metadata={'transform': format_excluded})
416     """ List of OSM objects to exclude from the results. Currenlty only
417         works when the internal place ID is given.
418         An empty list (the default) will disable this filter.
419     """
420     viewbox: Optional[Bbox] = dataclasses.field(default=None,
421                                                 metadata={'transform': Bbox.from_param})
422     """ Focus the search on a given map area.
423     """
424     bounded_viewbox: bool = False
425     """ Use 'viewbox' as a filter and restrict results to places within the
426         given area.
427     """
428     near: Optional[Point] = dataclasses.field(default=None,
429                                               metadata={'transform': Point.from_param})
430     """ Order results by distance to the given point.
431     """
432     near_radius: Optional[float] = dataclasses.field(default=None,
433                                               metadata={'transform': lambda r : r})
434     """ Use near point as a filter and drop results outside the given
435         radius. Radius is given in degrees WSG84.
436     """
437     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
438                                                           metadata={'transform': format_categories})
439     """ Restrict search to places with one of the given class/type categories.
440         An empty list (the default) will disable this filter.
441     """
442
443     def __post_init__(self) -> None:
444         if self.viewbox is not None:
445             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
446             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
447             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
448                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
449
450
451     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
452         """ Change the min_rank and max_rank fields to respect the
453             given boundaries.
454         """
455         assert new_min <= new_max
456         self.min_rank = max(self.min_rank, new_min)
457         self.max_rank = min(self.max_rank, new_max)
458
459
460     def is_impossible(self) -> bool:
461         """ Check if the parameter configuration is contradictionary and
462             cannot yield any results.
463         """
464         return (self.min_rank > self.max_rank
465                 or (self.bounded_viewbox
466                     and self.viewbox is not None and self.near is not None
467                     and self.viewbox.contains(self.near))
468                 or self.layers is not None and not self.layers)
469
470
471     def layer_enabled(self, layer: DataLayer) -> bool:
472         """ Check if the given layer has been choosen. Also returns
473             true when layer restriction has been disabled completely.
474         """
475         return self.layers is None or bool(self.layers & layer)