]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/types.py
Merge pull request #3110 from lonvia/sql-lambda-queries
[nominatim.git] / nominatim / api / types.py
index 9042e707db920ac46b2139156b3e53fcb5b6006e..43e83c1f689dfadda882a85aa79fe496cf0cfecf 100644 (file)
@@ -14,9 +14,7 @@ import dataclasses
 import enum
 import math
 from struct import unpack
 import enum
 import math
 from struct import unpack
-
-from geoalchemy2 import WKTElement
-import geoalchemy2.functions
+from binascii import unhexlify
 
 from nominatim.errors import UsageError
 
 
 from nominatim.errors import UsageError
 
@@ -73,11 +71,13 @@ class Point(NamedTuple):
 
 
     @staticmethod
 
 
     @staticmethod
-    def from_wkb(wkb: bytes) -> 'Point':
+    def from_wkb(wkb: Union[str, bytes]) -> 'Point':
         """ Create a point from EWKB as returned from the database.
         """
         """ Create a point from EWKB as returned from the database.
         """
+        if isinstance(wkb, str):
+            wkb = unhexlify(wkb)
         if len(wkb) != 25:
         if len(wkb) != 25:
-            raise ValueError("Point wkb has unexpected length")
+            raise ValueError(f"Point wkb has unexpected length {len(wkb)}")
         if wkb[0] == 0:
             gtype, srid, x, y = unpack('>iidd', wkb[1:])
         elif wkb[0] == 1:
         if wkb[0] == 0:
             gtype, srid, x, y = unpack('>iidd', wkb[1:])
         elif wkb[0] == 1:
@@ -122,10 +122,10 @@ class Point(NamedTuple):
         return Point(x, y)
 
 
         return Point(x, y)
 
 
-    def sql_value(self) -> WKTElement:
-        """ Create an SQL expression for the point.
+    def to_wkt(self) -> str:
+        """ Return the WKT representation of the point.
         """
         """
-        return WKTElement(f'POINT({self.x} {self.y})', srid=4326)
+        return f'POINT({self.x} {self.y})'
 
 
 
 
 
 
@@ -179,12 +179,6 @@ class Bbox:
         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
 
 
         return (self.coords[2] - self.coords[0]) * (self.coords[3] - self.coords[1])
 
 
-    def sql_value(self) -> Any:
-        """ Create an SQL expression for the box.
-        """
-        return geoalchemy2.functions.ST_MakeEnvelope(*self.coords, 4326)
-
-
     def contains(self, pt: Point) -> bool:
         """ Check if the point is inside or on the boundary of the box.
         """
     def contains(self, pt: Point) -> bool:
         """ Check if the point is inside or on the boundary of the box.
         """
@@ -192,14 +186,25 @@ class Bbox:
                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
 
 
                and self.coords[2] >= pt[0] and self.coords[3] >= pt[1]
 
 
+    def to_wkt(self) -> str:
+        """ Return the WKT representation of the Bbox. This
+            is a simple polygon with four points.
+        """
+        return 'POLYGON(({0} {1},{0} {3},{2} {3},{2} {1},{0} {1}))'\
+                  .format(*self.coords) # pylint: disable=consider-using-f-string
+
+
     @staticmethod
     @staticmethod
-    def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
+    def from_wkb(wkb: Union[None, str, bytes]) -> 'Optional[Bbox]':
         """ Create a Bbox from a bounding box polygon as returned by
             the database. Return s None if the input value is None.
         """
         if wkb is None:
             return None
 
         """ Create a Bbox from a bounding box polygon as returned by
             the database. Return s None if the input value is None.
         """
         if wkb is None:
             return None
 
+        if isinstance(wkb, str):
+            wkb = unhexlify(wkb)
+
         if len(wkb) != 97:
             raise ValueError("WKB must be a bounding box polygon")
         if wkb.startswith(WKB_BBOX_HEADER_LE):
         if len(wkb) != 97:
             raise ValueError("WKB must be a bounding box polygon")
         if wkb.startswith(WKB_BBOX_HEADER_LE):
@@ -296,16 +301,17 @@ def format_excluded(ids: Any) -> List[int]:
     """
     plist: Sequence[str]
     if isinstance(ids, str):
     """
     plist: Sequence[str]
     if isinstance(ids, str):
-        plist = ids.split(',')
+        plist = [s.strip() for s in ids.split(',')]
     elif isinstance(ids, abc.Sequence):
         plist = ids
     else:
         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
                          "or a Python list of numbers.")
     elif isinstance(ids, abc.Sequence):
         plist = ids
     else:
         raise UsageError("Parameter 'excluded' needs to be a comma-separated list "
                          "or a Python list of numbers.")
-    if any(not isinstance(i, int) or (isinstance(i, str) and not i.isdigit()) for i in plist):
+    if not all(isinstance(i, int) or
+               (isinstance(i, str) and (not i or i.isdigit())) for i in plist):
         raise UsageError("Parameter 'excluded' only takes place IDs.")
 
         raise UsageError("Parameter 'excluded' only takes place IDs.")
 
-    return [int(id) for id in plist if id]
+    return [int(id) for id in plist if id] or [0]
 
 
 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
 
 
 def format_categories(categories: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
@@ -400,7 +406,8 @@ class SearchDetails(LookupDetails):
                                      )
     """ Highest address rank to return.
     """
                                      )
     """ Highest address rank to return.
     """
-    layers: Optional[DataLayer] = None
+    layers: Optional[DataLayer] = dataclasses.field(default=None,
+                                                    metadata={'transform': lambda r : r})
     """ Filter which kind of data to include. When 'None' (the default) then
         filtering by layers is disabled.
     """
     """ Filter which kind of data to include. When 'None' (the default) then
         filtering by layers is disabled.
     """
@@ -427,7 +434,8 @@ class SearchDetails(LookupDetails):
                                               metadata={'transform': Point.from_param})
     """ Order results by distance to the given point.
     """
                                               metadata={'transform': Point.from_param})
     """ Order results by distance to the given point.
     """
-    near_radius: Optional[float] = None
+    near_radius: Optional[float] = dataclasses.field(default=None,
+                                              metadata={'transform': lambda r : r})
     """ Use near point as a filter and drop results outside the given
         radius. Radius is given in degrees WSG84.
     """
     """ Use near point as a filter and drop results outside the given
         radius. Radius is given in degrees WSG84.
     """
@@ -436,6 +444,7 @@ class SearchDetails(LookupDetails):
     """ Restrict search to places with one of the given class/type categories.
         An empty list (the default) will disable this filter.
     """
     """ Restrict search to places with one of the given class/type categories.
         An empty list (the default) will disable this filter.
     """
+    viewbox_x2: Optional[Bbox] = None
 
     def __post_init__(self) -> None:
         if self.viewbox is not None:
 
     def __post_init__(self) -> None:
         if self.viewbox is not None: