from .token_assignment import TokenAssignment
from . import db_search_fields as dbf
from . import db_searches as dbs
from .token_assignment import TokenAssignment
from . import db_search_fields as dbf
from . import db_searches as dbs
class _PoiData(dbf.SearchData):
penalty = 0.0
qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
class _PoiData(dbf.SearchData):
penalty = 0.0
qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
@property
def configured_for_country(self) -> bool:
""" Return true if the search details are configured to
allow countries in the result.
"""
return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
@property
def configured_for_country(self) -> bool:
""" Return true if the search details are configured to
allow countries in the result.
"""
return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
""" Yield all possible abstract searches for the given token assignment.
def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
""" Yield all possible abstract searches for the given token assignment.
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search query for a simple category search.
This kind of search requires an additional geographic constraint.
def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search query for a simple category search.
This kind of search requires an additional geographic constraint.
and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
yield dbs.PoiSearch(sdata)
and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
yield dbs.PoiSearch(sdata)
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
[t.token for r in address
for t in self.query.get_partials_list(r)],
lookups.Restrict)]
[t.token for r in address
for t in self.query.get_partials_list(r)],
lookups.Restrict)]
-
- def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
- address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
+ def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token],
+ address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]:
list(partials), lookups.LookupAll))
else:
addr_fulls = [t.token for t
list(partials), lookups.LookupAll))
else:
addr_fulls = [t.token for t
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches.
"""
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches.
"""
-
- def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
- -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
+ def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
+ ) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
are and tries to find a lookup that optimizes index use.
"""
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
are and tries to find a lookup that optimizes index use.
"""
name_partials = {t.token: t for t in self.query.get_partials_list(name)}
addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
addr_tokens = list({t.token for t in addr_partials})
name_partials = {t.token: t for t in self.query.get_partials_list(name)}
addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
addr_tokens = list({t.token for t in addr_partials})
yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
- if fulls_count < 50000 or addr_count < 30000:
- yield penalty,fulls_count / (2**len(addr_tokens)), \
+ if fulls_count < 80000 or addr_count < 50000:
+ yield penalty, fulls_count / (2**len(addr_tokens)), \
self.get_full_name_ranking(name_fulls, addr_partials,
fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
self.get_full_name_ranking(name_fulls, addr_partials,
fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
penalty += 0.35 * max(1 if name_fulls else 0.1,
5 - len(name_partials) - len(addr_tokens))
penalty += 0.35 * max(1 if name_fulls else 0.1,
5 - len(name_partials) - len(addr_tokens))
- yield penalty, exp_count,\
- self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
-
+ yield penalty, exp_count, \
+ self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
- if t.is_indexed:
- if t.addr_count > 20000:
- addr_restrict_tokens.append(t.token)
- else:
- addr_lookup_tokens.append(t.token)
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
-
- def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
+ def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
# This might yield wrong results, nothing we can do about that.
if use_lookup:
addr_restrict_tokens = []
# This might yield wrong results, nothing we can do about that.
if use_lookup:
addr_restrict_tokens = []
- addr_lookup_tokens = []
- for t in addr_partials:
- if t.is_indexed:
- if t.addr_count > 20000:
- addr_restrict_tokens.append(t.token)
- else:
- addr_lookup_tokens.append(t.token)
+ addr_lookup_tokens = [t.token for t in addr_partials]
addr_lookup_tokens = []
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
addr_lookup_tokens = []
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
ranks: List[dbf.RankedTokens] = []
heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
ranks: List[dbf.RankedTokens] = []
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end,
penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end,
heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end:
heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end:
ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens),
rank.tokens))
ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens),
rank.tokens))
def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
""" Collect the tokens for the non-name search fields in the
assignment.
def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
""" Collect the tokens for the non-name search fields in the
assignment.
if assignment.housenumber:
sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber,
if assignment.housenumber:
sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber,
if assignment.postcode:
sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode,
if assignment.postcode:
sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode,
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
parameters.
"""
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
parameters.
"""
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
parameters.
"""
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
parameters.
"""
if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories]
return tokens
if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories]
return tokens
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.
cat = t.get_category()
# The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter,
cat = t.get_category()
# The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter,