X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/df65c10360344f2deb01bbe0536c8b2bc1657fb4..a413aae8a3962be4d623844d867604df68a9a211:/nominatim/api/types.py diff --git a/nominatim/api/types.py b/nominatim/api/types.py index 89b81111..c7e15843 100644 --- a/nominatim/api/types.py +++ b/nominatim/api/types.py @@ -7,9 +7,20 @@ """ Complex datatypes used by the Nominatim API. """ -from typing import Optional, Union, NamedTuple +from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \ + Any, List, Sequence +from collections import abc import dataclasses import enum +import math +from struct import unpack + +from geoalchemy2 import WKTElement +import geoalchemy2.functions + +from nominatim.errors import UsageError + +# pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes @dataclasses.dataclass class PlaceID: @@ -55,6 +66,192 @@ class Point(NamedTuple): return self.x + def to_geojson(self) -> str: + """ Return the point in GeoJSON format. + """ + return f'{{"type": "Point","coordinates": [{self.x}, {self.y}]}}' + + + @staticmethod + def from_wkb(wkb: bytes) -> 'Point': + """ Create a point from EWKB as returned from the database. + """ + if len(wkb) != 25: + raise ValueError("Point wkb has unexpected length") + if wkb[0] == 0: + gtype, srid, x, y = unpack('>iidd', wkb[1:]) + elif wkb[0] == 1: + gtype, srid, x, y = unpack(' 'Point': + """ Create a point from an input parameter. The parameter + may be given as a point, a string or a sequence of + strings or floats. Raises a UsageError if the format is + not correct. + """ + if isinstance(inp, Point): + return inp + + seq: Sequence[str] + if isinstance(inp, str): + seq = inp.split(',') + elif isinstance(inp, abc.Sequence): + seq = inp + + if len(seq) != 2: + raise UsageError('Point parameter needs 2 coordinates.') + try: + x, y = filter(math.isfinite, map(float, seq)) + except ValueError as exc: + raise UsageError('Point parameter needs to be numbers.') from exc + + if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0: + raise UsageError('Point coordinates invalid.') + + return Point(x, y) + + + def sql_value(self) -> WKTElement: + """ Create an SQL expression for the point. + """ + return WKTElement(f'POINT({self.x} {self.y})', srid=4326) + + + +AnyPoint = Union[Point, Tuple[float, float]] + +WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00' +WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05' + +class Bbox: + """ A bounding box in WSG84 projection. + + The coordinates are available as an array in the 'coord' + property in the order (minx, miny, maxx, maxy). + """ + def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None: + self.coords = (minx, miny, maxx, maxy) + + + @property + def minlat(self) -> float: + """ Southern-most latitude, corresponding to the minimum y coordinate. + """ + return self.coords[1] + + + @property + def maxlat(self) -> float: + """ Northern-most latitude, corresponding to the maximum y coordinate. + """ + return self.coords[3] + + + @property + def minlon(self) -> float: + """ Western-most longitude, corresponding to the minimum x coordinate. + """ + return self.coords[0] + + + @property + def maxlon(self) -> float: + """ Eastern-most longitude, corresponding to the maximum x coordinate. + """ + return self.coords[2] + + + @property + def area(self) -> float: + """ Return the area of the box in WGS84. + """ + return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1]) + + + def sql_value(self) -> Any: + """ Create an SQL expression for the box. + """ + return geoalchemy2.functions.ST_MakeEnvelope(*self.coords, 4326) + + + def contains(self, pt: Point) -> bool: + """ Check if the point is inside or on the boundary of the box. + """ + return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\ + and self.coords[2] >= pt[0] and self.coords[3] >= pt[1] + + + @staticmethod + def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]': + """ Create a Bbox from a bounding box polygon as returned by + the database. Return s None if the input value is None. + """ + if wkb is None: + return None + + if len(wkb) != 97: + raise ValueError("WKB must be a bounding box polygon") + if wkb.startswith(WKB_BBOX_HEADER_LE): + x1, y1, _, _, x2, y2 = unpack('dddddd', wkb[17:65]) + else: + raise ValueError("WKB has wrong header") + + return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2)) + + + @staticmethod + def from_point(pt: Point, buffer: float) -> 'Bbox': + """ Return a Bbox around the point with the buffer added to all sides. + """ + return Bbox(pt[0] - buffer, pt[1] - buffer, + pt[0] + buffer, pt[1] + buffer) + + + @staticmethod + def from_param(inp: Any) -> 'Bbox': + """ Return a Bbox from an input parameter. The box may be + given as a Bbox, a string or a list or strings or integer. + Raises a UsageError if the format is incorrect. + """ + if isinstance(inp, Bbox): + return inp + + seq: Sequence[str] + if isinstance(inp, str): + seq = inp.split(',') + elif isinstance(inp, abc.Sequence): + seq = inp + + if len(seq) != 4: + raise UsageError('Bounding box parameter needs 4 coordinates.') + try: + x1, y1, x2, y2 = filter(math.isfinite, map(float, seq)) + except ValueError as exc: + raise UsageError('Bounding box parameter needs to be numbers.') from exc + + if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \ + or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0: + raise UsageError('Bounding box coordinates invalid.') + + if x1 == x2 or y1 == y2: + raise UsageError('Bounding box with invalid parameters.') + + return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2)) + + class GeometryFormat(enum.Flag): """ Geometry output formats supported by Nominatim. """ @@ -65,10 +262,63 @@ class GeometryFormat(enum.Flag): TEXT = enum.auto() +class DataLayer(enum.Flag): + """ Layer types that can be selected for reverse and forward search. + """ + POI = enum.auto() + ADDRESS = enum.auto() + RAILWAY = enum.auto() + MANMADE = enum.auto() + NATURAL = enum.auto() + + +def format_country(cc: Any) -> List[str]: + """ Extract a list of country codes from the input which may be either + a string or list of strings. Filters out all values that are not + a two-letter string. + """ + clist: Sequence[str] + if isinstance(cc, str): + clist = cc.split(',') + elif isinstance(cc, abc.Sequence): + clist = cc + else: + raise UsageError("Parameter 'country' needs to be a comma-separated list " + "or a Python list of strings.") + + return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2] + + +def format_excluded(ids: Any) -> List[int]: + """ Extract a list of place ids from the input which may be either + a string or a list of strings or ints. Ignores empty value but + throws a UserError on anything that cannot be converted to int. + """ + plist: Sequence[str] + if isinstance(ids, str): + plist = [s.strip() for s in ids.split(',')] + elif isinstance(ids, abc.Sequence): + plist = ids + else: + raise UsageError("Parameter 'excluded' needs to be a comma-separated list " + "or a Python list of numbers.") + if not all(isinstance(i, int) or (isinstance(i, str) and i.isdigit()) for i in plist): + raise UsageError("Parameter 'excluded' only takes place IDs.") + + return [int(id) for id in plist if id] + + +def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]: + """ Extract a list of categories. Currently a noop. + """ + return categories + +TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name + @dataclasses.dataclass class LookupDetails: """ Collection of parameters that define the amount of details - returned with a search result. + returned with a lookup or details result. """ geometry_output: GeometryFormat = GeometryFormat.NONE """ Add the full geometry of the place to the result. Multiple @@ -89,3 +339,136 @@ class LookupDetails: keywords: bool = False """ Add information about the search terms used for this place. """ + geometry_simplification: float = 0.0 + """ Simplification factor for a geometry in degrees WGS. A factor of + 0.0 means the original geometry is kept. The higher the value, the + more the geometry gets simplified. + """ + + @classmethod + def from_kwargs(cls: Type[TParam], kwargs: Dict[str, Any]) -> TParam: + """ Load the data fields of the class from a dictionary. + Unknown entries in the dictionary are ignored, missing ones + get the default setting. + + The function supports type checking and throws a UsageError + when the value does not fit. + """ + def _check_field(v: Any, field: 'dataclasses.Field[Any]') -> Any: + if v is None: + return field.default_factory() \ + if field.default_factory != dataclasses.MISSING \ + else field.default + if field.metadata and 'transform' in field.metadata: + return field.metadata['transform'](v) + if not isinstance(v, field.type): + raise UsageError(f"Parameter '{field.name}' needs to be of {field.type!s}.") + return v + + return cls(**{f.name: _check_field(kwargs[f.name], f) + for f in dataclasses.fields(cls) if f.name in kwargs}) + + +@dataclasses.dataclass +class ReverseDetails(LookupDetails): + """ Collection of parameters for the reverse call. + """ + max_rank: int = dataclasses.field(default=30, + metadata={'transform': lambda v: max(0, min(v, 30))} + ) + """ Highest address rank to return. + """ + layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI + """ Filter which kind of data to include. + """ + +@dataclasses.dataclass +class SearchDetails(LookupDetails): + """ Collection of parameters for the search call. + """ + max_results: int = 10 + """ Maximum number of results to be returned. The actual number of results + may be less. + """ + min_rank: int = dataclasses.field(default=0, + metadata={'transform': lambda v: max(0, min(v, 30))} + ) + """ Lowest address rank to return. + """ + max_rank: int = dataclasses.field(default=30, + metadata={'transform': lambda v: max(0, min(v, 30))} + ) + """ Highest address rank to return. + """ + layers: Optional[DataLayer] = dataclasses.field(default=None, + metadata={'transform': lambda r : r}) + """ Filter which kind of data to include. When 'None' (the default) then + filtering by layers is disabled. + """ + countries: List[str] = dataclasses.field(default_factory=list, + metadata={'transform': format_country}) + """ Restrict search results to the given countries. An empty list (the + default) will disable this filter. + """ + excluded: List[int] = dataclasses.field(default_factory=list, + metadata={'transform': format_excluded}) + """ List of OSM objects to exclude from the results. Currenlty only + works when the internal place ID is given. + An empty list (the default) will disable this filter. + """ + viewbox: Optional[Bbox] = dataclasses.field(default=None, + metadata={'transform': Bbox.from_param}) + """ Focus the search on a given map area. + """ + bounded_viewbox: bool = False + """ Use 'viewbox' as a filter and restrict results to places within the + given area. + """ + near: Optional[Point] = dataclasses.field(default=None, + metadata={'transform': Point.from_param}) + """ Order results by distance to the given point. + """ + near_radius: Optional[float] = dataclasses.field(default=None, + metadata={'transform': lambda r : r}) + """ Use near point as a filter and drop results outside the given + radius. Radius is given in degrees WSG84. + """ + categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list, + metadata={'transform': format_categories}) + """ Restrict search to places with one of the given class/type categories. + An empty list (the default) will disable this filter. + """ + + def __post_init__(self) -> None: + if self.viewbox is not None: + xext = (self.viewbox.maxlon - self.viewbox.minlon)/2 + yext = (self.viewbox.maxlat - self.viewbox.minlat)/2 + self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext, + self.viewbox.maxlon + xext, self.viewbox.maxlat + yext) + + + def restrict_min_max_rank(self, new_min: int, new_max: int) -> None: + """ Change the min_rank and max_rank fields to respect the + given boundaries. + """ + assert new_min <= new_max + self.min_rank = max(self.min_rank, new_min) + self.max_rank = min(self.max_rank, new_max) + + + def is_impossible(self) -> bool: + """ Check if the parameter configuration is contradictionary and + cannot yield any results. + """ + return (self.min_rank > self.max_rank + or (self.bounded_viewbox + and self.viewbox is not None and self.near is not None + and self.viewbox.contains(self.near)) + or self.layers is not None and not self.layers) + + + def layer_enabled(self, layer: DataLayer) -> bool: + """ Check if the given layer has been choosen. Also returns + true when layer restriction has been disabled completely. + """ + return self.layers is None or bool(self.layers & layer)