]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17 from binascii import unhexlify
18
19 from nominatim.errors import UsageError
20
21 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
22
23 @dataclasses.dataclass
24 class PlaceID:
25     """ Reference an object by Nominatim's internal ID.
26     """
27     place_id: int
28
29
30 @dataclasses.dataclass
31 class OsmID:
32     """ Reference by the OSM ID and potentially the basic category.
33     """
34     osm_type: str
35     osm_id: int
36     osm_class: Optional[str] = None
37
38     def __post_init__(self) -> None:
39         if self.osm_type not in ('N', 'W', 'R'):
40             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
41
42
43 PlaceRef = Union[PlaceID, OsmID]
44
45
46 class Point(NamedTuple):
47     """ A geographic point in WGS84 projection.
48     """
49     x: float
50     y: float
51
52
53     @property
54     def lat(self) -> float:
55         """ Return the latitude of the point.
56         """
57         return self.y
58
59
60     @property
61     def lon(self) -> float:
62         """ Return the longitude of the point.
63         """
64         return self.x
65
66
67     def to_geojson(self) -> str:
68         """ Return the point in GeoJSON format.
69         """
70         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
71
72
73     @staticmethod
74     def from_wkb(wkb: Union[str, bytes]) -> 'Point':
75         """ Create a point from EWKB as returned from the database.
76         """
77         if isinstance(wkb, str):
78             wkb = unhexlify(wkb)
79         if len(wkb) != 25:
80             raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
81         if wkb[0] == 0:
82             gtype, srid, x, y = unpack('>iidd', wkb[1:])
83         elif wkb[0] == 1:
84             gtype, srid, x, y = unpack('<iidd', wkb[1:])
85         else:
86             raise ValueError("WKB has unknown endian value.")
87
88         if gtype != 0x20000001:
89             raise ValueError("WKB must be a point geometry.")
90         if srid != 4326:
91             raise ValueError("Only WGS84 WKB supported.")
92
93         return Point(x, y)
94
95
96     @staticmethod
97     def from_param(inp: Any) -> 'Point':
98         """ Create a point from an input parameter. The parameter
99             may be given as a point, a string or a sequence of
100             strings or floats. Raises a UsageError if the format is
101             not correct.
102         """
103         if isinstance(inp, Point):
104             return inp
105
106         seq: Sequence[str]
107         if isinstance(inp, str):
108             seq = inp.split(',')
109         elif isinstance(inp, abc.Sequence):
110             seq = inp
111
112         if len(seq) != 2:
113             raise UsageError('Point parameter needs 2 coordinates.')
114         try:
115             x, y = filter(math.isfinite, map(float, seq))
116         except ValueError as exc:
117             raise UsageError('Point parameter needs to be numbers.') from exc
118
119         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
120             raise UsageError('Point coordinates invalid.')
121
122         return Point(x, y)
123
124
125     def to_wkt(self) -> str:
126         """ Return the WKT representation of the point.
127         """
128         return f'POINT({self.x} {self.y})'
129
130
131
132 AnyPoint = Union[Point, Tuple[float, float]]
133
134 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
135 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
136
137 class Bbox:
138     """ A bounding box in WSG84 projection.
139
140         The coordinates are available as an array in the 'coord'
141         property in the order (minx, miny, maxx, maxy).
142     """
143     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
144         self.coords = (minx, miny, maxx, maxy)
145
146
147     @property
148     def minlat(self) -> float:
149         """ Southern-most latitude, corresponding to the minimum y coordinate.
150         """
151         return self.coords[1]
152
153
154     @property
155     def maxlat(self) -> float:
156         """ Northern-most latitude, corresponding to the maximum y coordinate.
157         """
158         return self.coords[3]
159
160
161     @property
162     def minlon(self) -> float:
163         """ Western-most longitude, corresponding to the minimum x coordinate.
164         """
165         return self.coords[0]
166
167
168     @property
169     def maxlon(self) -> float:
170         """ Eastern-most longitude, corresponding to the maximum x coordinate.
171         """
172         return self.coords[2]
173
174
175     @property
176     def area(self) -> float:
177         """ Return the area of the box in WGS84.
178         """
179         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
180
181
182     def contains(self, pt: Point) -> bool:
183         """ Check if the point is inside or on the boundary of the box.
184         """
185         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
186                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
187
188
189     def to_wkt(self) -> str:
190         """ Return the WKT representation of the Bbox. This
191             is a simple polygon with four points.
192         """
193         return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'\
194                   .format(*self.coords) # pylint: disable=consider-using-f-string
195
196
197     @staticmethod
198     def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
199         """ Create a Bbox from a bounding box polygon as returned by
200             the database. Return s None if the input value is None.
201         """
202         if wkb is None:
203             return None
204
205         if isinstance(wkb, str):
206             wkb = unhexlify(wkb)
207
208         if len(wkb) != 97:
209             raise ValueError("WKB must be a bounding box polygon")
210         if wkb.startswith(WKB_BBOX_HEADER_LE):
211             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
212         elif wkb.startswith(WKB_BBOX_HEADER_BE):
213             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
214         else:
215             raise ValueError("WKB has wrong header")
216
217         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
218
219
220     @staticmethod
221     def from_point(pt: Point, buffer: float) -> 'Bbox':
222         """ Return a Bbox around the point with the buffer added to all sides.
223         """
224         return Bbox(pt[0] - buffer, pt[1] - buffer,
225                     pt[0] + buffer, pt[1] + buffer)
226
227
228     @staticmethod
229     def from_param(inp: Any) -> 'Bbox':
230         """ Return a Bbox from an input parameter. The box may be
231             given as a Bbox, a string or a list or strings or integer.
232             Raises a UsageError if the format is incorrect.
233         """
234         if isinstance(inp, Bbox):
235             return inp
236
237         seq: Sequence[str]
238         if isinstance(inp, str):
239             seq = inp.split(',')
240         elif isinstance(inp, abc.Sequence):
241             seq = inp
242
243         if len(seq) != 4:
244             raise UsageError('Bounding box parameter needs 4 coordinates.')
245         try:
246             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
247         except ValueError as exc:
248             raise UsageError('Bounding box parameter needs to be numbers.') from exc
249
250         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
251            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
252             raise UsageError('Bounding box coordinates invalid.')
253
254         if x1 == x2 or y1 == y2:
255             raise UsageError('Bounding box with invalid parameters.')
256
257         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
258
259
260 class GeometryFormat(enum.Flag):
261     """ Geometry output formats supported by Nominatim.
262     """
263     NONE = 0
264     GEOJSON = enum.auto()
265     KML = enum.auto()
266     SVG = enum.auto()
267     TEXT = enum.auto()
268
269
270 class DataLayer(enum.Flag):
271     """ Layer types that can be selected for reverse and forward search.
272     """
273     POI = enum.auto()
274     ADDRESS = enum.auto()
275     RAILWAY = enum.auto()
276     MANMADE = enum.auto()
277     NATURAL = enum.auto()
278
279
280 def format_country(cc: Any) -> List[str]:
281     """ Extract a list of country codes from the input which may be either
282         a string or list of strings. Filters out all values that are not
283         a two-letter string.
284     """
285     clist: Sequence[str]
286     if isinstance(cc, str):
287         clist = cc.split(',')
288     elif isinstance(cc, abc.Sequence):
289         clist = cc
290     else:
291         raise UsageError("Parameter 'country' needs to be a comma-separated list "
292                          "or a Python list of strings.")
293
294     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
295
296
297 def format_excluded(ids: Any) -> List[int]:
298     """ Extract a list of place ids from the input which may be either
299         a string or a list of strings or ints. Ignores empty value but
300         throws a UserError on anything that cannot be converted to int.
301     """
302     plist: Sequence[str]
303     if isinstance(ids, str):
304         plist = [s.strip() for s in ids.split(',')]
305     elif isinstance(ids, abc.Sequence):
306         plist = ids
307     else:
308         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
309                          "or a Python list of numbers.")
310     if not all(isinstance(i, int) or
311                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
312         raise UsageError("Parameter 'excluded' only takes place IDs.")
313
314     return [int(id) for id in plist if id] or [0]
315
316
317 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
318     """ Extract a list of categories. Currently a noop.
319     """
320     return categories
321
322 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
323
324 @dataclasses.dataclass
325 class LookupDetails:
326     """ Collection of parameters that define the amount of details
327         returned with a lookup or details result.
328     """
329     geometry_output: GeometryFormat = GeometryFormat.NONE
330     """ Add the full geometry of the place to the result. Multiple
331         formats may be selected. Note that geometries can become quite large.
332     """
333     address_details: bool = False
334     """ Get detailed information on the places that make up the address
335         for the result.
336     """
337     linked_places: bool = False
338     """ Get detailed information on the places that link to the result.
339     """
340     parented_places: bool = False
341     """ Get detailed information on all places that this place is a parent
342         for, i.e. all places for which it provides the address details.
343         Only POI places can have parents.
344     """
345     keywords: bool = False
346     """ Add information about the search terms used for this place.
347     """
348     geometry_simplification: float = 0.0
349     """ Simplification factor for a geometry in degrees WGS. A factor of
350         0.0 means the original geometry is kept. The higher the value, the
351         more the geometry gets simplified.
352     """
353
354     @classmethod
355     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
356         """ Load the data fields of the class from a dictionary.
357             Unknown entries in the dictionary are ignored, missing ones
358             get the default setting.
359
360             The function supports type checking and throws a UsageError
361             when the value does not fit.
362         """
363         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
364             if v is None:
365                 return field.default_factory() \
366                        if field.default_factory != dataclasses.MISSING \
367                        else field.default
368             if field.metadata and 'transform' in field.metadata:
369                 return field.metadata['transform'](v)
370             if not isinstance(v, field.type):
371                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
372             return v
373
374         return cls(**{f.name: _check_field(kwargs[f.name], f)
375                       for f in dataclasses.fields(cls) if f.name in kwargs})
376
377
378 @dataclasses.dataclass
379 class ReverseDetails(LookupDetails):
380     """ Collection of parameters for the reverse call.
381     """
382     max_rank: int = dataclasses.field(default=30,
383                                       metadata={'transform': lambda v: max(0, min(v, 30))}
384                                      )
385     """ Highest address rank to return.
386     """
387     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
388     """ Filter which kind of data to include.
389     """
390
391 @dataclasses.dataclass
392 class SearchDetails(LookupDetails):
393     """ Collection of parameters for the search call.
394     """
395     max_results: int = 10
396     """ Maximum number of results to be returned. The actual number of results
397         may be less.
398     """
399     min_rank: int = dataclasses.field(default=0,
400                                       metadata={'transform': lambda v: max(0, min(v, 30))}
401                                      )
402     """ Lowest address rank to return.
403     """
404     max_rank: int = dataclasses.field(default=30,
405                                       metadata={'transform': lambda v: max(0, min(v, 30))}
406                                      )
407     """ Highest address rank to return.
408     """
409     layers: Optional[DataLayer] = dataclasses.field(default=None,
410                                                     metadata={'transform': lambda r : r})
411     """ Filter which kind of data to include. When 'None' (the default) then
412         filtering by layers is disabled.
413     """
414     countries: List[str] = dataclasses.field(default_factory=list,
415                                              metadata={'transform': format_country})
416     """ Restrict search results to the given countries. An empty list (the
417         default) will disable this filter.
418     """
419     excluded: List[int] = dataclasses.field(default_factory=list,
420                                             metadata={'transform': format_excluded})
421     """ List of OSM objects to exclude from the results. Currenlty only
422         works when the internal place ID is given.
423         An empty list (the default) will disable this filter.
424     """
425     viewbox: Optional[Bbox] = dataclasses.field(default=None,
426                                                 metadata={'transform': Bbox.from_param})
427     """ Focus the search on a given map area.
428     """
429     bounded_viewbox: bool = False
430     """ Use 'viewbox' as a filter and restrict results to places within the
431         given area.
432     """
433     near: Optional[Point] = dataclasses.field(default=None,
434                                               metadata={'transform': Point.from_param})
435     """ Order results by distance to the given point.
436     """
437     near_radius: Optional[float] = dataclasses.field(default=None,
438                                               metadata={'transform': lambda r : r})
439     """ Use near point as a filter and drop results outside the given
440         radius. Radius is given in degrees WSG84.
441     """
442     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
443                                                           metadata={'transform': format_categories})
444     """ Restrict search to places with one of the given class/type categories.
445         An empty list (the default) will disable this filter.
446     """
447     viewbox_x2: Optional[Bbox] = None
448
449     def __post_init__(self) -> None:
450         if self.viewbox is not None:
451             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
452             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
453             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
454                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
455
456
457     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
458         """ Change the min_rank and max_rank fields to respect the
459             given boundaries.
460         """
461         assert new_min <= new_max
462         self.min_rank = max(self.min_rank, new_min)
463         self.max_rank = min(self.max_rank, new_max)
464
465
466     def is_impossible(self) -> bool:
467         """ Check if the parameter configuration is contradictionary and
468             cannot yield any results.
469         """
470         return (self.min_rank > self.max_rank
471                 or (self.bounded_viewbox
472                     and self.viewbox is not None and self.near is not None
473                     and self.viewbox.contains(self.near))
474                 or self.layers is not None and not self.layers)
475
476
477     def layer_enabled(self, layer: DataLayer) -> bool:
478         """ Check if the given layer has been choosen. Also returns
479             true when layer restriction has been disabled completely.
480         """
481         return self.layers is None or bool(self.layers & layer)