# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Convertion from token assignment to an abstract DB search.
+Conversion from token assignment to an abstract DB search.
"""
from typing import Optional, List, Tuple, Iterator, Dict
import heapq
from nominatim.api.search.token_assignment import TokenAssignment
import nominatim.api.search.db_search_fields as dbf
import nominatim.api.search.db_searches as dbs
+import nominatim.api.search.db_search_lookups as lookups
def wrap_near_search(categories: List[Tuple[str, str]],
penalty = min(near_items.penalties)
near_items.penalties = [p - penalty for p in near_items.penalties]
for search in builder:
- yield dbs.NearSearch(penalty + assignment.penalty, near_items, search)
+ search_penalty = search.penalty
+ search.penalty = 0.0
+ yield dbs.NearSearch(penalty + assignment.penalty + search_penalty,
+ near_items, search)
else:
for search in builder:
search.penalty += assignment.penalty
sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
[t.token for r in address
for t in self.query.get_partials_list(r)],
- 'restrict')]
+ lookups.Restrict)]
penalty += 0.2
yield dbs.PostcodeSearch(penalty, sdata)
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
- sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any')]
+ sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], lookups.LookupAny)]
+ expected_count = sum(t.count for t in hnrs)
- partials = [t for trange in address
- for t in self.query.get_partials_list(trange)]
+ partials = {t.token: t.count for trange in address
+ for t in self.query.get_partials_list(trange)}
- if len(partials) != 1 or partials[0].count < 10000:
+ if expected_count < 8000:
sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
- [t.token for t in partials], 'lookup_all'))
+ list(partials), lookups.Restrict))
+ elif len(partials) != 1 or list(partials.values())[0] < 10000:
+ sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
+ list(partials), lookups.LookupAll))
else:
+ addr_fulls = [t.token for t
+ in self.query.get_tokens(address[0], TokenType.WORD)]
+ if len(addr_fulls) > 5:
+ return
sdata.lookups.append(
- dbf.FieldLookup('nameaddress_vector',
- [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)],
- 'lookup_any'))
+ dbf.FieldLookup('nameaddress_vector', addr_fulls, lookups.LookupAny))
sdata.housenumbers = dbf.WeightedStrings([], [])
- yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
+ yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
are and tries to find a lookup that optimizes index use.
"""
penalty = 0.0 # extra penalty
- name_partials = self.query.get_partials_list(name)
- name_tokens = [t.token for t in name_partials]
+ name_partials = {t.token: t for t in self.query.get_partials_list(name)}
addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
- addr_tokens = [t.token for t in addr_partials]
+ addr_tokens = list({t.token for t in addr_partials})
- partials_indexed = all(t.is_indexed for t in name_partials) \
+ partials_indexed = all(t.is_indexed for t in name_partials.values()) \
and all(t.is_indexed for t in addr_partials)
- exp_count = min(t.count for t in name_partials) / (2**(len(name_partials) - 1))
+ exp_count = min(t.count for t in name_partials.values()) / (2**(len(name_partials) - 1))
if (len(name_partials) > 3 or exp_count < 8000) and partials_indexed:
- yield penalty, exp_count, dbf.lookup_by_names(name_tokens, addr_tokens)
+ yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
- fulls_count = sum(t.count for t in name_fulls)
- # At this point drop unindexed partials from the address.
- # This might yield wrong results, nothing we can do about that.
- if not partials_indexed:
- addr_tokens = [t.token for t in addr_partials if t.is_indexed]
- penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- # Any of the full names applies with all of the partials from the address
- yield penalty, fulls_count / (2**len(addr_partials)),\
- dbf.lookup_by_any_name([t.token for t in name_fulls], addr_tokens,
- 'restrict' if fulls_count < 10000 else 'lookup_all')
+ if name_fulls:
+ fulls_count = sum(t.count for t in name_fulls)
+ # At this point drop unindexed partials from the address.
+ # This might yield wrong results, nothing we can do about that.
+ if not partials_indexed:
+ addr_tokens = [t.token for t in addr_partials if t.is_indexed]
+ penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
+ # Any of the full names applies with all of the partials from the address
+ yield penalty, fulls_count / (2**len(addr_tokens)),\
+ dbf.lookup_by_any_name([t.token for t in name_fulls],
+ addr_tokens,
+ fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
- exp_count = exp_count / (2**len(addr_partials)) if addr_partials else exp_count
- if exp_count < 10000 and all(t.is_indexed for t in name_partials):
- lookup = [dbf.FieldLookup('name_vector', name_tokens, 'lookup_all')]
+ exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
+ if exp_count < 10000 and all(t.is_indexed for t in name_partials.values()):
+ lookup = [dbf.FieldLookup('name_vector', list(name_partials.keys()), lookups.LookupAll)]
if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
- penalty += 0.35 * max(0, 5 - len(name_partials) - len(addr_tokens))
+ lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
+ penalty += 0.35 * max(1 if name_fulls else 0.1,
+ 5 - len(name_partials) - len(addr_tokens))
yield penalty, exp_count, lookup
- def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+ def get_name_ranking(self, trange: TokenRange,
+ db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
name_fulls = self.query.get_tokens(trange, TokenType.WORD)
# Fallback, sum of penalty for partials
name_partials = self.query.get_partials_list(trange)
default = sum(t.penalty for t in name_partials) + 0.2
- return dbf.FieldRanking('name_vector', default, ranks)
+ return dbf.FieldRanking(db_field, default, ranks)
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
sdata = dbf.SearchData()
sdata.penalty = assignment.penalty
if assignment.country:
- tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
- if self.details.countries:
- tokens = [t for t in tokens if t.lookup_word in self.details.countries]
- if not tokens:
- return None
+ tokens = self.get_country_tokens(assignment.country)
+ if not tokens:
+ return None
sdata.set_strings('countries', tokens)
elif self.details.countries:
sdata.countries = dbf.WeightedStrings(self.details.countries,
self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE))
if assignment.qualifier:
- tokens = self.query.get_tokens(assignment.qualifier, TokenType.QUALIFIER)
- if self.details.categories:
- tokens = [t for t in tokens if t.get_category() in self.details.categories]
- if not tokens:
- return None
+ tokens = self.get_qualifier_tokens(assignment.qualifier)
+ if not tokens:
+ return None
sdata.set_qualifiers(tokens)
elif self.details.categories:
sdata.qualifiers = dbf.WeightedCategories(self.details.categories,
[0.0] * len(self.details.categories))
if assignment.address:
- sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
+ if not assignment.name and assignment.housenumber:
+ # housenumber search: the first item needs to be handled like
+ # a name in ranking or penalties are not comparable with
+ # normal searches.
+ sdata.set_ranking([self.get_name_ranking(assignment.address[0],
+ db_field='nameaddress_vector')]
+ + [self.get_addr_ranking(r) for r in assignment.address[1:]])
+ else:
+ sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
else:
sdata.rankings = []
return sdata
+ def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of country tokens for the given range,
+ optionally filtered by the country list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ if self.details.countries:
+ tokens = [t for t in tokens if t.lookup_word in self.details.countries]
+
+ return tokens
+
+
+ def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of qualifier tokens for the given range,
+ optionally filtered by the qualifier list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
+ if self.details.categories:
+ tokens = [t for t in tokens if t.get_category() in self.details.categories]
+
+ return tokens
+
+
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.