"""
Public interface to the search code.
"""
-from typing import List, Any, Optional, Iterator, Tuple
+from typing import List, Any, Optional, Iterator, Tuple, Dict
import itertools
import re
import datetime as dt
from nominatim.api.connection import SearchConnection
from nominatim.api.types import SearchDetails
-from nominatim.api.results import SearchResults, add_result_details
+from nominatim.api.results import SearchResult, SearchResults, add_result_details
from nominatim.api.search.token_assignment import yield_token_assignments
from nominatim.api.search.db_search_builder import SearchBuilder, build_poi_search, wrap_near_search
from nominatim.api.search.db_searches import AbstractSearch
log().table_dump('Searches for assignment',
_dump_searches(searches, query, num_searches))
num_searches = len(searches)
- searches.sort(key=lambda s: s.penalty)
+ searches.sort(key=lambda s: (s.penalty, s.SEARCH_PRIO))
return query, searches
is found.
"""
log().section('Execute database searches')
- results = SearchResults()
+ results: Dict[Any, SearchResult] = {}
+
end_time = dt.datetime.now() + self.timeout
- num_results = 0
- min_ranking = 1000.0
+ min_ranking = searches[0].penalty + 2.0
prev_penalty = 0.0
for i, search in enumerate(searches):
if search.penalty > prev_penalty and (search.penalty > min_ranking or i > 20):
break
log().table_dump(f"{i + 1}. Search", _dump_searches([search], query))
- for result in await search.lookup(self.conn, self.params):
- results.append(result)
- min_ranking = min(min_ranking, result.ranking + 0.5, search.penalty + 0.3)
- log().result_dump('Results', ((r.accuracy, r) for r in results[num_results:]))
- num_results = len(results)
+ log().var_dump('Params', self.params)
+ lookup_results = await search.lookup(self.conn, self.params)
+ for result in lookup_results:
+ rhash = (result.source_table, result.place_id,
+ result.housenumber, result.country_code)
+ prevresult = results.get(rhash)
+ if prevresult:
+ prevresult.accuracy = min(prevresult.accuracy, result.accuracy)
+ else:
+ results[rhash] = result
+ min_ranking = min(min_ranking, result.accuracy * 1.2)
+ log().result_dump('Results', ((r.accuracy, r) for r in lookup_results))
prev_penalty = search.penalty
if dt.datetime.now() >= end_time:
break
+ return SearchResults(results.values())
+
+
+ def pre_filter_results(self, results: SearchResults) -> SearchResults:
+ """ Remove results that are significantly worse than the
+ best match.
+ """
+ if results:
+ max_ranking = min(r.ranking for r in results) + 0.5
+ results = SearchResults(r for r in results if r.ranking < max_ranking)
+
return results
limit to the configured number of results.
"""
if results:
- min_ranking = min(r.ranking for r in results)
- results = SearchResults(r for r in results if r.ranking < min_ranking + 0.5)
results.sort(key=lambda r: r.ranking)
-
- if results:
min_rank = results[0].rank_search
+ min_ranking = results[0].ranking
results = SearchResults(r for r in results
- if r.ranking + 0.05 * (r.rank_search - min_rank)
+ if r.ranking + 0.03 * (r.rank_search - min_rank)
< min_ranking + 0.5)
results = SearchResults(results[:self.limit])
return
for result in results:
- if not result.display_name:
+ # Negative importance indicates ordering by distance, which is
+ # more important than word matching.
+ if not result.display_name\
+ or (result.importance is not None and result.importance < 0):
continue
distance = 0.0
- norm = self.query_analyzer.normalize_text(result.display_name)
+ norm = self.query_analyzer.normalize_text(' '.join((result.display_name,
+ result.country_code or '')))
words = set((w for w in norm.split(' ') if w))
if not words:
continue
distance += len(qword)
else:
distance += (1.0 - wdist) * len(qword)
- result.accuracy += distance * 0.5 / sum(len(w) for w in qwords)
+ # Compensate for the fact that country names do not get a
+ # match penalty yet by the tokenizer.
+ # Temporary hack that needs to be removed!
+ if result.rank_address == 4:
+ distance *= 2
+ result.accuracy += distance * 0.4 / sum(len(w) for w in qwords)
async def lookup_pois(self, categories: List[Tuple[str, str]],
if query:
searches = [wrap_near_search(categories, s) for s in searches[:50]]
results = await self.execute_searches(query, searches)
+ results = self.pre_filter_results(results)
await add_result_details(self.conn, results, self.params)
log().result_dump('Preliminary Results', ((r.accuracy, r) for r in results))
results = self.sort_and_cut_results(results)
if searches:
# Execute SQL until an appropriate result is found.
results = await self.execute_searches(query, searches[:50])
+ results = self.pre_filter_results(results)
await add_result_details(self.conn, results, self.params)
log().result_dump('Preliminary Results', ((r.accuracy, r) for r in results))
self.rerank_by_query(query, results)