1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Complex datatypes used by the Nominatim API.
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
12 from collections import abc
16 from struct import unpack
17 from binascii import unhexlify
19 import sqlalchemy as sa
21 from nominatim.errors import UsageError
23 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
25 @dataclasses.dataclass
27 """ Reference an object by Nominatim's internal ID.
32 @dataclasses.dataclass
34 """ Reference by the OSM ID and potentially the basic category.
38 osm_class: Optional[str] = None
40 def __post_init__(self) -> None:
41 if self.osm_type not in ('N', 'W', 'R'):
42 raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
45 PlaceRef = Union[PlaceID, OsmID]
48 class Point(NamedTuple):
49 """ A geographic point in WGS84 projection.
56 def lat(self) -> float:
57 """ Return the latitude of the point.
63 def lon(self) -> float:
64 """ Return the longitude of the point.
69 def to_geojson(self) -> str:
70 """ Return the point in GeoJSON format.
72 return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
76 def from_wkb(wkb: Union[str, bytes]) -> 'Point':
77 """ Create a point from EWKB as returned from the database.
79 if isinstance(wkb, str):
82 raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
84 gtype, srid, x, y = unpack('>iidd', wkb[1:])
86 gtype, srid, x, y = unpack('<iidd', wkb[1:])
88 raise ValueError("WKB has unknown endian value.")
90 if gtype != 0x20000001:
91 raise ValueError("WKB must be a point geometry.")
93 raise ValueError("Only WGS84 WKB supported.")
99 def from_param(inp: Any) -> 'Point':
100 """ Create a point from an input parameter. The parameter
101 may be given as a point, a string or a sequence of
102 strings or floats. Raises a UsageError if the format is
105 if isinstance(inp, Point):
109 if isinstance(inp, str):
111 elif isinstance(inp, abc.Sequence):
115 raise UsageError('Point parameter needs 2 coordinates.')
117 x, y = filter(math.isfinite, map(float, seq))
118 except ValueError as exc:
119 raise UsageError('Point parameter needs to be numbers.') from exc
121 if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
122 raise UsageError('Point coordinates invalid.')
127 def to_wkt(self) -> str:
128 """ Return the WKT representation of the point.
130 return f'POINT({self.x} {self.y})'
134 AnyPoint = Union[Point, Tuple[float, float]]
136 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
137 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
140 """ A bounding box in WSG84 projection.
142 The coordinates are available as an array in the 'coord'
143 property in the order (minx, miny, maxx, maxy).
145 def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
146 self.coords = (minx, miny, maxx, maxy)
150 def minlat(self) -> float:
151 """ Southern-most latitude, corresponding to the minimum y coordinate.
153 return self.coords[1]
157 def maxlat(self) -> float:
158 """ Northern-most latitude, corresponding to the maximum y coordinate.
160 return self.coords[3]
164 def minlon(self) -> float:
165 """ Western-most longitude, corresponding to the minimum x coordinate.
167 return self.coords[0]
171 def maxlon(self) -> float:
172 """ Eastern-most longitude, corresponding to the maximum x coordinate.
174 return self.coords[2]
178 def area(self) -> float:
179 """ Return the area of the box in WGS84.
181 return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
184 def contains(self, pt: Point) -> bool:
185 """ Check if the point is inside or on the boundary of the box.
187 return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
188 and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
191 def to_wkt(self) -> str:
192 """ Return the WKT representation of the Bbox. This
193 is a simple polygon with four points.
195 return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'.format(*self.coords)
199 def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
200 """ Create a Bbox from a bounding box polygon as returned by
201 the database. Return s None if the input value is None.
206 if isinstance(wkb, str):
210 raise ValueError("WKB must be a bounding box polygon")
211 if wkb.startswith(WKB_BBOX_HEADER_LE):
212 x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
213 elif wkb.startswith(WKB_BBOX_HEADER_BE):
214 x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
216 raise ValueError("WKB has wrong header")
218 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
222 def from_point(pt: Point, buffer: float) -> 'Bbox':
223 """ Return a Bbox around the point with the buffer added to all sides.
225 return Bbox(pt[0] - buffer, pt[1] - buffer,
226 pt[0] + buffer, pt[1] + buffer)
230 def from_param(inp: Any) -> 'Bbox':
231 """ Return a Bbox from an input parameter. The box may be
232 given as a Bbox, a string or a list or strings or integer.
233 Raises a UsageError if the format is incorrect.
235 if isinstance(inp, Bbox):
239 if isinstance(inp, str):
241 elif isinstance(inp, abc.Sequence):
245 raise UsageError('Bounding box parameter needs 4 coordinates.')
247 x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
248 except ValueError as exc:
249 raise UsageError('Bounding box parameter needs to be numbers.') from exc
251 if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
252 or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
253 raise UsageError('Bounding box coordinates invalid.')
255 if x1 == x2 or y1 == y2:
256 raise UsageError('Bounding box with invalid parameters.')
258 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
261 class GeometryFormat(enum.Flag):
262 """ Geometry output formats supported by Nominatim.
265 GEOJSON = enum.auto()
271 class DataLayer(enum.Flag):
272 """ Layer types that can be selected for reverse and forward search.
275 ADDRESS = enum.auto()
276 RAILWAY = enum.auto()
277 MANMADE = enum.auto()
278 NATURAL = enum.auto()
281 def format_country(cc: Any) -> List[str]:
282 """ Extract a list of country codes from the input which may be either
283 a string or list of strings. Filters out all values that are not
287 if isinstance(cc, str):
288 clist = cc.split(',')
289 elif isinstance(cc, abc.Sequence):
292 raise UsageError("Parameter 'country' needs to be a comma-separated list "
293 "or a Python list of strings.")
295 return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
298 def format_excluded(ids: Any) -> List[int]:
299 """ Extract a list of place ids from the input which may be either
300 a string or a list of strings or ints. Ignores empty value but
301 throws a UserError on anything that cannot be converted to int.
304 if isinstance(ids, str):
305 plist = [s.strip() for s in ids.split(',')]
306 elif isinstance(ids, abc.Sequence):
309 raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
310 "or a Python list of numbers.")
311 if not all(isinstance(i, int) or
312 (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
313 raise UsageError("Parameter 'excluded' only takes place IDs.")
315 return [int(id) for id in plist if id] or [0]
318 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
319 """ Extract a list of categories. Currently a noop.
323 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
325 @dataclasses.dataclass
327 """ Collection of parameters that define the amount of details
328 returned with a lookup or details result.
330 geometry_output: GeometryFormat = GeometryFormat.NONE
331 """ Add the full geometry of the place to the result. Multiple
332 formats may be selected. Note that geometries can become quite large.
334 address_details: bool = False
335 """ Get detailed information on the places that make up the address
338 linked_places: bool = False
339 """ Get detailed information on the places that link to the result.
341 parented_places: bool = False
342 """ Get detailed information on all places that this place is a parent
343 for, i.e. all places for which it provides the address details.
344 Only POI places can have parents.
346 keywords: bool = False
347 """ Add information about the search terms used for this place.
349 geometry_simplification: float = 0.0
350 """ Simplification factor for a geometry in degrees WGS. A factor of
351 0.0 means the original geometry is kept. The higher the value, the
352 more the geometry gets simplified.
356 def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
357 """ Load the data fields of the class from a dictionary.
358 Unknown entries in the dictionary are ignored, missing ones
359 get the default setting.
361 The function supports type checking and throws a UsageError
362 when the value does not fit.
364 def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
366 return field.default_factory() \
367 if field.default_factory != dataclasses.MISSING \
369 if field.metadata and 'transform' in field.metadata:
370 return field.metadata['transform'](v)
371 if not isinstance(v, field.type):
372 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
375 return cls(**{f.name: _check_field(kwargs[f.name], f)
376 for f in dataclasses.fields(cls) if f.name in kwargs})
379 @dataclasses.dataclass
380 class ReverseDetails(LookupDetails):
381 """ Collection of parameters for the reverse call.
383 max_rank: int = dataclasses.field(default=30,
384 metadata={'transform': lambda v: max(0, min(v, 30))}
386 """ Highest address rank to return.
388 layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
389 """ Filter which kind of data to include.
392 @dataclasses.dataclass
393 class SearchDetails(LookupDetails):
394 """ Collection of parameters for the search call.
396 max_results: int = 10
397 """ Maximum number of results to be returned. The actual number of results
400 min_rank: int = dataclasses.field(default=0,
401 metadata={'transform': lambda v: max(0, min(v, 30))}
403 """ Lowest address rank to return.
405 max_rank: int = dataclasses.field(default=30,
406 metadata={'transform': lambda v: max(0, min(v, 30))}
408 """ Highest address rank to return.
410 layers: Optional[DataLayer] = dataclasses.field(default=None,
411 metadata={'transform': lambda r : r})
412 """ Filter which kind of data to include. When 'None' (the default) then
413 filtering by layers is disabled.
415 countries: List[str] = dataclasses.field(default_factory=list,
416 metadata={'transform': format_country})
417 """ Restrict search results to the given countries. An empty list (the
418 default) will disable this filter.
420 excluded: List[int] = dataclasses.field(default_factory=list,
421 metadata={'transform': format_excluded})
422 """ List of OSM objects to exclude from the results. Currenlty only
423 works when the internal place ID is given.
424 An empty list (the default) will disable this filter.
426 viewbox: Optional[Bbox] = dataclasses.field(default=None,
427 metadata={'transform': Bbox.from_param})
428 """ Focus the search on a given map area.
430 bounded_viewbox: bool = False
431 """ Use 'viewbox' as a filter and restrict results to places within the
434 near: Optional[Point] = dataclasses.field(default=None,
435 metadata={'transform': Point.from_param})
436 """ Order results by distance to the given point.
438 near_radius: Optional[float] = dataclasses.field(default=None,
439 metadata={'transform': lambda r : r})
440 """ Use near point as a filter and drop results outside the given
441 radius. Radius is given in degrees WSG84.
443 categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
444 metadata={'transform': format_categories})
445 """ Restrict search to places with one of the given class/type categories.
446 An empty list (the default) will disable this filter.
449 def __post_init__(self) -> None:
450 if self.viewbox is not None:
451 xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
452 yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
453 self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
454 self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
456 self.viewbox_x2 = None
459 def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
460 """ Change the min_rank and max_rank fields to respect the
463 assert new_min <= new_max
464 self.min_rank = max(self.min_rank, new_min)
465 self.max_rank = min(self.max_rank, new_max)
468 def is_impossible(self) -> bool:
469 """ Check if the parameter configuration is contradictionary and
470 cannot yield any results.
472 return (self.min_rank > self.max_rank
473 or (self.bounded_viewbox
474 and self.viewbox is not None and self.near is not None
475 and self.viewbox.contains(self.near))
476 or self.layers is not None and not self.layers)
479 def layer_enabled(self, layer: DataLayer) -> bool:
480 """ Check if the given layer has been choosen. Also returns
481 true when layer restriction has been disabled completely.
483 return self.layers is None or bool(self.layers & layer)