Generic part of the server implementation of the v1 API.
Combine with the scaffolding provided for the various Python ASGI frameworks.
"""
-from typing import Optional, Any, Type, Callable
+from typing import Optional, Any, Type, Callable, NoReturn
import abc
from nominatim.config import Configuration
CONTENT_TYPE = {
'text': 'text/plain; charset=utf-8',
'xml': 'text/xml; charset=utf-8',
- 'jsonp': 'application/javascript',
'debug': 'text/html; charset=utf-8'
}
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
"""
+ content_type: str = 'text/plain; charset=utf-8'
@abc.abstractmethod
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
@abc.abstractmethod
- def create_response(self, status: int, output: str, content_type: str) -> Any:
+ def create_response(self, status: int, output: str) -> Any:
""" Create a response from the given parameters. The result will
be returned by the endpoint functions. The adaptor may also
return None when the response is created internally with some
"""
- def build_response(self, output: str, media_type: str, status: int = 200) -> Any:
+ def build_response(self, output: str, status: int = 200) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
- if media_type == 'json' and status == 200:
+ if self.content_type == 'application/json' and status == 200:
jsonp = self.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
- raise self.error('Invalid json_callback value')
+ self.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
- media_type = 'jsonp'
+ self.content_type = 'application/javascript'
- return self.create_response(status, output,
- CONTENT_TYPE.get(media_type, 'application/json'))
+ return self.create_response(status, output)
+
+
+ def raise_error(self, msg: str, status: int = 400) -> NoReturn:
+ """ Raise an exception resulting in the given HTTP status and
+ message. The message will be formatted according to the
+ output format chosen by the request.
+ """
+ if self.content_type == 'text/xml; charset=utf-8':
+ msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
+ <error>
+ <code>{status}</code>
+ <message>{msg}</message>
+ </error>
+ """
+ elif self.content_type == 'application/json':
+ msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
+ elif self.content_type == 'text/html; charset=utf-8':
+ loglib.log().section('Execution error')
+ loglib.log().var_dump('Status', status)
+ loglib.log().var_dump('Message', msg)
+ msg = loglib.get_and_disable()
+
+ raise self.error(msg, status)
def get_int(self, name: str, default: Optional[int] = None) -> int:
if default is not None:
return default
- raise self.error(f"Parameter '{name}' missing.")
+ self.raise_error(f"Parameter '{name}' missing.")
try:
- return int(value)
- except ValueError as exc:
- raise self.error(f"Parameter '{name}' must be a number.") from exc
+ intval = int(value)
+ except ValueError:
+ self.raise_error(f"Parameter '{name}' must be a number.")
+ return intval
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
""" Return an input parameter as bool. Only '0' is accepted as
if default is not None:
return default
- raise self.error(f"Parameter '{name}' missing.")
+ self.raise_error(f"Parameter '{name}' missing.")
return value != '0'
def get_accepted_languages(self) -> str:
- """ Return the accepted langauges.
+ """ Return the accepted languages.
"""
return self.get('accept-language')\
or self.get_header('http_accept_language')\
"""
if self.get_bool('debug', False):
loglib.set_log_output('html')
+ self.content_type = 'text/html; charset=utf-8'
return True
return False
-def parse_format(params: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
- """ Get and check the 'format' parameter and prepare the formatter.
- `fmtter` is a formatter and `default` the
- format value to assume when no parameter is present.
- """
- fmt = params.get('format', default=default)
- assert fmt is not None
+ def parse_format(self, result_type: Type[Any], default: str) -> str:
+ """ Get and check the 'format' parameter and prepare the formatter.
+ `result_type` is the type of result to be returned by the function
+ and `default` the format value to assume when no parameter is present.
+ """
+ fmt = self.get('format', default=default)
+ assert fmt is not None
- if not formatting.supports_format(result_type, fmt):
- raise params.error("Parameter 'format' must be one of: " +
- ', '.join(formatting.list_formats(result_type)))
+ if not formatting.supports_format(result_type, fmt):
+ self.raise_error("Parameter 'format' must be one of: " +
+ ', '.join(formatting.list_formats(result_type)))
- return fmt
+ self.content_type = CONTENT_TYPE.get(fmt, 'application/json')
+ return fmt
async def status_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
"""
result = await api.status()
- fmt = parse_format(params, napi.StatusResult, 'text')
+ fmt = params.parse_format(napi.StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
- return params.build_response(formatting.format_result(result, fmt, {}), fmt,
+ return params.build_response(formatting.format_result(result, fmt, {}),
status=status_code)
async def details_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
+ fmt = params.parse_format(napi.DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: napi.PlaceRef
if place_id:
else:
osmtype = params.get('osmtype')
if osmtype is None:
- raise params.error("Missing ID parameter 'place_id' or 'osmtype'.")
+ params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = napi.OsmID(osmtype, params.get_int('osmid'), params.get('class'))
debug = params.setup_debugging()
result = await api.lookup(place, details)
if debug:
- return params.build_response(loglib.get_and_disable(), 'debug')
+ return params.build_response(loglib.get_and_disable())
if result is None:
- raise params.error('No place with that OSM ID found.', status=404)
+ params.raise_error('No place with that OSM ID found.', status=404)
- output = formatting.format_result(
- result, 'json',
+ output = formatting.format_result(result, fmt,
{'locales': locales,
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
- return params.build_response(output, 'json')
+ return params.build_response(output)
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
--- /dev/null
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2023 by the Nominatim developer community.
+# For a full list of authors see the git log.
+"""
+Tests for the Python web frameworks adaptor, v1 API.
+"""
+from collections import namedtuple
+import json
+import xml.etree.ElementTree as ET
+from pathlib import Path
+
+import pytest
+
+from nominatim.config import Configuration
+import nominatim.api.v1.server_glue as glue
+import nominatim.api as napi
+import nominatim.api.logging as loglib
+
+class FakeError(BaseException):
+
+ def __init__(self, msg, status):
+ self.msg = msg
+ self.status = status
+
+ def __str__(self):
+ return f'{self.status} -- {self.msg}'
+
+FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
+
+class FakeAdaptor(glue.ASGIAdaptor):
+
+ def __init__(self, params={}, headers={}, config=None):
+ self.params = params
+ self.headers = headers
+ self._config = config or Configuration(None)
+
+
+ def get(self, name, default=None):
+ return self.params.get(name, default)
+
+
+ def get_header(self, name, default=None):
+ return self.headers.get(name, default)
+
+
+ def error(self, msg, status=400):
+ return FakeError(msg, status)
+
+
+ def create_response(self, status, output):
+ return FakeResponse(status, output, self.content_type)
+
+
+ def config(self):
+ return self._config
+
+
+# ASGIAdaptor.get_int/bool()
+
+@pytest.mark.parametrize('func', ['get_int', 'get_bool'])
+def test_adaptor_get_int_missing_but_required(func):
+ with pytest.raises(FakeError, match='^400 -- .*missing'):
+ getattr(FakeAdaptor(), func)('something')
+
+
+@pytest.mark.parametrize('func, val', [('get_int', 23), ('get_bool', True)])
+def test_adaptor_get_int_missing_with_default(func, val):
+ assert getattr(FakeAdaptor(), func)('something', val) == val
+
+
+@pytest.mark.parametrize('inp', ['0', '234', '-4566953498567934876'])
+def test_adaptor_get_int_success(inp):
+ assert FakeAdaptor(params={'foo': inp}).get_int('foo') == int(inp)
+ assert FakeAdaptor(params={'foo': inp}).get_int('foo', 4) == int(inp)
+
+
+@pytest.mark.parametrize('inp', ['rs', '4.5', '6f'])
+def test_adaptor_get_int_bad_number(inp):
+ with pytest.raises(FakeError, match='^400 -- .*must be a number'):
+ FakeAdaptor(params={'foo': inp}).get_int('foo')
+
+
+@pytest.mark.parametrize('inp', ['1', 'true', 'whatever', 'false'])
+def test_adaptor_get_bool_trueish(inp):
+ assert FakeAdaptor(params={'foo': inp}).get_bool('foo')
+
+
+def test_adaptor_get_bool_falsish():
+ assert not FakeAdaptor(params={'foo': '0'}).get_bool('foo')
+
+
+# ASGIAdaptor.parse_format()
+
+def test_adaptor_parse_format_use_default():
+ adaptor = FakeAdaptor()
+
+ assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
+ assert adaptor.content_type == 'text/plain; charset=utf-8'
+
+
+def test_adaptor_parse_format_use_configured():
+ adaptor = FakeAdaptor(params={'format': 'json'})
+
+ assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
+ assert adaptor.content_type == 'application/json'
+
+
+def test_adaptor_parse_format_invalid_value():
+ adaptor = FakeAdaptor(params={'format': '@!#'})
+
+ with pytest.raises(FakeError, match='^400 -- .*must be one of'):
+ adaptor.parse_format(napi.StatusResult, 'text')
+
+
+# ASGIAdaptor.get_accepted_languages()
+
+def test_accepted_languages_from_param():
+ a = FakeAdaptor(params={'accept-language': 'de'})
+ assert a.get_accepted_languages() == 'de'
+
+
+def test_accepted_languages_from_header():
+ a = FakeAdaptor(headers={'http_accept_language': 'de'})
+ assert a.get_accepted_languages() == 'de'
+
+
+def test_accepted_languages_from_default(monkeypatch):
+ monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
+ a = FakeAdaptor()
+ assert a.get_accepted_languages() == 'de'
+
+
+def test_accepted_languages_param_over_header():
+ a = FakeAdaptor(params={'accept-language': 'de'},
+ headers={'http_accept_language': 'en'})
+ assert a.get_accepted_languages() == 'de'
+
+
+def test_accepted_languages_header_over_default(monkeypatch):
+ monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
+ a = FakeAdaptor(headers={'http_accept_language': 'de'})
+ assert a.get_accepted_languages() == 'de'
+
+
+# ASGIAdaptor.raise_error()
+
+class TestAdaptorRaiseError:
+
+ @pytest.fixture(autouse=True)
+ def init_adaptor(self):
+ self.adaptor = FakeAdaptor()
+ self.adaptor.setup_debugging()
+
+ def run_raise_error(self, msg, status):
+ with pytest.raises(FakeError) as excinfo:
+ self.adaptor.raise_error(msg, status=status)
+
+ return excinfo.value
+
+
+ def test_without_content_set(self):
+ err = self.run_raise_error('TEST', 404)
+
+ assert self.adaptor.content_type == 'text/plain; charset=utf-8'
+ assert err.msg == 'TEST'
+ assert err.status == 404
+
+
+ def test_json(self):
+ self.adaptor.content_type = 'application/json'
+
+ err = self.run_raise_error('TEST', 501)
+
+ content = json.loads(err.msg)['error']
+ assert content['code'] == 501
+ assert content['message'] == 'TEST'
+
+
+ def test_xml(self):
+ self.adaptor.content_type = 'text/xml; charset=utf-8'
+
+ err = self.run_raise_error('this!', 503)
+
+ content = ET.fromstring(err.msg)
+
+ assert content.tag == 'error'
+ assert content.find('code').text == '503'
+ assert content.find('message').text == 'this!'
+
+
+def test_raise_error_during_debug():
+ a = FakeAdaptor(params={'debug': '1'})
+ a.setup_debugging()
+ loglib.log().section('Ongoing')
+
+ with pytest.raises(FakeError) as excinfo:
+ a.raise_error('bad state')
+
+ content = ET.fromstring(excinfo.value.msg)
+
+ assert content.tag == 'html'
+
+ assert '>Ongoing<' in excinfo.value.msg
+ assert 'bad state' in excinfo.value.msg
+
+
+# ASGIAdaptor.build_response
+
+def test_build_response_without_content_type():
+ resp = FakeAdaptor().build_response('attention')
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 200
+ assert resp.output == 'attention'
+ assert resp.content_type == 'text/plain; charset=utf-8'
+
+
+def test_build_response_with_status():
+ a = FakeAdaptor(params={'format': 'json'})
+ a.parse_format(napi.StatusResult, 'text')
+
+ resp = a.build_response('stuff\nmore stuff', status=404)
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 404
+ assert resp.output == 'stuff\nmore stuff'
+ assert resp.content_type == 'application/json'
+
+
+def test_build_response_jsonp_with_json():
+ a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
+ a.parse_format(napi.StatusResult, 'text')
+
+ resp = a.build_response('{}')
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 200
+ assert resp.output == 'test.func({})'
+ assert resp.content_type == 'application/javascript'
+
+
+def test_build_response_jsonp_without_json():
+ a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
+ a.parse_format(napi.StatusResult, 'text')
+
+ resp = a.build_response('{}')
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 200
+ assert resp.output == '{}'
+ assert resp.content_type == 'text/plain; charset=utf-8'
+
+
+@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
+def test_build_response_jsonp_bad_format(param):
+ a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
+ a.parse_format(napi.StatusResult, 'text')
+
+ with pytest.raises(FakeError, match='^400 -- .*Invalid'):
+ a.build_response('{}')
+
+
+# status_endpoint()
+
+class TestStatusEndpoint:
+
+ @pytest.fixture(autouse=True)
+ def patch_status_func(self, monkeypatch):
+ async def _status(*args, **kwargs):
+ return self.status
+
+ monkeypatch.setattr(napi.NominatimAPIAsync, 'status', _status)
+
+
+ @pytest.mark.asyncio
+ async def test_status_without_params(self):
+ a = FakeAdaptor()
+ self.status = napi.StatusResult(0, 'foo')
+
+ resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 200
+ assert resp.content_type == 'text/plain; charset=utf-8'
+
+
+ @pytest.mark.asyncio
+ async def test_status_with_error(self):
+ a = FakeAdaptor()
+ self.status = napi.StatusResult(405, 'foo')
+
+ resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 500
+ assert resp.content_type == 'text/plain; charset=utf-8'
+
+
+ @pytest.mark.asyncio
+ async def test_status_json_with_error(self):
+ a = FakeAdaptor(params={'format': 'json'})
+ self.status = napi.StatusResult(405, 'foo')
+
+ resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+ assert isinstance(resp, FakeResponse)
+ assert resp.status == 200
+ assert resp.content_type == 'application/json'
+
+
+ @pytest.mark.asyncio
+ async def test_status_bad_format(self):
+ a = FakeAdaptor(params={'format': 'foo'})
+ self.status = napi.StatusResult(0, 'foo')
+
+ with pytest.raises(FakeError):
+ await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+
+# details_endpoint()
+
+class TestDetailsEndpoint:
+
+ @pytest.fixture(autouse=True)
+ def patch_lookup_func(self, monkeypatch):
+ self.result = napi.DetailedResult(napi.SourceTable.PLACEX,
+ ('place', 'thing'),
+ napi.Point(1.0, 2.0))
+ self.lookup_args = []
+
+ async def _lookup(*args, **kwargs):
+ self.lookup_args.extend(args[1:])
+ return self.result
+
+ monkeypatch.setattr(napi.NominatimAPIAsync, 'lookup', _lookup)
+
+
+ @pytest.mark.asyncio
+ async def test_details_no_params(self):
+ a = FakeAdaptor()
+
+ with pytest.raises(FakeError, match='^400 -- .*Missing'):
+ await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+
+ @pytest.mark.asyncio
+ async def test_details_by_place_id(self):
+ a = FakeAdaptor(params={'place_id': '4573'})
+
+ await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+ assert self.lookup_args[0].place_id == 4573
+
+
+ @pytest.mark.asyncio
+ async def test_details_by_osm_id(self):
+ a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45'})
+
+ await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+
+ assert self.lookup_args[0].osm_type == 'N'
+ assert self.lookup_args[0].osm_id == 45
+ assert self.lookup_args[0].osm_class is None
+
+
+ @pytest.mark.asyncio
+ async def test_details_with_debugging(self):
+ a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45', 'debug': '1'})
+
+ resp = await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
+ content = ET.fromstring(resp.output)
+
+ assert resp.content_type == 'text/html; charset=utf-8'
+ assert content.tag == 'html'
+
+
+ @pytest.mark.asyncio
+ async def test_details_no_result(self):
+ a = FakeAdaptor(params={'place_id': '4573'})
+ self.result = None
+
+ with pytest.raises(FakeError, match='^404 -- .*found'):
+ await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)