X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/04857d32cd0fb39178500c537831707477701029..556bb2386d5882e4e6aa8bdbcc2661f12d66b726:/test/bdd/steps/steps_api_queries.py diff --git a/test/bdd/steps/steps_api_queries.py b/test/bdd/steps/steps_api_queries.py index 99469587..b15e296f 100644 --- a/test/bdd/steps/steps_api_queries.py +++ b/test/bdd/steps/steps_api_queries.py @@ -1,17 +1,25 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2022 by the Nominatim developer community. +# For a full list of authors see the git log. """ Steps that run queries against the API. Queries may either be run directly via PHP using the query script or via the HTTP interface using php-cgi. """ +from pathlib import Path import json import os import re import logging +import asyncio from urllib.parse import urlencode from utils import run_script from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse -from check_functions import Bbox +from check_functions import Bbox, check_for_attributes from table_compare import NominatimID LOG = logging.getLogger(__name__) @@ -40,6 +48,15 @@ BASE_SERVER_ENV = { } +def make_todo_list(context, result_id): + if result_id is None: + context.execute_steps("then at least 1 result is returned") + return range(len(context.response.result)) + + context.execute_steps(f"then more than {result_id}results are returned") + return (int(result_id.strip()), ) + + def compare(operator, op1, op2): if operator == 'less than': return op1 < op2 @@ -66,6 +83,17 @@ def send_api_query(endpoint, params, fmt, context): for h in context.table.headings: params[h] = context.table[0][h] + if context.nominatim.api_engine is None: + return send_api_query_php(endpoint, params, context) + + return asyncio.run(context.nominatim.api_engine(endpoint, params, + Path(context.nominatim.website_dir.name), + context.nominatim.test_env, + getattr(context, 'http_headers', {}))) + + + +def send_api_query_php(endpoint, params, context): env = dict(BASE_SERVER_ENV) env['QUERY_STRING'] = urlencode(params) @@ -147,6 +175,20 @@ def website_reverse_request(context, fmt, lat, lon): context.response = ReverseResponse(outp, fmt or 'xml', status) +@when(u'sending (?P\S+ )?reverse point (?P.+)') +def website_reverse_request(context, fmt, nodeid): + params = {} + if fmt and fmt.strip() == 'debug': + params['debug'] = '1' + params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid))) + + + outp, status = send_api_query('reverse', params, fmt, context) + + context.response = ReverseResponse(outp, fmt or 'xml', status) + + + @when(u'sending (?P\S+ )?details query for (?P.*)') def website_details_request(context, fmt, query): params = {} @@ -215,44 +257,27 @@ def check_header_attr(context): line['attr'], line['value'], context.response.header[line['attr']]) + @then(u'result header has (?Pnot )?attributes (?P.*)') def check_header_no_attr(context, neg, attrs): - for attr in attrs.split(','): - if neg: - assert attr not in context.response.header, \ - "Unexpected attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.header, sort_keys=True, indent=2)) - else: - assert attr in context.response.header, \ - "No attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.header, sort_keys=True, indent=2)) + check_for_attributes(context.response.header, attrs, + 'absent' if neg else 'present') + @then(u'results contain') def step_impl(context): context.execute_steps("then at least 1 result is returned") for line in context.table: - context.response.match_row(line) + context.response.match_row(line, context=context) + @then(u'result (?P\d+ )?has (?Pnot )?attributes (?P.*)') def validate_attributes(context, lid, neg, attrs): - if lid is None: - idx = range(len(context.response.result)) - context.execute_steps("then at least 1 result is returned") - else: - idx = [int(lid.strip())] - context.execute_steps("then more than %sresults are returned" % lid) - - for i in idx: - for attr in attrs.split(','): - if neg: - assert attr not in context.response.result[i],\ - "Unexpected attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.result[i], sort_keys=True, indent=2)) - else: - assert attr in context.response.result[i], \ - "No attribute {}. Full response:\n{}".format( - attr, json.dumps(context.response.result[i], sort_keys=True, indent=2)) + for i in make_todo_list(context, lid): + check_for_attributes(context.response.result[i], attrs, + 'absent' if neg else 'present') + @then(u'result addresses contain') def step_impl(context): @@ -291,36 +316,28 @@ def check_address(context, lid, complete): if complete == 'is': assert len(addr_parts) == 0, "Additional address parts found: %s" % str(addr_parts) -@then(u'result (?P\d+ )?has bounding box in (?P[\d,.-]+)') -def step_impl(context, lid, coords): - if lid is None: - context.execute_steps("then at least 1 result is returned") - bboxes = context.response.property_list('boundingbox') - else: - context.execute_steps("then more than {}results are returned".format(lid)) - bboxes = [context.response.result[int(lid)]['boundingbox']] +@then(u'result (?P\d+ )?has bounding box in (?P[\d,.-]+)') +def check_bounding_box_in_area(context, lid, coords): expected = Bbox(coords) - for bbox in bboxes: - assert bbox in expected, "Bbox {} is not contained in {}.".format(bbox, expected) + for idx in make_todo_list(context, lid): + res = context.response.result[idx] + check_for_attributes(res, 'boundingbox') + context.response.check_row(idx, res['boundingbox'] in expected, + f"Bbox is not contained in {expected}") -@then(u'result (?P\d+ )?has centroid in (?P[\d,.-]+)') -def step_impl(context, lid, coords): - if lid is None: - context.execute_steps("then at least 1 result is returned") - centroids = zip(context.response.property_list('lon'), - context.response.property_list('lat')) - else: - context.execute_steps("then more than %sresults are returned".format(lid)) - res = context.response.result[int(lid)] - centroids = [(res['lon'], res['lat'])] +@then(u'result (?P\d+ )?has centroid in (?P[\d,.-]+)') +def check_centroid_in_area(context, lid, coords): expected = Bbox(coords) - for centroid in centroids: - assert centroid in expected,\ - "Centroid {} is not inside {}.".format(centroid, expected) + for idx in make_todo_list(context, lid): + res = context.response.result[idx] + check_for_attributes(res, 'lat,lon') + context.response.check_row(idx, (res['lon'], res['lat']) in expected, + f"Centroid is not inside {expected}") + @then(u'there are(?P no)? duplicates') def check_for_duplicates(context, neg): @@ -340,3 +357,4 @@ def check_for_duplicates(context, neg): assert not has_dupe, "Found duplicate for %s" % (dup, ) else: assert has_dupe, "No duplicates found" +