1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Convertion from token assignment to an abstract DB search.
10 from typing import Optional, List, Tuple, Iterator
13 from nominatim.api.types import SearchDetails, DataLayer
14 from nominatim.api.search.query import QueryStruct, Token, TokenType, TokenRange, BreakType
15 from nominatim.api.search.token_assignment import TokenAssignment
16 import nominatim.api.search.db_search_fields as dbf
17 import nominatim.api.search.db_searches as dbs
20 def wrap_near_search(categories: List[Tuple[str, str]],
21 search: dbs.AbstractSearch) -> dbs.NearSearch:
22 """ Create a new search that wraps the given search in a search
23 for near places of the given category.
25 return dbs.NearSearch(penalty=search.penalty,
26 categories=dbf.WeightedCategories(categories,
27 [0.0] * len(categories)),
31 def build_poi_search(category: List[Tuple[str, str]],
32 countries: Optional[List[str]]) -> dbs.PoiSearch:
33 """ Create a new search for places by the given category, possibly
34 constraint to the given countries.
37 ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
39 ccs = dbf.WeightedStrings([], [])
41 class _PoiData(dbf.SearchData):
43 qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
46 return dbs.PoiSearch(_PoiData())
50 """ Build the abstract search queries from token assignments.
53 def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
55 self.details = details
59 def configured_for_country(self) -> bool:
60 """ Return true if the search details are configured to
61 allow countries in the result.
63 return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
64 and self.details.layer_enabled(DataLayer.ADDRESS)
68 def configured_for_postcode(self) -> bool:
69 """ Return true if the search details are configured to
70 allow postcodes in the result.
72 return self.details.min_rank <= 5 and self.details.max_rank >= 11\
73 and self.details.layer_enabled(DataLayer.ADDRESS)
77 def configured_for_housenumbers(self) -> bool:
78 """ Return true if the search details are configured to
79 allow addresses in the result.
81 return self.details.max_rank >= 30 \
82 and self.details.layer_enabled(DataLayer.ADDRESS)
85 def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
86 """ Yield all possible abstract searches for the given token assignment.
88 sdata = self.get_search_data(assignment)
92 categories = self.get_search_categories(assignment)
94 if assignment.name is None:
95 if categories and not sdata.postcodes:
96 sdata.qualifiers = categories
98 builder = self.build_poi_search(sdata)
99 elif assignment.housenumber:
100 hnr_tokens = self.query.get_tokens(assignment.housenumber,
101 TokenType.HOUSENUMBER)
102 builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
104 builder = self.build_special_search(sdata, assignment.address,
107 builder = self.build_name_search(sdata, assignment.name, assignment.address,
111 penalty = min(categories.penalties)
112 categories.penalties = [p - penalty for p in categories.penalties]
113 for search in builder:
114 yield dbs.NearSearch(penalty, categories, search)
119 def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
120 """ Build abstract search query for a simple category search.
121 This kind of search requires an additional geographic constraint.
123 if not sdata.housenumbers \
124 and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
125 yield dbs.PoiSearch(sdata)
128 def build_special_search(self, sdata: dbf.SearchData,
129 address: List[TokenRange],
130 is_category: bool) -> Iterator[dbs.AbstractSearch]:
131 """ Build abstract search queries for searches that do not involve
135 # No special searches over qualifiers supported.
138 if sdata.countries and not address and not sdata.postcodes \
139 and self.configured_for_country:
140 yield dbs.CountrySearch(sdata)
142 if sdata.postcodes and (is_category or self.configured_for_postcode):
143 penalty = 0.0 if sdata.countries else 0.1
145 sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
146 [t.token for r in address
147 for t in self.query.get_partials_list(r)],
150 yield dbs.PostcodeSearch(penalty, sdata)
153 def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
154 address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
155 """ Build a simple address search for special entries where the
156 housenumber is the main name token.
158 sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any')]
160 partials = [t for trange in address
161 for t in self.query.get_partials_list(trange)]
163 if len(partials) != 1 or partials[0].count < 10000:
164 sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
165 [t.token for t in partials], 'lookup_all'))
167 sdata.lookups.append(
168 dbf.FieldLookup('nameaddress_vector',
170 in self.query.get_tokens(address[0], TokenType.WORD)],
173 sdata.housenumbers = dbf.WeightedStrings([], [])
174 yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
177 def build_name_search(self, sdata: dbf.SearchData,
178 name: TokenRange, address: List[TokenRange],
179 is_category: bool) -> Iterator[dbs.AbstractSearch]:
180 """ Build abstract search queries for simple name or address searches.
182 if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
183 ranking = self.get_name_ranking(name)
184 name_penalty = ranking.normalize_penalty()
186 sdata.rankings.append(ranking)
187 for penalty, count, lookup in self.yield_lookups(name, address):
188 sdata.lookups = lookup
189 yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
192 def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
193 -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
194 """ Yield all variants how the given name and address should best
195 be searched for. This takes into account how frequent the terms
196 are and tries to find a lookup that optimizes index use.
198 penalty = 0.0 # extra penalty
199 name_partials = self.query.get_partials_list(name)
200 name_tokens = [t.token for t in name_partials]
202 addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
203 addr_tokens = [t.token for t in addr_partials]
205 partials_indexed = all(t.is_indexed for t in name_partials) \
206 and all(t.is_indexed for t in addr_partials)
207 exp_count = min(t.count for t in name_partials)
209 if (len(name_partials) > 3 or exp_count < 1000) and partials_indexed:
210 yield penalty, exp_count, dbf.lookup_by_names(name_tokens, addr_tokens)
213 exp_count = min(exp_count, min(t.count for t in addr_partials)) \
214 if addr_partials else exp_count
216 # Partial term to frequent. Try looking up by rare full names first.
217 name_fulls = self.query.get_tokens(name, TokenType.WORD)
218 rare_names = list(filter(lambda t: t.count < 10000, name_fulls))
219 # At this point drop unindexed partials from the address.
220 # This might yield wrong results, nothing we can do about that.
221 if not partials_indexed:
222 addr_tokens = [t.token for t in addr_partials if t.is_indexed]
223 penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
225 # Any of the full names applies with all of the partials from the address
226 yield penalty, sum(t.count for t in rare_names),\
227 dbf.lookup_by_any_name([t.token for t in rare_names], addr_tokens)
229 # To catch remaining results, lookup by name and address
230 # We only do this if there is a reasonable number of results expected.
231 if exp_count < 10000:
232 if all(t.is_indexed for t in name_partials):
233 lookup = [dbf.FieldLookup('name_vector', name_tokens, 'lookup_all')]
235 # we don't have the partials, try with the non-rare names
236 non_rare_names = [t.token for t in name_fulls if t.count >= 10000]
237 if not non_rare_names:
239 lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
241 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
242 penalty += 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens))
243 if len(rare_names) == len(name_fulls):
244 # if there already was a search for all full tokens,
245 # avoid this if anything has been found
247 yield penalty, exp_count, lookup
250 def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
251 """ Create a ranking expression for a name term in the given range.
253 name_fulls = self.query.get_tokens(trange, TokenType.WORD)
254 ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
255 ranks.sort(key=lambda r: r.penalty)
256 # Fallback, sum of penalty for partials
257 name_partials = self.query.get_partials_list(trange)
258 default = sum(t.penalty for t in name_partials) + 0.2
259 return dbf.FieldRanking('name_vector', default, ranks)
262 def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
263 """ Create a list of ranking expressions for an address term
264 for the given ranges.
266 todo: List[Tuple[int, int, dbf.RankedTokens]] = []
267 heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
268 ranks: List[dbf.RankedTokens] = []
270 while todo: # pylint: disable=too-many-nested-blocks
271 neglen, pos, rank = heapq.heappop(todo)
272 for tlist in self.query.nodes[pos].starting:
273 if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
274 if tlist.end < trange.end:
275 chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
276 if tlist.ttype == TokenType.PARTIAL:
277 penalty = rank.penalty + chgpenalty \
278 + max(t.penalty for t in tlist.tokens)
279 heapq.heappush(todo, (neglen - 1, tlist.end,
280 dbf.RankedTokens(penalty, rank.tokens)))
282 for t in tlist.tokens:
283 heapq.heappush(todo, (neglen - 1, tlist.end,
284 rank.with_token(t, chgpenalty)))
285 elif tlist.end == trange.end:
286 if tlist.ttype == TokenType.PARTIAL:
287 ranks.append(dbf.RankedTokens(rank.penalty
288 + max(t.penalty for t in tlist.tokens),
291 ranks.extend(rank.with_token(t, 0.0) for t in tlist.tokens)
293 # Too many variants, bail out and only add
294 # Worst-case Fallback: sum of penalty of partials
295 name_partials = self.query.get_partials_list(trange)
296 default = sum(t.penalty for t in name_partials) + 0.2
297 ranks.append(dbf.RankedTokens(rank.penalty + default, []))
298 # Bail out of outer loop
302 ranks.sort(key=lambda r: len(r.tokens))
303 default = ranks[0].penalty + 0.3
305 ranks.sort(key=lambda r: r.penalty)
307 return dbf.FieldRanking('nameaddress_vector', default, ranks)
310 def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
311 """ Collect the tokens for the non-name search fields in the
314 sdata = dbf.SearchData()
315 sdata.penalty = assignment.penalty
316 if assignment.country:
317 tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
318 if self.details.countries:
319 tokens = [t for t in tokens if t.lookup_word in self.details.countries]
322 sdata.set_strings('countries', tokens)
323 elif self.details.countries:
324 sdata.countries = dbf.WeightedStrings(self.details.countries,
325 [0.0] * len(self.details.countries))
326 if assignment.housenumber:
327 sdata.set_strings('housenumbers',
328 self.query.get_tokens(assignment.housenumber,
329 TokenType.HOUSENUMBER))
330 if assignment.postcode:
331 sdata.set_strings('postcodes',
332 self.query.get_tokens(assignment.postcode,
334 if assignment.qualifier:
335 sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
336 TokenType.QUALIFIER))
338 if assignment.address:
339 sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
346 def get_search_categories(self,
347 assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
348 """ Collect tokens for category search or use the categories
349 requested per parameter.
350 Returns None if no category search is requested.
352 if assignment.category:
353 tokens = [t for t in self.query.get_tokens(assignment.category,
355 if not self.details.categories
356 or t.get_category() in self.details.categories]
357 return dbf.WeightedCategories([t.get_category() for t in tokens],
358 [t.penalty for t in tokens])
360 if self.details.categories:
361 return dbf.WeightedCategories(self.details.categories,
362 [0.0] * len(self.details.categories))
367 PENALTY_WORDCHANGE = {
368 BreakType.START: 0.0,
370 BreakType.PHRASE: 0.0,