]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/bdd/steps/steps_api_queries.py
Merge pull request #3125 from lonvia/warm-to-python
[nominatim.git] / test / bdd / steps / steps_api_queries.py
index ebda5ec6e4ae8e57571265f4040cc3733eb5699a..3d3b16c76d4a365787413753bb719d96cbf44bf5 100644 (file)
@@ -1,16 +1,27 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """ Steps that run queries against the API.
 
     Queries may either be run directly via PHP using the query script
     or via the HTTP interface using php-cgi.
 """
 """ Steps that run queries against the API.
 
     Queries may either be run directly via PHP using the query script
     or via the HTTP interface using php-cgi.
 """
+from pathlib import Path
 import json
 import os
 import re
 import logging
 import json
 import os
 import re
 import logging
+import asyncio
+import xml.etree.ElementTree as ET
 from urllib.parse import urlencode
 
 from utils import run_script
 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
 from urllib.parse import urlencode
 
 from utils import run_script
 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
+from check_functions import Bbox, check_for_attributes
+from table_compare import NominatimID
 
 LOG = logging.getLogger(__name__)
 
 
 LOG = logging.getLogger(__name__)
 
@@ -38,6 +49,15 @@ BASE_SERVER_ENV = {
 }
 
 
 }
 
 
+def make_todo_list(context, result_id):
+    if result_id is None:
+        context.execute_steps("then at least 1 result is returned")
+        return range(len(context.response.result))
+
+    context.execute_steps(f"then more than {result_id}results are returned")
+    return (int(result_id.strip()), )
+
+
 def compare(operator, op1, op2):
     if operator == 'less than':
         return op1 < op2
 def compare(operator, op1, op2):
     if operator == 'less than':
         return op1 < op2
@@ -50,34 +70,16 @@ def compare(operator, op1, op2):
     elif operator == 'at most':
         return op1 <= op2
     else:
     elif operator == 'at most':
         return op1 <= op2
     else:
-        raise Exception("unknown operator '%s'" % operator)
-
-
-@when(u'searching for "(?P<query>.*)"(?P<dups> with dups)?')
-def query_cmd(context, query, dups):
-    """ Query directly via PHP script.
-    """
-    cmd = ['/usr/bin/env', 'php']
-    cmd.append(os.path.join(context.nominatim.build_dir, 'utils', 'query.php'))
-    if query:
-        cmd.extend(['--search', query])
-    # add more parameters in table form
-    if context.table:
-        for h in context.table.headings:
-            value = context.table[0][h].strip()
-            if value:
-                cmd.extend(('--' + h, value))
-
-    if dups:
-        cmd.extend(('--dedupe', '0'))
+        raise ValueError(f"Unknown operator '{operator}'")
 
 
-    outp, err = run_script(cmd, cwd=context.nominatim.build_dir)
-
-    context.response = SearchResponse(outp, 'json')
 
 def send_api_query(endpoint, params, fmt, context):
     if fmt is not None:
 
 def send_api_query(endpoint, params, fmt, context):
     if fmt is not None:
-        params['format'] = fmt.strip()
+        if fmt.strip() == 'debug':
+            params['debug'] = '1'
+        else:
+            params['format'] = fmt.strip()
+
     if context.table:
         if context.table.headings[0] == 'param':
             for line in context.table:
     if context.table:
         if context.table.headings[0] == 'param':
             for line in context.table:
@@ -86,25 +88,38 @@ def send_api_query(endpoint, params, fmt, context):
             for h in context.table.headings:
                 params[h] = context.table[0][h]
 
             for h in context.table.headings:
                 params[h] = context.table[0][h]
 
+    if context.nominatim.api_engine is None:
+        return send_api_query_php(endpoint, params, context)
+
+    return asyncio.run(context.nominatim.api_engine(endpoint, params,
+                                                    Path(context.nominatim.website_dir.name),
+                                                    context.nominatim.test_env,
+                                                    getattr(context, 'http_headers', {})))
+
+
+
+def send_api_query_php(endpoint, params, context):
     env = dict(BASE_SERVER_ENV)
     env['QUERY_STRING'] = urlencode(params)
 
     env = dict(BASE_SERVER_ENV)
     env['QUERY_STRING'] = urlencode(params)
 
-    env['SCRIPT_NAME'] = '/%s.php' % endpoint
-    env['REQUEST_URI'] = '%s?%s' % (env['SCRIPT_NAME'], env['QUERY_STRING'])
+    env['SCRIPT_NAME'] = f'/{endpoint}.php'
+    env['REQUEST_URI'] = f"{env['SCRIPT_NAME']}?{env['QUERY_STRING']}"
     env['CONTEXT_DOCUMENT_ROOT'] = os.path.join(context.nominatim.website_dir.name, 'website')
     env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
     env['CONTEXT_DOCUMENT_ROOT'] = os.path.join(context.nominatim.website_dir.name, 'website')
     env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
-                                          '%s.php' % endpoint)
+                                          f'{endpoint}.php')
 
     LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
 
     if hasattr(context, 'http_headers'):
 
     LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
 
     if hasattr(context, 'http_headers'):
-        env.update(context.http_headers)
+        for k, v in context.http_headers.items():
+            env['HTTP_' + k.upper().replace('-', '_')] = v
 
     cmd = ['/usr/bin/env', 'php-cgi', '-f']
     if context.nominatim.code_coverage_path:
 
     cmd = ['/usr/bin/env', 'php-cgi', '-f']
     if context.nominatim.code_coverage_path:
+        env['XDEBUG_MODE'] = 'coverage'
         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
-        env['COV_PHP_DIR'] = os.path.join(context.nominatim.src_dir, "lib")
-        env['COV_TEST_NAME'] = '%s:%s' % (context.scenario.filename, context.scenario.line)
+        env['COV_PHP_DIR'] = context.nominatim.src_dir
+        env['COV_TEST_NAME'] = f"{context.scenario.filename}:{context.scenario.line}"
         env['SCRIPT_FILENAME'] = \
                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
         cmd.append(env['SCRIPT_FILENAME'])
         env['SCRIPT_FILENAME'] = \
                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
         cmd.append(env['SCRIPT_FILENAME'])
@@ -113,11 +128,11 @@ def send_api_query(endpoint, params, fmt, context):
         cmd.append(env['SCRIPT_FILENAME'])
 
     for k,v in params.items():
         cmd.append(env['SCRIPT_FILENAME'])
 
     for k,v in params.items():
-        cmd.append("%s=%s" % (k, v))
+        cmd.append(f"{k}={v}")
 
     outp, err = run_script(cmd, cwd=context.nominatim.website_dir.name, env=env)
 
 
     outp, err = run_script(cmd, cwd=context.nominatim.website_dir.name, env=env)
 
-    assert len(err) == 0, "Unexpected PHP error: %s" % (err)
+    assert len(err) == 0, f"Unexpected PHP error: {err}"
 
     if outp.startswith('Status: '):
         status = int(outp[8:11])
 
     if outp.startswith('Status: '):
         status = int(outp[8:11])
@@ -134,8 +149,7 @@ def add_http_header(context):
         context.http_headers = {}
 
     for h in context.table.headings:
         context.http_headers = {}
 
     for h in context.table.headings:
-        envvar = 'HTTP_' + h.upper().replace('-', '_')
-        context.http_headers[envvar] = context.table[0][h]
+        context.http_headers[h] = context.table[0][h]
 
 
 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
 
 
 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
@@ -148,89 +162,75 @@ def website_search_request(context, fmt, query, addr):
 
     outp, status = send_api_query('search', params, fmt, context)
 
 
     outp, status = send_api_query('search', params, fmt, context)
 
-    if fmt is None or fmt == 'jsonv2 ':
-        outfmt = 'json'
-    else:
-        outfmt = fmt.strip()
+    context.response = SearchResponse(outp, fmt or 'json', status)
 
 
-    context.response = SearchResponse(outp, outfmt, status)
 
 
-@when(u'sending (?P<fmt>\S+ )?reverse coordinates (?P<lat>.+)?,(?P<lon>.+)?')
-def website_reverse_request(context, fmt, lat, lon):
+@when('sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
+def api_endpoint_v1_reverse(context, lat, lon, fmt):
     params = {}
     if lat is not None:
         params['lat'] = lat
     if lon is not None:
         params['lon'] = lon
     params = {}
     if lat is not None:
         params['lat'] = lat
     if lon is not None:
         params['lon'] = lon
+    if fmt is None:
+        fmt = 'jsonv2'
+    elif fmt == "''":
+        fmt = None
 
     outp, status = send_api_query('reverse', params, fmt, context)
 
     outp, status = send_api_query('reverse', params, fmt, context)
+    context.response = ReverseResponse(outp, fmt or 'xml', status)
 
 
-    if fmt is None:
-        outfmt = 'xml'
-    elif fmt == 'jsonv2 ':
-        outfmt = 'json'
-    else:
-        outfmt = fmt.strip()
 
 
-    context.response = ReverseResponse(outp, outfmt, status)
+@when('sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
+def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
+    params = {}
+    params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
+
+    outp, status = send_api_query('reverse', params, fmt, context)
+    context.response = ReverseResponse(outp, fmt or 'xml', status)
+
 
 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
 def website_details_request(context, fmt, query):
     params = {}
     if query[0] in 'NWR':
 
 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
 def website_details_request(context, fmt, query):
     params = {}
     if query[0] in 'NWR':
-        params['osmtype'] = query[0]
-        params['osmid'] = query[1:]
+        nid = NominatimID(query)
+        params['osmtype'] = nid.typ
+        params['osmid'] = nid.oid
+        if nid.cls:
+            params['class'] = nid.cls
     else:
         params['place_id'] = query
     outp, status = send_api_query('details', params, fmt, context)
 
     else:
         params['place_id'] = query
     outp, status = send_api_query('details', params, fmt, context)
 
-    if fmt is None:
-        outfmt = 'json'
-    else:
-        outfmt = fmt.strip()
-
-    context.response = GenericResponse(outp, outfmt, status)
+    context.response = GenericResponse(outp, fmt or 'json', status)
 
 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
 def website_lookup_request(context, fmt, query):
     params = { 'osm_ids' : query }
     outp, status = send_api_query('lookup', params, fmt, context)
 
 
 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
 def website_lookup_request(context, fmt, query):
     params = { 'osm_ids' : query }
     outp, status = send_api_query('lookup', params, fmt, context)
 
-    if fmt == 'json ':
-        outfmt = 'json'
-    elif fmt == 'jsonv2 ':
-        outfmt = 'json'
-    elif fmt == 'geojson ':
-        outfmt = 'geojson'
-    elif fmt == 'geocodejson ':
-        outfmt = 'geocodejson'
-    else:
-        outfmt = 'xml'
-
-    context.response = SearchResponse(outp, outfmt, status)
+    context.response = SearchResponse(outp, fmt or 'xml', status)
 
 @when(u'sending (?P<fmt>\S+ )?status query')
 def website_status_request(context, fmt):
     params = {}
     outp, status = send_api_query('status', params, fmt, context)
 
 
 @when(u'sending (?P<fmt>\S+ )?status query')
 def website_status_request(context, fmt):
     params = {}
     outp, status = send_api_query('status', params, fmt, context)
 
-    if fmt is None:
-        outfmt = 'text'
-    else:
-        outfmt = fmt.strip()
-
-    context.response = StatusResponse(outp, outfmt, status)
+    context.response = StatusResponse(outp, fmt or 'text', status)
 
 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
 def validate_result_number(context, operator, number):
 
 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
 def validate_result_number(context, operator, number):
-    assert context.response.errorcode == 200
+    context.execute_steps("Then a HTTP 200 is returned")
     numres = len(context.response.result)
     assert compare(operator, numres, int(number)), \
     numres = len(context.response.result)
     assert compare(operator, numres, int(number)), \
-        "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
+           f"Bad number of results: expected {operator} {number}, got {numres}."
 
 @then(u'a HTTP (?P<status>\d+) is returned')
 def check_http_return_status(context, status):
 
 @then(u'a HTTP (?P<status>\d+) is returned')
 def check_http_return_status(context, status):
-    assert context.response.errorcode == int(status)
+    assert context.response.errorcode == int(status), \
+           f"Return HTTP status is {context.response.errorcode}."\
+           f" Full response:\n{context.response.page}"
 
 @then(u'the page contents equals "(?P<text>.+)"')
 def check_page_content_equals(context, text):
 
 @then(u'the page contents equals "(?P<text>.+)"')
 def check_page_content_equals(context, text):
@@ -239,7 +239,19 @@ def check_page_content_equals(context, text):
 @then(u'the result is valid (?P<fmt>\w+)')
 def step_impl(context, fmt):
     context.execute_steps("Then a HTTP 200 is returned")
 @then(u'the result is valid (?P<fmt>\w+)')
 def step_impl(context, fmt):
     context.execute_steps("Then a HTTP 200 is returned")
-    assert context.response.format == fmt
+    if fmt.strip() == 'html':
+        try:
+            tree = ET.fromstring(context.response.page)
+        except Exception as ex:
+            assert False, f"Could not parse page:\n{context.response.page}"
+
+        assert tree.tag == 'html'
+        body = tree.find('./body')
+        assert body is not None
+        assert body.find('.//script') is None
+    else:
+        assert context.response.format == fmt
+
 
 @then(u'a (?P<fmt>\w+) user error is returned')
 def check_page_error(context, fmt):
 
 @then(u'a (?P<fmt>\w+) user error is returned')
 def check_page_error(context, fmt):
@@ -253,63 +265,50 @@ def check_page_error(context, fmt):
 
 @then(u'result header contains')
 def check_header_attr(context):
 
 @then(u'result header contains')
 def check_header_attr(context):
+    context.execute_steps("Then a HTTP 200 is returned")
     for line in context.table:
     for line in context.table:
-        assert re.fullmatch(line['value'], context.response.header[line['attr']]) is not None, \
-               "attribute '%s': expected: '%s', got '%s'" % (
-                    line['attr'], line['value'],
-                    context.response.header[line['attr']])
+        assert line['attr'] in context.response.header, \
+               f"Field '{line['attr']}' missing in header. Full header:\n{context.response.header}"
+        value = context.response.header[line['attr']]
+        assert re.fullmatch(line['value'], value) is not None, \
+               f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
+
 
 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
 def check_header_no_attr(context, neg, attrs):
 
 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
 def check_header_no_attr(context, neg, attrs):
-    for attr in attrs.split(','):
-        if neg:
-            assert attr not in context.response.header
-        else:
-            assert attr in context.response.header
+    check_for_attributes(context.response.header, attrs,
+                         'absent' if neg else 'present')
 
 
-@then(u'results contain')
-def step_impl(context):
+
+@then(u'results contain(?: in field (?P<field>.*))?')
+def step_impl(context, field):
     context.execute_steps("then at least 1 result is returned")
 
     for line in context.table:
     context.execute_steps("then at least 1 result is returned")
 
     for line in context.table:
-        context.response.match_row(line)
+        context.response.match_row(line, context=context, field=field)
+
 
 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
 def validate_attributes(context, lid, neg, attrs):
 
 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
 def validate_attributes(context, lid, neg, attrs):
-    if lid is None:
-        idx = range(len(context.response.result))
-        context.execute_steps("then at least 1 result is returned")
-    else:
-        idx = [int(lid.strip())]
-        context.execute_steps("then more than %sresults are returned" % lid)
+    for i in make_todo_list(context, lid):
+        check_for_attributes(context.response.result[i], attrs,
+                             'absent' if neg else 'present')
 
 
-    for i in idx:
-        for attr in attrs.split(','):
-            if neg:
-                assert attr not in context.response.result[i]
-            else:
-                assert attr in context.response.result[i]
 
 @then(u'result addresses contain')
 def step_impl(context):
     context.execute_steps("then at least 1 result is returned")
 
 
 @then(u'result addresses contain')
 def step_impl(context):
     context.execute_steps("then at least 1 result is returned")
 
-    if 'ID' not in context.table.headings:
-        addr_parts = context.response.property_list('address')
-
     for line in context.table:
     for line in context.table:
-        if 'ID' in context.table.headings:
-            addr_parts = [dict(context.response.result[int(line['ID'])]['address'])]
+        idx = int(line['ID']) if 'ID' in line.headings else None
 
 
-        for h in context.table.headings:
-            if h != 'ID':
-                for p in addr_parts:
-                    assert h in p
-                    assert p[h] == line[h], "Bad address value for %s" % h
+        for name, value in zip(line.headings, line.cells):
+            if name != 'ID':
+                context.response.assert_address_field(idx, name, value)
 
 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
 def check_address(context, lid, neg, attrs):
 
 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
 def check_address(context, lid, neg, attrs):
-    context.execute_steps("then more than %s results are returned" % lid)
+    context.execute_steps(f"then more than {lid} results are returned")
 
     addr_parts = context.response.result[int(lid)]['address']
 
 
     addr_parts = context.response.result[int(lid)]['address']
 
@@ -321,60 +320,40 @@ def check_address(context, lid, neg, attrs):
 
 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
 def check_address(context, lid, complete):
 
 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
 def check_address(context, lid, complete):
-    context.execute_steps("then more than %s results are returned" % lid)
+    context.execute_steps(f"then more than {lid} results are returned")
 
 
-    addr_parts = dict(context.response.result[int(lid)]['address'])
+    lid = int(lid)
+    addr_parts = dict(context.response.result[lid]['address'])
 
     for line in context.table:
 
     for line in context.table:
-        assert line['type'] in addr_parts
-        assert addr_parts[line['type']] == line['value'], \
-                     "Bad address value for %s" % line['type']
+        context.response.assert_address_field(lid, line['type'], line['value'])
         del addr_parts[line['type']]
 
     if complete == 'is':
         del addr_parts[line['type']]
 
     if complete == 'is':
-        assert len(addr_parts) == 0, "Additional address parts found: %s" % str(addr_parts)
+        assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
 
 
-@then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
-def step_impl(context, lid, coords):
-    if lid is None:
-        context.execute_steps("then at least 1 result is returned")
-        bboxes = context.response.property_list('boundingbox')
-    else:
-        context.execute_steps("then more than %sresults are returned" % lid)
-        bboxes = [ context.response.result[int(lid)]['boundingbox']]
 
 
-    coord = [ float(x) for x in coords.split(',') ]
+@then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
+def check_bounding_box_in_area(context, lid, coords):
+    expected = Bbox(coords)
 
 
-    for bbox in bboxes:
-        if isinstance(bbox, str):
-            bbox = bbox.split(',')
-        bbox = [ float(x) for x in bbox ]
+    for idx in make_todo_list(context, lid):
+        res = context.response.result[idx]
+        check_for_attributes(res, 'boundingbox')
+        context.response.check_row(idx, res['boundingbox'] in expected,
+                                   f"Bbox is not contained in {expected}")
 
 
-        assert bbox[0] >= coord[0]
-        assert bbox[1] <= coord[1]
-        assert bbox[2] >= coord[2]
-        assert bbox[3] <= coord[3]
 
 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
 
 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
-def step_impl(context, lid, coords):
-    if lid is None:
-        context.execute_steps("then at least 1 result is returned")
-        bboxes = zip(context.response.property_list('lat'),
-                     context.response.property_list('lon'))
-    else:
-        context.execute_steps("then more than %sresults are returned" % lid)
-        res = context.response.result[int(lid)]
-        bboxes = [ (res['lat'], res['lon']) ]
+def check_centroid_in_area(context, lid, coords):
+    expected = Bbox(coords)
 
 
-    coord = [ float(x) for x in coords.split(',') ]
+    for idx in make_todo_list(context, lid):
+        res = context.response.result[idx]
+        check_for_attributes(res, 'lat,lon')
+        context.response.check_row(idx, (res['lon'], res['lat']) in expected,
+                                   f"Centroid is not inside {expected}")
 
 
-    for lat, lon in bboxes:
-        lat = float(lat)
-        lon = float(lon)
-        assert lat >= coord[0]
-        assert lat <= coord[1]
-        assert lon >= coord[2]
-        assert lon <= coord[3]
 
 @then(u'there are(?P<neg> no)? duplicates')
 def check_for_duplicates(context, neg):
 
 @then(u'there are(?P<neg> no)? duplicates')
 def check_for_duplicates(context, neg):
@@ -391,6 +370,7 @@ def check_for_duplicates(context, neg):
         resarr.add(dup)
 
     if neg:
         resarr.add(dup)
 
     if neg:
-        assert not has_dupe, "Found duplicate for %s" % (dup, )
+        assert not has_dupe, f"Found duplicate for {dup}"
     else:
         assert has_dupe, "No duplicates found"
     else:
         assert has_dupe, "No duplicates found"
+