import heapq
from ..types import SearchDetails, DataLayer
- from .query import QueryStruct, Token, TokenType, TokenRange, BreakType
+ from . import query as qmod
from .token_assignment import TokenAssignment
from . import db_search_fields as dbf
from . import db_searches as dbs
""" Build the abstract search queries from token assignments.
"""
- def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
+ def __init__(self, query: qmod.QueryStruct, details: SearchDetails) -> None:
self.query = query
self.details = details
builder = self.build_poi_search(sdata)
elif assignment.housenumber:
hnr_tokens = self.query.get_tokens(assignment.housenumber,
- TokenType.HOUSENUMBER)
+ qmod.TOKEN_HOUSENUMBER)
builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
else:
builder = self.build_special_search(sdata, assignment.address,
yield dbs.PoiSearch(sdata)
def build_special_search(self, sdata: dbf.SearchData,
- address: List[TokenRange],
+ address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for searches that do not involve
a named place.
lookups.Restrict)]
yield dbs.PostcodeSearch(penalty, sdata)
- def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
- address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
+ def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token],
+ address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]:
""" Build a simple address search for special entries where the
housenumber is the main name token.
"""
list(partials), lookups.LookupAll))
else:
addr_fulls = [t.token for t
- in self.query.get_tokens(address[0], TokenType.WORD)]
+ in self.query.get_tokens(address[0], qmod.TOKEN_WORD)]
if len(addr_fulls) > 5:
return
sdata.lookups.append(
yield dbs.PlaceSearch(0.05, sdata, expected_count)
def build_name_search(self, sdata: dbf.SearchData,
- name: TokenRange, address: List[TokenRange],
+ name: qmod.TokenRange, address: List[qmod.TokenRange],
is_category: bool) -> Iterator[dbs.AbstractSearch]:
""" Build abstract search queries for simple name or address searches.
"""
sdata.lookups = lookup
yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
- def yield_lookups(self, name: TokenRange, address: List[TokenRange]
+ def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange]
) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
""" Yield all variants how the given name and address should best
be searched for. This takes into account how frequent the terms
yield penalty, exp_count, dbf.lookup_by_names(list(name_partials.keys()), addr_tokens)
return
- addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000
+ addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 50000
# Partial term to frequent. Try looking up by rare full names first.
- name_fulls = self.query.get_tokens(name, TokenType.WORD)
+ name_fulls = self.query.get_tokens(name, qmod.TOKEN_WORD)
if name_fulls:
fulls_count = sum(t.count for t in name_fulls)
- if fulls_count < 50000 or addr_count < 30000:
+ if fulls_count < 80000 or addr_count < 50000:
yield penalty, fulls_count / (2**len(addr_tokens)), \
self.get_full_name_ranking(name_fulls, addr_partials,
fulls_count > 30000 / max(1, len(addr_tokens)))
self.get_name_address_ranking(list(name_partials.keys()), addr_partials)
def get_name_address_ranking(self, name_tokens: List[int],
- addr_partials: List[Token]) -> List[dbf.FieldLookup]:
+ addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]:
""" Create a ranking expression looking up by name and address.
"""
lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)]
return lookup
- def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token],
+ def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token],
use_lookup: bool) -> List[dbf.FieldLookup]:
""" Create a ranking expression with full name terms and
additional address lookup. When 'use_lookup' is true, then
# This might yield wrong results, nothing we can do about that.
if use_lookup:
addr_restrict_tokens = []
- addr_lookup_tokens = []
- for t in addr_partials:
- if t.addr_count > 20000:
- addr_restrict_tokens.append(t.token)
- else:
- addr_lookup_tokens.append(t.token)
+ addr_lookup_tokens = [t.token for t in addr_partials]
else:
addr_restrict_tokens = [t.token for t in addr_partials]
addr_lookup_tokens = []
return dbf.lookup_by_any_name([t.token for t in name_fulls],
addr_restrict_tokens, addr_lookup_tokens)
- def get_name_ranking(self, trange: TokenRange,
+ def get_name_ranking(self, trange: qmod.TokenRange,
db_field: str = 'name_vector') -> dbf.FieldRanking:
""" Create a ranking expression for a name term in the given range.
"""
- name_fulls = self.query.get_tokens(trange, TokenType.WORD)
+ name_fulls = self.query.get_tokens(trange, qmod.TOKEN_WORD)
ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
ranks.sort(key=lambda r: r.penalty)
# Fallback, sum of penalty for partials
default = sum(t.penalty for t in name_partials) + 0.2
return dbf.FieldRanking(db_field, default, ranks)
- def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
+ def get_addr_ranking(self, trange: qmod.TokenRange) -> dbf.FieldRanking:
""" Create a list of ranking expressions for an address term
for the given ranges.
"""
while todo:
neglen, pos, rank = heapq.heappop(todo)
for tlist in self.query.nodes[pos].starting:
- if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
+ if tlist.ttype in (qmod.TOKEN_PARTIAL, qmod.TOKEN_WORD):
if tlist.end < trange.end:
chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
- if tlist.ttype == TokenType.PARTIAL:
+ if tlist.ttype == qmod.TOKEN_PARTIAL:
penalty = rank.penalty + chgpenalty \
+ max(t.penalty for t in tlist.tokens)
heapq.heappush(todo, (neglen - 1, tlist.end,
heapq.heappush(todo, (neglen - 1, tlist.end,
rank.with_token(t, chgpenalty)))
elif tlist.end == trange.end:
- if tlist.ttype == TokenType.PARTIAL:
+ if tlist.ttype == qmod.TOKEN_PARTIAL:
ranks.append(dbf.RankedTokens(rank.penalty
+ max(t.penalty for t in tlist.tokens),
rank.tokens))
if assignment.housenumber:
sdata.set_strings('housenumbers',
self.query.get_tokens(assignment.housenumber,
- TokenType.HOUSENUMBER))
+ qmod.TOKEN_HOUSENUMBER))
if assignment.postcode:
sdata.set_strings('postcodes',
self.query.get_tokens(assignment.postcode,
- TokenType.POSTCODE))
+ qmod.TOKEN_POSTCODE))
if assignment.qualifier:
tokens = self.get_qualifier_tokens(assignment.qualifier)
if not tokens:
return sdata
- def get_country_tokens(self, trange: TokenRange) -> List[Token]:
+ def get_country_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of country tokens for the given range,
optionally filtered by the country list from the details
parameters.
"""
- tokens = self.query.get_tokens(trange, TokenType.COUNTRY)
+ tokens = self.query.get_tokens(trange, qmod.TOKEN_COUNTRY)
if self.details.countries:
tokens = [t for t in tokens if t.lookup_word in self.details.countries]
return tokens
- def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]:
+ def get_qualifier_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]:
""" Return the list of qualifier tokens for the given range,
optionally filtered by the qualifier list from the details
parameters.
"""
- tokens = self.query.get_tokens(trange, TokenType.QUALIFIER)
+ tokens = self.query.get_tokens(trange, qmod.TOKEN_QUALIFIER)
if self.details.categories:
tokens = [t for t in tokens if t.get_category() in self.details.categories]
"""
if assignment.near_item:
tokens: Dict[Tuple[str, str], float] = {}
- for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM):
+ for t in self.query.get_tokens(assignment.near_item, qmod.TOKEN_NEAR_ITEM):
cat = t.get_category()
# The category of a near search will be that of near_item.
# Thus, if search is restricted to a category parameter,
PENALTY_WORDCHANGE = {
- BreakType.START: 0.0,
- BreakType.END: 0.0,
- BreakType.PHRASE: 0.0,
- BreakType.SOFT_PHRASE: 0.0,
- BreakType.WORD: 0.1,
- BreakType.PART: 0.2,
- BreakType.TOKEN: 0.4
+ qmod.BREAK_START: 0.0,
+ qmod.BREAK_END: 0.0,
+ qmod.BREAK_PHRASE: 0.0,
+ qmod.BREAK_SOFT_PHRASE: 0.0,
+ qmod.BREAK_WORD: 0.1,
+ qmod.BREAK_PART: 0.2,
+ qmod.BREAK_TOKEN: 0.4
}
DB_TO_TOKEN_TYPE = {
- 'W': qmod.TokenType.WORD,
- 'w': qmod.TokenType.PARTIAL,
- 'H': qmod.TokenType.HOUSENUMBER,
- 'P': qmod.TokenType.POSTCODE,
- 'C': qmod.TokenType.COUNTRY
+ 'W': qmod.TOKEN_WORD,
+ 'w': qmod.TOKEN_PARTIAL,
+ 'H': qmod.TOKEN_HOUSENUMBER,
+ 'P': qmod.TOKEN_POSTCODE,
+ 'C': qmod.TOKEN_COUNTRY
}
PENALTY_IN_TOKEN_BREAK = {
- qmod.BreakType.START: 0.5,
- qmod.BreakType.END: 0.5,
- qmod.BreakType.PHRASE: 0.5,
- qmod.BreakType.SOFT_PHRASE: 0.5,
- qmod.BreakType.WORD: 0.1,
- qmod.BreakType.PART: 0.0,
- qmod.BreakType.TOKEN: 0.0
+ qmod.BREAK_START: 0.5,
+ qmod.BREAK_END: 0.5,
+ qmod.BREAK_PHRASE: 0.5,
+ qmod.BREAK_SOFT_PHRASE: 0.5,
+ qmod.BREAK_WORD: 0.1,
+ qmod.BREAK_PART: 0.0,
+ qmod.BREAK_TOKEN: 0.0
}
@dataclasses.dataclass
class QueryPart:
""" Normalized and transliterated form of a single term in the query.
+
When the term came out of a split during the transliteration,
the normalized string is the full word before transliteration.
- The word number keeps track of the word before transliteration
- and can be used to identify partial transliterated terms.
+ Check the subsequent break type to figure out if the word is
+ continued.
+
Penalty is the break penalty for the break following the token.
"""
token: str
normalized: str
- word_number: int
penalty: float
WordDict = Dict[str, List[qmod.TokenRange]]
- def yield_words(terms: List[QueryPart], start: int) -> Iterator[Tuple[str, qmod.TokenRange]]:
- """ Return all combinations of words in the terms list after the
- given position.
+ def extract_words(terms: List[QueryPart], start: int, words: WordDict) -> None:
+ """ Add all combinations of words in the terms list after the
+ given position to the word list.
"""
total = len(terms)
+ base_penalty = PENALTY_IN_TOKEN_BREAK[qmod.BREAK_WORD]
for first in range(start, total):
word = terms[first].token
- penalty = PENALTY_IN_TOKEN_BREAK[qmod.BreakType.WORD]
- yield word, qmod.TokenRange(first, first + 1, penalty=penalty)
+ penalty = base_penalty
+ words[word].append(qmod.TokenRange(first, first + 1, penalty=penalty))
for last in range(first + 1, min(first + 20, total)):
word = ' '.join((word, terms[last].token))
penalty += terms[last - 1].penalty
- yield word, qmod.TokenRange(first, last + 1, penalty=penalty)
+ words[word].append(qmod.TokenRange(first, last + 1, penalty=penalty))
@dataclasses.dataclass
log().section('Analyze query (using ICU tokenizer)')
for func in self.preprocessors:
phrases = func(phrases)
+
+ if len(phrases) == 1 \
+ and phrases[0].text.count(' ') > 3 \
+ and max(len(s) for s in phrases[0].text.split()) < 3:
+ normalized = []
+
query = qmod.QueryStruct(phrases)
log().var_dump('Normalized query', query.source)
if row.type == 'S':
if row.info['op'] in ('in', 'near'):
if trange.start == 0:
- query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
+ query.add_token(trange, qmod.TOKEN_NEAR_ITEM, token)
else:
if trange.start == 0 and trange.end == query.num_token_slots():
- query.add_token(trange, qmod.TokenType.NEAR_ITEM, token)
+ query.add_token(trange, qmod.TOKEN_NEAR_ITEM, token)
else:
- query.add_token(trange, qmod.TokenType.QUALIFIER, token)
+ query.add_token(trange, qmod.TOKEN_QUALIFIER, token)
else:
query.add_token(trange, DB_TO_TOKEN_TYPE[row.type], token)
"""
parts: QueryParts = []
phrase_start = 0
- words = defaultdict(list)
- wordnr = 0
+ words: WordDict = defaultdict(list)
for phrase in query.source:
query.nodes[-1].ptype = phrase.ptype
phrase_split = re.split('([ :-])', phrase.text)
if trans:
for term in trans.split(' '):
if term:
- parts.append(QueryPart(term, word, wordnr,
- PENALTY_IN_TOKEN_BREAK[qmod.BreakType.TOKEN]))
- query.add_node(qmod.BreakType.TOKEN, phrase.ptype)
- query.nodes[-1].btype = qmod.BreakType(breakchar)
- parts[-1].penalty = PENALTY_IN_TOKEN_BREAK[qmod.BreakType(breakchar)]
- wordnr += 1
+ parts.append(QueryPart(term, word,
+ PENALTY_IN_TOKEN_BREAK[qmod.BREAK_TOKEN]))
+ query.add_node(qmod.BREAK_TOKEN, phrase.ptype)
+ query.nodes[-1].btype = breakchar
+ parts[-1].penalty = PENALTY_IN_TOKEN_BREAK[breakchar]
- for word, wrange in yield_words(parts, phrase_start):
- words[word].append(wrange)
+ extract_words(parts, phrase_start, words)
phrase_start = len(parts)
- query.nodes[-1].btype = qmod.BreakType.END
+ query.nodes[-1].btype = qmod.BREAK_END
return parts, words
"""
for part, node, i in zip(parts, query.nodes, range(1000)):
if len(part.token) <= 4 and part.token.isdigit()\
- and not node.has_tokens(i+1, qmod.TokenType.HOUSENUMBER):
- query.add_token(qmod.TokenRange(i, i+1), qmod.TokenType.HOUSENUMBER,
+ and not node.has_tokens(i+1, qmod.TOKEN_HOUSENUMBER):
+ query.add_token(qmod.TokenRange(i, i+1), qmod.TOKEN_HOUSENUMBER,
ICUToken(penalty=0.5, token=0,
count=1, addr_count=1, lookup_word=part.token,
word_token=part.token, info=None))
""" Add penalties to tokens that depend on presence of other token.
"""
for i, node, tlist in query.iter_token_lists():
- if tlist.ttype == qmod.TokenType.POSTCODE:
+ if tlist.ttype == qmod.TOKEN_POSTCODE:
for repl in node.starting:
- if repl.end == tlist.end and repl.ttype != qmod.TokenType.POSTCODE \
- and (repl.ttype != qmod.TokenType.HOUSENUMBER
+ if repl.end == tlist.end and repl.ttype != qmod.TOKEN_POSTCODE \
+ and (repl.ttype != qmod.TOKEN_HOUSENUMBER
or len(tlist.tokens[0].lookup_word) > 4):
repl.add_penalty(0.39)
- elif (tlist.ttype == qmod.TokenType.HOUSENUMBER
+ elif (tlist.ttype == qmod.TOKEN_HOUSENUMBER
and len(tlist.tokens[0].lookup_word) <= 3):
if any(c.isdigit() for c in tlist.tokens[0].lookup_word):
for repl in node.starting:
- if repl.end == tlist.end and repl.ttype != qmod.TokenType.HOUSENUMBER:
+ if repl.end == tlist.end and repl.ttype != qmod.TOKEN_HOUSENUMBER:
repl.add_penalty(0.5 - tlist.tokens[0].penalty)
- elif tlist.ttype not in (qmod.TokenType.COUNTRY, qmod.TokenType.PARTIAL):
+ elif tlist.ttype not in (qmod.TOKEN_COUNTRY, qmod.TOKEN_PARTIAL):
norm = parts[i].normalized
for j in range(i + 1, tlist.end):
- if parts[j - 1].word_number != parts[j].word_number:
+ if node.btype != qmod.BREAK_TOKEN:
norm += ' ' + parts[j].normalized
for token in tlist.tokens:
cast(ICUToken, token).rematch(norm)
def _dump_transliterated(query: qmod.QueryStruct, parts: QueryParts) -> str:
- out = query.nodes[0].btype.value
+ out = query.nodes[0].btype
for node, part in zip(query.nodes[1:], parts):
- out += part.token + node.btype.value
+ out += part.token + node.btype
return out
for tlist in node.starting:
for token in tlist.tokens:
t = cast(ICUToken, token)
- yield [tlist.ttype.name, t.token, t.word_token or '',
+ yield [tlist.ttype, t.token, t.word_token or '',
t.lookup_word or '', t.penalty, t.count, t.info]