]> git.openstreetmap.org Git - nominatim.git/blobdiff - nominatim/api/search/db_search_fields.py
Added missing return types to functions
[nominatim.git] / nominatim / api / search / db_search_fields.py
index 9fcc2c4e521e9aa3ba55207edcf438af79a26949..59af826086db86027f2c808dee51824fb17e72ff 100644 (file)
@@ -7,13 +7,13 @@
 """
 Data structures for more complex fields in abstract search descriptions.
 """
-from typing import List, Tuple, cast
+from typing import List, Tuple, Iterator, cast, Dict
 import dataclasses
 
 import sqlalchemy as sa
 from sqlalchemy.dialects.postgresql import ARRAY
 
-from nominatim.typing import SaFromClause, SaColumn
+from nominatim.typing import SaFromClause, SaColumn, SaExpression
 from nominatim.api.search.query import Token
 
 @dataclasses.dataclass
@@ -27,6 +27,21 @@ class WeightedStrings:
         return bool(self.values)
 
 
+    def __iter__(self) -> Iterator[Tuple[str, float]]:
+        return iter(zip(self.values, self.penalties))
+
+
+    def get_penalty(self, value: str, default: float = 1000.0) -> float:
+        """ Get the penalty for the given value. Returns the given default
+            if the value does not exist.
+        """
+        try:
+            return self.penalties[self.values.index(value)]
+        except ValueError:
+            pass
+        return default
+
+
 @dataclasses.dataclass
 class WeightedCategories:
     """ A list of class/type tuples together with a penalty.
@@ -38,6 +53,36 @@ class WeightedCategories:
         return bool(self.values)
 
 
+    def __iter__(self) -> Iterator[Tuple[Tuple[str, str], float]]:
+        return iter(zip(self.values, self.penalties))
+
+
+    def get_penalty(self, value: Tuple[str, str], default: float = 1000.0) -> float:
+        """ Get the penalty for the given value. Returns the given default
+            if the value does not exist.
+        """
+        try:
+            return self.penalties[self.values.index(value)]
+        except ValueError:
+            pass
+        return default
+
+
+    def sql_restrict(self, table: SaFromClause) -> SaExpression:
+        """ Return an SQLAlcheny expression that restricts the
+            class and type columns of the given table to the values
+            in the list.
+            Must not be used with an empty list.
+        """
+        assert self.values
+        if len(self.values) == 1:
+            return sa.and_(table.c.class_ == self.values[0][0],
+                           table.c.type == self.values[0][1])
+
+        return sa.or_(*(sa.and_(table.c.class_ == c, table.c.type == t)
+                        for c, t in self.values))
+
+
 @dataclasses.dataclass(order=True)
 class RankedTokens:
     """ List of tokens together with the penalty of using it.
@@ -84,10 +129,11 @@ class FieldRanking:
         """
         assert self.rankings
 
-        col = table.c[self.column]
-
-        return sa.case(*((col.contains(r.tokens),r.penalty) for r in self.rankings),
-                       else_=self.default)
+        return sa.func.weigh_search(table.c[self.column],
+                                    [f"{{{','.join((str(s) for s in r.tokens))}}}"
+                                     for r in self.rankings],
+                                    [r.penalty for r in self.rankings],
+                                    self.default)
 
 
 @dataclasses.dataclass
@@ -149,10 +195,17 @@ class SearchData:
         """ Set the qulaifier field from the given tokens.
         """
         if tokens:
-            min_penalty = min(t.penalty for t in tokens)
+            categories: Dict[Tuple[str, str], float] = {}
+            min_penalty = 1000.0
+            for t in tokens:
+                if t.penalty < min_penalty:
+                    min_penalty = t.penalty
+                cat = t.get_category()
+                if t.penalty < categories.get(cat, 1000.0):
+                    categories[cat] = t.penalty
             self.penalty += min_penalty
-            self.qualifiers = WeightedCategories([t.get_category() for t in tokens],
-                                                 [t.penalty - min_penalty for t in tokens])
+            self.qualifiers = WeightedCategories(list(categories.keys()),
+                                                 list(categories.values()))
 
 
     def set_ranking(self, rankings: List[FieldRanking]) -> None:
@@ -165,3 +218,35 @@ class SearchData:
                 self.rankings.append(ranking)
             else:
                 self.penalty += ranking.default
+
+
+def lookup_by_names(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
+    """ Create a lookup list where name tokens are looked up via index
+        and potential address tokens are used to restrict the search further.
+    """
+    lookup = [FieldLookup('name_vector', name_tokens, 'lookup_all')]
+    if addr_tokens:
+        lookup.append(FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
+
+    return lookup
+
+
+def lookup_by_any_name(name_tokens: List[int], addr_tokens: List[int],
+                       lookup_type: str) -> List[FieldLookup]:
+    """ Create a lookup list where name tokens are looked up via index
+        and only one of the name tokens must be present.
+        Potential address tokens are used to restrict the search further.
+    """
+    lookup = [FieldLookup('name_vector', name_tokens, 'lookup_any')]
+    if addr_tokens:
+        lookup.append(FieldLookup('nameaddress_vector', addr_tokens, lookup_type))
+
+    return lookup
+
+
+def lookup_by_addr(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]:
+    """ Create a lookup list where address tokens are looked up via index
+        and the name tokens are only used to restrict the search further.
+    """
+    return [FieldLookup('name_vector', name_tokens, 'restrict'),
+            FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]