X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/7219ee653276359d4485ff8b89e1841e42af71c5..556bb2386d5882e4e6aa8bdbcc2661f12d66b726:/test/bdd/steps/steps_api_queries.py diff --git a/test/bdd/steps/steps_api_queries.py b/test/bdd/steps/steps_api_queries.py index 7bf38d14..b15e296f 100644 --- a/test/bdd/steps/steps_api_queries.py +++ b/test/bdd/steps/steps_api_queries.py @@ -19,7 +19,7 @@ from urllib.parse import urlencode from utils import run_script from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse -from check_functions import Bbox +from check_functions import Bbox, check_for_attributes from table_compare import NominatimID LOG = logging.getLogger(__name__) @@ -48,6 +48,15 @@ BASE_SERVER_ENV = { } +def make_todo_list(context, result_id): + if result_id is None: + context.execute_steps("then at least 1 result is returned") + return range(len(context.response.result)) + + context.execute_steps(f"then more than {result_id}results are returned") + return (int(result_id.strip()), ) + + def compare(operator, op1, op2): if operator == 'less than': return op1 < op2 @@ -79,7 +88,8 @@ def send_api_query(endpoint, params, fmt, context): return asyncio.run(context.nominatim.api_engine(endpoint, params, Path(context.nominatim.website_dir.name), - context.nominatim.test_env)) + context.nominatim.test_env, + getattr(context, 'http_headers', {}))) @@ -247,17 +257,12 @@ def check_header_attr(context): line['attr'], line['value'], context.response.header[line['attr']]) + @then(u'result header has (?Pnot )?attributes (?P.*)') def check_header_no_attr(context, neg, attrs): - for attr in attrs.split(','): - if neg: - assert attr not in context.response.header, \ - "Unexpected attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.header, sort_keys=True, indent=2)) - else: - assert attr in context.response.header, \ - "No attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.header, sort_keys=True, indent=2)) + check_for_attributes(context.response.header, attrs, + 'absent' if neg else 'present') + @then(u'results contain') def step_impl(context): @@ -266,25 +271,13 @@ def step_impl(context): for line in context.table: context.response.match_row(line, context=context) + @then(u'result (?P\d+ )?has (?Pnot )?attributes (?P.*)') def validate_attributes(context, lid, neg, attrs): - if lid is None: - idx = range(len(context.response.result)) - context.execute_steps("then at least 1 result is returned") - else: - idx = [int(lid.strip())] - context.execute_steps("then more than %sresults are returned" % lid) - - for i in idx: - for attr in attrs.split(','): - if neg: - assert attr not in context.response.result[i],\ - "Unexpected attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.result[i], sort_keys=True, indent=2)) - else: - assert attr in context.response.result[i], \ - "No attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.result[i], sort_keys=True, indent=2)) + for i in make_todo_list(context, lid): + check_for_attributes(context.response.result[i], attrs, + 'absent' if neg else 'present') + @then(u'result addresses contain') def step_impl(context): @@ -323,36 +316,28 @@ def check_address(context, lid, complete): if complete == 'is': assert len(addr_parts) == 0, "Additional address parts found: %s" % str(addr_parts) -@then(u'result (?P\d+ )?has bounding box in (?P[\d,.-]+)') -def step_impl(context, lid, coords): - if lid is None: - context.execute_steps("then at least 1 result is returned") - bboxes = context.response.property_list('boundingbox') - else: - context.execute_steps("then more than {}results are returned".format(lid)) - bboxes = [context.response.result[int(lid)]['boundingbox']] +@then(u'result (?P\d+ )?has bounding box in (?P[\d,.-]+)') +def check_bounding_box_in_area(context, lid, coords): expected = Bbox(coords) - for bbox in bboxes: - assert bbox in expected, "Bbox {} is not contained in {}.".format(bbox, expected) + for idx in make_todo_list(context, lid): + res = context.response.result[idx] + check_for_attributes(res, 'boundingbox') + context.response.check_row(idx, res['boundingbox'] in expected, + f"Bbox is not contained in {expected}") -@then(u'result (?P\d+ )?has centroid in (?P[\d,.-]+)') -def step_impl(context, lid, coords): - if lid is None: - context.execute_steps("then at least 1 result is returned") - centroids = zip(context.response.property_list('lon'), - context.response.property_list('lat')) - else: - context.execute_steps("then more than %sresults are returned".format(lid)) - res = context.response.result[int(lid)] - centroids = [(res['lon'], res['lat'])] +@then(u'result (?P\d+ )?has centroid in (?P[\d,.-]+)') +def check_centroid_in_area(context, lid, coords): expected = Bbox(coords) - for centroid in centroids: - assert centroid in expected,\ - "Centroid {} is not inside {}.".format(centroid, expected) + for idx in make_todo_list(context, lid): + res = context.response.result[idx] + check_for_attributes(res, 'lat,lon') + context.response.check_row(idx, (res['lon'], res['lat']) in expected, + f"Centroid is not inside {expected}") + @then(u'there are(?P no)? duplicates') def check_for_duplicates(context, neg): @@ -372,3 +357,4 @@ def check_for_duplicates(context, neg): assert not has_dupe, "Found duplicate for %s" % (dup, ) else: assert has_dupe, "No duplicates found" +