class _PoiData(dbf.SearchData):
penalty = 0.0
qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
- countries=ccs
+ countries = ccs
return dbs.PoiSearch(_PoiData())
self.query = query
self.details = details
-
@property
def configured_for_country(self) -> bool:
""" Return true if the search details are configured to
allow countries in the result.
"""
return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
- and self.details.layer_enabled(DataLayer.ADDRESS)
-
+ and self.details.layer_enabled(DataLayer.ADDRESS)
@property
def configured_for_postcode(self) -> bool:
allow postcodes in the result.
"""
return self.details.min_rank <= 5 and self.details.max_rank >= 11\
- and self.details.layer_enabled(DataLayer.ADDRESS)
-
+ and self.details.layer_enabled(DataLayer.ADDRESS)
@property
def configured_for_housenumbers(self) -> bool:
allow addresses in the result.
"""
return self.details.max_rank >= 30 \
- and self.details.layer_enabled(DataLayer.ADDRESS)
-
+ and self.details.layer_enabled(DataLayer.ADDRESS)
def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
""" Yield all possible abstract searches for the given token assignment.
near_items = self.get_near_items(assignment)
if near_items is not None and not near_items:
- return # impossible compbination of near items and category parameter
+ return # impossible combination of near items and category parameter
if assignment.name is None:
if near_items and not sdata.postcodes:
search.penalty += assignment.penalty
yield search
-
def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search query for a simple category search.
This kind of search requires an additional geographic constraint.
and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
yield dbs.PoiSearch(sdata)
-
def build_special_search(self, sdata: dbf.SearchData,
address: List[TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
penalty += 0.2
yield dbs.PostcodeSearch(penalty, sdata)
-
def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the
expected_count = sum(t.count for t in hnrs)
partials = {t.token: t.addr_count for trange in address
- for t in self.query.get_partials_list(trange)}
+ for t in self.query.get_partials_list(trange)}
+
+ if not partials:
+ # can happen when none of the partials is indexed
+ return
if expected_count < 8000:
sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
sdata.housenumbers = dbf.WeightedStrings([], [])
yield dbs.PlaceSearch(0.05, sdata, expected_count)
-
def build_name_search(self, sdata: dbf.SearchData,
name: TokenRange, address: List[TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
-
- def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
- -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
+ def yield_lookups(self, name: TokenRange, address: List[TokenRange]
+ ) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
are and tries to find a lookup that optimizes index use.
"""
- penalty = 0.0 # extra penalty
+ penalty = 0.0 # extra penalty
name_partials = {t.token: t for t in self.query.get_partials_list(name)}
addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
addr_tokens = list({t.token for t in addr_partials})
- partials_indexed = all(t.is_indexed for t in name_partials.values()) \
- and all(t.is_indexed for t in addr_partials)
exp_count = min(t.count for t in name_partials.values()) / (2**(len(name_partials) - 1))
- if (len(name_partials) > 3 or exp_count < 8000) and partials_indexed:
+ if (len(name_partials) > 3 or exp_count < 8000):
yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
name_fulls = self.query.get_tokens(name, TokenType.WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
- if partials_indexed:
- penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
if fulls_count < 50000 or addr_count < 30000:
- yield penalty,fulls_count / (2**len(addr_tokens)), \
+ yield penalty, fulls_count / (2**len(addr_tokens)), \
self.get_full_name_ranking(name_fulls, addr_partials,
fulls_count > 30000 / max(1, len(addr_tokens)))
# To catch remaining results, lookup by name and address
# We only do this if there is a reasonable number of results expected.
exp_count = exp_count / (2**len(addr_tokens)) if addr_tokens else exp_count
- if exp_count < 10000 and addr_count < 20000\
- and all(t.is_indexed for t in name_partials.values()):
+ if exp_count < 10000 and addr_count < 20000:
penalty += 0.35 * max(1 if name_fulls else 0.1,
5 - len(name_partials) - len(addr_tokens))
- yield penalty, exp_count,\
- self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
-
+ yield penalty, exp_count, \
+ self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int],
addr_partials: List[Token]) -> List[dbf.FieldLookup]:
addr_restrict_tokens = []
addr_lookup_tokens = []
for t in addr_partials:
- if t.is_indexed:
- if t.addr_count > 20000:
- addr_restrict_tokens.append(t.token)
- else:
- addr_lookup_tokens.append(t.token)
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
if addr_restrict_tokens:
lookup.append(dbf.FieldLookup('nameaddress_vector',
return lookup
-
def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
addr_restrict_tokens = []
addr_lookup_tokens = []
for t in addr_partials:
- if t.is_indexed:
- if t.addr_count > 20000:
- addr_restrict_tokens.append(t.token)
- else:
- addr_lookup_tokens.append(t.token)
+ if t.addr_count > 20000:
+ addr_restrict_tokens.append(t.token)
+ else:
+ addr_lookup_tokens.append(t.token)
else:
- addr_restrict_tokens = [t.token for t in addr_partials if t.is_indexed]
+ addr_restrict_tokens = [t.token for t in addr_partials]
addr_lookup_tokens = []
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
-
def get_name_ranking(self, trange: TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
-
def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term
for the given ranges.
heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
ranks: List[dbf.RankedTokens] = []
- while todo: # pylint: disable=too-many-nested-blocks
+ while todo:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
return dbf.FieldRanking('nameaddress_vector', default, ranks)
-
def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
""" Collect the tokens for the non-name search fields in the
assignment.
return sdata
-
def get_country_tokens(self, trange: TokenRange) -> List[Token]:
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
return tokens
-
def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
return tokens
-
def get_near_items(self, assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
""" Collect tokens for near items search or use the categories
requested per parameter.