]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
use backwards-compatible asyncio timeout implementation
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17 from binascii import unhexlify
18
19 from nominatim.errors import UsageError
20
21 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
22
23 @dataclasses.dataclass
24 class PlaceID:
25     """ Reference an object by Nominatim's internal ID.
26     """
27     place_id: int
28
29
30 @dataclasses.dataclass
31 class OsmID:
32     """ Reference by the OSM ID and potentially the basic category.
33     """
34     osm_type: str
35     osm_id: int
36     osm_class: Optional[str] = None
37
38     def __post_init__(self) -> None:
39         if self.osm_type not in ('N', 'W', 'R'):
40             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
41
42
43 PlaceRef = Union[PlaceID, OsmID]
44
45
46 class Point(NamedTuple):
47     """ A geographic point in WGS84 projection.
48     """
49     x: float
50     y: float
51
52
53     @property
54     def lat(self) -> float:
55         """ Return the latitude of the point.
56         """
57         return self.y
58
59
60     @property
61     def lon(self) -> float:
62         """ Return the longitude of the point.
63         """
64         return self.x
65
66
67     def to_geojson(self) -> str:
68         """ Return the point in GeoJSON format.
69         """
70         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
71
72
73     @staticmethod
74     def from_wkb(wkb: Union[str, bytes]) -> 'Point':
75         """ Create a point from EWKB as returned from the database.
76         """
77         if isinstance(wkb, str):
78             wkb = unhexlify(wkb)
79         if len(wkb) != 25:
80             raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
81         if wkb[0] == 0:
82             gtype, srid, x, y = unpack('>iidd', wkb[1:])
83         elif wkb[0] == 1:
84             gtype, srid, x, y = unpack('<iidd', wkb[1:])
85         else:
86             raise ValueError("WKB has unknown endian value.")
87
88         if gtype != 0x20000001:
89             raise ValueError("WKB must be a point geometry.")
90         if srid != 4326:
91             raise ValueError("Only WGS84 WKB supported.")
92
93         return Point(x, y)
94
95
96     @staticmethod
97     def from_param(inp: Any) -> 'Point':
98         """ Create a point from an input parameter. The parameter
99             may be given as a point, a string or a sequence of
100             strings or floats. Raises a UsageError if the format is
101             not correct.
102         """
103         if isinstance(inp, Point):
104             return inp
105
106         seq: Sequence[str]
107         if isinstance(inp, str):
108             seq = inp.split(',')
109         elif isinstance(inp, abc.Sequence):
110             seq = inp
111
112         if len(seq) != 2:
113             raise UsageError('Point parameter needs 2 coordinates.')
114         try:
115             x, y = filter(math.isfinite, map(float, seq))
116         except ValueError as exc:
117             raise UsageError('Point parameter needs to be numbers.') from exc
118
119         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
120             raise UsageError('Point coordinates invalid.')
121
122         return Point(x, y)
123
124
125     def to_wkt(self) -> str:
126         """ Return the WKT representation of the point.
127         """
128         return f'POINT({self.x} {self.y})'
129
130
131
132 AnyPoint = Union[Point, Tuple[float, float]]
133
134 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
135 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
136
137 class Bbox:
138     """ A bounding box in WSG84 projection.
139
140         The coordinates are available as an array in the 'coord'
141         property in the order (minx, miny, maxx, maxy).
142     """
143     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
144         self.coords = (minx, miny, maxx, maxy)
145
146
147     @property
148     def minlat(self) -> float:
149         """ Southern-most latitude, corresponding to the minimum y coordinate.
150         """
151         return self.coords[1]
152
153
154     @property
155     def maxlat(self) -> float:
156         """ Northern-most latitude, corresponding to the maximum y coordinate.
157         """
158         return self.coords[3]
159
160
161     @property
162     def minlon(self) -> float:
163         """ Western-most longitude, corresponding to the minimum x coordinate.
164         """
165         return self.coords[0]
166
167
168     @property
169     def maxlon(self) -> float:
170         """ Eastern-most longitude, corresponding to the maximum x coordinate.
171         """
172         return self.coords[2]
173
174
175     @property
176     def area(self) -> float:
177         """ Return the area of the box in WGS84.
178         """
179         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
180
181
182     def contains(self, pt: Point) -> bool:
183         """ Check if the point is inside or on the boundary of the box.
184         """
185         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
186                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
187
188
189     def to_wkt(self) -> str:
190         """ Return the WKT representation of the Bbox. This
191             is a simple polygon with four points.
192         """
193         return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'\
194                   .format(*self.coords) # pylint: disable=consider-using-f-string
195
196
197     @staticmethod
198     def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
199         """ Create a Bbox from a bounding box polygon as returned by
200             the database. Return s None if the input value is None.
201         """
202         if wkb is None:
203             return None
204
205         if isinstance(wkb, str):
206             wkb = unhexlify(wkb)
207
208         if len(wkb) != 97:
209             raise ValueError("WKB must be a bounding box polygon")
210         if wkb.startswith(WKB_BBOX_HEADER_LE):
211             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
212         elif wkb.startswith(WKB_BBOX_HEADER_BE):
213             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
214         else:
215             raise ValueError("WKB has wrong header")
216
217         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
218
219
220     @staticmethod
221     def from_point(pt: Point, buffer: float) -> 'Bbox':
222         """ Return a Bbox around the point with the buffer added to all sides.
223         """
224         return Bbox(pt[0] - buffer, pt[1] - buffer,
225                     pt[0] + buffer, pt[1] + buffer)
226
227
228     @staticmethod
229     def from_param(inp: Any) -> 'Bbox':
230         """ Return a Bbox from an input parameter. The box may be
231             given as a Bbox, a string or a list or strings or integer.
232             Raises a UsageError if the format is incorrect.
233         """
234         if isinstance(inp, Bbox):
235             return inp
236
237         seq: Sequence[str]
238         if isinstance(inp, str):
239             seq = inp.split(',')
240         elif isinstance(inp, abc.Sequence):
241             seq = inp
242
243         if len(seq) != 4:
244             raise UsageError('Bounding box parameter needs 4 coordinates.')
245         try:
246             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
247         except ValueError as exc:
248             raise UsageError('Bounding box parameter needs to be numbers.') from exc
249
250         x1 = min(180, max(-180, x1))
251         x2 = min(180, max(-180, x2))
252         y1 = min(90, max(-90, y1))
253         y2 = min(90, max(-90, y2))
254
255         if x1 == x2 or y1 == y2:
256             raise UsageError('Bounding box with invalid parameters.')
257
258         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
259
260
261 class GeometryFormat(enum.Flag):
262     """ Geometry output formats supported by Nominatim.
263     """
264     NONE = 0
265     GEOJSON = enum.auto()
266     KML = enum.auto()
267     SVG = enum.auto()
268     TEXT = enum.auto()
269
270
271 class DataLayer(enum.Flag):
272     """ Layer types that can be selected for reverse and forward search.
273     """
274     POI = enum.auto()
275     ADDRESS = enum.auto()
276     RAILWAY = enum.auto()
277     MANMADE = enum.auto()
278     NATURAL = enum.auto()
279
280
281 def format_country(cc: Any) -> List[str]:
282     """ Extract a list of country codes from the input which may be either
283         a string or list of strings. Filters out all values that are not
284         a two-letter string.
285     """
286     clist: Sequence[str]
287     if isinstance(cc, str):
288         clist = cc.split(',')
289     elif isinstance(cc, abc.Sequence):
290         clist = cc
291     else:
292         raise UsageError("Parameter 'country' needs to be a comma-separated list "
293                          "or a Python list of strings.")
294
295     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
296
297
298 def format_excluded(ids: Any) -> List[int]:
299     """ Extract a list of place ids from the input which may be either
300         a string or a list of strings or ints. Ignores empty value but
301         throws a UserError on anything that cannot be converted to int.
302     """
303     plist: Sequence[str]
304     if isinstance(ids, str):
305         plist = [s.strip() for s in ids.split(',')]
306     elif isinstance(ids, abc.Sequence):
307         plist = ids
308     else:
309         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
310                          "or a Python list of numbers.")
311     if not all(isinstance(i, int) or
312                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
313         raise UsageError("Parameter 'excluded' only takes place IDs.")
314
315     return [int(id) for id in plist if id] or [0]
316
317
318 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
319     """ Extract a list of categories. Currently a noop.
320     """
321     return categories
322
323 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
324
325 @dataclasses.dataclass
326 class LookupDetails:
327     """ Collection of parameters that define the amount of details
328         returned with a lookup or details result.
329     """
330     geometry_output: GeometryFormat = GeometryFormat.NONE
331     """ Add the full geometry of the place to the result. Multiple
332         formats may be selected. Note that geometries can become quite large.
333     """
334     address_details: bool = False
335     """ Get detailed information on the places that make up the address
336         for the result.
337     """
338     linked_places: bool = False
339     """ Get detailed information on the places that link to the result.
340     """
341     parented_places: bool = False
342     """ Get detailed information on all places that this place is a parent
343         for, i.e. all places for which it provides the address details.
344         Only POI places can have parents.
345     """
346     keywords: bool = False
347     """ Add information about the search terms used for this place.
348     """
349     geometry_simplification: float = 0.0
350     """ Simplification factor for a geometry in degrees WGS. A factor of
351         0.0 means the original geometry is kept. The higher the value, the
352         more the geometry gets simplified.
353     """
354
355     @classmethod
356     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
357         """ Load the data fields of the class from a dictionary.
358             Unknown entries in the dictionary are ignored, missing ones
359             get the default setting.
360
361             The function supports type checking and throws a UsageError
362             when the value does not fit.
363         """
364         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
365             if v is None:
366                 return field.default_factory() \
367                        if field.default_factory != dataclasses.MISSING \
368                        else field.default
369             if field.metadata and 'transform' in field.metadata:
370                 return field.metadata['transform'](v)
371             if not isinstance(v, field.type):
372                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
373             return v
374
375         return cls(**{f.name: _check_field(kwargs[f.name], f)
376                       for f in dataclasses.fields(cls) if f.name in kwargs})
377
378
379 @dataclasses.dataclass
380 class ReverseDetails(LookupDetails):
381     """ Collection of parameters for the reverse call.
382     """
383     max_rank: int = dataclasses.field(default=30,
384                                       metadata={'transform': lambda v: max(0, min(v, 30))}
385                                      )
386     """ Highest address rank to return.
387     """
388     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
389     """ Filter which kind of data to include.
390     """
391
392 @dataclasses.dataclass
393 class SearchDetails(LookupDetails):
394     """ Collection of parameters for the search call.
395     """
396     max_results: int = 10
397     """ Maximum number of results to be returned. The actual number of results
398         may be less.
399     """
400     min_rank: int = dataclasses.field(default=0,
401                                       metadata={'transform': lambda v: max(0, min(v, 30))}
402                                      )
403     """ Lowest address rank to return.
404     """
405     max_rank: int = dataclasses.field(default=30,
406                                       metadata={'transform': lambda v: max(0, min(v, 30))}
407                                      )
408     """ Highest address rank to return.
409     """
410     layers: Optional[DataLayer] = dataclasses.field(default=None,
411                                                     metadata={'transform': lambda r : r})
412     """ Filter which kind of data to include. When 'None' (the default) then
413         filtering by layers is disabled.
414     """
415     countries: List[str] = dataclasses.field(default_factory=list,
416                                              metadata={'transform': format_country})
417     """ Restrict search results to the given countries. An empty list (the
418         default) will disable this filter.
419     """
420     excluded: List[int] = dataclasses.field(default_factory=list,
421                                             metadata={'transform': format_excluded})
422     """ List of OSM objects to exclude from the results. Currenlty only
423         works when the internal place ID is given.
424         An empty list (the default) will disable this filter.
425     """
426     viewbox: Optional[Bbox] = dataclasses.field(default=None,
427                                                 metadata={'transform': Bbox.from_param})
428     """ Focus the search on a given map area.
429     """
430     bounded_viewbox: bool = False
431     """ Use 'viewbox' as a filter and restrict results to places within the
432         given area.
433     """
434     near: Optional[Point] = dataclasses.field(default=None,
435                                               metadata={'transform': Point.from_param})
436     """ Order results by distance to the given point.
437     """
438     near_radius: Optional[float] = dataclasses.field(default=None,
439                                               metadata={'transform': lambda r : r})
440     """ Use near point as a filter and drop results outside the given
441         radius. Radius is given in degrees WSG84.
442     """
443     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
444                                                           metadata={'transform': format_categories})
445     """ Restrict search to places with one of the given class/type categories.
446         An empty list (the default) will disable this filter.
447     """
448     viewbox_x2: Optional[Bbox] = None
449
450     def __post_init__(self) -> None:
451         if self.viewbox is not None:
452             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
453             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
454             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
455                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
456
457
458     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
459         """ Change the min_rank and max_rank fields to respect the
460             given boundaries.
461         """
462         assert new_min <= new_max
463         self.min_rank = max(self.min_rank, new_min)
464         self.max_rank = min(self.max_rank, new_max)
465
466
467     def is_impossible(self) -> bool:
468         """ Check if the parameter configuration is contradictionary and
469             cannot yield any results.
470         """
471         return (self.min_rank > self.max_rank
472                 or (self.bounded_viewbox
473                     and self.viewbox is not None and self.near is not None
474                     and self.viewbox.contains(self.near))
475                 or self.layers is not None and not self.layers)
476
477
478     def layer_enabled(self, layer: DataLayer) -> bool:
479         """ Check if the given layer has been choosen. Also returns
480             true when layer restriction has been disabled completely.
481         """
482         return self.layers is None or bool(self.layers & layer)