import heapq
from nominatim.api.types import SearchDetails, DataLayer
-from nominatim.api.search.query import QueryStruct, TokenType, TokenRange, BreakType
+from nominatim.api.search.query import QueryStruct, Token, TokenType, TokenRange, BreakType
from nominatim.api.search.token_assignment import TokenAssignment
import nominatim.api.search.db_search_fields as dbf
import nominatim.api.search.db_searches as dbs
from nominatim.api.logging import log
+
+def wrap_near_search(categories: List[Tuple[str, str]],
+ search: dbs.AbstractSearch) -> dbs.NearSearch:
+ """ Create a new search that wraps the given search in a search
+ for near places of the given category.
+ """
+ return dbs.NearSearch(penalty=search.penalty,
+ categories=dbf.WeightedCategories(categories,
+ [0.0] * len(categories)),
+ search=search)
+
+
+def build_poi_search(category: List[Tuple[str, str]],
+ countries: Optional[List[str]]) -> dbs.PoiSearch:
+ """ Create a new search for places by the given category, possibly
+ constraint to the given countries.
+ """
+ if countries:
+ ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
+ else:
+ ccs = dbf.WeightedStrings([], [])
+
+ class _PoiData(dbf.SearchData):
+ penalty = 0.0
+ qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
+ countries=ccs
+
+ return dbs.PoiSearch(_PoiData())
+
+
class SearchBuilder:
""" Build the abstract search queries from token assignments.
"""
sdata.qualifiers = categories
categories = None
builder = self.build_poi_search(sdata)
+ elif assignment.housenumber:
+ hnr_tokens = self.query.get_tokens(assignment.housenumber,
+ TokenType.HOUSENUMBER)
+ builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
bool(categories))
""" Build abstract search queries for searches that do not involve
a named place.
"""
- if sdata.qualifiers or sdata.housenumbers:
- # No special searches over housenumbers or qualifiers supported.
+ if sdata.qualifiers:
+ # No special searches over qualifiers supported.
return
if sdata.countries and not address and not sdata.postcodes \
yield dbs.CountrySearch(sdata)
if sdata.postcodes and (is_category or self.configured_for_postcode):
+ penalty = 0.0 if sdata.countries else 0.1
if address:
sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
[t.token for r in address
for t in self.query.get_partials_list(r)],
'restrict')]
- yield dbs.PostcodeSearch(0.4, sdata)
+ penalty += 0.2
+ yield dbs.PostcodeSearch(penalty, sdata)
+
+
+ def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
+ address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
+ """ Build a simple address search for special entries where the
+ housenumber is the main name token.
+ """
+ partial_tokens: List[int] = []
+ for trange in address:
+ partial_tokens.extend(t.token for t in self.query.get_partials_list(trange))
+
+ sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any'),
+ dbf.FieldLookup('nameaddress_vector', partial_tokens, 'lookup_all')
+ ]
+ sdata.housenumbers = dbf.WeightedStrings([], [])
+ yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
def build_name_search(self, sdata: dbf.SearchData,
""" Build abstract search queries for simple name or address searches.
"""
if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
- sdata.rankings.append(self.get_name_ranking(name))
- name_penalty = sdata.rankings[-1].normalize_penalty()
+ ranking = self.get_name_ranking(name)
+ name_penalty = ranking.normalize_penalty()
+ if ranking.rankings:
+ sdata.rankings.append(ranking)
for penalty, count, lookup in self.yield_lookups(name, address):
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
exp_addr_count = min(t.count for t in addr_partials) if addr_partials else exp_name_count
if exp_addr_count < 1000 and partials_indexed:
# Lookup by address partials and restrict results through name terms.
- yield penalty, exp_addr_count,\
+ # Give this a small penalty because lookups in the address index are
+ # more expensive
+ yield penalty + exp_addr_count/5000, exp_addr_count,\
[dbf.FieldLookup('name_vector', [t.token for t in name_partials], 'restrict'),
dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
return
# Partial term to frequent. Try looking up by rare full names first.
name_fulls = self.query.get_tokens(name, TokenType.WORD)
- rare_names = list(filter(lambda t: t.count < 1000, name_fulls))
+ rare_names = list(filter(lambda t: t.count < 10000, name_fulls))
# At this point drop unindexed partials from the address.
# This might yield wrong results, nothing we can do about that.
if not partials_indexed:
addr_tokens = [t.token for t in addr_partials if t.is_indexed]
- log().var_dump('before', penalty)
penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
- log().var_dump('after', penalty)
if rare_names:
# Any of the full names applies with all of the partials from the address
lookup = [dbf.FieldLookup('name_vector', [t.token for t in rare_names], 'lookup_any')]
yield penalty, sum(t.count for t in rare_names), lookup
# To catch remaining results, lookup by name and address
- if all(t.is_indexed for t in name_partials):
- lookup = [dbf.FieldLookup('name_vector',
- [t.token for t in name_partials], 'lookup_all')]
- else:
- # we don't have the partials, try with the non-rare names
- non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
- if not non_rare_names:
- return
- lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
- if addr_tokens:
- lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
- yield penalty + 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens)),\
- min(exp_name_count, exp_addr_count), lookup
+ # We only do this if there is a reasonable number of results expected.
+ if min(exp_name_count, exp_addr_count) < 10000:
+ if all(t.is_indexed for t in name_partials):
+ lookup = [dbf.FieldLookup('name_vector',
+ [t.token for t in name_partials], 'lookup_all')]
+ else:
+ # we don't have the partials, try with the non-rare names
+ non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
+ if not non_rare_names:
+ return
+ lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
+ if addr_tokens:
+ lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
+ penalty += 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens))
+ if len(rare_names) == len(name_fulls):
+ # if there already was a search for all full tokens,
+ # avoid this if anything has been found
+ penalty += 0.25
+ yield penalty, min(exp_name_count, exp_addr_count), lookup
def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking: