# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Convertion from token assignment to an abstract DB search.
+Conversion from token assignment to an abstract DB search.
"""
from typing import Optional, List, Tuple, Iterator, Dict
import heapq
sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], lookups.LookupAny)]
expected_count = sum(t.count for t in hnrs)
- partials = {t.token: t.count for trange in address
+ partials = {t.token: t.addr_count for trange in address
for t in self.query.get_partials_list(trange)}
if expected_count < 8000:
sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
list(partials), lookups.LookupAll))
else:
+ addr_fulls = [t.token for t
+ in self.query.get_tokens(address[0], TokenType.WORD)]
+ if len(addr_fulls) > 5:
+ return
sdata.lookups.append(
- dbf.FieldLookup('nameaddress_vector',
- [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)],
- lookups.LookupAny))
+ dbf.FieldLookup('nameaddress_vector', addr_fulls, lookups.LookupAny))
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
+ addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
- # At this point drop unindexed partials from the address.
- # This might yield wrong results, nothing we can do about that.
- if not partials_indexed:
- addr_tokens = [t.token for t in addr_partials if t.is_indexed]
+ if partials_indexed:
penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- # Any of the full names applies with all of the partials from the address
- yield penalty, fulls_count / (2**len(addr_tokens)),\
- dbf.lookup_by_any_name([t.token for t in name_fulls],
- addr_tokens,
- fulls_count > 30000 / max(1, len(addr_tokens)))
+
+ if fulls_count < 50000 or addr_count < 30000:
+ yield penalty,fulls_count / (2**len(addr_tokens)), \
+ self.get_full_name_ranking(name_fulls, addr_partials,
+ fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
- if exp_count < 10000 and all(t.is_indexed for t in name_partials.values()):
- lookup = [dbf.FieldLookup('name_vector', list(name_partials.keys()), lookups.LookupAll)]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
+ if exp_count < 10000 and addr_count < 20000\
+ and all(t.is_indexed for t in name_partials.values()):
penalty += 0.35 * max(1 if name_fulls else 0.1,
5 - len(name_partials) - len(addr_tokens))
- yield penalty, exp_count, lookup
+ yield penalty, exp_count,\
+ self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
+
+
+ def get_name_address_ranking(self, name_tokens: List[int],
+ addr_partials: List[Token]) -> List[dbf.FieldLookup]:
+ """ Create a ranking expression looking up by name and address.
+ """
+ lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
+
+ addr_restrict_tokens = []
+ addr_lookup_tokens = []
+ for t in addr_partials:
+ if t.is_indexed:
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
+
+ if addr_restrict_tokens:
+ lookup.append(dbf.FieldLookup('nameaddress_vector',
+ addr_restrict_tokens, lookups.Restrict))
+ if addr_lookup_tokens:
+ lookup.append(dbf.FieldLookup('nameaddress_vector',
+ addr_lookup_tokens, lookups.LookupAll))
+
+ return lookup
+
+
+ def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
+ use_lookup: bool) -> List[dbf.FieldLookup]:
+ """ Create a ranking expression with full name terms and
+ additional address lookup. When 'use_lookup' is true, then
+ address lookups will use the index, when the occurences are not
+ too many.
+ """
+ # At this point drop unindexed partials from the address.
+ # This might yield wrong results, nothing we can do about that.
+ if use_lookup:
+ addr_restrict_tokens = []
+ addr_lookup_tokens = []
+ for t in addr_partials:
+ if t.is_indexed:
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
+ else:
+ addr_restrict_tokens = [t.token for t in addr_partials if t.is_indexed]
+ addr_lookup_tokens = []
+ return dbf.lookup_by_any_name([t.token for t in name_fulls],
+ addr_restrict_tokens, addr_lookup_tokens)
- def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+
+ def get_name_ranking(self, trange: TokenRange,
+ db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
name_fulls = self.query.get_tokens(trange, TokenType.WORD)
# Fallback, sum of penalty for partials
name_partials = self.query.get_partials_list(trange)
default = sum(t.penalty for t in name_partials) + 0.2
- return dbf.FieldRanking('name_vector', default, ranks)
+ return dbf.FieldRanking(db_field, default, ranks)
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
sdata = dbf.SearchData()
sdata.penalty = assignment.penalty
if assignment.country:
- tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
- if self.details.countries:
- tokens = [t for t in tokens if t.lookup_word in self.details.countries]
- if not tokens:
- return None
+ tokens = self.get_country_tokens(assignment.country)
+ if not tokens:
+ return None
sdata.set_strings('countries', tokens)
elif self.details.countries:
sdata.countries = dbf.WeightedStrings(self.details.countries,
self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE))
if assignment.qualifier:
- tokens = self.query.get_tokens(assignment.qualifier, TokenType.QUALIFIER)
- if self.details.categories:
- tokens = [t for t in tokens if t.get_category() in self.details.categories]
- if not tokens:
- return None
+ tokens = self.get_qualifier_tokens(assignment.qualifier)
+ if not tokens:
+ return None
sdata.set_qualifiers(tokens)
elif self.details.categories:
sdata.qualifiers = dbf.WeightedCategories(self.details.categories,
[0.0] * len(self.details.categories))
if assignment.address:
- sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
+ if not assignment.name and assignment.housenumber:
+ # housenumber search: the first item needs to be handled like
+ # a name in ranking or penalties are not comparable with
+ # normal searches.
+ sdata.set_ranking([self.get_name_ranking(assignment.address[0],
+ db_field='nameaddress_vector')]
+ + [self.get_addr_ranking(r) for r in assignment.address[1:]])
+ else:
+ sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
else:
sdata.rankings = []
return sdata
+ def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of country tokens for the given range,
+ optionally filtered by the country list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ if self.details.countries:
+ tokens = [t for t in tokens if t.lookup_word in self.details.countries]
+
+ return tokens
+
+
+ def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of qualifier tokens for the given range,
+ optionally filtered by the qualifier list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
+ if self.details.categories:
+ tokens = [t for t in tokens if t.get_category() in self.details.categories]
+
+ return tokens
+
+
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.