1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Complex datatypes used by the Nominatim API.
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
12 from collections import abc
16 from struct import unpack
17 from binascii import unhexlify
19 from nominatim.errors import UsageError
21 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
23 @dataclasses.dataclass
25 """ Reference an object by Nominatim's internal ID.
30 @dataclasses.dataclass
32 """ Reference by the OSM ID and potentially the basic category.
36 osm_class: Optional[str] = None
38 def __post_init__(self) -> None:
39 if self.osm_type not in ('N', 'W', 'R'):
40 raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
43 PlaceRef = Union[PlaceID, OsmID]
46 class Point(NamedTuple):
47 """ A geographic point in WGS84 projection.
54 def lat(self) -> float:
55 """ Return the latitude of the point.
61 def lon(self) -> float:
62 """ Return the longitude of the point.
67 def to_geojson(self) -> str:
68 """ Return the point in GeoJSON format.
70 return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
74 def from_wkb(wkb: Union[str, bytes]) -> 'Point':
75 """ Create a point from EWKB as returned from the database.
77 if isinstance(wkb, str):
80 raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
82 gtype, srid, x, y = unpack('>iidd', wkb[1:])
84 gtype, srid, x, y = unpack('<iidd', wkb[1:])
86 raise ValueError("WKB has unknown endian value.")
88 if gtype != 0x20000001:
89 raise ValueError("WKB must be a point geometry.")
91 raise ValueError("Only WGS84 WKB supported.")
97 def from_param(inp: Any) -> 'Point':
98 """ Create a point from an input parameter. The parameter
99 may be given as a point, a string or a sequence of
100 strings or floats. Raises a UsageError if the format is
103 if isinstance(inp, Point):
107 if isinstance(inp, str):
109 elif isinstance(inp, abc.Sequence):
113 raise UsageError('Point parameter needs 2 coordinates.')
115 x, y = filter(math.isfinite, map(float, seq))
116 except ValueError as exc:
117 raise UsageError('Point parameter needs to be numbers.') from exc
119 if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
120 raise UsageError('Point coordinates invalid.')
125 def to_wkt(self) -> str:
126 """ Return the WKT representation of the point.
128 return f'POINT({self.x} {self.y})'
132 AnyPoint = Union[Point, Tuple[float, float]]
134 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
135 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
138 """ A bounding box in WSG84 projection.
140 The coordinates are available as an array in the 'coord'
141 property in the order (minx, miny, maxx, maxy).
143 def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
144 self.coords = (minx, miny, maxx, maxy)
148 def minlat(self) -> float:
149 """ Southern-most latitude, corresponding to the minimum y coordinate.
151 return self.coords[1]
155 def maxlat(self) -> float:
156 """ Northern-most latitude, corresponding to the maximum y coordinate.
158 return self.coords[3]
162 def minlon(self) -> float:
163 """ Western-most longitude, corresponding to the minimum x coordinate.
165 return self.coords[0]
169 def maxlon(self) -> float:
170 """ Eastern-most longitude, corresponding to the maximum x coordinate.
172 return self.coords[2]
176 def area(self) -> float:
177 """ Return the area of the box in WGS84.
179 return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
182 def contains(self, pt: Point) -> bool:
183 """ Check if the point is inside or on the boundary of the box.
185 return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
186 and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
189 def to_wkt(self) -> str:
190 """ Return the WKT representation of the Bbox. This
191 is a simple polygon with four points.
193 return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'\
194 .format(*self.coords) # pylint: disable=consider-using-f-string
198 def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
199 """ Create a Bbox from a bounding box polygon as returned by
200 the database. Return s None if the input value is None.
205 if isinstance(wkb, str):
209 raise ValueError("WKB must be a bounding box polygon")
210 if wkb.startswith(WKB_BBOX_HEADER_LE):
211 x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
212 elif wkb.startswith(WKB_BBOX_HEADER_BE):
213 x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
215 raise ValueError("WKB has wrong header")
217 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
221 def from_point(pt: Point, buffer: float) -> 'Bbox':
222 """ Return a Bbox around the point with the buffer added to all sides.
224 return Bbox(pt[0] - buffer, pt[1] - buffer,
225 pt[0] + buffer, pt[1] + buffer)
229 def from_param(inp: Any) -> 'Bbox':
230 """ Return a Bbox from an input parameter. The box may be
231 given as a Bbox, a string or a list or strings or integer.
232 Raises a UsageError if the format is incorrect.
234 if isinstance(inp, Bbox):
238 if isinstance(inp, str):
240 elif isinstance(inp, abc.Sequence):
244 raise UsageError('Bounding box parameter needs 4 coordinates.')
246 x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
247 except ValueError as exc:
248 raise UsageError('Bounding box parameter needs to be numbers.') from exc
250 if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
251 or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
252 raise UsageError('Bounding box coordinates invalid.')
254 if x1 == x2 or y1 == y2:
255 raise UsageError('Bounding box with invalid parameters.')
257 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
260 class GeometryFormat(enum.Flag):
261 """ Geometry output formats supported by Nominatim.
264 GEOJSON = enum.auto()
270 class DataLayer(enum.Flag):
271 """ Layer types that can be selected for reverse and forward search.
274 ADDRESS = enum.auto()
275 RAILWAY = enum.auto()
276 MANMADE = enum.auto()
277 NATURAL = enum.auto()
280 def format_country(cc: Any) -> List[str]:
281 """ Extract a list of country codes from the input which may be either
282 a string or list of strings. Filters out all values that are not
286 if isinstance(cc, str):
287 clist = cc.split(',')
288 elif isinstance(cc, abc.Sequence):
291 raise UsageError("Parameter 'country' needs to be a comma-separated list "
292 "or a Python list of strings.")
294 return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
297 def format_excluded(ids: Any) -> List[int]:
298 """ Extract a list of place ids from the input which may be either
299 a string or a list of strings or ints. Ignores empty value but
300 throws a UserError on anything that cannot be converted to int.
303 if isinstance(ids, str):
304 plist = [s.strip() for s in ids.split(',')]
305 elif isinstance(ids, abc.Sequence):
308 raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
309 "or a Python list of numbers.")
310 if not all(isinstance(i, int) or
311 (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
312 raise UsageError("Parameter 'excluded' only takes place IDs.")
314 return [int(id) for id in plist if id] or [0]
317 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
318 """ Extract a list of categories. Currently a noop.
322 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
324 @dataclasses.dataclass
326 """ Collection of parameters that define the amount of details
327 returned with a lookup or details result.
329 geometry_output: GeometryFormat = GeometryFormat.NONE
330 """ Add the full geometry of the place to the result. Multiple
331 formats may be selected. Note that geometries can become quite large.
333 address_details: bool = False
334 """ Get detailed information on the places that make up the address
337 linked_places: bool = False
338 """ Get detailed information on the places that link to the result.
340 parented_places: bool = False
341 """ Get detailed information on all places that this place is a parent
342 for, i.e. all places for which it provides the address details.
343 Only POI places can have parents.
345 keywords: bool = False
346 """ Add information about the search terms used for this place.
348 geometry_simplification: float = 0.0
349 """ Simplification factor for a geometry in degrees WGS. A factor of
350 0.0 means the original geometry is kept. The higher the value, the
351 more the geometry gets simplified.
355 def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
356 """ Load the data fields of the class from a dictionary.
357 Unknown entries in the dictionary are ignored, missing ones
358 get the default setting.
360 The function supports type checking and throws a UsageError
361 when the value does not fit.
363 def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
365 return field.default_factory() \
366 if field.default_factory != dataclasses.MISSING \
368 if field.metadata and 'transform' in field.metadata:
369 return field.metadata['transform'](v)
370 if not isinstance(v, field.type):
371 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
374 return cls(**{f.name: _check_field(kwargs[f.name], f)
375 for f in dataclasses.fields(cls) if f.name in kwargs})
378 @dataclasses.dataclass
379 class ReverseDetails(LookupDetails):
380 """ Collection of parameters for the reverse call.
382 max_rank: int = dataclasses.field(default=30,
383 metadata={'transform': lambda v: max(0, min(v, 30))}
385 """ Highest address rank to return.
387 layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
388 """ Filter which kind of data to include.
391 @dataclasses.dataclass
392 class SearchDetails(LookupDetails):
393 """ Collection of parameters for the search call.
395 max_results: int = 10
396 """ Maximum number of results to be returned. The actual number of results
399 min_rank: int = dataclasses.field(default=0,
400 metadata={'transform': lambda v: max(0, min(v, 30))}
402 """ Lowest address rank to return.
404 max_rank: int = dataclasses.field(default=30,
405 metadata={'transform': lambda v: max(0, min(v, 30))}
407 """ Highest address rank to return.
409 layers: Optional[DataLayer] = dataclasses.field(default=None,
410 metadata={'transform': lambda r : r})
411 """ Filter which kind of data to include. When 'None' (the default) then
412 filtering by layers is disabled.
414 countries: List[str] = dataclasses.field(default_factory=list,
415 metadata={'transform': format_country})
416 """ Restrict search results to the given countries. An empty list (the
417 default) will disable this filter.
419 excluded: List[int] = dataclasses.field(default_factory=list,
420 metadata={'transform': format_excluded})
421 """ List of OSM objects to exclude from the results. Currenlty only
422 works when the internal place ID is given.
423 An empty list (the default) will disable this filter.
425 viewbox: Optional[Bbox] = dataclasses.field(default=None,
426 metadata={'transform': Bbox.from_param})
427 """ Focus the search on a given map area.
429 bounded_viewbox: bool = False
430 """ Use 'viewbox' as a filter and restrict results to places within the
433 near: Optional[Point] = dataclasses.field(default=None,
434 metadata={'transform': Point.from_param})
435 """ Order results by distance to the given point.
437 near_radius: Optional[float] = dataclasses.field(default=None,
438 metadata={'transform': lambda r : r})
439 """ Use near point as a filter and drop results outside the given
440 radius. Radius is given in degrees WSG84.
442 categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
443 metadata={'transform': format_categories})
444 """ Restrict search to places with one of the given class/type categories.
445 An empty list (the default) will disable this filter.
447 viewbox_x2: Optional[Bbox] = None
449 def __post_init__(self) -> None:
450 if self.viewbox is not None:
451 xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
452 yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
453 self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
454 self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
457 def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
458 """ Change the min_rank and max_rank fields to respect the
461 assert new_min <= new_max
462 self.min_rank = max(self.min_rank, new_min)
463 self.max_rank = min(self.max_rank, new_max)
466 def is_impossible(self) -> bool:
467 """ Check if the parameter configuration is contradictionary and
468 cannot yield any results.
470 return (self.min_rank > self.max_rank
471 or (self.bounded_viewbox
472 and self.viewbox is not None and self.near is not None
473 and self.viewbox.contains(self.near))
474 or self.layers is not None and not self.layers)
477 def layer_enabled(self, layer: DataLayer) -> bool:
478 """ Check if the given layer has been choosen. Also returns
479 true when layer restriction has been disabled completely.
481 return self.layers is None or bool(self.layers & layer)