1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Complex datatypes used by the Nominatim API.
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
12 from collections import abc
16 from struct import unpack
18 from geoalchemy2 import WKTElement
19 import geoalchemy2.functions
21 from nominatim.errors import UsageError
23 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
25 @dataclasses.dataclass
27 """ Reference an object by Nominatim's internal ID.
32 @dataclasses.dataclass
34 """ Reference by the OSM ID and potentially the basic category.
38 osm_class: Optional[str] = None
40 def __post_init__(self) -> None:
41 if self.osm_type not in ('N', 'W', 'R'):
42 raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
45 PlaceRef = Union[PlaceID, OsmID]
48 class Point(NamedTuple):
49 """ A geographic point in WGS84 projection.
56 def lat(self) -> float:
57 """ Return the latitude of the point.
63 def lon(self) -> float:
64 """ Return the longitude of the point.
69 def to_geojson(self) -> str:
70 """ Return the point in GeoJSON format.
72 return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
76 def from_wkb(wkb: bytes) -> 'Point':
77 """ Create a point from EWKB as returned from the database.
80 raise ValueError("Point wkb has unexpected length")
82 gtype, srid, x, y = unpack('>iidd', wkb[1:])
84 gtype, srid, x, y = unpack('<iidd', wkb[1:])
86 raise ValueError("WKB has unknown endian value.")
88 if gtype != 0x20000001:
89 raise ValueError("WKB must be a point geometry.")
91 raise ValueError("Only WGS84 WKB supported.")
97 def from_param(inp: Any) -> 'Point':
98 """ Create a point from an input parameter. The parameter
99 may be given as a point, a string or a sequence of
100 strings or floats. Raises a UsageError if the format is
103 if isinstance(inp, Point):
107 if isinstance(inp, str):
109 elif isinstance(inp, abc.Sequence):
113 raise UsageError('Point parameter needs 2 coordinates.')
115 x, y = filter(math.isfinite, map(float, seq))
116 except ValueError as exc:
117 raise UsageError('Point parameter needs to be numbers.') from exc
119 if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
120 raise UsageError('Point coordinates invalid.')
125 def sql_value(self) -> WKTElement:
126 """ Create an SQL expression for the point.
128 return WKTElement(f'POINT({self.x} {self.y})', srid=4326)
132 AnyPoint = Union[Point, Tuple[float, float]]
134 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
135 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
138 """ A bounding box in WSG84 projection.
140 The coordinates are available as an array in the 'coord'
141 property in the order (minx, miny, maxx, maxy).
143 def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
144 self.coords = (minx, miny, maxx, maxy)
148 def minlat(self) -> float:
149 """ Southern-most latitude, corresponding to the minimum y coordinate.
151 return self.coords[1]
155 def maxlat(self) -> float:
156 """ Northern-most latitude, corresponding to the maximum y coordinate.
158 return self.coords[3]
162 def minlon(self) -> float:
163 """ Western-most longitude, corresponding to the minimum x coordinate.
165 return self.coords[0]
169 def maxlon(self) -> float:
170 """ Eastern-most longitude, corresponding to the maximum x coordinate.
172 return self.coords[2]
176 def area(self) -> float:
177 """ Return the area of the box in WGS84.
179 return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
182 def sql_value(self) -> Any:
183 """ Create an SQL expression for the box.
185 return geoalchemy2.functions.ST_MakeEnvelope(*self.coords, 4326)
188 def contains(self, pt: Point) -> bool:
189 """ Check if the point is inside or on the boundary of the box.
191 return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
192 and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
196 def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
197 """ Create a Bbox from a bounding box polygon as returned by
198 the database. Return s None if the input value is None.
204 raise ValueError("WKB must be a bounding box polygon")
205 if wkb.startswith(WKB_BBOX_HEADER_LE):
206 x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
207 elif wkb.startswith(WKB_BBOX_HEADER_BE):
208 x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
210 raise ValueError("WKB has wrong header")
212 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
216 def from_point(pt: Point, buffer: float) -> 'Bbox':
217 """ Return a Bbox around the point with the buffer added to all sides.
219 return Bbox(pt[0] - buffer, pt[1] - buffer,
220 pt[0] + buffer, pt[1] + buffer)
224 def from_param(inp: Any) -> 'Bbox':
225 """ Return a Bbox from an input parameter. The box may be
226 given as a Bbox, a string or a list or strings or integer.
227 Raises a UsageError if the format is incorrect.
229 if isinstance(inp, Bbox):
233 if isinstance(inp, str):
235 elif isinstance(inp, abc.Sequence):
239 raise UsageError('Bounding box parameter needs 4 coordinates.')
241 x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
242 except ValueError as exc:
243 raise UsageError('Bounding box parameter needs to be numbers.') from exc
245 if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
246 or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
247 raise UsageError('Bounding box coordinates invalid.')
249 if x1 == x2 or y1 == y2:
250 raise UsageError('Bounding box with invalid parameters.')
252 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
255 class GeometryFormat(enum.Flag):
256 """ Geometry output formats supported by Nominatim.
259 GEOJSON = enum.auto()
265 class DataLayer(enum.Flag):
266 """ Layer types that can be selected for reverse and forward search.
269 ADDRESS = enum.auto()
270 RAILWAY = enum.auto()
271 MANMADE = enum.auto()
272 NATURAL = enum.auto()
275 def format_country(cc: Any) -> List[str]:
276 """ Extract a list of country codes from the input which may be either
277 a string or list of strings. Filters out all values that are not
281 if isinstance(cc, str):
282 clist = cc.split(',')
283 elif isinstance(cc, abc.Sequence):
286 raise UsageError("Parameter 'country' needs to be a comma-separated list "
287 "or a Python list of strings.")
289 return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
292 def format_excluded(ids: Any) -> List[int]:
293 """ Extract a list of place ids from the input which may be either
294 a string or a list of strings or ints. Ignores empty value but
295 throws a UserError on anything that cannot be converted to int.
298 if isinstance(ids, str):
299 plist = [s.strip() for s in ids.split(',')]
300 elif isinstance(ids, abc.Sequence):
303 raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
304 "or a Python list of numbers.")
305 if not all(isinstance(i, int) or
306 (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
307 raise UsageError("Parameter 'excluded' only takes place IDs.")
309 return [int(id) for id in plist if id] or [0]
312 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
313 """ Extract a list of categories. Currently a noop.
317 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
319 @dataclasses.dataclass
321 """ Collection of parameters that define the amount of details
322 returned with a lookup or details result.
324 geometry_output: GeometryFormat = GeometryFormat.NONE
325 """ Add the full geometry of the place to the result. Multiple
326 formats may be selected. Note that geometries can become quite large.
328 address_details: bool = False
329 """ Get detailed information on the places that make up the address
332 linked_places: bool = False
333 """ Get detailed information on the places that link to the result.
335 parented_places: bool = False
336 """ Get detailed information on all places that this place is a parent
337 for, i.e. all places for which it provides the address details.
338 Only POI places can have parents.
340 keywords: bool = False
341 """ Add information about the search terms used for this place.
343 geometry_simplification: float = 0.0
344 """ Simplification factor for a geometry in degrees WGS. A factor of
345 0.0 means the original geometry is kept. The higher the value, the
346 more the geometry gets simplified.
350 def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
351 """ Load the data fields of the class from a dictionary.
352 Unknown entries in the dictionary are ignored, missing ones
353 get the default setting.
355 The function supports type checking and throws a UsageError
356 when the value does not fit.
358 def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
360 return field.default_factory() \
361 if field.default_factory != dataclasses.MISSING \
363 if field.metadata and 'transform' in field.metadata:
364 return field.metadata['transform'](v)
365 if not isinstance(v, field.type):
366 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
369 return cls(**{f.name: _check_field(kwargs[f.name], f)
370 for f in dataclasses.fields(cls) if f.name in kwargs})
373 @dataclasses.dataclass
374 class ReverseDetails(LookupDetails):
375 """ Collection of parameters for the reverse call.
377 max_rank: int = dataclasses.field(default=30,
378 metadata={'transform': lambda v: max(0, min(v, 30))}
380 """ Highest address rank to return.
382 layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
383 """ Filter which kind of data to include.
386 @dataclasses.dataclass
387 class SearchDetails(LookupDetails):
388 """ Collection of parameters for the search call.
390 max_results: int = 10
391 """ Maximum number of results to be returned. The actual number of results
394 min_rank: int = dataclasses.field(default=0,
395 metadata={'transform': lambda v: max(0, min(v, 30))}
397 """ Lowest address rank to return.
399 max_rank: int = dataclasses.field(default=30,
400 metadata={'transform': lambda v: max(0, min(v, 30))}
402 """ Highest address rank to return.
404 layers: Optional[DataLayer] = dataclasses.field(default=None,
405 metadata={'transform': lambda r : r})
406 """ Filter which kind of data to include. When 'None' (the default) then
407 filtering by layers is disabled.
409 countries: List[str] = dataclasses.field(default_factory=list,
410 metadata={'transform': format_country})
411 """ Restrict search results to the given countries. An empty list (the
412 default) will disable this filter.
414 excluded: List[int] = dataclasses.field(default_factory=list,
415 metadata={'transform': format_excluded})
416 """ List of OSM objects to exclude from the results. Currenlty only
417 works when the internal place ID is given.
418 An empty list (the default) will disable this filter.
420 viewbox: Optional[Bbox] = dataclasses.field(default=None,
421 metadata={'transform': Bbox.from_param})
422 """ Focus the search on a given map area.
424 bounded_viewbox: bool = False
425 """ Use 'viewbox' as a filter and restrict results to places within the
428 near: Optional[Point] = dataclasses.field(default=None,
429 metadata={'transform': Point.from_param})
430 """ Order results by distance to the given point.
432 near_radius: Optional[float] = dataclasses.field(default=None,
433 metadata={'transform': lambda r : r})
434 """ Use near point as a filter and drop results outside the given
435 radius. Radius is given in degrees WSG84.
437 categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
438 metadata={'transform': format_categories})
439 """ Restrict search to places with one of the given class/type categories.
440 An empty list (the default) will disable this filter.
443 def __post_init__(self) -> None:
444 if self.viewbox is not None:
445 xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
446 yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
447 self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
448 self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
451 def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
452 """ Change the min_rank and max_rank fields to respect the
455 assert new_min <= new_max
456 self.min_rank = max(self.min_rank, new_min)
457 self.max_rank = min(self.max_rank, new_max)
460 def is_impossible(self) -> bool:
461 """ Check if the parameter configuration is contradictionary and
462 cannot yield any results.
464 return (self.min_rank > self.max_rank
465 or (self.bounded_viewbox
466 and self.viewbox is not None and self.near is not None
467 and self.viewbox.contains(self.near))
468 or self.layers is not None and not self.layers)
471 def layer_enabled(self, layer: DataLayer) -> bool:
472 """ Check if the given layer has been choosen. Also returns
473 true when layer restriction has been disabled completely.
475 return self.layers is None or bool(self.layers & layer)