"""
Convertion from token assignment to an abstract DB search.
"""
-from typing import Optional, List, Tuple, Iterator
+from typing import Optional, List, Tuple, Iterator, Dict
import heapq
from nominatim.api.types import SearchDetails, DataLayer
from nominatim.api.search.token_assignment import TokenAssignment
import nominatim.api.search.db_search_fields as dbf
import nominatim.api.search.db_searches as dbs
+import nominatim.api.search.db_search_lookups as lookups
def wrap_near_search(categories: List[Tuple[str, str]],
if sdata is None:
return
- categories = self.get_search_categories(assignment)
+ near_items = self.get_near_items(assignment)
+ if near_items is not None and not near_items:
+ return # impossible compbination of near items and category parameter
if assignment.name is None:
- if categories and not sdata.postcodes:
- sdata.qualifiers = categories
- categories = None
+ if near_items and not sdata.postcodes:
+ sdata.qualifiers = near_items
+ near_items = None
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
- bool(categories))
+ bool(near_items))
else:
builder = self.build_name_search(sdata, assignment.name, assignment.address,
- bool(categories))
+ bool(near_items))
- if categories:
- penalty = min(categories.penalties)
- categories.penalties = [p - penalty for p in categories.penalties]
+ if near_items:
+ penalty = min(near_items.penalties)
+ near_items.penalties = [p - penalty for p in near_items.penalties]
for search in builder:
- yield dbs.NearSearch(penalty + assignment.penalty, categories, search)
+ search_penalty = search.penalty
+ search.penalty = 0.0
+ yield dbs.NearSearch(penalty + assignment.penalty + search_penalty,
+ near_items, search)
else:
for search in builder:
search.penalty += assignment.penalty
sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
[t.token for r in address
for t in self.query.get_partials_list(r)],
- 'restrict')]
+ lookups.Restrict)]
penalty += 0.2
yield dbs.PostcodeSearch(penalty, sdata)
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
- sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any')]
+ sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], lookups.LookupAny)]
+ expected_count = sum(t.count for t in hnrs)
partials = [t for trange in address
for t in self.query.get_partials_list(trange)]
- if len(partials) != 1 or partials[0].count < 10000:
+ if expected_count < 8000:
sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
- [t.token for t in partials], 'lookup_all'))
+ [t.token for t in partials], lookups.Restrict))
+ elif len(partials) != 1 or partials[0].count < 10000:
+ sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
+ [t.token for t in partials], lookups.LookupAll))
else:
sdata.lookups.append(
dbf.FieldLookup('nameaddress_vector',
[t.token for t
in self.query.get_tokens(address[0], TokenType.WORD)],
- 'lookup_any'))
+ lookups.LookupAny))
sdata.housenumbers = dbf.WeightedStrings([], [])
- yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
+ yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
- fulls_count = sum(t.count for t in name_fulls)
- # At this point drop unindexed partials from the address.
- # This might yield wrong results, nothing we can do about that.
- if not partials_indexed:
- addr_tokens = [t.token for t in addr_partials if t.is_indexed]
- penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- # Any of the full names applies with all of the partials from the address
- yield penalty, fulls_count / (2**len(addr_partials)),\
- dbf.lookup_by_any_name([t.token for t in name_fulls], addr_tokens,
- 'restrict' if fulls_count < 10000 else 'lookup_all')
+ if name_fulls:
+ fulls_count = sum(t.count for t in name_fulls)
+ # At this point drop unindexed partials from the address.
+ # This might yield wrong results, nothing we can do about that.
+ if not partials_indexed:
+ addr_tokens = [t.token for t in addr_partials if t.is_indexed]
+ penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
+ # Any of the full names applies with all of the partials from the address
+ yield penalty, fulls_count / (2**len(addr_partials)),\
+ dbf.lookup_by_any_name([t.token for t in name_fulls],
+ addr_tokens, fulls_count > 10000)
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
exp_count = exp_count / (2**len(addr_partials)) if addr_partials else exp_count
if exp_count < 10000 and all(t.is_indexed for t in name_partials):
- lookup = [dbf.FieldLookup('name_vector', name_tokens, 'lookup_all')]
+ lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
+ lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, lookups.LookupAll))
penalty += 0.35 * max(0, 5 - len(name_partials) - len(addr_tokens))
yield penalty, exp_count, lookup
self.query.get_tokens(assignment.postcode,
TokenType.POSTCODE))
if assignment.qualifier:
- sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
- TokenType.QUALIFIER))
+ tokens = self.query.get_tokens(assignment.qualifier, TokenType.QUALIFIER)
+ if self.details.categories:
+ tokens = [t for t in tokens if t.get_category() in self.details.categories]
+ if not tokens:
+ return None
+ sdata.set_qualifiers(tokens)
+ elif self.details.categories:
+ sdata.qualifiers = dbf.WeightedCategories(self.details.categories,
+ [0.0] * len(self.details.categories))
if assignment.address:
sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
return sdata
- def get_search_categories(self,
- assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
- """ Collect tokens for category search or use the categories
+ def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
+ """ Collect tokens for near items search or use the categories
requested per parameter.
Returns None if no category search is requested.
"""
- if assignment.category:
- tokens = [t for t in self.query.get_tokens(assignment.category,
- TokenType.CATEGORY)
- if not self.details.categories
- or t.get_category() in self.details.categories]
- return dbf.WeightedCategories([t.get_category() for t in tokens],
- [t.penalty for t in tokens])
-
- if self.details.categories:
- return dbf.WeightedCategories(self.details.categories,
- [0.0] * len(self.details.categories))
+ if assignment.near_item:
+ tokens: Dict[Tuple[str, str], float] = {}
+ for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
+ cat = t.get_category()
+ # The category of a near search will be that of near_item.
+ # Thus, if search is restricted to a category parameter,
+ # the two sets must intersect.
+ if (not self.details.categories or cat in self.details.categories)\
+ and t.penalty < tokens.get(cat, 1000.0):
+ tokens[cat] = t.penalty
+ return dbf.WeightedCategories(list(tokens.keys()), list(tokens.values()))
return None