1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Convertion from token assignment to an abstract DB search.
10 from typing import Optional, List, Tuple, Iterator
13 from nominatim.api.types import SearchDetails, DataLayer
14 from nominatim.api.search.query import QueryStruct, Token, TokenType, TokenRange, BreakType
15 from nominatim.api.search.token_assignment import TokenAssignment
16 import nominatim.api.search.db_search_fields as dbf
17 import nominatim.api.search.db_searches as dbs
20 def wrap_near_search(categories: List[Tuple[str, str]],
21 search: dbs.AbstractSearch) -> dbs.NearSearch:
22 """ Create a new search that wraps the given search in a search
23 for near places of the given category.
25 return dbs.NearSearch(penalty=search.penalty,
26 categories=dbf.WeightedCategories(categories,
27 [0.0] * len(categories)),
31 def build_poi_search(category: List[Tuple[str, str]],
32 countries: Optional[List[str]]) -> dbs.PoiSearch:
33 """ Create a new search for places by the given category, possibly
34 constraint to the given countries.
37 ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
39 ccs = dbf.WeightedStrings([], [])
41 class _PoiData(dbf.SearchData):
43 qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
46 return dbs.PoiSearch(_PoiData())
50 """ Build the abstract search queries from token assignments.
53 def __init__(self, query: QueryStruct, details: SearchDetails) -> None:
55 self.details = details
59 def configured_for_country(self) -> bool:
60 """ Return true if the search details are configured to
61 allow countries in the result.
63 return self.details.min_rank <= 4 and self.details.max_rank >= 4 \
64 and self.details.layer_enabled(DataLayer.ADDRESS)
68 def configured_for_postcode(self) -> bool:
69 """ Return true if the search details are configured to
70 allow postcodes in the result.
72 return self.details.min_rank <= 5 and self.details.max_rank >= 11\
73 and self.details.layer_enabled(DataLayer.ADDRESS)
77 def configured_for_housenumbers(self) -> bool:
78 """ Return true if the search details are configured to
79 allow addresses in the result.
81 return self.details.max_rank >= 30 \
82 and self.details.layer_enabled(DataLayer.ADDRESS)
85 def build(self, assignment: TokenAssignment) -> Iterator[dbs.AbstractSearch]:
86 """ Yield all possible abstract searches for the given token assignment.
88 sdata = self.get_search_data(assignment)
92 categories = self.get_search_categories(assignment)
94 if assignment.name is None:
95 if categories and not sdata.postcodes:
96 sdata.qualifiers = categories
98 builder = self.build_poi_search(sdata)
99 elif assignment.housenumber:
100 hnr_tokens = self.query.get_tokens(assignment.housenumber,
101 TokenType.HOUSENUMBER)
102 builder = self.build_housenumber_search(sdata, hnr_tokens, assignment.address)
104 builder = self.build_special_search(sdata, assignment.address,
107 builder = self.build_name_search(sdata, assignment.name, assignment.address,
111 penalty = min(categories.penalties)
112 categories.penalties = [p - penalty for p in categories.penalties]
113 for search in builder:
114 yield dbs.NearSearch(penalty, categories, search)
119 def build_poi_search(self, sdata: dbf.SearchData) -> Iterator[dbs.AbstractSearch]:
120 """ Build abstract search query for a simple category search.
121 This kind of search requires an additional geographic constraint.
123 if not sdata.housenumbers \
124 and ((self.details.viewbox and self.details.bounded_viewbox) or self.details.near):
125 yield dbs.PoiSearch(sdata)
128 def build_special_search(self, sdata: dbf.SearchData,
129 address: List[TokenRange],
130 is_category: bool) -> Iterator[dbs.AbstractSearch]:
131 """ Build abstract search queries for searches that do not involve
135 # No special searches over qualifiers supported.
138 if sdata.countries and not address and not sdata.postcodes \
139 and self.configured_for_country:
140 yield dbs.CountrySearch(sdata)
142 if sdata.postcodes and (is_category or self.configured_for_postcode):
143 penalty = 0.0 if sdata.countries else 0.1
145 sdata.lookups = [dbf.FieldLookup('nameaddress_vector',
146 [t.token for r in address
147 for t in self.query.get_partials_list(r)],
150 yield dbs.PostcodeSearch(penalty, sdata)
153 def build_housenumber_search(self, sdata: dbf.SearchData, hnrs: List[Token],
154 address: List[TokenRange]) -> Iterator[dbs.AbstractSearch]:
155 """ Build a simple address search for special entries where the
156 housenumber is the main name token.
158 sdata.lookups = [dbf.FieldLookup('name_vector', [t.token for t in hnrs], 'lookup_any')]
160 partials = [t for trange in address
161 for t in self.query.get_partials_list(trange)]
163 if len(partials) != 1 or partials[0].count < 10000:
164 sdata.lookups.append(dbf.FieldLookup('nameaddress_vector',
165 [t.token for t in partials], 'lookup_all'))
167 sdata.lookups.append(
168 dbf.FieldLookup('nameaddress_vector',
170 in self.query.get_tokens(address[0], TokenType.WORD)],
173 sdata.housenumbers = dbf.WeightedStrings([], [])
174 yield dbs.PlaceSearch(0.05, sdata, sum(t.count for t in hnrs))
177 def build_name_search(self, sdata: dbf.SearchData,
178 name: TokenRange, address: List[TokenRange],
179 is_category: bool) -> Iterator[dbs.AbstractSearch]:
180 """ Build abstract search queries for simple name or address searches.
182 if is_category or not sdata.housenumbers or self.configured_for_housenumbers:
183 ranking = self.get_name_ranking(name)
184 name_penalty = ranking.normalize_penalty()
186 sdata.rankings.append(ranking)
187 for penalty, count, lookup in self.yield_lookups(name, address):
188 sdata.lookups = lookup
189 yield dbs.PlaceSearch(penalty + name_penalty, sdata, count)
192 def yield_lookups(self, name: TokenRange, address: List[TokenRange])\
193 -> Iterator[Tuple[float, int, List[dbf.FieldLookup]]]:
194 """ Yield all variants how the given name and address should best
195 be searched for. This takes into account how frequent the terms
196 are and tries to find a lookup that optimizes index use.
198 penalty = 0.0 # extra penalty
199 name_partials = self.query.get_partials_list(name)
200 name_tokens = [t.token for t in name_partials]
202 addr_partials = [t for r in address for t in self.query.get_partials_list(r)]
203 addr_tokens = [t.token for t in addr_partials]
205 partials_indexed = all(t.is_indexed for t in name_partials) \
206 and all(t.is_indexed for t in addr_partials)
207 exp_count = min(t.count for t in name_partials)
209 if (len(name_partials) > 3 or exp_count < 1000) and partials_indexed:
210 yield penalty, exp_count, dbf.lookup_by_names(name_tokens, addr_tokens)
213 exp_count = min(exp_count, min(t.count for t in addr_partials)) \
214 if addr_partials else exp_count
215 if exp_count < 1000 and partials_indexed:
216 # Lookup by address partials and restrict results through name terms.
217 # Give this a small penalty because lookups in the address index are
219 yield penalty + exp_count/5000, exp_count,\
220 dbf.lookup_by_addr(name_tokens, addr_tokens)
223 # Partial term to frequent. Try looking up by rare full names first.
224 name_fulls = self.query.get_tokens(name, TokenType.WORD)
225 rare_names = list(filter(lambda t: t.count < 10000, name_fulls))
226 # At this point drop unindexed partials from the address.
227 # This might yield wrong results, nothing we can do about that.
228 if not partials_indexed:
229 addr_tokens = [t.token for t in addr_partials if t.is_indexed]
230 penalty += 1.2 * sum(t.penalty for t in addr_partials if not t.is_indexed)
232 # Any of the full names applies with all of the partials from the address
233 yield penalty, sum(t.count for t in rare_names),\
234 dbf.lookup_by_any_name([t.token for t in rare_names], addr_tokens)
236 # To catch remaining results, lookup by name and address
237 # We only do this if there is a reasonable number of results expected.
238 if exp_count < 10000:
239 if all(t.is_indexed for t in name_partials):
240 lookup = [dbf.FieldLookup('name_vector', name_tokens, 'lookup_all')]
242 # we don't have the partials, try with the non-rare names
243 non_rare_names = [t.token for t in name_fulls if t.count >= 10000]
244 if not non_rare_names:
246 lookup = [dbf.FieldLookup('name_vector', non_rare_names, 'lookup_any')]
248 lookup.append(dbf.FieldLookup('nameaddress_vector', addr_tokens, 'lookup_all'))
249 penalty += 0.1 * max(0, 5 - len(name_partials) - len(addr_tokens))
250 if len(rare_names) == len(name_fulls):
251 # if there already was a search for all full tokens,
252 # avoid this if anything has been found
254 yield penalty, exp_count, lookup
257 def get_name_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
258 """ Create a ranking expression for a name term in the given range.
260 name_fulls = self.query.get_tokens(trange, TokenType.WORD)
261 ranks = [dbf.RankedTokens(t.penalty, [t.token]) for t in name_fulls]
262 ranks.sort(key=lambda r: r.penalty)
263 # Fallback, sum of penalty for partials
264 name_partials = self.query.get_partials_list(trange)
265 default = sum(t.penalty for t in name_partials) + 0.2
266 return dbf.FieldRanking('name_vector', default, ranks)
269 def get_addr_ranking(self, trange: TokenRange) -> dbf.FieldRanking:
270 """ Create a list of ranking expressions for an address term
271 for the given ranges.
273 todo: List[Tuple[int, int, dbf.RankedTokens]] = []
274 heapq.heappush(todo, (0, trange.start, dbf.RankedTokens(0.0, [])))
275 ranks: List[dbf.RankedTokens] = []
277 while todo: # pylint: disable=too-many-nested-blocks
278 neglen, pos, rank = heapq.heappop(todo)
279 for tlist in self.query.nodes[pos].starting:
280 if tlist.ttype in (TokenType.PARTIAL, TokenType.WORD):
281 if tlist.end < trange.end:
282 chgpenalty = PENALTY_WORDCHANGE[self.query.nodes[tlist.end].btype]
283 if tlist.ttype == TokenType.PARTIAL:
284 penalty = rank.penalty + chgpenalty \
285 + max(t.penalty for t in tlist.tokens)
286 heapq.heappush(todo, (neglen - 1, tlist.end,
287 dbf.RankedTokens(penalty, rank.tokens)))
289 for t in tlist.tokens:
290 heapq.heappush(todo, (neglen - 1, tlist.end,
291 rank.with_token(t, chgpenalty)))
292 elif tlist.end == trange.end:
293 if tlist.ttype == TokenType.PARTIAL:
294 ranks.append(dbf.RankedTokens(rank.penalty
295 + max(t.penalty for t in tlist.tokens),
298 ranks.extend(rank.with_token(t, 0.0) for t in tlist.tokens)
300 # Too many variants, bail out and only add
301 # Worst-case Fallback: sum of penalty of partials
302 name_partials = self.query.get_partials_list(trange)
303 default = sum(t.penalty for t in name_partials) + 0.2
304 ranks.append(dbf.RankedTokens(rank.penalty + default, []))
305 # Bail out of outer loop
309 ranks.sort(key=lambda r: len(r.tokens))
310 default = ranks[0].penalty + 0.3
312 ranks.sort(key=lambda r: r.penalty)
314 return dbf.FieldRanking('nameaddress_vector', default, ranks)
317 def get_search_data(self, assignment: TokenAssignment) -> Optional[dbf.SearchData]:
318 """ Collect the tokens for the non-name search fields in the
321 sdata = dbf.SearchData()
322 sdata.penalty = assignment.penalty
323 if assignment.country:
324 tokens = self.query.get_tokens(assignment.country, TokenType.COUNTRY)
325 if self.details.countries:
326 tokens = [t for t in tokens if t.lookup_word in self.details.countries]
329 sdata.set_strings('countries', tokens)
330 elif self.details.countries:
331 sdata.countries = dbf.WeightedStrings(self.details.countries,
332 [0.0] * len(self.details.countries))
333 if assignment.housenumber:
334 sdata.set_strings('housenumbers',
335 self.query.get_tokens(assignment.housenumber,
336 TokenType.HOUSENUMBER))
337 if assignment.postcode:
338 sdata.set_strings('postcodes',
339 self.query.get_tokens(assignment.postcode,
341 if assignment.qualifier:
342 sdata.set_qualifiers(self.query.get_tokens(assignment.qualifier,
343 TokenType.QUALIFIER))
345 if assignment.address:
346 sdata.set_ranking([self.get_addr_ranking(r) for r in assignment.address])
353 def get_search_categories(self,
354 assignment: TokenAssignment) -> Optional[dbf.WeightedCategories]:
355 """ Collect tokens for category search or use the categories
356 requested per parameter.
357 Returns None if no category search is requested.
359 if assignment.category:
360 tokens = [t for t in self.query.get_tokens(assignment.category,
362 if not self.details.categories
363 or t.get_category() in self.details.categories]
364 return dbf.WeightedCategories([t.get_category() for t in tokens],
365 [t.penalty for t in tokens])
367 if self.details.categories:
368 return dbf.WeightedCategories(self.details.categories,
369 [0.0] * len(self.details.categories))
374 PENALTY_WORDCHANGE = {
375 BreakType.START: 0.0,
377 BreakType.PHRASE: 0.0,