May be empty when no result was found.
+ def localize(self, locales: Locales) -> None:
+ """ Apply the given locales to all results.
+ """
+ for result in self:
+ result.localize(locales)
def _filter_geometries(row: SaRow) -> Dict[str, str]:
return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
def _format_reverse_xml(results: napi.ReverseResults, options: Mapping[str, Any]) -> str:
return format_xml.format_base_xml(results,
options, True, 'reversegeocode',
- {'querystring': 'TODO'})
+ {'querystring': options.get('query', '')})
@dispatch.format_func(napi.ReverseResults, 'geojson')
@dispatch.format_func(napi.SearchResults, 'xml')
def _format_search_xml(results: napi.SearchResults, options: Mapping[str, Any]) -> str:
- return format_xml.format_base_xml(results,
- options, False, 'searchresults',
- {'querystring': 'TODO'})
+ extra = {'querystring': options.get('query', '')}
+ for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
+ if options.get(attr):
+ extra[attr] = options[attr]
+ return format_xml.format_base_xml(results, options, False, 'searchresults',
+ extra)
@dispatch.format_func(napi.SearchResults, 'geojson')
Helper function for parsing parameters and and outputting data
specifically for the v1 version of the API.
+from typing import Tuple, Optional, Any, Dict, Iterable
+from itertools import chain
+import re
from nominatim.api.results import SearchResult, SearchResults, SourceTable
+from nominatim.api.types import SearchDetails, GeometryFormat
REVERSE_MAX_RANKS = [2, 2, 2, # 0-2 Continent/Sea
4, 4, # 3-4 Country
return REVERSE_MAX_RANKS[max(0, min(18, zoom))]
+FEATURE_TYPE_TO_RANK: Dict[Optional[str], Any] = {
+ 'country': (4, 4),
+ 'state': (8, 8),
+ 'city': (14, 16),
+ 'settlement': (8, 20)
+def feature_type_to_rank(feature_type: Optional[str]) -> Tuple[int, int]:
+ """ Convert a feature type parameter to a tuple of
+ feature type name, minimum rank and maximum rank.
+ """
+ return FEATURE_TYPE_TO_RANK.get(feature_type, (0, 30))
+#pylint: disable=too-many-arguments
+def extend_query_parts(queryparts: dict[str, Any], details: dict[str, Any],
+ feature_type: Optional[str],
+ namedetails: bool, extratags: bool,
+ excluded: Iterable[str]) -> None:
+ """ Add parameters from details dictionary to the query parts
+ dictionary which is suitable as URL parameter dictionary.
+ """
+ parsed = SearchDetails.from_kwargs(details)
+ if parsed.geometry_output != GeometryFormat.NONE:
+ for flag in parsed.geometry_output:
+ assert
+ queryparts[f'polygon_{}'] = '1'
+ if parsed.address_details:
+ queryparts['addressdetails'] = '1'
+ if namedetails:
+ queryparts['namedetails'] = '1'
+ if extratags:
+ queryparts['extratags'] = '1'
+ if parsed.geometry_simplification > 0.0:
+ queryparts['polygon_threshold'] = f"{parsed.geometry_simplification:.6g}"
+ if parsed.max_results != 10:
+ queryparts['limit'] = str(parsed.max_results)
+ if parsed.countries:
+ queryparts['countrycodes'] = ','.join(parsed.countries)
+ queryparts['exclude_place_ids'] = \
+ ','.join(chain(excluded, map(str, parsed.excluded)))
+ if parsed.viewbox:
+ queryparts['viewbox'] = ','.join(f"{c:.7g}" for c in parsed.viewbox.coords)
+ if parsed.bounded_viewbox:
+ queryparts['bounded'] = '1'
+ if not details['dedupe']:
+ queryparts['dedupe'] = '0'
+ if feature_type in FEATURE_TYPE_TO_RANK:
+ queryparts['featureType'] = feature_type
def deduplicate_results(results: SearchResults, max_results: int) -> SearchResults:
""" Remove results that look like duplicates.
and result.category == ('boundary', 'postal_code') \
and result.names is not None \
and result.names.get('ref') == postcode
+def _deg(axis:str) -> str:
+ return f"(?P<{axis}_deg>\\d+\\.\\d+)°?"
+def _deg_min(axis: str) -> str:
+ return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>[\\d.]+)?[′']*"
+def _deg_min_sec(axis: str) -> str:
+ return f"(?P<{axis}_deg>\\d+)[°\\s]+(?P<{axis}_min>\\d+)[′'\\s]+(?P<{axis}_sec>[\\d.]+)?[\"″]*"
+COORD_REGEX = [re.compile(r'(?:(?P<pre>.*?)\s+)??' + r + r'(?:\s+(?P<post>.*))?') for r in (
+ r"(?P<ns>[NS])\s*" + _deg('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg('lon'),
+ _deg('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg('lon') + r"\s*(?P<ew>[EW])",
+ r"(?P<ns>[NS])\s*" + _deg_min('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min('lon'),
+ _deg_min('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min('lon') + r"\s*(?P<ew>[EW])",
+ r"(?P<ns>[NS])\s*" + _deg_min_sec('lat') + r"[\s,]+" + r"(?P<ew>[EW])\s*" + _deg_min_sec('lon'),
+ _deg_min_sec('lat') + r"\s*(?P<ns>[NS])[\s,]+" + _deg_min_sec('lon') + r"\s*(?P<ew>[EW])",
+ r"\[?(?P<lat_deg>[+-]?\d+\.\d+)[\s,]+(?P<lon_deg>[+-]?\d+\.\d+)\]?"
+def extract_coords_from_query(query: str) -> Tuple[str, Optional[float], Optional[float]]:
+ """ Look for something that is formated like a coordinate at the
+ beginning or end of the query. If found, extract the coordinate and
+ return the remaining query (or the empty string if the query
+ consisted of nothing but a coordinate).
+ Only the first match will be returned.
+ """
+ for regex in COORD_REGEX:
+ match = regex.fullmatch(query)
+ if match is None:
+ continue
+ groups = match.groupdict()
+ if not groups['pre'] or not groups['post']:
+ x = float(groups['lon_deg']) \
+ + float(groups.get('lon_min', 0.0)) / 60.0 \
+ + float(groups.get('lon_sec', 0.0)) / 3600.0
+ if groups.get('ew') == 'W':
+ x = -x
+ y = float(groups['lat_deg']) \
+ + float(groups.get('lat_min', 0.0)) / 60.0 \
+ + float(groups.get('lat_sec', 0.0)) / 3600.0
+ if groups.get('ns') == 'S':
+ y = -y
+ return groups['pre'] or groups['post'] or '', x, y
+ return query, None, None
+CATEGORY_REGEX = re.compile(r'(?P<pre>.*?)\[(?P<cls>[a-zA-Z_]+)=(?P<typ>[a-zA-Z_]+)\](?P<post>.*)')
+def extract_category_from_query(query: str) -> Tuple[str, Optional[str], Optional[str]]:
+ """ Extract a hidden category specification of the form '[key=value]' from
+ the query. If found, extract key and value and
+ return the remaining query (or the empty string if the query
+ consisted of nothing but a category).
+ Only the first match will be returned.
+ """
+ match =
+ if match is not None:
+ return ('pre').strip() + ' ' +'post').strip()).strip(), \
+ return query, None, None
from typing import Optional, Any, Type, Callable, NoReturn, Dict, cast
from functools import reduce
import abc
+import dataclasses
import math
+from urllib.parse import urlencode
+from nominatim.errors import UsageError
from nominatim.config import Configuration
import nominatim.api as napi
import nominatim.api.logging as loglib
fmt = params.parse_format(napi.ReverseResults, 'xml')
debug = params.setup_debugging()
coord = napi.Point(params.get_float('lon'), params.get_float('lat'))
- locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
details = params.parse_geometry_details(fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
if debug:
return params.build_response(loglib.get_and_disable())
- fmt_options = {'extratags': params.get_bool('extratags', False),
+ if fmt == 'xml':
+ queryparts = {'lat': str(, 'lon': str(coord.lon), 'format': 'xml'}
+ zoom = params.get('zoom', None)
+ if zoom:
+ queryparts['zoom'] = zoom
+ query = urlencode(queryparts)
+ else:
+ query = ''
+ fmt_options = {'query': query,
+ 'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
if result:
- result.localize(locales)
+ result.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
output = formatting.format_result(napi.ReverseResults([result] if result else []),
fmt, fmt_options)
fmt = params.parse_format(napi.SearchResults, 'xml')
debug = params.setup_debugging()
- locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
details = params.parse_geometry_details(fmt)
places = []
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
- for result in results:
- result.localize(locales)
+ results.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
output = formatting.format_result(results, fmt, fmt_options)
return params.build_response(output)
+async def _unstructured_search(query: str, api: napi.NominatimAPIAsync,
+ details: Dict[str, Any]) -> napi.SearchResults:
+ if not query:
+ return napi.SearchResults()
+ # Extract special format for coordinates from query.
+ query, x, y = helpers.extract_coords_from_query(query)
+ if x is not None:
+ assert y is not None
+ details['near'] = napi.Point(x, y)
+ details['near_radius'] = 0.1
+ # If no query is left, revert to reverse search.
+ if x is not None and not query:
+ result = await api.reverse(details['near'], **details)
+ if not result:
+ return napi.SearchResults()
+ return napi.SearchResults(
+ [napi.SearchResult(**{ getattr(result,
+ for f in dataclasses.fields(napi.SearchResult)
+ if hasattr(result,})])
+ query, cls, typ = helpers.extract_category_from_query(query)
+ if cls is not None:
+ assert typ is not None
+ return await api.search_category([(cls, typ)], near_query=query, **details)
+ return await, **details)
+async def search_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
+ """ Server glue for /search endpoint. See API docs for details.
+ """
+ fmt = params.parse_format(napi.SearchResults, 'jsonv2')
+ debug = params.setup_debugging()
+ details = params.parse_geometry_details(fmt)
+ details['countries'] = params.get('countrycodes', None)
+ details['excluded'] = params.get('exclude_place_ids', None)
+ details['viewbox'] = params.get('viewbox', None) or params.get('viewboxlbrt', None)
+ details['bounded_viewbox'] = params.get_bool('bounded', False)
+ details['dedupe'] = params.get_bool('dedupe', True)
+ max_results = max(1, min(50, params.get_int('limit', 10)))
+ details['max_results'] = max_results + min(10, max_results) \
+ if details['dedupe'] else max_results
+ details['min_rank'], details['max_rank'] = \
+ helpers.feature_type_to_rank(params.get('featureType', ''))
+ query = params.get('q', None)
+ queryparts = {}
+ try:
+ if query is not None:
+ queryparts['q'] = query
+ results = await _unstructured_search(query, api, details)
+ else:
+ for key in ('amenity', 'street', 'city', 'county', 'state', 'postalcode', 'country'):
+ details[key] = params.get(key, None)
+ if details[key]:
+ queryparts[key] = details[key]
+ query = ', '.join(queryparts.values())
+ results = await api.search_address(**details)
+ except UsageError as err:
+ params.raise_error(str(err))
+ results.localize(napi.Locales.from_accept_languages(params.get_accepted_languages()))
+ if details['dedupe'] and len(results) > 1:
+ results = helpers.deduplicate_results(results, max_results)
+ if debug:
+ return params.build_response(loglib.get_and_disable())
+ if fmt == 'xml':
+ helpers.extend_query_parts(queryparts, details,
+ params.get('featureType', ''),
+ params.get_bool('namedetails', False),
+ params.get_bool('extratags', False),
+ (str(r.place_id) for r in results if r.place_id))
+ queryparts['format'] = fmt
+ moreurl = urlencode(queryparts)
+ else:
+ moreurl = ''
+ fmt_options = {'query': query, 'more_url': moreurl,
+ 'exclude_place_ids': queryparts.get('exclude_place_ids'),
+ 'viewbox': queryparts.get('viewbox'),
+ 'extratags': params.get_bool('extratags', False),
+ 'namedetails': params.get_bool('namedetails', False),
+ 'addressdetails': params.get_bool('addressdetails', False)}
+ output = formatting.format_result(results, fmt, fmt_options)
+ return params.build_response(output)
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
('status', status_endpoint),
('details', details_endpoint),
('reverse', reverse_endpoint),
- ('lookup', lookup_endpoint)
+ ('lookup', lookup_endpoint),
+ ('search', search_endpoint)
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+# This file is part of Nominatim. (
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+Tests for the helper functions for v1 API.
+import pytest
+import nominatim.api.v1.helpers as helper
+@pytest.mark.parametrize('inp', ['', 'abc', '12 23', 'abc -78.90, 12.456 def'])
+def test_extract_coords_no_coords(inp):
+ query, x, y = helper.extract_coords_from_query(inp)
+ assert query == inp
+ assert x is None
+ assert y is None
+def test_extract_coords_null_island():
+ assert ('', 0.0, 0.0) == helper.extract_coords_from_query('0.0 -0.0')
+def test_extract_coords_with_text_before():
+ assert ('abc', 12.456, -78.90) == helper.extract_coords_from_query('abc -78.90, 12.456')
+def test_extract_coords_with_text_after():
+ assert ('abc', 12.456, -78.90) == helper.extract_coords_from_query('-78.90, 12.456 abc')
+@pytest.mark.parametrize('inp', [' [12.456,-78.90] ', ' 12.456,-78.90 '])
+def test_extract_coords_with_spaces(inp):
+ assert ('', -78.90, 12.456) == helper.extract_coords_from_query(inp)
+@pytest.mark.parametrize('inp', ['40 26.767 N 79 58.933 W',
+ '40° 26.767′ N 79° 58.933′ W',
+ "40° 26.767' N 79° 58.933' W",
+ "40° 26.767'\n"
+ " N 79° 58.933' W",
+ 'N 40 26.767, W 79 58.933',
+ 'N 40°26.767′, W 79°58.933′',
+ ' N 40°26.767′, W 79°58.933′',
+ "N 40°26.767', W 79°58.933'",
+ '40 26 46 N 79 58 56 W',
+ '40° 26′ 46″ N 79° 58′ 56″ W',
+ '40° 26′ 46.00″ N 79° 58′ 56.00″ W',
+ '40°26′46″N 79°58′56″W',
+ 'N 40 26 46 W 79 58 56',
+ 'N 40° 26′ 46″, W 79° 58′ 56″',
+ 'N 40° 26\' 46", W 79° 58\' 56"',
+ 'N 40° 26\' 46", W 79° 58\' 56"',
+ '40.446 -79.982',
+ '40.446,-79.982',
+ '40.446° N 79.982° W',
+ 'N 40.446° W 79.982°',
+ '[40.446 -79.982]',
+ '[40.446,\v-79.982]',
+ ' 40.446 , -79.982 ',
+ ' 40.446 , -79.982 ',
+ ' 40.446 , -79.982 ',
+ ' 40.446\v, -79.982 '])
+def test_extract_coords_formats(inp):
+ query, x, y = helper.extract_coords_from_query(inp)
+ assert query == ''
+ assert pytest.approx(x, abs=0.001) == -79.982
+ assert pytest.approx(y, abs=0.001) == 40.446
+ query, x, y = helper.extract_coords_from_query('foo bar ' + inp)
+ assert query == 'foo bar'
+ assert pytest.approx(x, abs=0.001) == -79.982
+ assert pytest.approx(y, abs=0.001) == 40.446
+ query, x, y = helper.extract_coords_from_query(inp + ' x')
+ assert query == 'x'
+ assert pytest.approx(x, abs=0.001) == -79.982
+ assert pytest.approx(y, abs=0.001) == 40.446
+def test_extract_coords_formats_southeast():
+ query, x, y = helper.extract_coords_from_query('S 40 26.767, E 79 58.933')
+ assert query == ''
+ assert pytest.approx(x, abs=0.001) == 79.982
+ assert pytest.approx(y, abs=0.001) == -40.446
+@pytest.mark.parametrize('inp', ['[shop=fish] foo bar',
+ 'foo [shop=fish] bar',
+ 'foo [shop=fish]bar',
+ 'foo bar [shop=fish]'])
+def test_extract_category_good(inp):
+ query, cls, typ = helper.extract_category_from_query(inp)
+ assert query == 'foo bar'
+ assert cls == 'shop'
+ assert typ == 'fish'
+def test_extract_category_only():
+ assert helper.extract_category_from_query('[shop=market]') == ('', 'shop', 'market')
+@pytest.mark.parametrize('inp', ['house []', 'nothing', '[352]'])
+def test_extract_category_no_match(inp):
+ assert helper.extract_category_from_query(inp) == (inp, None, None)
class FakeAdaptor(glue.ASGIAdaptor):
- def __init__(self, params={}, headers={}, config=None):
- self.params = params
- self.headers = headers
+ def __init__(self, params=None, headers=None, config=None):
+ self.params = params or {}
+ self.headers = headers or {}
self._config = config or Configuration(None)
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+# reverse_endpoint()
+class TestReverseEndPoint:
+ @pytest.fixture(autouse=True)
+ def patch_reverse_func(self, monkeypatch):
+ self.result = napi.ReverseResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0))
+ async def _reverse(*args, **kwargs):
+ return self.result
+ monkeypatch.setattr(napi.NominatimAPIAsync, 'reverse', _reverse)
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize('params', [{}, {'lat': '3.4'}, {'lon': '6.7'}])
+ async def test_reverse_no_params(self, params):
+ a = FakeAdaptor()
+ a.params = params
+ a.params['format'] = 'xml'
+ with pytest.raises(FakeError, match='^400 -- (?s:.*)missing'):
+ await glue.reverse_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize('params', [{'lat': '45.6', 'lon': '4563'}])
+ async def test_reverse_success(self, params):
+ a = FakeAdaptor()
+ a.params = params
+ a.params['format'] = 'json'
+ res = await glue.reverse_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert res == ''
+ @pytest.mark.asyncio
+ async def test_reverse_success(self):
+ a = FakeAdaptor()
+ a.params['lat'] = '56.3'
+ a.params['lon'] = '6.8'
+ assert await glue.reverse_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ @pytest.mark.asyncio
+ async def test_reverse_from_search(self):
+ a = FakeAdaptor()
+ a.params['q'] = '34.6 2.56'
+ a.params['format'] = 'json'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert len(json.loads(res.output)) == 1
# lookup_endpoint()
class TestLookupEndpoint:
res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert len(json.loads(res.output)) == 1
+# search_endpoint()
+class TestSearchEndPointSearch:
+ @pytest.fixture(autouse=True)
+ def patch_lookup_func(self, monkeypatch):
+ self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0))]
+ async def _search(*args, **kwargs):
+ return napi.SearchResults(self.results)
+ monkeypatch.setattr(napi.NominatimAPIAsync, 'search', _search)
+ @pytest.mark.asyncio
+ async def test_search_free_text(self):
+ a = FakeAdaptor()
+ a.params['q'] = 'something'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert len(json.loads(res.output)) == 1
+ @pytest.mark.asyncio
+ async def test_search_free_text_xml(self):
+ a = FakeAdaptor()
+ a.params['q'] = 'something'
+ a.params['format'] = 'xml'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert res.status == 200
+ assert res.output.index('something') > 0
+ @pytest.mark.asyncio
+ async def test_search_free_and_structured(self):
+ a = FakeAdaptor()
+ a.params['q'] = 'something'
+ a.params['city'] = 'ignored'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert len(json.loads(res.output)) == 1
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize('dedupe,numres', [(True, 1), (False, 2)])
+ async def test_search_dedupe(self, dedupe, numres):
+ self.results = self.results * 2
+ a = FakeAdaptor()
+ a.params['q'] = 'something'
+ if not dedupe:
+ a.params['dedupe'] = '0'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert len(json.loads(res.output)) == numres
+class TestSearchEndPointSearchAddress:
+ @pytest.fixture(autouse=True)
+ def patch_lookup_func(self, monkeypatch):
+ self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0))]
+ async def _search(*args, **kwargs):
+ return napi.SearchResults(self.results)
+ monkeypatch.setattr(napi.NominatimAPIAsync, 'search_address', _search)
+ @pytest.mark.asyncio
+ async def test_search_structured(self):
+ a = FakeAdaptor()
+ a.params['street'] = 'something'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert len(json.loads(res.output)) == 1
+class TestSearchEndPointSearchCategory:
+ @pytest.fixture(autouse=True)
+ def patch_lookup_func(self, monkeypatch):
+ self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0))]
+ async def _search(*args, **kwargs):
+ return napi.SearchResults(self.results)
+ monkeypatch.setattr(napi.NominatimAPIAsync, 'search_category', _search)
+ @pytest.mark.asyncio
+ async def test_search_category(self):
+ a = FakeAdaptor()
+ a.params['q'] = '[shop=fog]'
+ res = await glue.search_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ assert len(json.loads(res.output)) == 1