]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_api_queries.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / bdd / steps / steps_api_queries.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Steps that run queries against the API.
8
9     Queries may either be run directly via PHP using the query script
10     or via the HTTP interface using php-cgi.
11 """
12 from pathlib import Path
13 import json
14 import os
15 import re
16 import logging
17 import asyncio
18 import xml.etree.ElementTree as ET
19 from urllib.parse import urlencode
20
21 from utils import run_script
22 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
23 from check_functions import Bbox, check_for_attributes
24 from table_compare import NominatimID
25
26 LOG = logging.getLogger(__name__)
27
28 BASE_SERVER_ENV = {
29     'HTTP_HOST' : 'localhost',
30     'HTTP_USER_AGENT' : 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
31     'HTTP_ACCEPT' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
32     'HTTP_ACCEPT_ENCODING' : 'gzip, deflate',
33     'HTTP_CONNECTION' : 'keep-alive',
34     'SERVER_SIGNATURE' : '<address>Nominatim BDD Tests</address>',
35     'SERVER_SOFTWARE' : 'Nominatim test',
36     'SERVER_NAME' : 'localhost',
37     'SERVER_ADDR' : '127.0.1.1',
38     'SERVER_PORT' : '80',
39     'REMOTE_ADDR' : '127.0.0.1',
40     'DOCUMENT_ROOT' : '/var/www',
41     'REQUEST_SCHEME' : 'http',
42     'CONTEXT_PREFIX' : '/',
43     'SERVER_ADMIN' : 'webmaster@localhost',
44     'REMOTE_PORT' : '49319',
45     'GATEWAY_INTERFACE' : 'CGI/1.1',
46     'SERVER_PROTOCOL' : 'HTTP/1.1',
47     'REQUEST_METHOD' : 'GET',
48     'REDIRECT_STATUS' : 'CGI'
49 }
50
51
52 def make_todo_list(context, result_id):
53     if result_id is None:
54         context.execute_steps("then at least 1 result is returned")
55         return range(len(context.response.result))
56
57     context.execute_steps(f"then more than {result_id}results are returned")
58     return (int(result_id.strip()), )
59
60
61 def compare(operator, op1, op2):
62     if operator == 'less than':
63         return op1 < op2
64     elif operator == 'more than':
65         return op1 > op2
66     elif operator == 'exactly':
67         return op1 == op2
68     elif operator == 'at least':
69         return op1 >= op2
70     elif operator == 'at most':
71         return op1 <= op2
72     else:
73         raise ValueError(f"Unknown operator '{operator}'")
74
75
76 def send_api_query(endpoint, params, fmt, context):
77     if fmt is not None:
78         if fmt.strip() == 'debug':
79             params['debug'] = '1'
80         else:
81             params['format'] = fmt.strip()
82
83     if context.table:
84         if context.table.headings[0] == 'param':
85             for line in context.table:
86                 params[line['param']] = line['value']
87         else:
88             for h in context.table.headings:
89                 params[h] = context.table[0][h]
90
91     if context.nominatim.api_engine is None:
92         return send_api_query_php(endpoint, params, context)
93
94     return asyncio.run(context.nominatim.api_engine(endpoint, params,
95                                                     Path(context.nominatim.website_dir.name),
96                                                     context.nominatim.test_env,
97                                                     getattr(context, 'http_headers', {})))
98
99
100
101 def send_api_query_php(endpoint, params, context):
102     env = dict(BASE_SERVER_ENV)
103     env['QUERY_STRING'] = urlencode(params)
104
105     env['SCRIPT_NAME'] = f'/{endpoint}.php'
106     env['REQUEST_URI'] = f"{env['SCRIPT_NAME']}?{env['QUERY_STRING']}"
107     env['CONTEXT_DOCUMENT_ROOT'] = os.path.join(context.nominatim.website_dir.name, 'website')
108     env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
109                                           f'{endpoint}.php')
110
111     LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
112
113     if hasattr(context, 'http_headers'):
114         env.update(context.http_headers)
115
116     cmd = ['/usr/bin/env', 'php-cgi', '-f']
117     if context.nominatim.code_coverage_path:
118         env['XDEBUG_MODE'] = 'coverage'
119         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
120         env['COV_PHP_DIR'] = context.nominatim.src_dir
121         env['COV_TEST_NAME'] = f"{context.scenario.filename}:{context.scenario.line}"
122         env['SCRIPT_FILENAME'] = \
123                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
124         cmd.append(env['SCRIPT_FILENAME'])
125         env['PHP_CODE_COVERAGE_FILE'] = context.nominatim.next_code_coverage_file()
126     else:
127         cmd.append(env['SCRIPT_FILENAME'])
128
129     for k,v in params.items():
130         cmd.append(f"{k}={v}")
131
132     outp, err = run_script(cmd, cwd=context.nominatim.website_dir.name, env=env)
133
134     assert len(err) == 0, f"Unexpected PHP error: {err}"
135
136     if outp.startswith('Status: '):
137         status = int(outp[8:11])
138     else:
139         status = 200
140
141     content_start = outp.find('\r\n\r\n')
142
143     return outp[content_start + 4:], status
144
145 @given(u'the HTTP header')
146 def add_http_header(context):
147     if not hasattr(context, 'http_headers'):
148         context.http_headers = {}
149
150     for h in context.table.headings:
151         envvar = 'HTTP_' + h.upper().replace('-', '_')
152         context.http_headers[envvar] = context.table[0][h]
153
154
155 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
156 def website_search_request(context, fmt, query, addr):
157     params = {}
158     if query:
159         params['q'] = query
160     if addr is not None:
161         params['addressdetails'] = '1'
162
163     outp, status = send_api_query('search', params, fmt, context)
164
165     context.response = SearchResponse(outp, fmt or 'json', status)
166
167
168 @when('sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
169 def api_endpoint_v1_reverse(context, lat, lon, fmt):
170     params = {}
171     if lat is not None:
172         params['lat'] = lat
173     if lon is not None:
174         params['lon'] = lon
175     if fmt is None:
176         fmt = 'jsonv2'
177     elif fmt == "''":
178         fmt = None
179
180     outp, status = send_api_query('reverse', params, fmt, context)
181     context.response = ReverseResponse(outp, fmt or 'xml', status)
182
183
184 @when('sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
185 def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
186     params = {}
187     params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
188
189     outp, status = send_api_query('reverse', params, fmt, context)
190     context.response = ReverseResponse(outp, fmt or 'xml', status)
191
192
193 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
194 def website_details_request(context, fmt, query):
195     params = {}
196     if query[0] in 'NWR':
197         nid = NominatimID(query)
198         params['osmtype'] = nid.typ
199         params['osmid'] = nid.oid
200         if nid.cls:
201             params['class'] = nid.cls
202     else:
203         params['place_id'] = query
204     outp, status = send_api_query('details', params, fmt, context)
205
206     context.response = GenericResponse(outp, fmt or 'json', status)
207
208 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
209 def website_lookup_request(context, fmt, query):
210     params = { 'osm_ids' : query }
211     outp, status = send_api_query('lookup', params, fmt, context)
212
213     context.response = SearchResponse(outp, fmt or 'xml', status)
214
215 @when(u'sending (?P<fmt>\S+ )?status query')
216 def website_status_request(context, fmt):
217     params = {}
218     outp, status = send_api_query('status', params, fmt, context)
219
220     context.response = StatusResponse(outp, fmt or 'text', status)
221
222 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
223 def validate_result_number(context, operator, number):
224     context.execute_steps("Then a HTTP 200 is returned")
225     numres = len(context.response.result)
226     assert compare(operator, numres, int(number)), \
227            f"Bad number of results: expected {operator} {number}, got {numres}."
228
229 @then(u'a HTTP (?P<status>\d+) is returned')
230 def check_http_return_status(context, status):
231     assert context.response.errorcode == int(status), \
232            f"Return HTTP status is {context.response.errorcode}."
233
234 @then(u'the page contents equals "(?P<text>.+)"')
235 def check_page_content_equals(context, text):
236     assert context.response.page == text
237
238 @then(u'the result is valid (?P<fmt>\w+)')
239 def step_impl(context, fmt):
240     context.execute_steps("Then a HTTP 200 is returned")
241     if fmt.strip() == 'html':
242         try:
243             tree = ET.fromstring(context.response.page)
244         except Exception as ex:
245             assert False, f"Could not parse page:\n{context.response.page}"
246
247         assert tree.tag == 'html'
248         body = tree.find('./body')
249         assert body is not None
250         assert body.find('.//script') is None
251     else:
252         assert context.response.format == fmt
253
254
255 @then(u'a (?P<fmt>\w+) user error is returned')
256 def check_page_error(context, fmt):
257     context.execute_steps("Then a HTTP 400 is returned")
258     assert context.response.format == fmt
259
260     if fmt == 'xml':
261         assert re.search(r'<error>.+</error>', context.response.page, re.DOTALL) is not None
262     else:
263         assert re.search(r'({"error":)', context.response.page, re.DOTALL) is not None
264
265 @then(u'result header contains')
266 def check_header_attr(context):
267     for line in context.table:
268         value = context.response.header[line['attr']]
269         assert re.fullmatch(line['value'], value) is not None, \
270                f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
271
272
273 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
274 def check_header_no_attr(context, neg, attrs):
275     check_for_attributes(context.response.header, attrs,
276                          'absent' if neg else 'present')
277
278
279 @then(u'results contain(?: in field (?P<field>.*))?')
280 def step_impl(context, field):
281     context.execute_steps("then at least 1 result is returned")
282
283     for line in context.table:
284         context.response.match_row(line, context=context, field=field)
285
286
287 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
288 def validate_attributes(context, lid, neg, attrs):
289     for i in make_todo_list(context, lid):
290         check_for_attributes(context.response.result[i], attrs,
291                              'absent' if neg else 'present')
292
293
294 @then(u'result addresses contain')
295 def step_impl(context):
296     context.execute_steps("then at least 1 result is returned")
297
298     for line in context.table:
299         idx = int(line['ID']) if 'ID' in line.headings else None
300
301         for name, value in zip(line.headings, line.cells):
302             if name != 'ID':
303                 context.response.assert_address_field(idx, name, value)
304
305 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
306 def check_address(context, lid, neg, attrs):
307     context.execute_steps(f"then more than {lid} results are returned")
308
309     addr_parts = context.response.result[int(lid)]['address']
310
311     for attr in attrs.split(','):
312         if neg:
313             assert attr not in addr_parts
314         else:
315             assert attr in addr_parts
316
317 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
318 def check_address(context, lid, complete):
319     context.execute_steps(f"then more than {lid} results are returned")
320
321     lid = int(lid)
322     addr_parts = dict(context.response.result[lid]['address'])
323
324     for line in context.table:
325         context.response.assert_address_field(lid, line['type'], line['value'])
326         del addr_parts[line['type']]
327
328     if complete == 'is':
329         assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
330
331
332 @then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
333 def check_bounding_box_in_area(context, lid, coords):
334     expected = Bbox(coords)
335
336     for idx in make_todo_list(context, lid):
337         res = context.response.result[idx]
338         check_for_attributes(res, 'boundingbox')
339         context.response.check_row(idx, res['boundingbox'] in expected,
340                                    f"Bbox is not contained in {expected}")
341
342
343 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
344 def check_centroid_in_area(context, lid, coords):
345     expected = Bbox(coords)
346
347     for idx in make_todo_list(context, lid):
348         res = context.response.result[idx]
349         check_for_attributes(res, 'lat,lon')
350         context.response.check_row(idx, (res['lon'], res['lat']) in expected,
351                                    f"Centroid is not inside {expected}")
352
353
354 @then(u'there are(?P<neg> no)? duplicates')
355 def check_for_duplicates(context, neg):
356     context.execute_steps("then at least 1 result is returned")
357
358     resarr = set()
359     has_dupe = False
360
361     for res in context.response.result:
362         dup = (res['osm_type'], res['class'], res['type'], res['display_name'])
363         if dup in resarr:
364             has_dupe = True
365             break
366         resarr.add(dup)
367
368     if neg:
369         assert not has_dupe, f"Found duplicate for {dup}"
370     else:
371         assert has_dupe, "No duplicates found"
372