X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c42273a4db2d7b4fe05a0be9210901d35e038887..97ac036df54f5a9e6f1d344e397f4e27aeccda9a:/nominatim/api/search/db_search_fields.py?ds=sidebyside diff --git a/nominatim/api/search/db_search_fields.py b/nominatim/api/search/db_search_fields.py index 9fcc2c4e..59af8260 100644 --- a/nominatim/api/search/db_search_fields.py +++ b/nominatim/api/search/db_search_fields.py @@ -7,13 +7,13 @@ """ Data structures for more complex fields in abstract search descriptions. """ -from typing import List, Tuple, cast +from typing import List, Tuple, Iterator, cast, Dict import dataclasses import sqlalchemy as sa from sqlalchemy.dialects.postgresql import ARRAY -from nominatim.typing import SaFromClause, SaColumn +from nominatim.typing import SaFromClause, SaColumn, SaExpression from nominatim.api.search.query import Token @dataclasses.dataclass @@ -27,6 +27,21 @@ class WeightedStrings: return bool(self.values) + def __iter__(self) -> Iterator[Tuple[str, float]]: + return iter(zip(self.values, self.penalties)) + + + def get_penalty(self, value: str, default: float = 1000.0) -> float: + """ Get the penalty for the given value. Returns the given default + if the value does not exist. + """ + try: + return self.penalties[self.values.index(value)] + except ValueError: + pass + return default + + @dataclasses.dataclass class WeightedCategories: """ A list of class/type tuples together with a penalty. @@ -38,6 +53,36 @@ class WeightedCategories: return bool(self.values) + def __iter__(self) -> Iterator[Tuple[Tuple[str, str], float]]: + return iter(zip(self.values, self.penalties)) + + + def get_penalty(self, value: Tuple[str, str], default: float = 1000.0) -> float: + """ Get the penalty for the given value. Returns the given default + if the value does not exist. + """ + try: + return self.penalties[self.values.index(value)] + except ValueError: + pass + return default + + + def sql_restrict(self, table: SaFromClause) -> SaExpression: + """ Return an SQLAlcheny expression that restricts the + class and type columns of the given table to the values + in the list. + Must not be used with an empty list. + """ + assert self.values + if len(self.values) == 1: + return sa.and_(table.c.class_ == self.values[0][0], + table.c.type == self.values[0][1]) + + return sa.or_(*(sa.and_(table.c.class_ == c, table.c.type == t) + for c, t in self.values)) + + @dataclasses.dataclass(order=True) class RankedTokens: """ List of tokens together with the penalty of using it. @@ -84,10 +129,11 @@ class FieldRanking: """ assert self.rankings - col = table.c[self.column] - - return sa.case(*((col.contains(r.tokens),r.penalty) for r in self.rankings), - else_=self.default) + return sa.func.weigh_search(table.c[self.column], + [f"{{{','.join((str(s) for s in r.tokens))}}}" + for r in self.rankings], + [r.penalty for r in self.rankings], + self.default) @dataclasses.dataclass @@ -149,10 +195,17 @@ class SearchData: """ Set the qulaifier field from the given tokens. """ if tokens: - min_penalty = min(t.penalty for t in tokens) + categories: Dict[Tuple[str, str], float] = {} + min_penalty = 1000.0 + for t in tokens: + if t.penalty < min_penalty: + min_penalty = t.penalty + cat = t.get_category() + if t.penalty < categories.get(cat, 1000.0): + categories[cat] = t.penalty self.penalty += min_penalty - self.qualifiers = WeightedCategories([t.get_category() for t in tokens], - [t.penalty - min_penalty for t in tokens]) + self.qualifiers = WeightedCategories(list(categories.keys()), + list(categories.values())) def set_ranking(self, rankings: List[FieldRanking]) -> None: @@ -165,3 +218,35 @@ class SearchData: self.rankings.append(ranking) else: self.penalty += ranking.default + + +def lookup_by_names(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]: + """ Create a lookup list where name tokens are looked up via index + and potential address tokens are used to restrict the search further. + """ + lookup = [FieldLookup('name_vector', name_tokens, 'lookup_all')] + if addr_tokens: + lookup.append(FieldLookup('nameaddress_vector', addr_tokens, 'restrict')) + + return lookup + + +def lookup_by_any_name(name_tokens: List[int], addr_tokens: List[int], + lookup_type: str) -> List[FieldLookup]: + """ Create a lookup list where name tokens are looked up via index + and only one of the name tokens must be present. + Potential address tokens are used to restrict the search further. + """ + lookup = [FieldLookup('name_vector', name_tokens, 'lookup_any')] + if addr_tokens: + lookup.append(FieldLookup('nameaddress_vector', addr_tokens, lookup_type)) + + return lookup + + +def lookup_by_addr(name_tokens: List[int], addr_tokens: List[int]) -> List[FieldLookup]: + """ Create a lookup list where address tokens are looked up via index + and the name tokens are only used to restrict the search further. + """ + return [FieldLookup('name_vector', name_tokens, 'restrict'), + FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]