X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/a74e736283c7871a2c47494f18cc65da6b43bccb..8557105c4096f4e6e96de3f188009859122a6d91:/test/bdd/steps/steps_api_queries.py?ds=sidebyside diff --git a/test/bdd/steps/steps_api_queries.py b/test/bdd/steps/steps_api_queries.py index 7b3597da..1df1d523 100644 --- a/test/bdd/steps/steps_api_queries.py +++ b/test/bdd/steps/steps_api_queries.py @@ -1,17 +1,26 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# This file is part of Nominatim. (https://nominatim.org) +# +# Copyright (C) 2022 by the Nominatim developer community. +# For a full list of authors see the git log. """ Steps that run queries against the API. Queries may either be run directly via PHP using the query script or via the HTTP interface using php-cgi. """ +from pathlib import Path import json import os import re import logging +import asyncio from urllib.parse import urlencode from utils import run_script from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse from check_functions import Bbox +from table_compare import NominatimID LOG = logging.getLogger(__name__) @@ -54,30 +63,8 @@ def compare(operator, op1, op2): raise Exception("unknown operator '%s'" % operator) -@when(u'searching for "(?P.*)"(?P with dups)?') -def query_cmd(context, query, dups): - """ Query directly via PHP script. - """ - cmd = ['/usr/bin/env', 'php'] - cmd.append(os.path.join(context.nominatim.build_dir, 'utils', 'query.php')) - if query: - cmd.extend(['--search', query]) - # add more parameters in table form - if context.table: - for h in context.table.headings: - value = context.table[0][h].strip() - if value: - cmd.extend(('--' + h, value)) - - if dups: - cmd.extend(('--dedupe', '0')) - - outp, err = run_script(cmd, cwd=context.nominatim.build_dir) - - context.response = SearchResponse(outp, 'json') - def send_api_query(endpoint, params, fmt, context): - if fmt is not None: + if fmt is not None and fmt.strip() != 'debug': params['format'] = fmt.strip() if context.table: if context.table.headings[0] == 'param': @@ -87,6 +74,17 @@ def send_api_query(endpoint, params, fmt, context): for h in context.table.headings: params[h] = context.table[0][h] + if context.nominatim.api_engine is None: + return send_api_query_php(endpoint, params, context) + + return asyncio.run(context.nominatim.api_engine(endpoint, params, + Path(context.nominatim.website_dir.name), + context.nominatim.test_env, + getattr(context, 'http_headers', {}))) + + + +def send_api_query_php(endpoint, params, context): env = dict(BASE_SERVER_ENV) env['QUERY_STRING'] = urlencode(params) @@ -103,8 +101,9 @@ def send_api_query(endpoint, params, fmt, context): cmd = ['/usr/bin/env', 'php-cgi', '-f'] if context.nominatim.code_coverage_path: + env['XDEBUG_MODE'] = 'coverage' env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME'] - env['COV_PHP_DIR'] = os.path.join(context.nominatim.src_dir, "lib") + env['COV_PHP_DIR'] = context.nominatim.src_dir env['COV_TEST_NAME'] = '%s:%s' % (context.scenario.filename, context.scenario.line) env['SCRIPT_FILENAME'] = \ os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php') @@ -146,6 +145,8 @@ def website_search_request(context, fmt, query, addr): params['q'] = query if addr is not None: params['addressdetails'] = '1' + if fmt and fmt.strip() == 'debug': + params['debug'] = '1' outp, status = send_api_query('search', params, fmt, context) @@ -158,17 +159,36 @@ def website_reverse_request(context, fmt, lat, lon): params['lat'] = lat if lon is not None: params['lon'] = lon + if fmt and fmt.strip() == 'debug': + params['debug'] = '1' outp, status = send_api_query('reverse', params, fmt, context) context.response = ReverseResponse(outp, fmt or 'xml', status) +@when(u'sending (?P\S+ )?reverse point (?P.+)') +def website_reverse_request(context, fmt, nodeid): + params = {} + if fmt and fmt.strip() == 'debug': + params['debug'] = '1' + params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid))) + + + outp, status = send_api_query('reverse', params, fmt, context) + + context.response = ReverseResponse(outp, fmt or 'xml', status) + + + @when(u'sending (?P\S+ )?details query for (?P.*)') def website_details_request(context, fmt, query): params = {} if query[0] in 'NWR': - params['osmtype'] = query[0] - params['osmid'] = query[1:] + nid = NominatimID(query) + params['osmtype'] = nid.typ + params['osmid'] = nid.oid + if nid.cls: + params['class'] = nid.cls else: params['place_id'] = query outp, status = send_api_query('details', params, fmt, context) @@ -198,7 +218,8 @@ def validate_result_number(context, operator, number): @then(u'a HTTP (?P\d+) is returned') def check_http_return_status(context, status): - assert context.response.errorcode == int(status) + assert context.response.errorcode == int(status), \ + "Return HTTP status is {}.".format(context.response.errorcode) @then(u'the page contents equals "(?P.+)"') def check_page_content_equals(context, text): @@ -231,16 +252,20 @@ def check_header_attr(context): def check_header_no_attr(context, neg, attrs): for attr in attrs.split(','): if neg: - assert attr not in context.response.header + assert attr not in context.response.header, \ + "Unexpected attribute {}. Full response:\n{}".format( + attr, json.dumps(context.response.header, sort_keys=True, indent=2)) else: - assert attr in context.response.header + assert attr in context.response.header, \ + "No attribute {}. Full response:\n{}".format( + attr, json.dumps(context.response.header, sort_keys=True, indent=2)) @then(u'results contain') def step_impl(context): context.execute_steps("then at least 1 result is returned") for line in context.table: - context.response.match_row(line) + context.response.match_row(line, context=context) @then(u'result (?P\d+ )?has (?Pnot )?attributes (?P.*)') def validate_attributes(context, lid, neg, attrs): @@ -254,9 +279,13 @@ def validate_attributes(context, lid, neg, attrs): for i in idx: for attr in attrs.split(','): if neg: - assert attr not in context.response.result[i] + assert attr not in context.response.result[i],\ + "Unexpected attribute {}. Full response:\n{}".format( + attr, json.dumps(context.response.result[i], sort_keys=True, indent=2)) else: - assert attr in context.response.result[i] + assert attr in context.response.result[i], \ + "No attribute {}. Full response:\n{}".format( + attr, json.dumps(context.response.result[i], sort_keys=True, indent=2)) @then(u'result addresses contain') def step_impl(context):