# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Convertion from token assignment to an abstract DB search.
+Conversion from token assignment to an abstract DB search.
"""
from typing import Optional, List, Tuple, Iterator, Dict
import heapq
sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
list(partials), lookups.LookupAll))
else:
+ addr_fulls = [t.token for t
+ in self.query.get_tokens(address[0], TokenType.WORD)]
+ if len(addr_fulls) > 5:
+ return
sdata.lookups.append(
- dbf.FieldLookup('nameaddress_vector',
- [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)],
- lookups.LookupAny))
+ dbf.FieldLookup('nameaddress_vector', addr_fulls, lookups.LookupAny))
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
# Any of the full names applies with all of the partials from the address
yield penalty, fulls_count / (2**len(addr_tokens)),\
dbf.lookup_by_any_name([t.token for t in name_fulls],
- addr_tokens, fulls_count > 10000)
+ addr_tokens,
+ fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
lookup = [dbf.FieldLookup('name_vector', list(name_partials.keys()), lookups.LookupAll)]
if addr_tokens:
lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
- penalty += 0.35 * max(0, 5 - len(name_partials) - len(addr_tokens))
+ penalty += 0.35 * max(1 if name_fulls else 0.1,
+ 5 - len(name_partials) - len(addr_tokens))
yield penalty, exp_count, lookup
- def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+ def get_name_ranking(self, trange: TokenRange,
+ db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
name_fulls = self.query.get_tokens(trange, TokenType.WORD)
# Fallback, sum of penalty for partials
name_partials = self.query.get_partials_list(trange)
default = sum(t.penalty for t in name_partials) + 0.2
- return dbf.FieldRanking('name_vector', default, ranks)
+ return dbf.FieldRanking(db_field, default, ranks)
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
sdata = dbf.SearchData()
sdata.penalty = assignment.penalty
if assignment.country:
- tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
- if self.details.countries:
- tokens = [t for t in tokens if t.lookup_word in self.details.countries]
- if not tokens:
- return None
+ tokens = self.get_country_tokens(assignment.country)
+ if not tokens:
+ return None
sdata.set_strings('countries', tokens)
elif self.details.countries:
sdata.countries = dbf.WeightedStrings(self.details.countries,
self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE))
if assignment.qualifier:
- tokens = self.query.get_tokens(assignment.qualifier, TokenType.QUALIFIER)
- if self.details.categories:
- tokens = [t for t in tokens if t.get_category() in self.details.categories]
- if not tokens:
- return None
+ tokens = self.get_qualifier_tokens(assignment.qualifier)
+ if not tokens:
+ return None
sdata.set_qualifiers(tokens)
elif self.details.categories:
sdata.qualifiers = dbf.WeightedCategories(self.details.categories,
[0.0] * len(self.details.categories))
if assignment.address:
- sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
+ if not assignment.name and assignment.housenumber:
+ # housenumber search: the first item needs to be handled like
+ # a name in ranking or penalties are not comparable with
+ # normal searches.
+ sdata.set_ranking([self.get_name_ranking(assignment.address[0],
+ db_field='nameaddress_vector')]
+ + [self.get_addr_ranking(r) for r in assignment.address[1:]])
+ else:
+ sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
else:
sdata.rankings = []
return sdata
+ def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of country tokens for the given range,
+ optionally filtered by the country list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ if self.details.countries:
+ tokens = [t for t in tokens if t.lookup_word in self.details.countries]
+
+ return tokens
+
+
+ def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ """ Return the list of qualifier tokens for the given range,
+ optionally filtered by the qualifier list from the details
+ parameters.
+ """
+ tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
+ if self.details.categories:
+ tokens = [t for t in tokens if t.get_category() in self.details.categories]
+
+ return tokens
+
+
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.