"""
Implementation of classes for API access via libraries.
"""
-from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence
+from typing import Mapping, Optional, Any, AsyncIterator, Dict, Sequence, List, Tuple
import asyncio
import contextlib
from pathlib import Path
import sqlalchemy as sa
import sqlalchemy.ext.asyncio as sa_asyncio
-
+from nominatim.errors import UsageError
from nominatim.db.sqlalchemy_schema import SearchTables
from nominatim.db.async_core_library import PGCORE_LIB, PGCORE_ERROR
from nominatim.config import Configuration
from nominatim.api.status import get_status, StatusResult
from nominatim.api.lookup import get_detailed_place, get_simple_place
from nominatim.api.reverse import ReverseGeocoder
+from nominatim.api.search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer
import nominatim.api.types as ntyp
from nominatim.api.results import DetailedResult, ReverseResult, SearchResults
Returns None if there is no entry under the given ID.
"""
+ details = ntyp.LookupDetails.from_kwargs(params)
async with self.begin() as conn:
- return await get_detailed_place(conn, place,
- ntyp.LookupDetails.from_kwargs(params))
+ if details.keywords:
+ await make_query_analyzer(conn)
+ return await get_detailed_place(conn, place, details)
async def lookup(self, places: Sequence[ntyp.PlaceRef], **params: Any) -> SearchResults:
"""
details = ntyp.LookupDetails.from_kwargs(params)
async with self.begin() as conn:
+ if details.keywords:
+ await make_query_analyzer(conn)
return SearchResults(filter(None,
[await get_simple_place(conn, p, details) for p in places]))
# There are no results to be expected outside valid coordinates.
return None
+ details = ntyp.ReverseDetails.from_kwargs(params)
async with self.begin() as conn:
- geocoder = ReverseGeocoder(conn, ntyp.ReverseDetails.from_kwargs(params))
+ if details.keywords:
+ await make_query_analyzer(conn)
+ geocoder = ReverseGeocoder(conn, details)
return await geocoder.lookup(coord)
+ async def search(self, query: str, **params: Any) -> SearchResults:
+ """ Find a place by free-text search. Also known as forward geocoding.
+ """
+ query = query.strip()
+ if not query:
+ raise UsageError('Nothing to search for.')
+
+ async with self.begin() as conn:
+ geocoder = ForwardGeocoder(conn, ntyp.SearchDetails.from_kwargs(params))
+ phrases = [Phrase(PhraseType.NONE, p.strip()) for p in query.split(',')]
+ return await geocoder.lookup(phrases)
+
+
+ # pylint: disable=too-many-arguments,too-many-branches
+ async def search_address(self, amenity: Optional[str] = None,
+ street: Optional[str] = None,
+ city: Optional[str] = None,
+ county: Optional[str] = None,
+ state: Optional[str] = None,
+ country: Optional[str] = None,
+ postalcode: Optional[str] = None,
+ **params: Any) -> SearchResults:
+ """ Find an address using structured search.
+ """
+ async with self.begin() as conn:
+ details = ntyp.SearchDetails.from_kwargs(params)
+
+ phrases: List[Phrase] = []
+
+ if amenity:
+ phrases.append(Phrase(PhraseType.AMENITY, amenity))
+ if street:
+ phrases.append(Phrase(PhraseType.STREET, street))
+ if city:
+ phrases.append(Phrase(PhraseType.CITY, city))
+ if county:
+ phrases.append(Phrase(PhraseType.COUNTY, county))
+ if state:
+ phrases.append(Phrase(PhraseType.STATE, state))
+ if postalcode:
+ phrases.append(Phrase(PhraseType.POSTCODE, postalcode))
+ if country:
+ phrases.append(Phrase(PhraseType.COUNTRY, country))
+
+ if not phrases:
+ raise UsageError('Nothing to search for.')
+
+ if amenity or street:
+ details.restrict_min_max_rank(26, 30)
+ elif city:
+ details.restrict_min_max_rank(13, 25)
+ elif county:
+ details.restrict_min_max_rank(10, 12)
+ elif state:
+ details.restrict_min_max_rank(5, 9)
+ elif postalcode:
+ details.restrict_min_max_rank(5, 11)
+ else:
+ details.restrict_min_max_rank(4, 4)
+
+ if 'layers' not in params:
+ details.layers = ntyp.DataLayer.ADDRESS
+ if amenity:
+ details.layers |= ntyp.DataLayer.POI
+
+ geocoder = ForwardGeocoder(conn, details)
+ return await geocoder.lookup(phrases)
+
+
+ async def search_category(self, categories: List[Tuple[str, str]],
+ near_query: Optional[str] = None,
+ **params: Any) -> SearchResults:
+ """ Find an object of a certain category near another place.
+ The near place may either be given as an unstructured search
+ query in itself or as coordinates.
+ """
+ if not categories:
+ return SearchResults()
+
+ details = ntyp.SearchDetails.from_kwargs(params)
+ async with self.begin() as conn:
+ if near_query:
+ phrases = [Phrase(PhraseType.NONE, p) for p in near_query.split(',')]
+ else:
+ phrases = []
+ if details.keywords:
+ await make_query_analyzer(conn)
+
+ geocoder = ForwardGeocoder(conn, details)
+ return await geocoder.lookup_pois(categories, phrases)
+
+
+
class NominatimAPI:
""" API loader, synchronous version.
"""
no place matches the given criteria.
"""
return self._loop.run_until_complete(self._async_api.reverse(coord, **params))
+
+
+ def search(self, query: str, **params: Any) -> SearchResults:
+ """ Find a place by free-text search. Also known as forward geocoding.
+ """
+ return self._loop.run_until_complete(
+ self._async_api.search(query, **params))
+
+
+ # pylint: disable=too-many-arguments
+ def search_address(self, amenity: Optional[str] = None,
+ street: Optional[str] = None,
+ city: Optional[str] = None,
+ county: Optional[str] = None,
+ state: Optional[str] = None,
+ country: Optional[str] = None,
+ postalcode: Optional[str] = None,
+ **params: Any) -> SearchResults:
+ """ Find an address using structured search.
+ """
+ return self._loop.run_until_complete(
+ self._async_api.search_address(amenity, street, city, county,
+ state, country, postalcode, **params))
+
+
+ def search_category(self, categories: List[Tuple[str, str]],
+ near_query: Optional[str] = None,
+ **params: Any) -> SearchResults:
+ """ Find an object of a certain category near another place.
+ The near place may either be given as an unstructured search
+ query in itself or as a geographic area through the
+ viewbox or near parameters.
+ """
+ return self._loop.run_until_complete(
+ self._async_api.search_category(categories, near_query, **params))
"""
Functions for specialised logging with HTML output.
"""
-from typing import Any, Iterator, Optional, List, cast
+from typing import Any, Iterator, Optional, List, Tuple, cast
from contextvars import ContextVar
import textwrap
import io
CODE_HIGHLIGHT = False
+def _debug_name(res: Any) -> str:
+ if res.names:
+ return cast(str, res.names.get('name', next(iter(res.names.values()))))
+
+ return f"Hnr {res.housenumber}" if res.housenumber is not None else '[NONE]'
+
+
class BaseLogger:
""" Interface for logging function.
"""
+ def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ """ Print a list of search results generated by the generator function.
+ """
+
+
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
""" Print the SQL for the given statement.
"""
self._write('</tbody></table>')
+ def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ """ Print a list of search results generated by the generator function.
+ """
+ def format_osm(osm_object: Optional[Tuple[str, int]]) -> str:
+ if not osm_object:
+ return '-'
+
+ t, i = osm_object
+ if t == 'N':
+ fullt = 'node'
+ elif t == 'W':
+ fullt = 'way'
+ elif t == 'R':
+ fullt = 'relation'
+ else:
+ return f'{t}{i}'
+
+ return f'<a href="https://www.openstreetmap.org/{fullt}/{i}">{t}{i}</a>'
+
+ self._write(f'<h5>{heading}</h5><p><dl>')
+ total = 0
+ for rank, res in results:
+ self._write(f'<dt>[{rank:.3f}]</dt> <dd>{res.source_table.name}(')
+ self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
+ self._write(f"rank={res.rank_address}, ")
+ self._write(f"osm={format_osm(res.osm_object)}, ")
+ self._write(f'cc={res.country_code}, ')
+ self._write(f'importance={res.importance or -1:.5f})</dd>')
+ total += 1
+ self._write(f'</dl><b>TOTAL:</b> {total}</p>')
+
+
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
sqlstr = self.format_sql(conn, statement)
if CODE_HIGHLIGHT:
self._write('-'*tablewidth + '\n')
+ def result_dump(self, heading: str, results: Iterator[Tuple[Any, Any]]) -> None:
+ self._write(f'{heading}:\n')
+ total = 0
+ for rank, res in results:
+ self._write(f'[{rank:.3f}] {res.source_table.name}(')
+ self._write(f"{_debug_name(res)}, type=({','.join(res.category)}), ")
+ self._write(f"rank={res.rank_address}, ")
+ self._write(f"osm={''.join(map(str, res.osm_object or []))}, ")
+ self._write(f'cc={res.country_code}, ')
+ self._write(f'importance={res.importance or -1:.5f})\n')
+ total += 1
+ self._write(f'TOTAL: {total}\n\n')
+
+
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement), width=78))
self._write(f"| {sqlstr}\n\n")
if indexed_date is not None:
result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
- await nres.add_result_details(conn, result, details)
+ await nres.add_result_details(conn, [result], details)
return result
async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
- details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
+ details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
""" Retrieve a place as a simple search result from the database.
"""
log().function('get_simple_place', place=place, details=details)
assert result is not None
result.bbox = getattr(row, 'bbox', None)
- await nres.add_result_details(conn, result, details)
+ await nres.add_result_details(conn, [result], details)
return result
internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
-from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List
+from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, Any
import enum
import dataclasses
import datetime as dt
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
from nominatim.api.localization import Locales
-from nominatim.api.search.query_analyzer_factory import make_query_analyzer
# This file defines complex result data classes.
# pylint: disable=too-many-instance-attributes
"""
return self.importance or (0.7500001 - (self.rank_search/40.0))
+
BaseResultT = TypeVar('BaseResultT', bound=BaseResult)
@dataclasses.dataclass
country_code=row.country_code)
-async def add_result_details(conn: SearchConnection, result: BaseResult,
+async def add_result_details(conn: SearchConnection, results: List[BaseResultT],
details: LookupDetails) -> None:
""" Retrieve more details from the database according to the
parameters specified in 'details'.
"""
- log().section('Query details for result')
- if details.address_details:
- log().comment('Query address details')
- await complete_address_details(conn, result)
- if details.linked_places:
- log().comment('Query linked places')
- await complete_linked_places(conn, result)
- if details.parented_places:
- log().comment('Query parent places')
- await complete_parented_places(conn, result)
- if details.keywords:
- log().comment('Query keywords')
- await complete_keywords(conn, result)
+ if results:
+ log().section('Query details for result')
+ if details.address_details:
+ log().comment('Query address details')
+ await complete_address_details(conn, results)
+ if details.linked_places:
+ log().comment('Query linked places')
+ for result in results:
+ await complete_linked_places(conn, result)
+ if details.parented_places:
+ log().comment('Query parent places')
+ for result in results:
+ await complete_parented_places(conn, result)
+ if details.keywords:
+ log().comment('Query keywords')
+ for result in results:
+ await complete_keywords(conn, result)
def _result_row_to_address_row(row: SaRow) -> AddressLine:
distance=row.distance)
-async def complete_address_details(conn: SearchConnection, result: BaseResult) -> None:
+async def complete_address_details(conn: SearchConnection, results: List[BaseResultT]) -> None:
""" Retrieve information about places that make up the address of the result.
"""
- housenumber = -1
- if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
- if result.housenumber is not None:
- housenumber = int(result.housenumber)
- elif result.extratags is not None and 'startnumber' in result.extratags:
- # details requests do not come with a specific house number
- housenumber = int(result.extratags['startnumber'])
-
- sfn = sa.func.get_addressdata(result.place_id, housenumber)\
- .table_valued( # type: ignore[no-untyped-call]
- sa.column('place_id', type_=sa.Integer),
- 'osm_type',
- sa.column('osm_id', type_=sa.BigInteger),
- sa.column('name', type_=conn.t.types.Composite),
- 'class', 'type', 'place_type',
- sa.column('admin_level', type_=sa.Integer),
- sa.column('fromarea', type_=sa.Boolean),
- sa.column('isaddress', type_=sa.Boolean),
- sa.column('rank_address', type_=sa.SmallInteger),
- sa.column('distance', type_=sa.Float))
- sql = sa.select(sfn).order_by(sa.column('rank_address').desc(),
- sa.column('isaddress').desc())
-
- result.address_rows = AddressLines()
+ def get_hnr(result: BaseResult) -> Tuple[int, int]:
+ housenumber = -1
+ if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
+ if result.housenumber is not None:
+ housenumber = int(result.housenumber)
+ elif result.extratags is not None and 'startnumber' in result.extratags:
+ # details requests do not come with a specific house number
+ housenumber = int(result.extratags['startnumber'])
+ assert result.place_id
+ return result.place_id, housenumber
+
+ data: List[Tuple[Any, ...]] = [get_hnr(r) for r in results if r.place_id]
+
+ if not data:
+ return
+
+ values = sa.values(sa.column('place_id', type_=sa.Integer),
+ sa.column('housenumber', type_=sa.Integer),
+ name='places',
+ literal_binds=True).data(data)
+
+ sfn = sa.func.get_addressdata(values.c.place_id, values.c.housenumber)\
+ .table_valued( # type: ignore[no-untyped-call]
+ sa.column('place_id', type_=sa.Integer),
+ 'osm_type',
+ sa.column('osm_id', type_=sa.BigInteger),
+ sa.column('name', type_=conn.t.types.Composite),
+ 'class', 'type', 'place_type',
+ sa.column('admin_level', type_=sa.Integer),
+ sa.column('fromarea', type_=sa.Boolean),
+ sa.column('isaddress', type_=sa.Boolean),
+ sa.column('rank_address', type_=sa.SmallInteger),
+ sa.column('distance', type_=sa.Float),
+ joins_implicitly=True)
+
+ sql = sa.select(values.c.place_id.label('result_place_id'), sfn)\
+ .order_by(values.c.place_id,
+ sa.column('rank_address').desc(),
+ sa.column('isaddress').desc())
+
+ current_result = None
for row in await conn.execute(sql):
- result.address_rows.append(_result_row_to_address_row(row))
+ if current_result is None or row.result_place_id != current_result.place_id:
+ for result in results:
+ if result.place_id == row.result_place_id:
+ current_result = result
+ break
+ else:
+ assert False
+ current_result.address_rows = AddressLines()
+ current_result.address_rows.append(_result_row_to_address_row(row))
# pylint: disable=consider-using-f-string
async def complete_keywords(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about the search terms used for this place.
+
+ Requires that the query analyzer was initialised to get access to
+ the word table.
"""
t = conn.t.search_name
sql = sa.select(t.c.name_vector, t.c.nameaddress_vector)\
result.name_keywords = []
result.address_keywords = []
- await make_query_analyzer(conn)
t = conn.t.meta.tables['word']
sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
result.distance = row.distance
if hasattr(row, 'bbox'):
result.bbox = Bbox.from_wkb(row.bbox.data)
- await nres.add_result_details(self.conn, result, self.params)
+ await nres.add_result_details(self.conn, [result], self.params)
return result
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Module for forward search.
+"""
+# pylint: disable=useless-import-alias
+
+from .geocoder import (ForwardGeocoder as ForwardGeocoder)
+from .query import (Phrase as Phrase,
+ PhraseType as PhraseType)
+from .query_analyzer_factory import (make_query_analyzer as make_query_analyzer)
import nominatim.api.search.db_searches as dbs
from nominatim.api.logging import log
+
+def wrap_near_search(categories: List[Tuple[str, str]],
+ search: dbs.AbstractSearch) -> dbs.NearSearch:
+ """ Create a new search that wraps the given search in a search
+ for near places of the given category.
+ """
+ return dbs.NearSearch(penalty=search.penalty,
+ categories=dbf.WeightedCategories(categories,
+ [0.0] * len(categories)),
+ search=search)
+
+
+def build_poi_search(category: List[Tuple[str, str]],
+ countries: Optional[List[str]]) -> dbs.PoiSearch:
+ """ Create a new search for places by the given category, possibly
+ constraint to the given countries.
+ """
+ if countries:
+ ccs = dbf.WeightedStrings(countries, [0.0] * len(countries))
+ else:
+ ccs = dbf.WeightedStrings([], [])
+
+ class _PoiData(dbf.SearchData):
+ penalty = 0.0
+ qualifiers = dbf.WeightedCategories(category, [0.0] * len(category))
+ countries=ccs
+
+ return dbs.PoiSearch(_PoiData())
+
+
class SearchBuilder:
""" Build the abstract search queries from token assignments.
"""
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Public interface to the search code.
+"""
+from typing import List, Any, Optional, Iterator, Tuple
+import itertools
+
+from nominatim.api.connection import SearchConnection
+from nominatim.api.types import SearchDetails
+from nominatim.api.results import SearchResults, add_result_details
+from nominatim.api.search.token_assignment import yield_token_assignments
+from nominatim.api.search.db_search_builder import SearchBuilder, build_poi_search, wrap_near_search
+from nominatim.api.search.db_searches import AbstractSearch
+from nominatim.api.search.query_analyzer_factory import make_query_analyzer, AbstractQueryAnalyzer
+from nominatim.api.search.query import Phrase, QueryStruct
+from nominatim.api.logging import log
+
+class ForwardGeocoder:
+ """ Main class responsible for place search.
+ """
+
+ def __init__(self, conn: SearchConnection, params: SearchDetails) -> None:
+ self.conn = conn
+ self.params = params
+ self.query_analyzer: Optional[AbstractQueryAnalyzer] = None
+
+
+ @property
+ def limit(self) -> int:
+ """ Return the configured maximum number of search results.
+ """
+ return self.params.max_results
+
+
+ async def build_searches(self,
+ phrases: List[Phrase]) -> Tuple[QueryStruct, List[AbstractSearch]]:
+ """ Analyse the query and return the tokenized query and list of
+ possible searches over it.
+ """
+ if self.query_analyzer is None:
+ self.query_analyzer = await make_query_analyzer(self.conn)
+
+ query = await self.query_analyzer.analyze_query(phrases)
+
+ searches: List[AbstractSearch] = []
+ if query.num_token_slots() > 0:
+ # 2. Compute all possible search interpretations
+ log().section('Compute abstract searches')
+ search_builder = SearchBuilder(query, self.params)
+ num_searches = 0
+ for assignment in yield_token_assignments(query):
+ searches.extend(search_builder.build(assignment))
+ log().table_dump('Searches for assignment',
+ _dump_searches(searches, query, num_searches))
+ num_searches = len(searches)
+ searches.sort(key=lambda s: s.penalty)
+
+ return query, searches
+
+
+ async def execute_searches(self, query: QueryStruct,
+ searches: List[AbstractSearch]) -> SearchResults:
+ """ Run the abstract searches against the database until a result
+ is found.
+ """
+ log().section('Execute database searches')
+ results = SearchResults()
+
+ num_results = 0
+ min_ranking = 1000.0
+ prev_penalty = 0.0
+ for i, search in enumerate(searches):
+ if search.penalty > prev_penalty and (search.penalty > min_ranking or i > 20):
+ break
+ log().table_dump(f"{i + 1}. Search", _dump_searches([search], query))
+ for result in await search.lookup(self.conn, self.params):
+ results.append(result)
+ min_ranking = min(min_ranking, result.ranking + 0.5, search.penalty + 0.3)
+ log().result_dump('Results', ((r.accuracy, r) for r in results[num_results:]))
+ num_results = len(results)
+ prev_penalty = search.penalty
+
+ if results:
+ min_ranking = min(r.ranking for r in results)
+ results = SearchResults(r for r in results if r.ranking < min_ranking + 0.5)
+
+ if results:
+ min_rank = min(r.rank_search for r in results)
+
+ results = SearchResults(r for r in results
+ if r.ranking + 0.05 * (r.rank_search - min_rank)
+ < min_ranking + 0.5)
+
+ results.sort(key=lambda r: r.accuracy - r.calculated_importance())
+ results = SearchResults(results[:self.limit])
+
+ return results
+
+
+ async def lookup_pois(self, categories: List[Tuple[str, str]],
+ phrases: List[Phrase]) -> SearchResults:
+ """ Look up places by category. If phrase is given, a place search
+ over the phrase will be executed first and places close to the
+ results returned.
+ """
+ log().function('forward_lookup_pois', categories=categories, params=self.params)
+
+ if phrases:
+ query, searches = await self.build_searches(phrases)
+
+ if query:
+ searches = [wrap_near_search(categories, s) for s in searches[:50]]
+ results = await self.execute_searches(query, searches)
+ else:
+ results = SearchResults()
+ else:
+ search = build_poi_search(categories, self.params.countries)
+ results = await search.lookup(self.conn, self.params)
+
+ await add_result_details(self.conn, results, self.params)
+ log().result_dump('Final Results', ((r.accuracy, r) for r in results))
+
+ return results
+
+
+ async def lookup(self, phrases: List[Phrase]) -> SearchResults:
+ """ Look up a single free-text query.
+ """
+ log().function('forward_lookup', phrases=phrases, params=self.params)
+ results = SearchResults()
+
+ if self.params.is_impossible():
+ return results
+
+ query, searches = await self.build_searches(phrases)
+
+ if searches:
+ # Execute SQL until an appropriate result is found.
+ results = await self.execute_searches(query, searches[:50])
+ await add_result_details(self.conn, results, self.params)
+ log().result_dump('Final Results', ((r.accuracy, r) for r in results))
+
+ return results
+
+
+# pylint: disable=invalid-name,too-many-locals
+def _dump_searches(searches: List[AbstractSearch], query: QueryStruct,
+ start: int = 0) -> Iterator[Optional[List[Any]]]:
+ yield ['Penalty', 'Lookups', 'Housenr', 'Postcode', 'Countries', 'Qualifier', 'Rankings']
+
+ def tk(tl: List[int]) -> str:
+ tstr = [f"{query.find_lookup_word_by_id(t)}({t})" for t in tl]
+
+ return f"[{','.join(tstr)}]"
+
+ def fmt_ranking(f: Any) -> str:
+ if not f:
+ return ''
+ ranks = ','.join((f"{tk(r.tokens)}^{r.penalty:.3g}" for r in f.rankings))
+ if len(ranks) > 100:
+ ranks = ranks[:100] + '...'
+ return f"{f.column}({ranks},def={f.default:.3g})"
+
+ def fmt_lookup(l: Any) -> str:
+ if not l:
+ return ''
+
+ return f"{l.lookup_type}({l.column}{tk(l.tokens)})"
+
+
+ def fmt_cstr(c: Any) -> str:
+ if not c:
+ return ''
+
+ return f'{c[0]}^{c[1]}'
+
+ for search in searches[start:]:
+ fields = ('name_lookups', 'name_ranking', 'countries', 'housenumbers',
+ 'postcodes', 'qualifier')
+ iters = itertools.zip_longest([f"{search.penalty:.3g}"],
+ *(getattr(search, attr, []) for attr in fields),
+ fillvalue= '')
+ for penalty, lookup, rank, cc, hnr, pc, qual in iters:
+ yield [penalty, fmt_lookup(lookup), fmt_cstr(hnr),
+ fmt_cstr(pc), fmt_cstr(cc), fmt_cstr(qual), fmt_ranking(rank)]
+ yield None
"""
Factory for creating a query analyzer for the configured tokenizer.
"""
-from typing import List, cast
+from typing import List, cast, TYPE_CHECKING
from abc import ABC, abstractmethod
from pathlib import Path
import importlib
from nominatim.api.logging import log
from nominatim.api.connection import SearchConnection
-from nominatim.api.search.query import Phrase, QueryStruct
+
+if TYPE_CHECKING:
+ from nominatim.api.search.query import Phrase, QueryStruct
class AbstractQueryAnalyzer(ABC):
""" Class for analysing incomming queries.
"""
@abstractmethod
- async def analyze_query(self, phrases: List[Phrase]) -> QueryStruct:
+ async def analyze_query(self, phrases: List['Phrase']) -> 'QueryStruct':
""" Analyze the given phrases and return the tokenized query.
"""
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for search API calls.
+
+These tests make sure that all Python code is correct and executable.
+Functional tests can be found in the BDD test suite.
+"""
+import json
+
+import pytest
+
+import sqlalchemy as sa
+
+import nominatim.api as napi
+import nominatim.api.logging as loglib
+
+@pytest.fixture(autouse=True)
+def setup_icu_tokenizer(apiobj):
+ """ Setup the propoerties needed for using the ICU tokenizer.
+ """
+ apiobj.add_data('properties',
+ [{'property': 'tokenizer', 'value': 'icu'},
+ {'property': 'tokenizer_import_normalisation', 'value': ':: lower();'},
+ {'property': 'tokenizer_import_transliteration', 'value': "'1' > '/1/'; 'ä' > 'ä '"},
+ ])
+
+
+def test_search_no_content(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+
+ assert apiobj.api.search('foo') == []
+
+
+def test_search_simple_word(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
+ content=[(55, 'test', 'W', 'test', None),
+ (2, 'test', 'w', 'test', None)])
+
+ apiobj.add_placex(place_id=444, class_='place', type='village',
+ centroid=(1.3, 0.7))
+ apiobj.add_search_name(444, names=[2, 55])
+
+ results = apiobj.api.search('TEST')
+
+ assert [r.place_id for r in results] == [444]
+
+
+@pytest.mark.parametrize('logtype', ['text', 'html'])
+def test_search_with_debug(apiobj, table_factory, logtype):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
+ content=[(55, 'test', 'W', 'test', None),
+ (2, 'test', 'w', 'test', None)])
+
+ apiobj.add_placex(place_id=444, class_='place', type='village',
+ centroid=(1.3, 0.7))
+ apiobj.add_search_name(444, names=[2, 55])
+
+ loglib.set_log_output(logtype)
+ results = apiobj.api.search('TEST')
+
+ assert loglib.get_and_disable()
+
+
+def test_address_no_content(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+
+ assert apiobj.api.search_address(amenity='hotel',
+ street='Main St 34',
+ city='Happyville',
+ county='Wideland',
+ state='Praerie',
+ postalcode='55648',
+ country='xx') == []
+
+
+@pytest.mark.parametrize('atype,address,search', [('street', 26, 26),
+ ('city', 16, 18),
+ ('county', 12, 12),
+ ('state', 8, 8)])
+def test_address_simple_places(apiobj, table_factory, atype, address, search):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
+ content=[(55, 'test', 'W', 'test', None),
+ (2, 'test', 'w', 'test', None)])
+
+ apiobj.add_placex(place_id=444,
+ rank_address=address, rank_search=search,
+ centroid=(1.3, 0.7))
+ apiobj.add_search_name(444, names=[2, 55], address_rank=address, search_rank=search)
+
+ results = apiobj.api.search_address(**{atype: 'TEST'})
+
+ assert [r.place_id for r in results] == [444]
+
+
+def test_address_country(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
+ content=[(None, 'ro', 'C', 'ro', None)])
+ apiobj.add_country('ro', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
+ apiobj.add_country_name('ro', {'name': 'România'})
+
+ assert len(apiobj.api.search_address(country='ro')) == 1
+
+
+def test_category_no_categories(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+
+ assert apiobj.api.search_category([], near_query='Berlin') == []
+
+
+def test_category_no_content(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+
+ assert apiobj.api.search_category([('amenity', 'restaurant')]) == []
+
+
+def test_category_simple_restaurant(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB')
+
+ apiobj.add_placex(place_id=444, class_='amenity', type='restaurant',
+ centroid=(1.3, 0.7))
+ apiobj.add_search_name(444, names=[2, 55], address_rank=16, search_rank=18)
+
+ results = apiobj.api.search_category([('amenity', 'restaurant')],
+ near=(1.3, 0.701), near_radius=0.015)
+
+ assert [r.place_id for r in results] == [444]
+
+
+def test_category_with_search_phrase(apiobj, table_factory):
+ table_factory('word',
+ definition='word_id INT, word_token TEXT, type TEXT, word TEXT, info JSONB',
+ content=[(55, 'test', 'W', 'test', None),
+ (2, 'test', 'w', 'test', None)])
+
+ apiobj.add_placex(place_id=444, class_='place', type='village',
+ rank_address=16, rank_search=18,
+ centroid=(1.3, 0.7))
+ apiobj.add_search_name(444, names=[2, 55], address_rank=16, search_rank=18)
+ apiobj.add_placex(place_id=95, class_='amenity', type='restaurant',
+ centroid=(1.3, 0.7003))
+
+ results = apiobj.api.search_category([('amenity', 'restaurant')],
+ near_query='TEST')
+
+ assert [r.place_id for r in results] == [95]