]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/types.py
switch reverse() to new Geometry datatype
[nominatim.git] / nominatim / api / types.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Complex datatypes used by the Nominatim API.
9 """
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
11                    Any, List, Sequence
12 from collections import abc
13 import dataclasses
14 import enum
15 import math
16 from struct import unpack
17 from binascii import unhexlify
18
19 import sqlalchemy as sa
20
21 from nominatim.errors import UsageError
22
23 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
24
25 @dataclasses.dataclass
26 class PlaceID:
27     """ Reference an object by Nominatim's internal ID.
28     """
29     place_id: int
30
31
32 @dataclasses.dataclass
33 class OsmID:
34     """ Reference by the OSM ID and potentially the basic category.
35     """
36     osm_type: str
37     osm_id: int
38     osm_class: Optional[str] = None
39
40     def __post_init__(self) -> None:
41         if self.osm_type not in ('N', 'W', 'R'):
42             raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
43
44
45 PlaceRef = Union[PlaceID, OsmID]
46
47
48 class Point(NamedTuple):
49     """ A geographic point in WGS84 projection.
50     """
51     x: float
52     y: float
53
54
55     @property
56     def lat(self) -> float:
57         """ Return the latitude of the point.
58         """
59         return self.y
60
61
62     @property
63     def lon(self) -> float:
64         """ Return the longitude of the point.
65         """
66         return self.x
67
68
69     def to_geojson(self) -> str:
70         """ Return the point in GeoJSON format.
71         """
72         return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
73
74
75     @staticmethod
76     def from_wkb(wkb: Union[str, bytes]) -> 'Point':
77         """ Create a point from EWKB as returned from the database.
78         """
79         if isinstance(wkb, str):
80             wkb = unhexlify(wkb)
81         if len(wkb) != 25:
82             raise ValueError("Point wkb has unexpected length")
83         if wkb[0] == 0:
84             gtype, srid, x, y = unpack('>iidd', wkb[1:])
85         elif wkb[0] == 1:
86             gtype, srid, x, y = unpack('<iidd', wkb[1:])
87         else:
88             raise ValueError("WKB has unknown endian value.")
89
90         if gtype != 0x20000001:
91             raise ValueError("WKB must be a point geometry.")
92         if srid != 4326:
93             raise ValueError("Only WGS84 WKB supported.")
94
95         return Point(x, y)
96
97
98     @staticmethod
99     def from_param(inp: Any) -> 'Point':
100         """ Create a point from an input parameter. The parameter
101             may be given as a point, a string or a sequence of
102             strings or floats. Raises a UsageError if the format is
103             not correct.
104         """
105         if isinstance(inp, Point):
106             return inp
107
108         seq: Sequence[str]
109         if isinstance(inp, str):
110             seq = inp.split(',')
111         elif isinstance(inp, abc.Sequence):
112             seq = inp
113
114         if len(seq) != 2:
115             raise UsageError('Point parameter needs 2 coordinates.')
116         try:
117             x, y = filter(math.isfinite, map(float, seq))
118         except ValueError as exc:
119             raise UsageError('Point parameter needs to be numbers.') from exc
120
121         if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
122             raise UsageError('Point coordinates invalid.')
123
124         return Point(x, y)
125
126
127     def sql_value(self) -> str:
128         """ Create an SQL expression for the point.
129         """
130         return f'POINT({self.x} {self.y})'
131
132
133
134 AnyPoint = Union[Point, Tuple[float, float]]
135
136 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
137 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
138
139 class Bbox:
140     """ A bounding box in WSG84 projection.
141
142         The coordinates are available as an array in the 'coord'
143         property in the order (minx, miny, maxx, maxy).
144     """
145     def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
146         self.coords = (minx, miny, maxx, maxy)
147
148
149     @property
150     def minlat(self) -> float:
151         """ Southern-most latitude, corresponding to the minimum y coordinate.
152         """
153         return self.coords[1]
154
155
156     @property
157     def maxlat(self) -> float:
158         """ Northern-most latitude, corresponding to the maximum y coordinate.
159         """
160         return self.coords[3]
161
162
163     @property
164     def minlon(self) -> float:
165         """ Western-most longitude, corresponding to the minimum x coordinate.
166         """
167         return self.coords[0]
168
169
170     @property
171     def maxlon(self) -> float:
172         """ Eastern-most longitude, corresponding to the maximum x coordinate.
173         """
174         return self.coords[2]
175
176
177     @property
178     def area(self) -> float:
179         """ Return the area of the box in WGS84.
180         """
181         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
182
183
184     def sql_value(self) -> Any:
185         """ Create an SQL expression for the box.
186         """
187         return sa.func.ST_MakeEnvelope(*self.coords, 4326)
188
189
190     def contains(self, pt: Point) -> bool:
191         """ Check if the point is inside or on the boundary of the box.
192         """
193         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
194                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
195
196
197     @staticmethod
198     def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
199         """ Create a Bbox from a bounding box polygon as returned by
200             the database. Return s None if the input value is None.
201         """
202         if wkb is None:
203             return None
204
205         if isinstance(wkb, str):
206             wkb = unhexlify(wkb)
207
208         if len(wkb) != 97:
209             raise ValueError("WKB must be a bounding box polygon")
210         if wkb.startswith(WKB_BBOX_HEADER_LE):
211             x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
212         elif wkb.startswith(WKB_BBOX_HEADER_BE):
213             x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
214         else:
215             raise ValueError("WKB has wrong header")
216
217         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
218
219
220     @staticmethod
221     def from_point(pt: Point, buffer: float) -> 'Bbox':
222         """ Return a Bbox around the point with the buffer added to all sides.
223         """
224         return Bbox(pt[0] - buffer, pt[1] - buffer,
225                     pt[0] + buffer, pt[1] + buffer)
226
227
228     @staticmethod
229     def from_param(inp: Any) -> 'Bbox':
230         """ Return a Bbox from an input parameter. The box may be
231             given as a Bbox, a string or a list or strings or integer.
232             Raises a UsageError if the format is incorrect.
233         """
234         if isinstance(inp, Bbox):
235             return inp
236
237         seq: Sequence[str]
238         if isinstance(inp, str):
239             seq = inp.split(',')
240         elif isinstance(inp, abc.Sequence):
241             seq = inp
242
243         if len(seq) != 4:
244             raise UsageError('Bounding box parameter needs 4 coordinates.')
245         try:
246             x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
247         except ValueError as exc:
248             raise UsageError('Bounding box parameter needs to be numbers.') from exc
249
250         if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
251            or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
252             raise UsageError('Bounding box coordinates invalid.')
253
254         if x1 == x2 or y1 == y2:
255             raise UsageError('Bounding box with invalid parameters.')
256
257         return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
258
259
260 class GeometryFormat(enum.Flag):
261     """ Geometry output formats supported by Nominatim.
262     """
263     NONE = 0
264     GEOJSON = enum.auto()
265     KML = enum.auto()
266     SVG = enum.auto()
267     TEXT = enum.auto()
268
269
270 class DataLayer(enum.Flag):
271     """ Layer types that can be selected for reverse and forward search.
272     """
273     POI = enum.auto()
274     ADDRESS = enum.auto()
275     RAILWAY = enum.auto()
276     MANMADE = enum.auto()
277     NATURAL = enum.auto()
278
279
280 def format_country(cc: Any) -> List[str]:
281     """ Extract a list of country codes from the input which may be either
282         a string or list of strings. Filters out all values that are not
283         a two-letter string.
284     """
285     clist: Sequence[str]
286     if isinstance(cc, str):
287         clist = cc.split(',')
288     elif isinstance(cc, abc.Sequence):
289         clist = cc
290     else:
291         raise UsageError("Parameter 'country' needs to be a comma-separated list "
292                          "or a Python list of strings.")
293
294     return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
295
296
297 def format_excluded(ids: Any) -> List[int]:
298     """ Extract a list of place ids from the input which may be either
299         a string or a list of strings or ints. Ignores empty value but
300         throws a UserError on anything that cannot be converted to int.
301     """
302     plist: Sequence[str]
303     if isinstance(ids, str):
304         plist = [s.strip() for s in ids.split(',')]
305     elif isinstance(ids, abc.Sequence):
306         plist = ids
307     else:
308         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
309                          "or a Python list of numbers.")
310     if not all(isinstance(i, int) or
311                (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
312         raise UsageError("Parameter 'excluded' only takes place IDs.")
313
314     return [int(id) for id in plist if id] or [0]
315
316
317 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
318     """ Extract a list of categories. Currently a noop.
319     """
320     return categories
321
322 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
323
324 @dataclasses.dataclass
325 class LookupDetails:
326     """ Collection of parameters that define the amount of details
327         returned with a lookup or details result.
328     """
329     geometry_output: GeometryFormat = GeometryFormat.NONE
330     """ Add the full geometry of the place to the result. Multiple
331         formats may be selected. Note that geometries can become quite large.
332     """
333     address_details: bool = False
334     """ Get detailed information on the places that make up the address
335         for the result.
336     """
337     linked_places: bool = False
338     """ Get detailed information on the places that link to the result.
339     """
340     parented_places: bool = False
341     """ Get detailed information on all places that this place is a parent
342         for, i.e. all places for which it provides the address details.
343         Only POI places can have parents.
344     """
345     keywords: bool = False
346     """ Add information about the search terms used for this place.
347     """
348     geometry_simplification: float = 0.0
349     """ Simplification factor for a geometry in degrees WGS. A factor of
350         0.0 means the original geometry is kept. The higher the value, the
351         more the geometry gets simplified.
352     """
353
354     @classmethod
355     def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
356         """ Load the data fields of the class from a dictionary.
357             Unknown entries in the dictionary are ignored, missing ones
358             get the default setting.
359
360             The function supports type checking and throws a UsageError
361             when the value does not fit.
362         """
363         def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
364             if v is None:
365                 return field.default_factory() \
366                        if field.default_factory != dataclasses.MISSING \
367                        else field.default
368             if field.metadata and 'transform' in field.metadata:
369                 return field.metadata['transform'](v)
370             if not isinstance(v, field.type):
371                 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
372             return v
373
374         return cls(**{f.name: _check_field(kwargs[f.name], f)
375                       for f in dataclasses.fields(cls) if f.name in kwargs})
376
377
378 @dataclasses.dataclass
379 class ReverseDetails(LookupDetails):
380     """ Collection of parameters for the reverse call.
381     """
382     max_rank: int = dataclasses.field(default=30,
383                                       metadata={'transform': lambda v: max(0, min(v, 30))}
384                                      )
385     """ Highest address rank to return.
386     """
387     layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
388     """ Filter which kind of data to include.
389     """
390
391 @dataclasses.dataclass
392 class SearchDetails(LookupDetails):
393     """ Collection of parameters for the search call.
394     """
395     max_results: int = 10
396     """ Maximum number of results to be returned. The actual number of results
397         may be less.
398     """
399     min_rank: int = dataclasses.field(default=0,
400                                       metadata={'transform': lambda v: max(0, min(v, 30))}
401                                      )
402     """ Lowest address rank to return.
403     """
404     max_rank: int = dataclasses.field(default=30,
405                                       metadata={'transform': lambda v: max(0, min(v, 30))}
406                                      )
407     """ Highest address rank to return.
408     """
409     layers: Optional[DataLayer] = dataclasses.field(default=None,
410                                                     metadata={'transform': lambda r : r})
411     """ Filter which kind of data to include. When 'None' (the default) then
412         filtering by layers is disabled.
413     """
414     countries: List[str] = dataclasses.field(default_factory=list,
415                                              metadata={'transform': format_country})
416     """ Restrict search results to the given countries. An empty list (the
417         default) will disable this filter.
418     """
419     excluded: List[int] = dataclasses.field(default_factory=list,
420                                             metadata={'transform': format_excluded})
421     """ List of OSM objects to exclude from the results. Currenlty only
422         works when the internal place ID is given.
423         An empty list (the default) will disable this filter.
424     """
425     viewbox: Optional[Bbox] = dataclasses.field(default=None,
426                                                 metadata={'transform': Bbox.from_param})
427     """ Focus the search on a given map area.
428     """
429     bounded_viewbox: bool = False
430     """ Use 'viewbox' as a filter and restrict results to places within the
431         given area.
432     """
433     near: Optional[Point] = dataclasses.field(default=None,
434                                               metadata={'transform': Point.from_param})
435     """ Order results by distance to the given point.
436     """
437     near_radius: Optional[float] = dataclasses.field(default=None,
438                                               metadata={'transform': lambda r : r})
439     """ Use near point as a filter and drop results outside the given
440         radius. Radius is given in degrees WSG84.
441     """
442     categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
443                                                           metadata={'transform': format_categories})
444     """ Restrict search to places with one of the given class/type categories.
445         An empty list (the default) will disable this filter.
446     """
447
448     def __post_init__(self) -> None:
449         if self.viewbox is not None:
450             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
451             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
452             self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
453                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
454
455
456     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
457         """ Change the min_rank and max_rank fields to respect the
458             given boundaries.
459         """
460         assert new_min <= new_max
461         self.min_rank = max(self.min_rank, new_min)
462         self.max_rank = min(self.max_rank, new_max)
463
464
465     def is_impossible(self) -> bool:
466         """ Check if the parameter configuration is contradictionary and
467             cannot yield any results.
468         """
469         return (self.min_rank > self.max_rank
470                 or (self.bounded_viewbox
471                     and self.viewbox is not None and self.near is not None
472                     and self.viewbox.contains(self.near))
473                 or self.layers is not None and not self.layers)
474
475
476     def layer_enabled(self, layer: DataLayer) -> bool:
477         """ Check if the given layer has been choosen. Also returns
478             true when layer restriction has been disabled completely.
479         """
480         return self.layers is None or bool(self.layers & layer)