X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/671af4cff24ed671bed414160a73ea3541907d29..157414a053cde0133a001964264254a62f231be5:/src/nominatim_api/search/db_search_builder.py diff --git a/src/nominatim_api/search/db_search_builder.py b/src/nominatim_api/search/db_search_builder.py index a6335c13..2c5d0d18 100644 --- a/src/nominatim_api/search/db_search_builder.py +++ b/src/nominatim_api/search/db_search_builder.py @@ -11,7 +11,7 @@ from typing import Optional, List, Tuple, Iterator, Dict import heapq from ..types import SearchDetails, DataLayer -from .query import QueryStruct, Token, TokenType, TokenRange, BreakType +from . import query as qmod from .token_assignment import TokenAssignment from . import db_search_fields as dbf from . import db_searches as dbs @@ -51,7 +51,7 @@ class SearchBuilder: """ Build the abstract search queries from token assignments. """ - def __init__(self, query: QueryStruct, details: SearchDetails) -> None: + def __init__(self, query: qmod.QueryStruct, details: SearchDetails) -> None: self.query = query self.details = details @@ -97,7 +97,7 @@ class SearchBuilder: builder = self.build_poi_search(sdata) elif assignment.housenumber: hnr_tokens = self.query.get_tokens(assignment.housenumber, - TokenType.HOUSENUMBER) + qmod.TOKEN_HOUSENUMBER) builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address) else: builder = self.build_special_search(sdata, assignment.address, @@ -128,7 +128,7 @@ class SearchBuilder: yield dbs.PoiSearch(sdata) def build_special_search(self, sdata: dbf.SearchData, - address: List[TokenRange], + address: List[qmod.TokenRange], is_category: bool) -> Iterator[dbs.AbstractSearch]: """ Build abstract search queries for searches that do not involve a named place. @@ -148,11 +148,10 @@ class SearchBuilder: [t.token for r in address for t in self.query.get_partials_list(r)], lookups.Restrict)] - penalty += 0.2 yield dbs.PostcodeSearch(penalty, sdata) - def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token], - address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]: + def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[qmod.Token], + address: List[qmod.TokenRange]) -> Iterator[dbs.AbstractSearch]: """ Build a simple address search for special entries where the housenumber is the main name token. """ @@ -174,7 +173,7 @@ class SearchBuilder: list(partials), lookups.LookupAll)) else: addr_fulls = [t.token for t - in self.query.get_tokens(address[0], TokenType.WORD)] + in self.query.get_tokens(address[0], qmod.TOKEN_WORD)] if len(addr_fulls) > 5: return sdata.lookups.append( @@ -184,7 +183,7 @@ class SearchBuilder: yield dbs.PlaceSearch(0.05, sdata, expected_count) def build_name_search(self, sdata: dbf.SearchData, - name: TokenRange, address: List[TokenRange], + name: qmod.TokenRange, address: List[qmod.TokenRange], is_category: bool) -> Iterator[dbs.AbstractSearch]: """ Build abstract search queries for simple name or address searches. """ @@ -197,7 +196,7 @@ class SearchBuilder: sdata.lookups = lookup yield dbs.PlaceSearch(penalty + name_penalty, sdata, count) - def yield_lookups(self, name: TokenRange, address: List[TokenRange] + def yield_lookups(self, name: qmod.TokenRange, address: List[qmod.TokenRange] ) -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]: """ Yield all variants how the given name and address should best be searched for. This takes into account how frequent the terms @@ -217,7 +216,7 @@ class SearchBuilder: addr_count = min(t.addr_count for t in addr_partials) if addr_partials else 30000 # Partial term to frequent. Try looking up by rare full names first. - name_fulls = self.query.get_tokens(name, TokenType.WORD) + name_fulls = self.query.get_tokens(name, qmod.TOKEN_WORD) if name_fulls: fulls_count = sum(t.count for t in name_fulls) @@ -236,7 +235,7 @@ class SearchBuilder: self.get_name_address_ranking(list(name_partials.keys()), addr_partials) def get_name_address_ranking(self, name_tokens: List[int], - addr_partials: List[Token]) -> List[dbf.FieldLookup]: + addr_partials: List[qmod.Token]) -> List[dbf.FieldLookup]: """ Create a ranking expression looking up by name and address. """ lookup = [dbf.FieldLookup('name_vector', name_tokens, lookups.LookupAll)] @@ -258,7 +257,7 @@ class SearchBuilder: return lookup - def get_full_name_ranking(self, name_fulls: List[Token], addr_partials: List[Token], + def get_full_name_ranking(self, name_fulls: List[qmod.Token], addr_partials: List[qmod.Token], use_lookup: bool) -> List[dbf.FieldLookup]: """ Create a ranking expression with full name terms and additional address lookup. When 'use_lookup' is true, then @@ -282,11 +281,11 @@ class SearchBuilder: return dbf.lookup_by_any_name([t.token for t in name_fulls], addr_restrict_tokens, addr_lookup_tokens) - def get_name_ranking(self, trange: TokenRange, + def get_name_ranking(self, trange: qmod.TokenRange, db_field: str = 'name_vector') -> dbf.FieldRanking: """ Create a ranking expression for a name term in the given range. """ - name_fulls = self.query.get_tokens(trange, TokenType.WORD) + name_fulls = self.query.get_tokens(trange, qmod.TOKEN_WORD) ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls] ranks.sort(key=lambda r: r.penalty) # Fallback, sum of penalty for partials @@ -294,7 +293,7 @@ class SearchBuilder: default = sum(t.penalty for t in name_partials) + 0.2 return dbf.FieldRanking(db_field, default, ranks) - def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking: + def get_addr_ranking(self, trange: qmod.TokenRange) -> dbf.FieldRanking: """ Create a list of ranking expressions for an address term for the given ranges. """ @@ -305,10 +304,10 @@ class SearchBuilder: while todo: neglen, pos, rank = heapq.heappop(todo) for tlist in self.query.nodes[pos].starting: - if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD): + if tlist.ttype in (qmod.TOKEN_PARTIAL, qmod.TOKEN_WORD): if tlist.end < trange.end: chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype] - if tlist.ttype == TokenType.PARTIAL: + if tlist.ttype == qmod.TOKEN_PARTIAL: penalty = rank.penalty + chgpenalty \ + max(t.penalty for t in tlist.tokens) heapq.heappush(todo, (neglen - 1, tlist.end, @@ -318,7 +317,7 @@ class SearchBuilder: heapq.heappush(todo, (neglen - 1, tlist.end, rank.with_token(t, chgpenalty))) elif tlist.end == trange.end: - if tlist.ttype == TokenType.PARTIAL: + if tlist.ttype == qmod.TOKEN_PARTIAL: ranks.append(dbf.RankedTokens(rank.penalty + max(t.penalty for t in tlist.tokens), rank.tokens)) @@ -358,11 +357,11 @@ class SearchBuilder: if assignment.housenumber: sdata.set_strings('housenumbers', self.query.get_tokens(assignment.housenumber, - TokenType.HOUSENUMBER)) + qmod.TOKEN_HOUSENUMBER)) if assignment.postcode: sdata.set_strings('postcodes', self.query.get_tokens(assignment.postcode, - TokenType.POSTCODE)) + qmod.TOKEN_POSTCODE)) if assignment.qualifier: tokens = self.get_qualifier_tokens(assignment.qualifier) if not tokens: @@ -387,23 +386,23 @@ class SearchBuilder: return sdata - def get_country_tokens(self, trange: TokenRange) -> List[Token]: + def get_country_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]: """ Return the list of country tokens for the given range, optionally filtered by the country list from the details parameters. """ - tokens = self.query.get_tokens(trange, TokenType.COUNTRY) + tokens = self.query.get_tokens(trange, qmod.TOKEN_COUNTRY) if self.details.countries: tokens = [t for t in tokens if t.lookup_word in self.details.countries] return tokens - def get_qualifier_tokens(self, trange: TokenRange) -> List[Token]: + def get_qualifier_tokens(self, trange: qmod.TokenRange) -> List[qmod.Token]: """ Return the list of qualifier tokens for the given range, optionally filtered by the qualifier list from the details parameters. """ - tokens = self.query.get_tokens(trange, TokenType.QUALIFIER) + tokens = self.query.get_tokens(trange, qmod.TOKEN_QUALIFIER) if self.details.categories: tokens = [t for t in tokens if t.get_category() in self.details.categories] @@ -416,7 +415,7 @@ class SearchBuilder: """ if assignment.near_item: tokens: Dict[Tuple[str, str], float] = {} - for t in self.query.get_tokens(assignment.near_item, TokenType.NEAR_ITEM): + for t in self.query.get_tokens(assignment.near_item, qmod.TOKEN_NEAR_ITEM): cat = t.get_category() # The category of a near search will be that of near_item. # Thus, if search is restricted to a category parameter, @@ -430,11 +429,11 @@ class SearchBuilder: PENALTY_WORDCHANGE = { - BreakType.START: 0.0, - BreakType.END: 0.0, - BreakType.PHRASE: 0.0, - BreakType.SOFT_PHRASE: 0.0, - BreakType.WORD: 0.1, - BreakType.PART: 0.2, - BreakType.TOKEN: 0.4 + qmod.BREAK_START: 0.0, + qmod.BREAK_END: 0.0, + qmod.BREAK_PHRASE: 0.0, + qmod.BREAK_SOFT_PHRASE: 0.0, + qmod.BREAK_WORD: 0.1, + qmod.BREAK_PART: 0.2, + qmod.BREAK_TOKEN: 0.4 }