1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Tests for the Python web frameworks adaptor, v1 API.
10 from collections import namedtuple
12 import xml.etree.ElementTree as ET
13 from pathlib import Path
17 from nominatim.config import Configuration
18 import nominatim.api.v1.server_glue as glue
19 import nominatim.api as napi
20 import nominatim.api.logging as loglib
22 class FakeError(BaseException):
24 def __init__(self, msg, status):
29 return f'{self.status} -- {self.msg}'
31 FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
33 class FakeAdaptor(glue.ASGIAdaptor):
35 def __init__(self, params={}, headers={}, config=None):
37 self.headers = headers
38 self._config = config or Configuration(None)
41 def get(self, name, default=None):
42 return self.params.get(name, default)
45 def get_header(self, name, default=None):
46 return self.headers.get(name, default)
49 def error(self, msg, status=400):
50 return FakeError(msg, status)
53 def create_response(self, status, output):
54 return FakeResponse(status, output, self.content_type)
61 # ASGIAdaptor.get_int/bool()
63 @pytest.mark.parametrize('func', ['get_int', 'get_bool'])
64 def test_adaptor_get_int_missing_but_required(func):
65 with pytest.raises(FakeError, match='^400 -- .*missing'):
66 getattr(FakeAdaptor(), func)('something')
69 @pytest.mark.parametrize('func, val', [('get_int', 23), ('get_bool', True)])
70 def test_adaptor_get_int_missing_with_default(func, val):
71 assert getattr(FakeAdaptor(), func)('something', val) == val
74 @pytest.mark.parametrize('inp', ['0', '234', '-4566953498567934876'])
75 def test_adaptor_get_int_success(inp):
76 assert FakeAdaptor(params={'foo': inp}).get_int('foo') == int(inp)
77 assert FakeAdaptor(params={'foo': inp}).get_int('foo', 4) == int(inp)
80 @pytest.mark.parametrize('inp', ['rs', '4.5', '6f'])
81 def test_adaptor_get_int_bad_number(inp):
82 with pytest.raises(FakeError, match='^400 -- .*must be a number'):
83 FakeAdaptor(params={'foo': inp}).get_int('foo')
86 @pytest.mark.parametrize('inp', ['1', 'true', 'whatever', 'false'])
87 def test_adaptor_get_bool_trueish(inp):
88 assert FakeAdaptor(params={'foo': inp}).get_bool('foo')
91 def test_adaptor_get_bool_falsish():
92 assert not FakeAdaptor(params={'foo': '0'}).get_bool('foo')
95 # ASGIAdaptor.parse_format()
97 def test_adaptor_parse_format_use_default():
98 adaptor = FakeAdaptor()
100 assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
101 assert adaptor.content_type == 'text/plain; charset=utf-8'
104 def test_adaptor_parse_format_use_configured():
105 adaptor = FakeAdaptor(params={'format': 'json'})
107 assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
108 assert adaptor.content_type == 'application/json'
111 def test_adaptor_parse_format_invalid_value():
112 adaptor = FakeAdaptor(params={'format': '@!#'})
114 with pytest.raises(FakeError, match='^400 -- .*must be one of'):
115 adaptor.parse_format(napi.StatusResult, 'text')
118 # ASGIAdaptor.get_accepted_languages()
120 def test_accepted_languages_from_param():
121 a = FakeAdaptor(params={'accept-language': 'de'})
122 assert a.get_accepted_languages() == 'de'
125 def test_accepted_languages_from_header():
126 a = FakeAdaptor(headers={'http_accept_language': 'de'})
127 assert a.get_accepted_languages() == 'de'
130 def test_accepted_languages_from_default(monkeypatch):
131 monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
133 assert a.get_accepted_languages() == 'de'
136 def test_accepted_languages_param_over_header():
137 a = FakeAdaptor(params={'accept-language': 'de'},
138 headers={'http_accept_language': 'en'})
139 assert a.get_accepted_languages() == 'de'
142 def test_accepted_languages_header_over_default(monkeypatch):
143 monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
144 a = FakeAdaptor(headers={'http_accept_language': 'de'})
145 assert a.get_accepted_languages() == 'de'
148 # ASGIAdaptor.raise_error()
150 class TestAdaptorRaiseError:
152 @pytest.fixture(autouse=True)
153 def init_adaptor(self):
154 self.adaptor = FakeAdaptor()
155 self.adaptor.setup_debugging()
157 def run_raise_error(self, msg, status):
158 with pytest.raises(FakeError) as excinfo:
159 self.adaptor.raise_error(msg, status=status)
164 def test_without_content_set(self):
165 err = self.run_raise_error('TEST', 404)
167 assert self.adaptor.content_type == 'text/plain; charset=utf-8'
168 assert err.msg == 'TEST'
169 assert err.status == 404
173 self.adaptor.content_type = 'application/json'
175 err = self.run_raise_error('TEST', 501)
177 content = json.loads(err.msg)['error']
178 assert content['code'] == 501
179 assert content['message'] == 'TEST'
183 self.adaptor.content_type = 'text/xml; charset=utf-8'
185 err = self.run_raise_error('this!', 503)
187 content = ET.fromstring(err.msg)
189 assert content.tag == 'error'
190 assert content.find('code').text == '503'
191 assert content.find('message').text == 'this!'
194 def test_raise_error_during_debug():
195 a = FakeAdaptor(params={'debug': '1'})
197 loglib.log().section('Ongoing')
199 with pytest.raises(FakeError) as excinfo:
200 a.raise_error('bad state')
202 content = ET.fromstring(excinfo.value.msg)
204 assert content.tag == 'html'
206 assert '>Ongoing<' in excinfo.value.msg
207 assert 'bad state' in excinfo.value.msg
210 # ASGIAdaptor.build_response
212 def test_build_response_without_content_type():
213 resp = FakeAdaptor().build_response('attention')
215 assert isinstance(resp, FakeResponse)
216 assert resp.status == 200
217 assert resp.output == 'attention'
218 assert resp.content_type == 'text/plain; charset=utf-8'
221 def test_build_response_with_status():
222 a = FakeAdaptor(params={'format': 'json'})
223 a.parse_format(napi.StatusResult, 'text')
225 resp = a.build_response('stuff\nmore stuff', status=404)
227 assert isinstance(resp, FakeResponse)
228 assert resp.status == 404
229 assert resp.output == 'stuff\nmore stuff'
230 assert resp.content_type == 'application/json'
233 def test_build_response_jsonp_with_json():
234 a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
235 a.parse_format(napi.StatusResult, 'text')
237 resp = a.build_response('{}')
239 assert isinstance(resp, FakeResponse)
240 assert resp.status == 200
241 assert resp.output == 'test.func({})'
242 assert resp.content_type == 'application/javascript'
245 def test_build_response_jsonp_without_json():
246 a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
247 a.parse_format(napi.StatusResult, 'text')
249 resp = a.build_response('{}')
251 assert isinstance(resp, FakeResponse)
252 assert resp.status == 200
253 assert resp.output == '{}'
254 assert resp.content_type == 'text/plain; charset=utf-8'
257 @pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
258 def test_build_response_jsonp_bad_format(param):
259 a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
260 a.parse_format(napi.StatusResult, 'text')
262 with pytest.raises(FakeError, match='^400 -- .*Invalid'):
263 a.build_response('{}')
268 class TestStatusEndpoint:
270 @pytest.fixture(autouse=True)
271 def patch_status_func(self, monkeypatch):
272 async def _status(*args, **kwargs):
275 monkeypatch.setattr(napi.NominatimAPIAsync, 'status', _status)
279 async def test_status_without_params(self):
281 self.status = napi.StatusResult(0, 'foo')
283 resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
285 assert isinstance(resp, FakeResponse)
286 assert resp.status == 200
287 assert resp.content_type == 'text/plain; charset=utf-8'
291 async def test_status_with_error(self):
293 self.status = napi.StatusResult(405, 'foo')
295 resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
297 assert isinstance(resp, FakeResponse)
298 assert resp.status == 500
299 assert resp.content_type == 'text/plain; charset=utf-8'
303 async def test_status_json_with_error(self):
304 a = FakeAdaptor(params={'format': 'json'})
305 self.status = napi.StatusResult(405, 'foo')
307 resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
309 assert isinstance(resp, FakeResponse)
310 assert resp.status == 200
311 assert resp.content_type == 'application/json'
315 async def test_status_bad_format(self):
316 a = FakeAdaptor(params={'format': 'foo'})
317 self.status = napi.StatusResult(0, 'foo')
319 with pytest.raises(FakeError):
320 await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
325 class TestDetailsEndpoint:
327 @pytest.fixture(autouse=True)
328 def patch_lookup_func(self, monkeypatch):
329 self.result = napi.DetailedResult(napi.SourceTable.PLACEX,
331 napi.Point(1.0, 2.0))
332 self.lookup_args = []
334 async def _lookup(*args, **kwargs):
335 self.lookup_args.extend(args[1:])
338 monkeypatch.setattr(napi.NominatimAPIAsync, 'details', _lookup)
342 async def test_details_no_params(self):
345 with pytest.raises(FakeError, match='^400 -- .*Missing'):
346 await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
350 async def test_details_by_place_id(self):
351 a = FakeAdaptor(params={'place_id': '4573'})
353 await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
355 assert self.lookup_args[0].place_id == 4573
359 async def test_details_by_osm_id(self):
360 a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45'})
362 await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
364 assert self.lookup_args[0].osm_type == 'N'
365 assert self.lookup_args[0].osm_id == 45
366 assert self.lookup_args[0].osm_class is None
370 async def test_details_with_debugging(self):
371 a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45', 'debug': '1'})
373 resp = await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
374 content = ET.fromstring(resp.output)
376 assert resp.content_type == 'text/html; charset=utf-8'
377 assert content.tag == 'html'
381 async def test_details_no_result(self):
382 a = FakeAdaptor(params={'place_id': '4573'})
385 with pytest.raises(FakeError, match='^404 -- .*found'):
386 await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
391 class TestLookupEndpoint:
393 @pytest.fixture(autouse=True)
394 def patch_lookup_func(self, monkeypatch):
395 self.results = [napi.SearchResult(napi.SourceTable.PLACEX,
397 napi.Point(1.0, 2.0))]
398 async def _lookup(*args, **kwargs):
399 return napi.SearchResults(self.results)
401 monkeypatch.setattr(napi.NominatimAPIAsync, 'lookup', _lookup)
405 async def test_lookup_no_params(self):
407 a.params['format'] = 'json'
409 res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
411 assert res.output == '[]'
415 @pytest.mark.parametrize('param', ['w', 'bad', ''])
416 async def test_lookup_bad_params(self, param):
418 a.params['format'] = 'json'
419 a.params['osm_ids'] = f'W34,{param},N33333'
421 res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
423 assert len(json.loads(res.output)) == 1
427 @pytest.mark.parametrize('param', ['p234234', '4563'])
428 async def test_lookup_bad_osm_type(self, param):
430 a.params['format'] = 'json'
431 a.params['osm_ids'] = f'W34,{param},N33333'
433 res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
435 assert len(json.loads(res.output)) == 1
439 async def test_lookup_working(self):
441 a.params['format'] = 'json'
442 a.params['osm_ids'] = 'N23,W34'
444 res = await glue.lookup_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
446 assert len(json.loads(res.output)) == 1