1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Complex datatypes used by the Nominatim API.
10 from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
12 from collections import abc
16 from struct import unpack
18 import sqlalchemy as sa
20 from nominatim.errors import UsageError
22 # pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
24 @dataclasses.dataclass
26 """ Reference an object by Nominatim's internal ID.
31 @dataclasses.dataclass
33 """ Reference by the OSM ID and potentially the basic category.
37 osm_class: Optional[str] = None
39 def __post_init__(self) -> None:
40 if self.osm_type not in ('N', 'W', 'R'):
41 raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.")
44 PlaceRef = Union[PlaceID, OsmID]
47 class Point(NamedTuple):
48 """ A geographic point in WGS84 projection.
55 def lat(self) -> float:
56 """ Return the latitude of the point.
62 def lon(self) -> float:
63 """ Return the longitude of the point.
68 def to_geojson(self) -> str:
69 """ Return the point in GeoJSON format.
71 return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}'
75 def from_wkb(wkb: bytes) -> 'Point':
76 """ Create a point from EWKB as returned from the database.
79 raise ValueError("Point wkb has unexpected length")
81 gtype, srid, x, y = unpack('>iidd', wkb[1:])
83 gtype, srid, x, y = unpack('<iidd', wkb[1:])
85 raise ValueError("WKB has unknown endian value.")
87 if gtype != 0x20000001:
88 raise ValueError("WKB must be a point geometry.")
90 raise ValueError("Only WGS84 WKB supported.")
96 def from_param(inp: Any) -> 'Point':
97 """ Create a point from an input parameter. The parameter
98 may be given as a point, a string or a sequence of
99 strings or floats. Raises a UsageError if the format is
102 if isinstance(inp, Point):
106 if isinstance(inp, str):
108 elif isinstance(inp, abc.Sequence):
112 raise UsageError('Point parameter needs 2 coordinates.')
114 x, y = filter(math.isfinite, map(float, seq))
115 except ValueError as exc:
116 raise UsageError('Point parameter needs to be numbers.') from exc
118 if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
119 raise UsageError('Point coordinates invalid.')
124 def sql_value(self) -> str:
125 """ Create an SQL expression for the point.
127 return f'POINT({self.x} {self.y})'
131 AnyPoint = Union[Point, Tuple[float, float]]
133 WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
134 WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
137 """ A bounding box in WSG84 projection.
139 The coordinates are available as an array in the 'coord'
140 property in the order (minx, miny, maxx, maxy).
142 def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
143 self.coords = (minx, miny, maxx, maxy)
147 def minlat(self) -> float:
148 """ Southern-most latitude, corresponding to the minimum y coordinate.
150 return self.coords[1]
154 def maxlat(self) -> float:
155 """ Northern-most latitude, corresponding to the maximum y coordinate.
157 return self.coords[3]
161 def minlon(self) -> float:
162 """ Western-most longitude, corresponding to the minimum x coordinate.
164 return self.coords[0]
168 def maxlon(self) -> float:
169 """ Eastern-most longitude, corresponding to the maximum x coordinate.
171 return self.coords[2]
175 def area(self) -> float:
176 """ Return the area of the box in WGS84.
178 return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
181 def sql_value(self) -> Any:
182 """ Create an SQL expression for the box.
184 return sa.func.ST_MakeEnvelope(*self.coords, 4326)
187 def contains(self, pt: Point) -> bool:
188 """ Check if the point is inside or on the boundary of the box.
190 return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
191 and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
195 def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
196 """ Create a Bbox from a bounding box polygon as returned by
197 the database. Return s None if the input value is None.
203 raise ValueError("WKB must be a bounding box polygon")
204 if wkb.startswith(WKB_BBOX_HEADER_LE):
205 x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
206 elif wkb.startswith(WKB_BBOX_HEADER_BE):
207 x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
209 raise ValueError("WKB has wrong header")
211 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
215 def from_point(pt: Point, buffer: float) -> 'Bbox':
216 """ Return a Bbox around the point with the buffer added to all sides.
218 return Bbox(pt[0] - buffer, pt[1] - buffer,
219 pt[0] + buffer, pt[1] + buffer)
223 def from_param(inp: Any) -> 'Bbox':
224 """ Return a Bbox from an input parameter. The box may be
225 given as a Bbox, a string or a list or strings or integer.
226 Raises a UsageError if the format is incorrect.
228 if isinstance(inp, Bbox):
232 if isinstance(inp, str):
234 elif isinstance(inp, abc.Sequence):
238 raise UsageError('Bounding box parameter needs 4 coordinates.')
240 x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
241 except ValueError as exc:
242 raise UsageError('Bounding box parameter needs to be numbers.') from exc
244 if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
245 or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
246 raise UsageError('Bounding box coordinates invalid.')
248 if x1 == x2 or y1 == y2:
249 raise UsageError('Bounding box with invalid parameters.')
251 return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
254 class GeometryFormat(enum.Flag):
255 """ Geometry output formats supported by Nominatim.
258 GEOJSON = enum.auto()
264 class DataLayer(enum.Flag):
265 """ Layer types that can be selected for reverse and forward search.
268 ADDRESS = enum.auto()
269 RAILWAY = enum.auto()
270 MANMADE = enum.auto()
271 NATURAL = enum.auto()
274 def format_country(cc: Any) -> List[str]:
275 """ Extract a list of country codes from the input which may be either
276 a string or list of strings. Filters out all values that are not
280 if isinstance(cc, str):
281 clist = cc.split(',')
282 elif isinstance(cc, abc.Sequence):
285 raise UsageError("Parameter 'country' needs to be a comma-separated list "
286 "or a Python list of strings.")
288 return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
291 def format_excluded(ids: Any) -> List[int]:
292 """ Extract a list of place ids from the input which may be either
293 a string or a list of strings or ints. Ignores empty value but
294 throws a UserError on anything that cannot be converted to int.
297 if isinstance(ids, str):
298 plist = [s.strip() for s in ids.split(',')]
299 elif isinstance(ids, abc.Sequence):
302 raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
303 "or a Python list of numbers.")
304 if not all(isinstance(i, int) or
305 (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
306 raise UsageError("Parameter 'excluded' only takes place IDs.")
308 return [int(id) for id in plist if id] or [0]
311 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
312 """ Extract a list of categories. Currently a noop.
316 TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
318 @dataclasses.dataclass
320 """ Collection of parameters that define the amount of details
321 returned with a lookup or details result.
323 geometry_output: GeometryFormat = GeometryFormat.NONE
324 """ Add the full geometry of the place to the result. Multiple
325 formats may be selected. Note that geometries can become quite large.
327 address_details: bool = False
328 """ Get detailed information on the places that make up the address
331 linked_places: bool = False
332 """ Get detailed information on the places that link to the result.
334 parented_places: bool = False
335 """ Get detailed information on all places that this place is a parent
336 for, i.e. all places for which it provides the address details.
337 Only POI places can have parents.
339 keywords: bool = False
340 """ Add information about the search terms used for this place.
342 geometry_simplification: float = 0.0
343 """ Simplification factor for a geometry in degrees WGS. A factor of
344 0.0 means the original geometry is kept. The higher the value, the
345 more the geometry gets simplified.
349 def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam:
350 """ Load the data fields of the class from a dictionary.
351 Unknown entries in the dictionary are ignored, missing ones
352 get the default setting.
354 The function supports type checking and throws a UsageError
355 when the value does not fit.
357 def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any:
359 return field.default_factory() \
360 if field.default_factory != dataclasses.MISSING \
362 if field.metadata and 'transform' in field.metadata:
363 return field.metadata['transform'](v)
364 if not isinstance(v, field.type):
365 raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.")
368 return cls(**{f.name: _check_field(kwargs[f.name], f)
369 for f in dataclasses.fields(cls) if f.name in kwargs})
372 @dataclasses.dataclass
373 class ReverseDetails(LookupDetails):
374 """ Collection of parameters for the reverse call.
376 max_rank: int = dataclasses.field(default=30,
377 metadata={'transform': lambda v: max(0, min(v, 30))}
379 """ Highest address rank to return.
381 layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
382 """ Filter which kind of data to include.
385 @dataclasses.dataclass
386 class SearchDetails(LookupDetails):
387 """ Collection of parameters for the search call.
389 max_results: int = 10
390 """ Maximum number of results to be returned. The actual number of results
393 min_rank: int = dataclasses.field(default=0,
394 metadata={'transform': lambda v: max(0, min(v, 30))}
396 """ Lowest address rank to return.
398 max_rank: int = dataclasses.field(default=30,
399 metadata={'transform': lambda v: max(0, min(v, 30))}
401 """ Highest address rank to return.
403 layers: Optional[DataLayer] = dataclasses.field(default=None,
404 metadata={'transform': lambda r : r})
405 """ Filter which kind of data to include. When 'None' (the default) then
406 filtering by layers is disabled.
408 countries: List[str] = dataclasses.field(default_factory=list,
409 metadata={'transform': format_country})
410 """ Restrict search results to the given countries. An empty list (the
411 default) will disable this filter.
413 excluded: List[int] = dataclasses.field(default_factory=list,
414 metadata={'transform': format_excluded})
415 """ List of OSM objects to exclude from the results. Currenlty only
416 works when the internal place ID is given.
417 An empty list (the default) will disable this filter.
419 viewbox: Optional[Bbox] = dataclasses.field(default=None,
420 metadata={'transform': Bbox.from_param})
421 """ Focus the search on a given map area.
423 bounded_viewbox: bool = False
424 """ Use 'viewbox' as a filter and restrict results to places within the
427 near: Optional[Point] = dataclasses.field(default=None,
428 metadata={'transform': Point.from_param})
429 """ Order results by distance to the given point.
431 near_radius: Optional[float] = dataclasses.field(default=None,
432 metadata={'transform': lambda r : r})
433 """ Use near point as a filter and drop results outside the given
434 radius. Radius is given in degrees WSG84.
436 categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
437 metadata={'transform': format_categories})
438 """ Restrict search to places with one of the given class/type categories.
439 An empty list (the default) will disable this filter.
442 def __post_init__(self) -> None:
443 if self.viewbox is not None:
444 xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
445 yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
446 self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
447 self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
450 def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
451 """ Change the min_rank and max_rank fields to respect the
454 assert new_min <= new_max
455 self.min_rank = max(self.min_rank, new_min)
456 self.max_rank = min(self.max_rank, new_max)
459 def is_impossible(self) -> bool:
460 """ Check if the parameter configuration is contradictionary and
461 cannot yield any results.
463 return (self.min_rank > self.max_rank
464 or (self.bounded_viewbox
465 and self.viewbox is not None and self.near is not None
466 and self.viewbox.contains(self.near))
467 or self.layers is not None and not self.layers)
470 def layer_enabled(self, layer: DataLayer) -> bool:
471 """ Check if the given layer has been choosen. Also returns
472 true when layer restriction has been disabled completely.
474 return self.layers is None or bool(self.layers & layer)