X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/da20881096fb4f38ed0226279f4ce1cf6f95e94f..d17ec56e5462dd00cefcbd9537195f9b67ee11eb:/test/bdd/steps/http_responses.py?ds=sidebyside diff --git a/test/bdd/steps/http_responses.py b/test/bdd/steps/http_responses.py index 161e29fd..dcce1b58 100644 --- a/test/bdd/steps/http_responses.py +++ b/test/bdd/steps/http_responses.py @@ -1,13 +1,22 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2023 by the Nominatim developer community. +# For a full list of authors see the git log. """ Classes wrapping HTTP responses from the Nominatim API. """ -from collections import OrderedDict import re import json import xml.etree.ElementTree as ET from check_functions import Almost +OSM_TYPE = {'N' : 'node', 'W' : 'way', 'R' : 'relation', + 'n' : 'node', 'w' : 'way', 'r' : 'relation', + 'node' : 'n', 'way' : 'w', 'relation' : 'r'} + def _geojson_result_to_json_result(geojson_result): result = geojson_result['properties'] result['geojson'] = geojson_result['geometry'] @@ -50,7 +59,7 @@ class GenericResponse: self.result = [] self.header = dict() - if errorcode == 200: + if errorcode == 200 and fmt != 'debug': getattr(self, '_parse_' + fmt)() def _parse_json(self): @@ -60,15 +69,16 @@ class GenericResponse: else: code = m.group(2) self.header['json_func'] = m.group(1) - self.result = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code) - if isinstance(self.result, OrderedDict): - self.result = [self.result] + self.result = json.JSONDecoder().decode(code) + if isinstance(self.result, dict): + if 'error' in self.result: + self.result = [] + else: + self.result = [self.result] def _parse_geojson(self): self._parse_json() - if 'error' in self.result[0]: - self.result = [] - else: + if self.result: self.result = list(map(_geojson_result_to_json_result, self.result[0]['features'])) def _parse_geocodejson(self): @@ -91,11 +101,57 @@ class GenericResponse: elif value.startswith("^"): assert re.fullmatch(value, self.result[idx][field]), \ BadRowValueAssert(self, idx, field, value) + elif isinstance(self.result[idx][field], dict): + assert self.result[idx][field] == eval('{' + value + '}'), \ + BadRowValueAssert(self, idx, field, value) else: assert str(self.result[idx][field]) == str(value), \ BadRowValueAssert(self, idx, field, value) - def match_row(self, row): + + def assert_subfield(self, idx, path, value): + assert path + + field = self.result[idx] + for p in path: + assert isinstance(field, dict) + assert p in field + field = field[p] + + if isinstance(value, float): + assert Almost(value) == float(field) + elif value.startswith("^"): + assert re.fullmatch(value, field) + elif isinstance(field, dict): + assert field, eval('{' + value + '}') + else: + assert str(field) == str(value) + + + def assert_address_field(self, idx, field, value): + """ Check that result rows`idx` has a field `field` with value `value` + in its address. If idx is None, then all results are checked. + """ + if idx is None: + todo = range(len(self.result)) + else: + todo = [int(idx)] + + for idx in todo: + assert 'address' in self.result[idx], \ + "Result row {} has no field 'address'.\nFull row: {}"\ + .format(idx, json.dumps(self.result[idx], indent=4)) + + address = self.result[idx]['address'] + assert field in address, \ + "Result row {} has no field '{}' in address.\nFull address: {}"\ + .format(idx, field, json.dumps(address, indent=4)) + + assert address[field] == value, \ + "\nBad value for row {} field '{}' in address. Expected: {}, got: {}.\nFull address: {}"""\ + .format(idx, field, value, address[field], json.dumps(address, indent=4)) + + def match_row(self, row, context=None): """ Match the result fields against the given behave table row. """ if 'ID' in row.headings: @@ -108,12 +164,26 @@ class GenericResponse: if name == 'ID': pass elif name == 'osm': - self.assert_field(i, 'osm_type', value[0]) + assert 'osm_type' in self.result[i], \ + "Result row {} has no field 'osm_type'.\nFull row: {}"\ + .format(i, json.dumps(self.result[i], indent=4)) + assert self.result[i]['osm_type'] in (OSM_TYPE[value[0]], value[0]), \ + BadRowValueAssert(self, i, 'osm_type', value) self.assert_field(i, 'osm_id', value[1:]) + elif name == 'osm_type': + assert self.result[i]['osm_type'] in (OSM_TYPE[value[0]], value[0]), \ + BadRowValueAssert(self, i, 'osm_type', value) elif name == 'centroid': - lon, lat = value.split(' ') + if ' ' in value: + lon, lat = value.split(' ') + elif context is not None: + lon, lat = context.osm.grid_node(int(value)) + else: + raise RuntimeError("Context needed when using grid coordinates") self.assert_field(i, 'lat', float(lat)) self.assert_field(i, 'lon', float(lon)) + elif '+' in name: + self.assert_subfield(i, name.split('+'), value) else: self.assert_field(i, name, value)