import heapq
from ..types import SearchDetails, DataLayer
-from .query import QueryStruct, Token, TokenType, TokenRange, BreakType
+from . import query as qmod
from .token_assignment import TokenAssignment
from . import db_search_fields as dbf
from . import db_searches as dbs
""" Build the abstract search queries from token assignments.
"""
- def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
+ def __init__(self, query: qmod.QueryStruct, details: SearchDetails) -> None:
self.query = query
self.details = details
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
- TokenType.HOUSENUMBER)
+ qmod.TokenType.HOUSENUMBER)
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
yield dbs.PoiSearch(sdata)
def build_special_search(self, sdata: dbf.SearchData,
- address: List[TokenRange],
+ address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
[t.token for r in address
for t in self.query.get_partials_list(r)],
lookups.Restrict)]
- penalty += 0.2
yield dbs.PostcodeSearch(penalty, sdata)
- def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
- address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
+ def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token],
+ address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
list(partials), lookups.LookupAll))
else:
addr_fulls = [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)]
+ in self.query.get_tokens(address[0], qmod.TokenType.WORD)]
if len(addr_fulls) > 5:
return
sdata.lookups.append(
yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
- name: TokenRange, address: List[TokenRange],
+ name: qmod.TokenRange, address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches.
"""
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
- def yield_lookups(self, name: TokenRange, address: List[TokenRange]
+ def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000
# Partial term to frequent. Try looking up by rare full names first.
- name_fulls = self.query.get_tokens(name, TokenType.WORD)
+ name_fulls = self.query.get_tokens(name, qmod.TokenType.WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int],
- addr_partials: List[Token]) -> List[dbf.FieldLookup]:
+ addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]:
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
return lookup
- def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
+ def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
- def get_name_ranking(self, trange: TokenRange,
+ def get_name_ranking(self, trange: qmod.TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
- name_fulls = self.query.get_tokens(trange, TokenType.WORD)
+ name_fulls = self.query.get_tokens(trange, qmod.TokenType.WORD)
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
- def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+ def get_addr_ranking(self, trange: qmod.TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term
for the given ranges.
"""
while todo:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
- if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
+ if tlist.ttype in (qmod.TokenType.PARTIAL, qmod.TokenType.WORD):
if tlist.end < trange.end:
chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
- if tlist.ttype == TokenType.PARTIAL:
+ if tlist.ttype == qmod.TokenType.PARTIAL:
penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end,
heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end:
- if tlist.ttype == TokenType.PARTIAL:
+ if tlist.ttype == qmod.TokenType.PARTIAL:
ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens),
rank.tokens))
if assignment.housenumber:
sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber,
- TokenType.HOUSENUMBER))
+ qmod.TokenType.HOUSENUMBER))
if assignment.postcode:
sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode,
- TokenType.POSTCODE))
+ qmod.TokenType.POSTCODE))
if assignment.qualifier:
tokens = self.get_qualifier_tokens(assignment.qualifier)
if not tokens:
return sdata
- def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ def get_country_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
parameters.
"""
- tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ tokens = self.query.get_tokens(trange, qmod.TokenType.COUNTRY)
if self.details.countries:
tokens = [t for t in tokens if t.lookup_word in self.details.countries]
return tokens
- def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ def get_qualifier_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
parameters.
"""
- tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
+ tokens = self.query.get_tokens(trange, qmod.TokenType.QUALIFIER)
if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories]
"""
if assignment.near_item:
tokens: Dict[Tuple[str, str], float] = {}
- for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
+ for t in self.query.get_tokens(assignment.near_item, qmod.TokenType.NEAR_ITEM):
cat = t.get_category()
# The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter,
PENALTY_WORDCHANGE = {
- BreakType.START: 0.0,
- BreakType.END: 0.0,
- BreakType.PHRASE: 0.0,
- BreakType.WORD: 0.1,
- BreakType.PART: 0.2,
- BreakType.TOKEN: 0.4
+ qmod.BREAK_START: 0.0,
+ qmod.BREAK_END: 0.0,
+ qmod.BREAK_PHRASE: 0.0,
+ qmod.BREAK_SOFT_PHRASE: 0.0,
+ qmod.BREAK_WORD: 0.1,
+ qmod.BREAK_PART: 0.2,
+ qmod.BREAK_TOKEN: 0.4
}