1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Convertion from token assignment to an abstract DB search.
10 from typing import Optional, List, Tuple, Iterator
13 from nominatim.api.types import SearchDetails, DataLayer
14 from nominatim.api.search.query import QueryStruct, TokenType, TokenRange, BreakType
15 from nominatim.api.search.token_assignment import TokenAssignment
16 import nominatim.api.search.db_search_fields as dbf
17 import nominatim.api.search.db_searches as dbs
18 from nominatim.api.logging import log
21 """ Build the abstract search queries from token assignments.
24 def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
26 self.details = details
30 def configured_for_country(self) -> bool:
31 """ Return true if the search details are configured to
32 allow countries in the result.
34 return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
35 and self.details.layer_enabled(DataLayer.ADDRESS)
39 def configured_for_postcode(self) -> bool:
40 """ Return true if the search details are configured to
41 allow postcodes in the result.
43 return self.details.min_rank <= 5 and self.details.max_rank >= 11\
44 and self.details.layer_enabled(DataLayer.ADDRESS)
48 def configured_for_housenumbers(self) -> bool:
49 """ Return true if the search details are configured to
50 allow addresses in the result.
52 return self.details.max_rank >= 30 \
53 and self.details.layer_enabled(DataLayer.ADDRESS)
56 def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
57 """ Yield all possible abstract searches for the given token assignment.
59 sdata = self.get_search_data(assignment)
63 categories = self.get_search_categories(assignment)
65 if assignment.name is None:
66 if categories and not sdata.postcodes:
67 sdata.qualifiers = categories
69 builder = self.build_poi_search(sdata)
71 builder = self.build_special_search(sdata, assignment.address,
74 builder = self.build_name_search(sdata, assignment.name, assignment.address,
78 penalty = min(categories.penalties)
79 categories.penalties = [p - penalty for p in categories.penalties]
80 for search in builder:
81 yield dbs.NearSearch(penalty, categories, search)
86 def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
87 """ Build abstract search query for a simple category search.
88 This kind of search requires an additional geographic constraint.
90 if not sdata.housenumbers \
91 and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
92 yield dbs.PoiSearch(sdata)
95 def build_special_search(self, sdata: dbf.SearchData,
96 address: List[TokenRange],
97 is_category: bool) -> Iterator[dbs.AbstractSearch]:
98 """ Build abstract search queries for searches that do not involve
101 if sdata.qualifiers or sdata.housenumbers:
102 # No special searches over housenumbers or qualifiers supported.
105 if sdata.countries and not address and not sdata.postcodes \
106 and self.configured_for_country:
107 yield dbs.CountrySearch(sdata)
109 if sdata.postcodes and (is_category or self.configured_for_postcode):
111 sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
112 [t.token for r in address
113 for t in self.query.get_partials_list(r)],
115 yield dbs.PostcodeSearch(0.4, sdata)
118 def build_name_search(self, sdata: dbf.SearchData,
119 name: TokenRange, address: List[TokenRange],
120 is_category: bool) -> Iterator[dbs.AbstractSearch]:
121 """ Build abstract search queries for simple name or address searches.
123 if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
124 sdata.rankings.append(self.get_name_ranking(name))
125 name_penalty = sdata.rankings[-1].normalize_penalty()
126 for penalty, count, lookup in self.yield_lookups(name, address):
127 sdata.lookups = lookup
128 yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
131 def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
132 -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
133 """ Yield all variants how the given name and address should best
134 be searched for. This takes into account how frequent the terms
135 are and tries to find a lookup that optimizes index use.
137 penalty = 0.0 # extra penalty currently unused
139 name_partials = self.query.get_partials_list(name)
140 exp_name_count = min(t.count for t in name_partials)
142 for trange in address:
143 addr_partials.extend(self.query.get_partials_list(trange))
144 addr_tokens = [t.token for t in addr_partials]
145 partials_indexed = all(t.is_indexed for t in name_partials) \
146 and all(t.is_indexed for t in addr_partials)
148 if (len(name_partials) > 3 or exp_name_count < 1000) and partials_indexed:
149 # Lookup by name partials, use address partials to restrict results.
150 lookup = [dbf.FieldLookup('name_vector',
151 [t.token for t in name_partials], 'lookup_all')]
153 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
154 yield penalty, exp_name_count, lookup
157 exp_addr_count = min(t.count for t in addr_partials) if addr_partials else exp_name_count
158 if exp_addr_count < 1000 and partials_indexed:
159 # Lookup by address partials and restrict results through name terms.
160 yield penalty, exp_addr_count,\
161 [dbf.FieldLookup('name_vector', [t.token for t in name_partials], 'restrict'),
162 dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all')]
165 # Partial term to frequent. Try looking up by rare full names first.
166 name_fulls = self.query.get_tokens(name, TokenType.WORD)
167 rare_names = list(filter(lambda t: t.count < 1000, name_fulls))
168 # At this point drop unindexed partials from the address.
169 # This might yield wrong results, nothing we can do about that.
170 if not partials_indexed:
171 addr_tokens = [t.token for t in addr_partials if t.is_indexed]
172 log().var_dump('before', penalty)
173 penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
174 log().var_dump('after', penalty)
176 # Any of the full names applies with all of the partials from the address
177 lookup = [dbf.FieldLookup('name_vector', [t.token for t in rare_names], 'lookup_any')]
179 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'restrict'))
180 yield penalty, sum(t.count for t in rare_names), lookup
182 # To catch remaining results, lookup by name and address
183 if all(t.is_indexed for t in name_partials):
184 lookup = [dbf.FieldLookup('name_vector',
185 [t.token for t in name_partials], 'lookup_all')]
187 # we don't have the partials, try with the non-rare names
188 non_rare_names = [t.token for t in name_fulls if t.count >= 1000]
189 if not non_rare_names:
191 lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
193 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
194 yield penalty + 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens)),\
195 min(exp_name_count, exp_addr_count), lookup
198 def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
199 """ Create a ranking expression for a name term in the given range.
201 name_fulls = self.query.get_tokens(trange, TokenType.WORD)
202 ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
203 ranks.sort(key=lambda r: r.penalty)
204 # Fallback, sum of penalty for partials
205 name_partials = self.query.get_partials_list(trange)
206 default = sum(t.penalty for t in name_partials) + 0.2
207 return dbf.FieldRanking('name_vector', default, ranks)
210 def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
211 """ Create a list of ranking expressions for an address term
212 for the given ranges.
214 todo: List[Tuple[int, int, dbf.RankedTokens]] = []
215 heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
216 ranks: List[dbf.RankedTokens] = []
218 while todo: # pylint: disable=too-many-nested-blocks
219 neglen, pos, rank = heapq.heappop(todo)
220 for tlist in self.query.nodes[pos].starting:
221 if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
222 if tlist.end < trange.end:
223 chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
224 if tlist.ttype == TokenType.PARTIAL:
225 penalty = rank.penalty + chgpenalty \
226 + max(t.penalty for t in tlist.tokens)
227 heapq.heappush(todo, (neglen - 1, tlist.end,
228 dbf.RankedTokens(penalty, rank.tokens)))
230 for t in tlist.tokens:
231 heapq.heappush(todo, (neglen - 1, tlist.end,
232 rank.with_token(t, chgpenalty)))
233 elif tlist.end == trange.end:
234 if tlist.ttype == TokenType.PARTIAL:
235 ranks.append(dbf.RankedTokens(rank.penalty
236 + max(t.penalty for t in tlist.tokens),
239 ranks.extend(rank.with_token(t, 0.0) for t in tlist.tokens)
241 # Too many variants, bail out and only add
242 # Worst-case Fallback: sum of penalty of partials
243 name_partials = self.query.get_partials_list(trange)
244 default = sum(t.penalty for t in name_partials) + 0.2
245 ranks.append(dbf.RankedTokens(rank.penalty + default, []))
246 # Bail out of outer loop
250 ranks.sort(key=lambda r: len(r.tokens))
251 default = ranks[0].penalty + 0.3
253 ranks.sort(key=lambda r: r.penalty)
255 return dbf.FieldRanking('nameaddress_vector', default, ranks)
258 def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
259 """ Collect the tokens for the non-name search fields in the
262 sdata = dbf.SearchData()
263 sdata.penalty = assignment.penalty
264 if assignment.country:
265 tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
266 if self.details.countries:
267 tokens = [t for t in tokens if t.lookup_word in self.details.countries]
270 sdata.set_strings('countries', tokens)
271 elif self.details.countries:
272 sdata.countries = dbf.WeightedStrings(self.details.countries,
273 [0.0] * len(self.details.countries))
274 if assignment.housenumber:
275 sdata.set_strings('housenumbers',
276 self.query.get_tokens(assignment.housenumber,
277 TokenType.HOUSENUMBER))
278 if assignment.postcode:
279 sdata.set_strings('postcodes',
280 self.query.get_tokens(assignment.postcode,
282 if assignment.qualifier:
283 sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
284 TokenType.QUALIFIER))
286 if assignment.address:
287 sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
294 def get_search_categories(self,
295 assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
296 """ Collect tokens for category search or use the categories
297 requested per parameter.
298 Returns None if no category search is requested.
300 if assignment.category:
301 tokens = [t for t in self.query.get_tokens(assignment.category,
303 if not self.details.categories
304 or t.get_category() in self.details.categories]
305 return dbf.WeightedCategories([t.get_category() for t in tokens],
306 [t.penalty for t in tokens])
308 if self.details.categories:
309 return dbf.WeightedCategories(self.details.categories,
310 [0.0] * len(self.details.categories))
315 PENALTY_WORDCHANGE = {
316 BreakType.START: 0.0,
318 BreakType.PHRASE: 0.0,