"""
Complex datatypes used by the Nominatim API.
"""
-from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, Any
+from typing import Optional, Union, Tuple, NamedTuple, TypeVar, Type, Dict, \
+ Any, List, Sequence
+from collections import abc
import dataclasses
import enum
+import math
from struct import unpack
+from binascii import unhexlify
+
+import sqlalchemy as sa
from nominatim.errors import UsageError
+# pylint: disable=no-member,too-many-boolean-expressions,too-many-instance-attributes
+
@dataclasses.dataclass
class PlaceID:
""" Reference an object by Nominatim's internal ID.
@staticmethod
- def from_wkb(wkb: bytes) -> 'Point':
+ def from_wkb(wkb: Union[str, bytes]) -> 'Point':
""" Create a point from EWKB as returned from the database.
"""
+ if isinstance(wkb, str):
+ wkb = unhexlify(wkb)
if len(wkb) != 25:
- raise ValueError("Point wkb has unexpected length")
+ raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
if wkb[0] == 0:
gtype, srid, x, y = unpack('>iidd', wkb[1:])
elif wkb[0] == 1:
return Point(x, y)
+ @staticmethod
+ def from_param(inp: Any) -> 'Point':
+ """ Create a point from an input parameter. The parameter
+ may be given as a point, a string or a sequence of
+ strings or floats. Raises a UsageError if the format is
+ not correct.
+ """
+ if isinstance(inp, Point):
+ return inp
+
+ seq: Sequence[str]
+ if isinstance(inp, str):
+ seq = inp.split(',')
+ elif isinstance(inp, abc.Sequence):
+ seq = inp
+
+ if len(seq) != 2:
+ raise UsageError('Point parameter needs 2 coordinates.')
+ try:
+ x, y = filter(math.isfinite, map(float, seq))
+ except ValueError as exc:
+ raise UsageError('Point parameter needs to be numbers.') from exc
+
+ if x < -180.0 or x > 180.0 or y < -90.0 or y > 90.0:
+ raise UsageError('Point coordinates invalid.')
+
+ return Point(x, y)
+
+
+ def to_wkt(self) -> str:
+ """ Return the WKT representation of the point.
+ """
+ return f'POINT({self.x} {self.y})'
+
+
+
AnyPoint = Union[Point, Tuple[float, float]]
WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
return self.coords[2]
+ @property
+ def area(self) -> float:
+ """ Return the area of the box in WGS84.
+ """
+ return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
+
+
+ def contains(self, pt: Point) -> bool:
+ """ Check if the point is inside or on the boundary of the box.
+ """
+ return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
+ and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
+
+
+ def to_wkt(self) -> str:
+ """ Return the WKT representation of the Bbox. This
+ is a simple polygon with four points.
+ """
+ return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'.format(*self.coords)
+
+
@staticmethod
- def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
+ def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
""" Create a Bbox from a bounding box polygon as returned by
the database. Return s None if the input value is None.
"""
if wkb is None:
return None
+ if isinstance(wkb, str):
+ wkb = unhexlify(wkb)
+
if len(wkb) != 97:
raise ValueError("WKB must be a bounding box polygon")
if wkb.startswith(WKB_BBOX_HEADER_LE):
pt[0] + buffer, pt[1] + buffer)
+ @staticmethod
+ def from_param(inp: Any) -> 'Bbox':
+ """ Return a Bbox from an input parameter. The box may be
+ given as a Bbox, a string or a list or strings or integer.
+ Raises a UsageError if the format is incorrect.
+ """
+ if isinstance(inp, Bbox):
+ return inp
+
+ seq: Sequence[str]
+ if isinstance(inp, str):
+ seq = inp.split(',')
+ elif isinstance(inp, abc.Sequence):
+ seq = inp
+
+ if len(seq) != 4:
+ raise UsageError('Bounding box parameter needs 4 coordinates.')
+ try:
+ x1, y1, x2, y2 = filter(math.isfinite, map(float, seq))
+ except ValueError as exc:
+ raise UsageError('Bounding box parameter needs to be numbers.') from exc
+
+ if x1 < -180.0 or x1 > 180.0 or y1 < -90.0 or y1 > 90.0 \
+ or x2 < -180.0 or x2 > 180.0 or y2 < -90.0 or y2 > 90.0:
+ raise UsageError('Bounding box coordinates invalid.')
+
+ if x1 == x2 or y1 == y2:
+ raise UsageError('Bounding box with invalid parameters.')
+
+ return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
+
+
class GeometryFormat(enum.Flag):
""" Geometry output formats supported by Nominatim.
"""
NATURAL = enum.auto()
+def format_country(cc: Any) -> List[str]:
+ """ Extract a list of country codes from the input which may be either
+ a string or list of strings. Filters out all values that are not
+ a two-letter string.
+ """
+ clist: Sequence[str]
+ if isinstance(cc, str):
+ clist = cc.split(',')
+ elif isinstance(cc, abc.Sequence):
+ clist = cc
+ else:
+ raise UsageError("Parameter 'country' needs to be a comma-separated list "
+ "or a Python list of strings.")
+
+ return [cc.lower() for cc in clist if isinstance(cc, str) and len(cc) == 2]
+
+
+def format_excluded(ids: Any) -> List[int]:
+ """ Extract a list of place ids from the input which may be either
+ a string or a list of strings or ints. Ignores empty value but
+ throws a UserError on anything that cannot be converted to int.
+ """
+ plist: Sequence[str]
+ if isinstance(ids, str):
+ plist = [s.strip() for s in ids.split(',')]
+ elif isinstance(ids, abc.Sequence):
+ plist = ids
+ else:
+ raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
+ "or a Python list of numbers.")
+ if not all(isinstance(i, int) or
+ (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
+ raise UsageError("Parameter 'excluded' only takes place IDs.")
+
+ return [int(id) for id in plist if id] or [0]
+
+
+def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
+ """ Extract a list of categories. Currently a noop.
+ """
+ return categories
+
TParam = TypeVar('TParam', bound='LookupDetails') # pylint: disable=invalid-name
@dataclasses.dataclass
layers: DataLayer = DataLayer.ADDRESS | DataLayer.POI
""" Filter which kind of data to include.
"""
+
+@dataclasses.dataclass
+class SearchDetails(LookupDetails):
+ """ Collection of parameters for the search call.
+ """
+ max_results: int = 10
+ """ Maximum number of results to be returned. The actual number of results
+ may be less.
+ """
+ min_rank: int = dataclasses.field(default=0,
+ metadata={'transform': lambda v: max(0, min(v, 30))}
+ )
+ """ Lowest address rank to return.
+ """
+ max_rank: int = dataclasses.field(default=30,
+ metadata={'transform': lambda v: max(0, min(v, 30))}
+ )
+ """ Highest address rank to return.
+ """
+ layers: Optional[DataLayer] = dataclasses.field(default=None,
+ metadata={'transform': lambda r : r})
+ """ Filter which kind of data to include. When 'None' (the default) then
+ filtering by layers is disabled.
+ """
+ countries: List[str] = dataclasses.field(default_factory=list,
+ metadata={'transform': format_country})
+ """ Restrict search results to the given countries. An empty list (the
+ default) will disable this filter.
+ """
+ excluded: List[int] = dataclasses.field(default_factory=list,
+ metadata={'transform': format_excluded})
+ """ List of OSM objects to exclude from the results. Currenlty only
+ works when the internal place ID is given.
+ An empty list (the default) will disable this filter.
+ """
+ viewbox: Optional[Bbox] = dataclasses.field(default=None,
+ metadata={'transform': Bbox.from_param})
+ """ Focus the search on a given map area.
+ """
+ bounded_viewbox: bool = False
+ """ Use 'viewbox' as a filter and restrict results to places within the
+ given area.
+ """
+ near: Optional[Point] = dataclasses.field(default=None,
+ metadata={'transform': Point.from_param})
+ """ Order results by distance to the given point.
+ """
+ near_radius: Optional[float] = dataclasses.field(default=None,
+ metadata={'transform': lambda r : r})
+ """ Use near point as a filter and drop results outside the given
+ radius. Radius is given in degrees WSG84.
+ """
+ categories: List[Tuple[str, str]] = dataclasses.field(default_factory=list,
+ metadata={'transform': format_categories})
+ """ Restrict search to places with one of the given class/type categories.
+ An empty list (the default) will disable this filter.
+ """
+
+ def __post_init__(self) -> None:
+ if self.viewbox is not None:
+ xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
+ yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
+ self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
+ self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
+ else:
+ self.viewbox_x2 = None
+
+
+ def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
+ """ Change the min_rank and max_rank fields to respect the
+ given boundaries.
+ """
+ assert new_min <= new_max
+ self.min_rank = max(self.min_rank, new_min)
+ self.max_rank = min(self.max_rank, new_max)
+
+
+ def is_impossible(self) -> bool:
+ """ Check if the parameter configuration is contradictionary and
+ cannot yield any results.
+ """
+ return (self.min_rank > self.max_rank
+ or (self.bounded_viewbox
+ and self.viewbox is not None and self.near is not None
+ and self.viewbox.contains(self.near))
+ or self.layers is not None and not self.layers)
+
+
+ def layer_enabled(self, layer: DataLayer) -> bool:
+ """ Check if the given layer has been choosen. Also returns
+ true when layer restriction has been disabled completely.
+ """
+ return self.layers is None or bool(self.layers & layer)