# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Convertion from token assignment to an abstract DB search.
+Conversion from token assignment to an abstract DB search.
"""
-from typing import Optional, List, Tuple, Iterator
+from typing import Optional, List, Tuple, Iterator, Dict
import heapq
from nominatim.api.types import SearchDetails, DataLayer
-from nominatim.api.search.query import QueryStruct, TokenType, TokenRange, BreakType
+from nominatim.api.search.query import QueryStruct, Token, TokenType, TokenRange, BreakType
from nominatim.api.search.token_assignment import TokenAssignment
import nominatim.api.search.db_search_fields as dbf
import nominatim.api.search.db_searches as dbs
-from nominatim.api.logging import log
+import nominatim.api.search.db_search_lookups as lookups
+
+
+def wrap_near_search(categories: List[Tuple[str, str]],
+ search: dbs.AbstractSearch) -> dbs.NearSearch:
+ """ Create a new search that wraps the given search in a search
+ for near places of the given category.
+ """
+ return dbs.NearSearch(penalty=search.penalty,
+ categories=dbf.WeightedCategories(categories,
+ [0.0] * len(categories)),
+ search=search)
+
+
+def build_poi_search(category: List[Tuple[str, str]],
+ countries: Optional[List[str]]) -> dbs.PoiSearch:
+ """ Create a new search for places by the given category, possibly
+ constraint to the given countries.
+ """
+ if countries:
+ ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
+ else:
+ ccs = dbf.WeightedStrings([], [])
+
+ class _PoiData(dbf.SearchData):
+ penalty = 0.0
+ qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
+ countries=ccs
+
+ return dbs.PoiSearch(_PoiData())
+
class SearchBuilder:
""" Build the abstract search queries from token assignments.
if sdata is None:
return
- categories = self.get_search_categories(assignment)
+ near_items = self.get_near_items(assignment)
+ if near_items is not None and not near_items:
+ return # impossible compbination of near items and category parameter
if assignment.name is None:
- if categories and not sdata.postcodes:
- sdata.qualifiers = categories
- categories = None
+ if near_items and not sdata.postcodes:
+ sdata.qualifiers = near_items
+ near_items = None
builder = self.build_poi_search(sdata)
+ elif assignment.housenumber:
+ hnr_tokens = self.query.get_tokens(assignment.housenumber,
+ TokenType.HOUSENUMBER)
+ builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
- bool(categories))
+ bool(near_items))
else:
builder = self.build_name_search(sdata, assignment.name, assignment.address,
- bool(categories))
+ bool(near_items))
- if categories:
- penalty = min(categories.penalties)
- categories.penalties = [p - penalty for p in categories.penalties]
+ if near_items:
+ penalty = min(near_items.penalties)
+ near_items.penalties = [p - penalty for p in near_items.penalties]
for search in builder:
- yield dbs.NearSearch(penalty, categories, search)
+ search_penalty = search.penalty
+ search.penalty = 0.0
+ yield dbs.NearSearch(penalty + assignment.penalty + search_penalty,
+ near_items, search)
else:
- yield from builder
+ for search in builder:
+ search.penalty += assignment.penalty
+ yield search
def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
"""
- if sdata.qualifiers or sdata.housenumbers:
- # No special searches over housenumbers or qualifiers supported.
+ if sdata.qualifiers:
+ # No special searches over qualifiers supported.
return
if sdata.countries and not address and not sdata.postcodes \
yield dbs.CountrySearch(sdata)
if sdata.postcodes and (is_category or self.configured_for_postcode):
+ penalty = 0.0 if sdata.countries else 0.1
if address:
sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
[t.token for r in address
for t in self.query.get_partials_list(r)],
- 'restrict')]
- yield dbs.PostcodeSearch(0.4, sdata)
+ lookups.Restrict)]
+ penalty += 0.2
+ yield dbs.PostcodeSearch(penalty, sdata)
+
+
+ def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
+ address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
+ """ Build a simple address search for special entries where the
+ housenumber is the main name token.
+ """
+ sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], lookups.LookupAny)]
+ expected_count = sum(t.count for t in hnrs)
+
+ partials = {t.token: t.count for trange in address
+ for t in self.query.get_partials_list(trange)}
+
+ if expected_count < 8000:
+ sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
+ list(partials), lookups.Restrict))
+ elif len(partials) != 1 or list(partials.values())[0] < 10000:
+ sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
+ list(partials), lookups.LookupAll))
+ else:
+ addr_fulls = [t.token for t
+ in self.query.get_tokens(address[0], TokenType.WORD)]
+ if len(addr_fulls) > 5:
+ return
+ sdata.lookups.append(
+ dbf.FieldLookup('nameaddress_vector', addr_fulls, lookups.LookupAny))
+
+ sdata.housenumbers = dbf.WeightedStrings([], [])
+ yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
""" Build abstract search queries for simple name or address searches.
"""
if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
- sdata.rankings.append(self.get_name_ranking(name))
- name_penalty = sdata.rankings[-1].normalize_penalty()
+ ranking = self.get_name_ranking(name)
+ name_penalty = ranking.normalize_penalty()
+ if ranking.rankings:
+ sdata.rankings.append(ranking)
for penalty, count, lookup in self.yield_lookups(name, address):
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
be searched for. This takes into account how frequent the terms
are and tries to find a lookup that optimizes index use.
"""
- penalty = 0.0 # extra penalty currently unused
-
- name_partials = self.query.get_partials_list(name)
- exp_name_count = min(t.count for t in name_partials)
- addr_partials = []
- for trange in address:
- addr_partials.extend(self.query.get_partials_list(trange))
- addr_tokens = [t.token for t in addr_partials]
- partials_indexed = all(t.is_indexed for t in name_partials) \
- and all(t.is_indexed for t in addr_partials)
+ penalty = 0.0 # extra penalty
+ name_partials = {t.token: t for t in self.query.get_partials_list(name)}
- if (len(name_partials) > 3 or exp_name_count < 1000) and partials_indexed:
- # Lookup by name partials, use address partials to restrict results.
- lookup = [dbf.FieldLookup('name_vector',
- [t.token for t in name_partials], 'lookup_all')]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
- yield penalty, exp_name_count, lookup
- return
+ addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
+ addr_tokens = list({t.token for t in addr_partials})
- exp_addr_count = min(t.count for t in addr_partials) if addr_partials else exp_name_count
- if exp_addr_count < 1000 and partials_indexed:
- # Lookup by address partials and restrict results through name terms.
- yield penalty, exp_addr_count,\
- [dbf.FieldLookup('name_vector', [t.token for t in name_partials], 'restrict'),
- dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
+ partials_indexed = all(t.is_indexed for t in name_partials.values()) \
+ and all(t.is_indexed for t in addr_partials)
+ exp_count = min(t.count for t in name_partials.values()) / (2**(len(name_partials) - 1))
+
+ if (len(name_partials) > 3 or exp_count < 8000) and partials_indexed:
+ yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
- rare_names = list(filter(lambda t: t.count < 1000, name_fulls))
- # At this point drop unindexed partials from the address.
- # This might yield wrong results, nothing we can do about that.
- if not partials_indexed:
- addr_tokens = [t.token for t in addr_partials if t.is_indexed]
- log().var_dump('before', penalty)
- penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- log().var_dump('after', penalty)
- if rare_names:
+ if name_fulls:
+ fulls_count = sum(t.count for t in name_fulls)
+ # At this point drop unindexed partials from the address.
+ # This might yield wrong results, nothing we can do about that.
+ if not partials_indexed:
+ addr_tokens = [t.token for t in addr_partials if t.is_indexed]
+ penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
# Any of the full names applies with all of the partials from the address
- lookup = [dbf.FieldLookup('name_vector', [t.token for t in rare_names], 'lookup_any')]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
- yield penalty, sum(t.count for t in rare_names), lookup
+ yield penalty, fulls_count / (2**len(addr_tokens)),\
+ dbf.lookup_by_any_name([t.token for t in name_fulls],
+ addr_tokens,
+ fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
- if all(t.is_indexed for t in name_partials):
- lookup = [dbf.FieldLookup('name_vector',
- [t.token for t in name_partials], 'lookup_all')]
- else:
- # we don't have the partials, try with the non-rare names
- non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
- if not non_rare_names:
- return
- lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
- yield penalty + 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens)),\
- min(exp_name_count, exp_addr_count), lookup
+ # We only do this if there is a reasonable number of results expected.
+ exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
+ if exp_count < 10000 and all(t.is_indexed for t in name_partials.values()):
+ lookup = [dbf.FieldLookup('name_vector', list(name_partials.keys()), lookups.LookupAll)]
+ if addr_tokens:
+ lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
+ penalty += 0.35 * max(1 if name_fulls else 0.1,
+ 5 - len(name_partials) - len(addr_tokens))
+ yield penalty, exp_count, lookup
- def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+ def get_name_ranking(self, trange: TokenRange,
+ db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
name_fulls = self.query.get_tokens(trange, TokenType.WORD)
# Fallback, sum of penalty for partials
name_partials = self.query.get_partials_list(trange)
default = sum(t.penalty for t in name_partials) + 0.2
- return dbf.FieldRanking('name_vector', default, ranks)
+ return dbf.FieldRanking(db_field, default, ranks)
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
sdata = dbf.SearchData()
sdata.penalty = assignment.penalty
if assignment.country:
- tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
- if self.details.countries:
- tokens = [t for t in tokens if t.lookup_word in self.details.countries]
- if not tokens:
- return None
+ tokens = self.get_country_tokens(assignment.country)
+ if not tokens:
+ return None
sdata.set_strings('countries', tokens)
elif self.details.countries:
sdata.countries = dbf.WeightedStrings(self.details.countries,
self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE))
if assignment.qualifier:
- sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
- TokenType.QUALIFIER))
+ tokens = self.get_qualifier_tokens(assignment.qualifier)
+ if not tokens:
+ return None
+ sdata.set_qualifiers(tokens)
+ elif self.details.categories:
+ sdata.qualifiers = dbf.WeightedCategories(self.details.categories,
+ [0.0] * len(self.details.categories))
if assignment.address:
- sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
+ if not assignment.name and assignment.housenumber:
+ # housenumber search: the first item needs to be handled like
+ # a name in ranking or penalties are not comparable with
+ # normal searches.
+ sdata.set_ranking([self.get_name_ranking(assignment.address[0],
+ db_field='nameaddress_vector')]
+ + [self.get_addr_ranking(r) for r in assignment.address[1:]])
+ else:
+ sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
else:
sdata.rankings = []
return sdata
- def get_search_categories(self,
- assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
- """ Collect tokens for category search or use the categories
- requested per parameter.
- Returns None if no category search is requested.
+ def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of country tokens for the given range,
+ optionally filtered by the country list from the details
+ parameters.
"""
- if assignment.category:
- tokens = [t for t in self.query.get_tokens(assignment.category,
- TokenType.CATEGORY)
- if not self.details.categories
- or t.get_category() in self.details.categories]
- return dbf.WeightedCategories([t.get_category() for t in tokens],
- [t.penalty for t in tokens])
+ tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ if self.details.countries:
+ tokens = [t for t in tokens if t.lookup_word in self.details.countries]
+ return tokens
+
+
+ def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of qualifier tokens for the given range,
+ optionally filtered by the qualifier list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
if self.details.categories:
- return dbf.WeightedCategories(self.details.categories,
- [0.0] * len(self.details.categories))
+ tokens = [t for t in tokens if t.get_category() in self.details.categories]
+
+ return tokens
+
+
+ def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
+ """ Collect tokens for near items search or use the categories
+ requested per parameter.
+ Returns None if no category search is requested.
+ """
+ if assignment.near_item:
+ tokens: Dict[Tuple[str, str], float] = {}
+ for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
+ cat = t.get_category()
+ # The category of a near search will be that of near_item.
+ # Thus, if search is restricted to a category parameter,
+ # the two sets must intersect.
+ if (not self.details.categories or cat in self.details.categories)\
+ and t.penalty < tokens.get(cat, 1000.0):
+ tokens[cat] = t.penalty
+ return dbf.WeightedCategories(list(tokens.keys()), list(tokens.values()))
return None