]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/bdd/steps/steps_api_queries.py
use context management when processing Tiger data
[nominatim.git] / test / bdd / steps / steps_api_queries.py
index ebda5ec6e4ae8e57571265f4040cc3733eb5699a..22517338bab04f664198222680f273ae358882a1 100644 (file)
@@ -1,3 +1,9 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """ Steps that run queries against the API.
 
     Queries may either be run directly via PHP using the query script
 """ Steps that run queries against the API.
 
     Queries may either be run directly via PHP using the query script
@@ -11,6 +17,8 @@ from urllib.parse import urlencode
 
 from utils import run_script
 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
 
 from utils import run_script
 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
+from check_functions import Bbox
+from table_compare import NominatimID
 
 LOG = logging.getLogger(__name__)
 
 
 LOG = logging.getLogger(__name__)
 
@@ -53,30 +61,8 @@ def compare(operator, op1, op2):
         raise Exception("unknown operator '%s'" % operator)
 
 
         raise Exception("unknown operator '%s'" % operator)
 
 
-@when(u'searching for "(?P<query>.*)"(?P<dups> with dups)?')
-def query_cmd(context, query, dups):
-    """ Query directly via PHP script.
-    """
-    cmd = ['/usr/bin/env', 'php']
-    cmd.append(os.path.join(context.nominatim.build_dir, 'utils', 'query.php'))
-    if query:
-        cmd.extend(['--search', query])
-    # add more parameters in table form
-    if context.table:
-        for h in context.table.headings:
-            value = context.table[0][h].strip()
-            if value:
-                cmd.extend(('--' + h, value))
-
-    if dups:
-        cmd.extend(('--dedupe', '0'))
-
-    outp, err = run_script(cmd, cwd=context.nominatim.build_dir)
-
-    context.response = SearchResponse(outp, 'json')
-
 def send_api_query(endpoint, params, fmt, context):
 def send_api_query(endpoint, params, fmt, context):
-    if fmt is not None:
+    if fmt is not None and fmt.strip() != 'debug':
         params['format'] = fmt.strip()
     if context.table:
         if context.table.headings[0] == 'param':
         params['format'] = fmt.strip()
     if context.table:
         if context.table.headings[0] == 'param':
@@ -102,8 +88,9 @@ def send_api_query(endpoint, params, fmt, context):
 
     cmd = ['/usr/bin/env', 'php-cgi', '-f']
     if context.nominatim.code_coverage_path:
 
     cmd = ['/usr/bin/env', 'php-cgi', '-f']
     if context.nominatim.code_coverage_path:
+        env['XDEBUG_MODE'] = 'coverage'
         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
-        env['COV_PHP_DIR'] = os.path.join(context.nominatim.src_dir, "lib")
+        env['COV_PHP_DIR'] = context.nominatim.src_dir
         env['COV_TEST_NAME'] = '%s:%s' % (context.scenario.filename, context.scenario.line)
         env['SCRIPT_FILENAME'] = \
                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
         env['COV_TEST_NAME'] = '%s:%s' % (context.scenario.filename, context.scenario.line)
         env['SCRIPT_FILENAME'] = \
                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
@@ -145,15 +132,12 @@ def website_search_request(context, fmt, query, addr):
         params['q'] = query
     if addr is not None:
         params['addressdetails'] = '1'
         params['q'] = query
     if addr is not None:
         params['addressdetails'] = '1'
+    if fmt and fmt.strip() == 'debug':
+        params['debug'] = '1'
 
     outp, status = send_api_query('search', params, fmt, context)
 
 
     outp, status = send_api_query('search', params, fmt, context)
 
-    if fmt is None or fmt == 'jsonv2 ':
-        outfmt = 'json'
-    else:
-        outfmt = fmt.strip()
-
-    context.response = SearchResponse(outp, outfmt, status)
+    context.response = SearchResponse(outp, fmt or 'json', status)
 
 @when(u'sending (?P<fmt>\S+ )?reverse coordinates (?P<lat>.+)?,(?P<lon>.+)?')
 def website_reverse_request(context, fmt, lat, lon):
 
 @when(u'sending (?P<fmt>\S+ )?reverse coordinates (?P<lat>.+)?,(?P<lon>.+)?')
 def website_reverse_request(context, fmt, lat, lon):
@@ -162,75 +146,67 @@ def website_reverse_request(context, fmt, lat, lon):
         params['lat'] = lat
     if lon is not None:
         params['lon'] = lon
         params['lat'] = lat
     if lon is not None:
         params['lon'] = lon
+    if fmt and fmt.strip() == 'debug':
+        params['debug'] = '1'
 
     outp, status = send_api_query('reverse', params, fmt, context)
 
 
     outp, status = send_api_query('reverse', params, fmt, context)
 
-    if fmt is None:
-        outfmt = 'xml'
-    elif fmt == 'jsonv2 ':
-        outfmt = 'json'
-    else:
-        outfmt = fmt.strip()
+    context.response = ReverseResponse(outp, fmt or 'xml', status)
+
+@when(u'sending (?P<fmt>\S+ )?reverse point (?P<nodeid>.+)')
+def website_reverse_request(context, fmt, nodeid):
+    params = {}
+    if fmt and fmt.strip() == 'debug':
+        params['debug'] = '1'
+    params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
+
+
+    outp, status = send_api_query('reverse', params, fmt, context)
+
+    context.response = ReverseResponse(outp, fmt or 'xml', status)
+
 
 
-    context.response = ReverseResponse(outp, outfmt, status)
 
 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
 def website_details_request(context, fmt, query):
     params = {}
     if query[0] in 'NWR':
 
 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
 def website_details_request(context, fmt, query):
     params = {}
     if query[0] in 'NWR':
-        params['osmtype'] = query[0]
-        params['osmid'] = query[1:]
+        nid = NominatimID(query)
+        params['osmtype'] = nid.typ
+        params['osmid'] = nid.oid
+        if nid.cls:
+            params['class'] = nid.cls
     else:
         params['place_id'] = query
     outp, status = send_api_query('details', params, fmt, context)
 
     else:
         params['place_id'] = query
     outp, status = send_api_query('details', params, fmt, context)
 
-    if fmt is None:
-        outfmt = 'json'
-    else:
-        outfmt = fmt.strip()
-
-    context.response = GenericResponse(outp, outfmt, status)
+    context.response = GenericResponse(outp, fmt or 'json', status)
 
 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
 def website_lookup_request(context, fmt, query):
     params = { 'osm_ids' : query }
     outp, status = send_api_query('lookup', params, fmt, context)
 
 
 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
 def website_lookup_request(context, fmt, query):
     params = { 'osm_ids' : query }
     outp, status = send_api_query('lookup', params, fmt, context)
 
-    if fmt == 'json ':
-        outfmt = 'json'
-    elif fmt == 'jsonv2 ':
-        outfmt = 'json'
-    elif fmt == 'geojson ':
-        outfmt = 'geojson'
-    elif fmt == 'geocodejson ':
-        outfmt = 'geocodejson'
-    else:
-        outfmt = 'xml'
-
-    context.response = SearchResponse(outp, outfmt, status)
+    context.response = SearchResponse(outp, fmt or 'xml', status)
 
 @when(u'sending (?P<fmt>\S+ )?status query')
 def website_status_request(context, fmt):
     params = {}
     outp, status = send_api_query('status', params, fmt, context)
 
 
 @when(u'sending (?P<fmt>\S+ )?status query')
 def website_status_request(context, fmt):
     params = {}
     outp, status = send_api_query('status', params, fmt, context)
 
-    if fmt is None:
-        outfmt = 'text'
-    else:
-        outfmt = fmt.strip()
-
-    context.response = StatusResponse(outp, outfmt, status)
+    context.response = StatusResponse(outp, fmt or 'text', status)
 
 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
 def validate_result_number(context, operator, number):
     assert context.response.errorcode == 200
     numres = len(context.response.result)
     assert compare(operator, numres, int(number)), \
 
 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
 def validate_result_number(context, operator, number):
     assert context.response.errorcode == 200
     numres = len(context.response.result)
     assert compare(operator, numres, int(number)), \
-        "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
+        "Bad number of results: expected {} {}, got {}.".format(operator, number, numres)
 
 @then(u'a HTTP (?P<status>\d+) is returned')
 def check_http_return_status(context, status):
 
 @then(u'a HTTP (?P<status>\d+) is returned')
 def check_http_return_status(context, status):
-    assert context.response.errorcode == int(status)
+    assert context.response.errorcode == int(status), \
+           "Return HTTP status is {}.".format(context.response.errorcode)
 
 @then(u'the page contents equals "(?P<text>.+)"')
 def check_page_content_equals(context, text):
 
 @then(u'the page contents equals "(?P<text>.+)"')
 def check_page_content_equals(context, text):
@@ -263,16 +239,20 @@ def check_header_attr(context):
 def check_header_no_attr(context, neg, attrs):
     for attr in attrs.split(','):
         if neg:
 def check_header_no_attr(context, neg, attrs):
     for attr in attrs.split(','):
         if neg:
-            assert attr not in context.response.header
+            assert attr not in context.response.header, \
+                   "Unexpected attribute {}. Full response:\n{}".format(
+                       attr, json.dumps(context.response.header, sort_keys=True, indent=2))
         else:
         else:
-            assert attr in context.response.header
+            assert attr in context.response.header, \
+                   "No attribute {}. Full response:\n{}".format(
+                       attr, json.dumps(context.response.header, sort_keys=True, indent=2))
 
 @then(u'results contain')
 def step_impl(context):
     context.execute_steps("then at least 1 result is returned")
 
     for line in context.table:
 
 @then(u'results contain')
 def step_impl(context):
     context.execute_steps("then at least 1 result is returned")
 
     for line in context.table:
-        context.response.match_row(line)
+        context.response.match_row(line, context=context)
 
 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
 def validate_attributes(context, lid, neg, attrs):
 
 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
 def validate_attributes(context, lid, neg, attrs):
@@ -286,26 +266,24 @@ def validate_attributes(context, lid, neg, attrs):
     for i in idx:
         for attr in attrs.split(','):
             if neg:
     for i in idx:
         for attr in attrs.split(','):
             if neg:
-                assert attr not in context.response.result[i]
+                assert attr not in context.response.result[i],\
+                       "Unexpected attribute {}. Full response:\n{}".format(
+                           attr, json.dumps(context.response.result[i], sort_keys=True, indent=2))
             else:
             else:
-                assert attr in context.response.result[i]
+                assert attr in context.response.result[i], \
+                       "No attribute {}. Full response:\n{}".format(
+                           attr, json.dumps(context.response.result[i], sort_keys=True, indent=2))
 
 @then(u'result addresses contain')
 def step_impl(context):
     context.execute_steps("then at least 1 result is returned")
 
 
 @then(u'result addresses contain')
 def step_impl(context):
     context.execute_steps("then at least 1 result is returned")
 
-    if 'ID' not in context.table.headings:
-        addr_parts = context.response.property_list('address')
-
     for line in context.table:
     for line in context.table:
-        if 'ID' in context.table.headings:
-            addr_parts = [dict(context.response.result[int(line['ID'])]['address'])]
+        idx = int(line['ID']) if 'ID' in line.headings else None
 
 
-        for h in context.table.headings:
-            if h != 'ID':
-                for p in addr_parts:
-                    assert h in p
-                    assert p[h] == line[h], "Bad address value for %s" % h
+        for name, value in zip(line.headings, line.cells):
+            if name != 'ID':
+                context.response.assert_address_field(idx, name, value)
 
 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
 def check_address(context, lid, neg, attrs):
 
 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
 def check_address(context, lid, neg, attrs):
@@ -323,12 +301,11 @@ def check_address(context, lid, neg, attrs):
 def check_address(context, lid, complete):
     context.execute_steps("then more than %s results are returned" % lid)
 
 def check_address(context, lid, complete):
     context.execute_steps("then more than %s results are returned" % lid)
 
-    addr_parts = dict(context.response.result[int(lid)]['address'])
+    lid = int(lid)
+    addr_parts = dict(context.response.result[lid]['address'])
 
     for line in context.table:
 
     for line in context.table:
-        assert line['type'] in addr_parts
-        assert addr_parts[line['type']] == line['value'], \
-                     "Bad address value for %s" % line['type']
+        context.response.assert_address_field(lid, line['type'], line['value'])
         del addr_parts[line['type']]
 
     if complete == 'is':
         del addr_parts[line['type']]
 
     if complete == 'is':
@@ -340,41 +317,30 @@ def step_impl(context, lid, coords):
         context.execute_steps("then at least 1 result is returned")
         bboxes = context.response.property_list('boundingbox')
     else:
         context.execute_steps("then at least 1 result is returned")
         bboxes = context.response.property_list('boundingbox')
     else:
-        context.execute_steps("then more than %sresults are returned" % lid)
-        bboxes = [ context.response.result[int(lid)]['boundingbox']]
+        context.execute_steps("then more than {}results are returned".format(lid))
+        bboxes = [context.response.result[int(lid)]['boundingbox']]
 
 
-    coord = [ float(x) for x in coords.split(',') ]
+    expected = Bbox(coords)
 
     for bbox in bboxes:
 
     for bbox in bboxes:
-        if isinstance(bbox, str):
-            bbox = bbox.split(',')
-        bbox = [ float(x) for x in bbox ]
-
-        assert bbox[0] >= coord[0]
-        assert bbox[1] <= coord[1]
-        assert bbox[2] >= coord[2]
-        assert bbox[3] <= coord[3]
+        assert bbox in expected, "Bbox {} is not contained in {}.".format(bbox, expected)
 
 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
 def step_impl(context, lid, coords):
     if lid is None:
         context.execute_steps("then at least 1 result is returned")
 
 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
 def step_impl(context, lid, coords):
     if lid is None:
         context.execute_steps("then at least 1 result is returned")
-        bboxes = zip(context.response.property_list('lat'),
-                     context.response.property_list('lon'))
+        centroids = zip(context.response.property_list('lon'),
+                        context.response.property_list('lat'))
     else:
     else:
-        context.execute_steps("then more than %sresults are returned" % lid)
+        context.execute_steps("then more than %sresults are returned".format(lid))
         res = context.response.result[int(lid)]
         res = context.response.result[int(lid)]
-        bboxes = [ (res['lat'], res['lon']) ]
+        centroids = [(res['lon'], res['lat'])]
 
 
-    coord = [ float(x) for x in coords.split(',') ]
+    expected = Bbox(coords)
 
 
-    for lat, lon in bboxes:
-        lat = float(lat)
-        lon = float(lon)
-        assert lat >= coord[0]
-        assert lat <= coord[1]
-        assert lon >= coord[2]
-        assert lon <= coord[3]
+    for centroid in centroids:
+        assert centroid in expected,\
+               "Centroid {} is not inside {}.".format(centroid, expected)
 
 @then(u'there are(?P<neg> no)? duplicates')
 def check_for_duplicates(context, neg):
 
 @then(u'there are(?P<neg> no)? duplicates')
 def check_for_duplicates(context, neg):