]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/types.py
replace CASE construct with plpgsql function
[nominatim.git] / nominatim / api / types.py
index ff7457ec01ce3bd52df97176966d2c8ce3ae4306..60495dd08ac2a530614499217dd19ac1f414e162 100644 (file)
@@ -14,6 +14,9 @@ import dataclasses
 import enum
 import math
 from struct import unpack
 import enum
 import math
 from struct import unpack
+from binascii import unhexlify
+
+import sqlalchemy as sa
 
 from nominatim.errors import UsageError
 
 
 from nominatim.errors import UsageError
 
@@ -70,11 +73,13 @@ class Point(NamedTuple):
 
 
     @staticmethod
 
 
     @staticmethod
-    def from_wkb(wkb: bytes) -> 'Point':
+    def from_wkb(wkb: Union[str, bytes]) -> 'Point':
         """ Create a point from EWKB as returned from the database.
         """
         """ Create a point from EWKB as returned from the database.
         """
+        if isinstance(wkb, str):
+            wkb = unhexlify(wkb)
         if len(wkb) != 25:
         if len(wkb) != 25:
-            raise ValueError("Point wkb has unexpected length")
+            raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
         if wkb[0] == 0:
             gtype, srid, x, y = unpack('>iidd', wkb[1:])
         elif wkb[0] == 1:
         if wkb[0] == 0:
             gtype, srid, x, y = unpack('>iidd', wkb[1:])
         elif wkb[0] == 1:
@@ -119,6 +124,12 @@ class Point(NamedTuple):
         return Point(x, y)
 
 
         return Point(x, y)
 
 
+    def to_wkt(self) -> str:
+        """ Return the WKT representation of the point.
+        """
+        return f'POINT({self.x} {self.y})'
+
+
 
 AnyPoint = Union[Point, Tuple[float, float]]
 
 
 AnyPoint = Union[Point, Tuple[float, float]]
 
@@ -163,20 +174,38 @@ class Bbox:
         return self.coords[2]
 
 
         return self.coords[2]
 
 
+    @property
+    def area(self) -> float:
+        """ Return the area of the box in WGS84.
+        """
+        return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
+
+
     def contains(self, pt: Point) -> bool:
         """ Check if the point is inside or on the boundary of the box.
         """
         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
 
     def contains(self, pt: Point) -> bool:
         """ Check if the point is inside or on the boundary of the box.
         """
         return self.coords[0] <= pt[0] and self.coords[1] <= pt[1]\
                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
 
+
+    def to_wkt(self) -> str:
+        """ Return the WKT representation of the Bbox. This
+            is a simple polygon with four points.
+        """
+        return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'.format(*self.coords)
+
+
     @staticmethod
     @staticmethod
-    def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
+    def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
         """ Create a Bbox from a bounding box polygon as returned by
             the database. Return s None if the input value is None.
         """
         if wkb is None:
             return None
 
         """ Create a Bbox from a bounding box polygon as returned by
             the database. Return s None if the input value is None.
         """
         if wkb is None:
             return None
 
+        if isinstance(wkb, str):
+            wkb = unhexlify(wkb)
+
         if len(wkb) != 97:
             raise ValueError("WKB must be a bounding box polygon")
         if wkb.startswith(WKB_BBOX_HEADER_LE):
         if len(wkb) != 97:
             raise ValueError("WKB must be a bounding box polygon")
         if wkb.startswith(WKB_BBOX_HEADER_LE):
@@ -273,16 +302,17 @@ def format_excluded(ids: Any) -> List[int]:
     """
     plist: Sequence[str]
     if isinstance(ids, str):
     """
     plist: Sequence[str]
     if isinstance(ids, str):
-        plist = ids.split(',')
+        plist = [s.strip() for s in ids.split(',')]
     elif isinstance(ids, abc.Sequence):
         plist = ids
     else:
         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
                          "or a Python list of numbers.")
     elif isinstance(ids, abc.Sequence):
         plist = ids
     else:
         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
                          "or a Python list of numbers.")
-    if any(not isinstance(i, int) or (isinstance(i, str) and not i.isdigit()) for i in plist):
+    if not all(isinstance(i, int) or
+               (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
         raise UsageError("Parameter 'excluded' only takes place IDs.")
 
         raise UsageError("Parameter 'excluded' only takes place IDs.")
 
-    return [int(id) for id in plist if id]
+    return [int(id) for id in plist if id] or [0]
 
 
 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
 
 
 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
@@ -377,7 +407,8 @@ class SearchDetails(LookupDetails):
                                      )
     """ Highest address rank to return.
     """
                                      )
     """ Highest address rank to return.
     """
-    layers: Optional[DataLayer] = None
+    layers: Optional[DataLayer] = dataclasses.field(default=None,
+                                                    metadata={'transform': lambda r : r})
     """ Filter which kind of data to include. When 'None' (the default) then
         filtering by layers is disabled.
     """
     """ Filter which kind of data to include. When 'None' (the default) then
         filtering by layers is disabled.
     """
@@ -404,7 +435,8 @@ class SearchDetails(LookupDetails):
                                               metadata={'transform': Point.from_param})
     """ Order results by distance to the given point.
     """
                                               metadata={'transform': Point.from_param})
     """ Order results by distance to the given point.
     """
-    near_radius: Optional[float] = None
+    near_radius: Optional[float] = dataclasses.field(default=None,
+                                              metadata={'transform': lambda r : r})
     """ Use near point as a filter and drop results outside the given
         radius. Radius is given in degrees WSG84.
     """
     """ Use near point as a filter and drop results outside the given
         radius. Radius is given in degrees WSG84.
     """
@@ -418,8 +450,10 @@ class SearchDetails(LookupDetails):
         if self.viewbox is not None:
             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
         if self.viewbox is not None:
             xext = (self.viewbox.maxlon - self.viewbox.minlon)/2
             yext = (self.viewbox.maxlat - self.viewbox.minlat)/2
-            self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.maxlon - yext,
+            self.viewbox_x2 = Bbox(self.viewbox.minlon - xext, self.viewbox.minlat - yext,
                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
                                    self.viewbox.maxlon + xext, self.viewbox.maxlat + yext)
+        else:
+            self.viewbox_x2 = None
 
 
     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None:
 
 
     def restrict_min_max_rank(self, new_min: int, new_max: int) -> None: