# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
-Convertion from token assignment to an abstract DB search.
+Conversion from token assignment to an abstract DB search.
"""
from typing import Optional, List, Tuple, Iterator, Dict
import heapq
sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
list(partials), lookups.LookupAll))
else:
+ addr_fulls = [t.token for t
+ in self.query.get_tokens(address[0], TokenType.WORD)]
+ if len(addr_fulls) > 5:
+ return
sdata.lookups.append(
- dbf.FieldLookup('nameaddress_vector',
- [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)],
- lookups.LookupAny))
+ dbf.FieldLookup('nameaddress_vector', addr_fulls, lookups.LookupAny))
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
name_fulls = self.query.get_tokens(name, TokenType.WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
- # At this point drop unindexed partials from the address.
- # This might yield wrong results, nothing we can do about that.
- if not partials_indexed:
- addr_tokens = [t.token for t in addr_partials if t.is_indexed]
+ if len(name_partials) == 1:
+ penalty += min(0.5, max(0, (exp_count - 50 * fulls_count) / (2000 * fulls_count)))
+ if partials_indexed:
penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- # Any of the full names applies with all of the partials from the address
- yield penalty, fulls_count / (2**len(addr_tokens)),\
- dbf.lookup_by_any_name([t.token for t in name_fulls],
- addr_tokens,
- fulls_count > 30000 / max(1, len(addr_tokens)))
+
+ yield penalty,fulls_count / (2**len(addr_tokens)), \
+ self.get_full_name_ranking(name_fulls, addr_partials,
+ fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
if exp_count < 10000 and all(t.is_indexed for t in name_partials.values()):
- lookup = [dbf.FieldLookup('name_vector', list(name_partials.keys()), lookups.LookupAll)]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
penalty += 0.35 * max(1 if name_fulls else 0.1,
5 - len(name_partials) - len(addr_tokens))
- yield penalty, exp_count, lookup
+ yield penalty, exp_count,\
+ self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
+
+
+ def get_name_address_ranking(self, name_tokens: List[int],
+ addr_partials: List[Token]) -> List[dbf.FieldLookup]:
+ """ Create a ranking expression looking up by name and address.
+ """
+ lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
+
+ addr_restrict_tokens = []
+ addr_lookup_tokens = []
+ for t in addr_partials:
+ if t.is_indexed:
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
+
+ if addr_restrict_tokens:
+ lookup.append(dbf.FieldLookup('nameaddress_vector',
+ addr_restrict_tokens, lookups.Restrict))
+ if addr_lookup_tokens:
+ lookup.append(dbf.FieldLookup('nameaddress_vector',
+ addr_lookup_tokens, lookups.LookupAll))
+
+ return lookup
+
+
+ def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
+ use_lookup: bool) -> List[dbf.FieldLookup]:
+ """ Create a ranking expression with full name terms and
+ additional address lookup. When 'use_lookup' is true, then
+ address lookups will use the index, when the occurences are not
+ too many.
+ """
+ # At this point drop unindexed partials from the address.
+ # This might yield wrong results, nothing we can do about that.
+ if use_lookup:
+ addr_restrict_tokens = []
+ addr_lookup_tokens = []
+ for t in addr_partials:
+ if t.is_indexed:
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
+ else:
+ addr_restrict_tokens = [t.token for t in addr_partials if t.is_indexed]
+ addr_lookup_tokens = []
+
+ return dbf.lookup_by_any_name([t.token for t in name_fulls],
+ addr_restrict_tokens, addr_lookup_tokens)
def get_name_ranking(self, trange: TokenRange,