]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_api_queries.py
Merge pull request #3515 from lonvia/custom-result-formatting
[nominatim.git] / test / bdd / steps / steps_api_queries.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Steps that run queries against the API.
8
9     Queries may either be run directly via PHP using the query script
10     or via the HTTP interface using php-cgi.
11 """
12 from pathlib import Path
13 import json
14 import os
15 import re
16 import logging
17 import asyncio
18 import xml.etree.ElementTree as ET
19 from urllib.parse import urlencode
20
21 from utils import run_script
22 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
23 from check_functions import Bbox, check_for_attributes
24 from table_compare import NominatimID
25
26 LOG = logging.getLogger(__name__)
27
28 BASE_SERVER_ENV = {
29     'HTTP_HOST' : 'localhost',
30     'HTTP_USER_AGENT' : 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
31     'HTTP_ACCEPT' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
32     'HTTP_ACCEPT_ENCODING' : 'gzip, deflate',
33     'HTTP_CONNECTION' : 'keep-alive',
34     'SERVER_SIGNATURE' : '<address>Nominatim BDD Tests</address>',
35     'SERVER_SOFTWARE' : 'Nominatim test',
36     'SERVER_NAME' : 'localhost',
37     'SERVER_ADDR' : '127.0.1.1',
38     'SERVER_PORT' : '80',
39     'REMOTE_ADDR' : '127.0.0.1',
40     'DOCUMENT_ROOT' : '/var/www',
41     'REQUEST_SCHEME' : 'http',
42     'CONTEXT_PREFIX' : '/',
43     'SERVER_ADMIN' : 'webmaster@localhost',
44     'REMOTE_PORT' : '49319',
45     'GATEWAY_INTERFACE' : 'CGI/1.1',
46     'SERVER_PROTOCOL' : 'HTTP/1.1',
47     'REQUEST_METHOD' : 'GET',
48     'REDIRECT_STATUS' : 'CGI'
49 }
50
51
52 def make_todo_list(context, result_id):
53     if result_id is None:
54         context.execute_steps("then at least 1 result is returned")
55         return range(len(context.response.result))
56
57     context.execute_steps(f"then more than {result_id}results are returned")
58     return (int(result_id.strip()), )
59
60
61 def compare(operator, op1, op2):
62     if operator == 'less than':
63         return op1 < op2
64     elif operator == 'more than':
65         return op1 > op2
66     elif operator == 'exactly':
67         return op1 == op2
68     elif operator == 'at least':
69         return op1 >= op2
70     elif operator == 'at most':
71         return op1 <= op2
72     else:
73         raise ValueError(f"Unknown operator '{operator}'")
74
75
76 def send_api_query(endpoint, params, fmt, context):
77     if fmt is not None:
78         if fmt.strip() == 'debug':
79             params['debug'] = '1'
80         else:
81             params['format'] = fmt.strip()
82
83     if context.table:
84         if context.table.headings[0] == 'param':
85             for line in context.table:
86                 params[line['param']] = line['value']
87         else:
88             for h in context.table.headings:
89                 params[h] = context.table[0][h]
90
91     if context.nominatim.api_engine is None:
92         return send_api_query_php(endpoint, params, context)
93
94     return asyncio.run(context.nominatim.api_engine(endpoint, params,
95                                                     Path(context.nominatim.website_dir.name),
96                                                     context.nominatim.test_env,
97                                                     getattr(context, 'http_headers', {})))
98
99
100
101 def send_api_query_php(endpoint, params, context):
102     env = dict(BASE_SERVER_ENV)
103     env['QUERY_STRING'] = urlencode(params)
104
105     env['SCRIPT_NAME'] = f'/{endpoint}.php'
106     env['REQUEST_URI'] = f"{env['SCRIPT_NAME']}?{env['QUERY_STRING']}"
107     env['CONTEXT_DOCUMENT_ROOT'] = os.path.join(context.nominatim.website_dir.name, 'website')
108     env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
109                                           f'{endpoint}.php')
110
111     LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
112
113     if hasattr(context, 'http_headers'):
114         for k, v in context.http_headers.items():
115             env['HTTP_' + k.upper().replace('-', '_')] = v
116
117     cmd = ['/usr/bin/env', 'php-cgi', '-f', env['SCRIPT_FILENAME']]
118
119     for k,v in params.items():
120         cmd.append(f"{k}={v}")
121
122     outp, err = run_script(cmd, cwd=context.nominatim.website_dir.name, env=env)
123
124     assert len(err) == 0, f"Unexpected PHP error: {err}"
125
126     if outp.startswith('Status: '):
127         status = int(outp[8:11])
128     else:
129         status = 200
130
131     content_start = outp.find('\r\n\r\n')
132
133     return outp[content_start + 4:], status
134
135 @given(u'the HTTP header')
136 def add_http_header(context):
137     if not hasattr(context, 'http_headers'):
138         context.http_headers = {}
139
140     for h in context.table.headings:
141         context.http_headers[h] = context.table[0][h]
142
143
144 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
145 def website_search_request(context, fmt, query, addr):
146     params = {}
147     if query:
148         params['q'] = query
149     if addr is not None:
150         params['addressdetails'] = '1'
151
152     outp, status = send_api_query('search', params, fmt, context)
153
154     context.response = SearchResponse(outp, fmt or 'json', status)
155
156
157 @when('sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
158 def api_endpoint_v1_reverse(context, lat, lon, fmt):
159     params = {}
160     if lat is not None:
161         params['lat'] = lat
162     if lon is not None:
163         params['lon'] = lon
164     if fmt is None:
165         fmt = 'jsonv2'
166     elif fmt == "''":
167         fmt = None
168
169     outp, status = send_api_query('reverse', params, fmt, context)
170     context.response = ReverseResponse(outp, fmt or 'xml', status)
171
172
173 @when('sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
174 def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
175     params = {}
176     params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
177
178     outp, status = send_api_query('reverse', params, fmt, context)
179     context.response = ReverseResponse(outp, fmt or 'xml', status)
180
181
182 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
183 def website_details_request(context, fmt, query):
184     params = {}
185     if query[0] in 'NWR':
186         nid = NominatimID(query)
187         params['osmtype'] = nid.typ
188         params['osmid'] = nid.oid
189         if nid.cls:
190             params['class'] = nid.cls
191     else:
192         params['place_id'] = query
193     outp, status = send_api_query('details', params, fmt, context)
194
195     context.response = GenericResponse(outp, fmt or 'json', status)
196
197 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
198 def website_lookup_request(context, fmt, query):
199     params = { 'osm_ids' : query }
200     outp, status = send_api_query('lookup', params, fmt, context)
201
202     context.response = SearchResponse(outp, fmt or 'xml', status)
203
204 @when(u'sending (?P<fmt>\S+ )?status query')
205 def website_status_request(context, fmt):
206     params = {}
207     outp, status = send_api_query('status', params, fmt, context)
208
209     context.response = StatusResponse(outp, fmt or 'text', status)
210
211 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
212 def validate_result_number(context, operator, number):
213     context.execute_steps("Then a HTTP 200 is returned")
214     numres = len(context.response.result)
215     assert compare(operator, numres, int(number)), \
216            f"Bad number of results: expected {operator} {number}, got {numres}."
217
218 @then(u'a HTTP (?P<status>\d+) is returned')
219 def check_http_return_status(context, status):
220     assert context.response.errorcode == int(status), \
221            f"Return HTTP status is {context.response.errorcode}."\
222            f" Full response:\n{context.response.page}"
223
224 @then(u'the page contents equals "(?P<text>.+)"')
225 def check_page_content_equals(context, text):
226     assert context.response.page == text
227
228 @then(u'the result is valid (?P<fmt>\w+)')
229 def step_impl(context, fmt):
230     context.execute_steps("Then a HTTP 200 is returned")
231     if fmt.strip() == 'html':
232         try:
233             tree = ET.fromstring(context.response.page)
234         except Exception as ex:
235             assert False, f"Could not parse page: {ex}\n{context.response.page}"
236
237         assert tree.tag == 'html'
238         body = tree.find('./body')
239         assert body is not None
240         assert body.find('.//script') is None
241     else:
242         assert context.response.format == fmt
243
244
245 @then(u'a (?P<fmt>\w+) user error is returned')
246 def check_page_error(context, fmt):
247     context.execute_steps("Then a HTTP 400 is returned")
248     assert context.response.format == fmt
249
250     if fmt == 'xml':
251         assert re.search(r'<error>.+</error>', context.response.page, re.DOTALL) is not None
252     else:
253         assert re.search(r'({"error":)', context.response.page, re.DOTALL) is not None
254
255 @then(u'result header contains')
256 def check_header_attr(context):
257     context.execute_steps("Then a HTTP 200 is returned")
258     for line in context.table:
259         assert line['attr'] in context.response.header, \
260                f"Field '{line['attr']}' missing in header. Full header:\n{context.response.header}"
261         value = context.response.header[line['attr']]
262         assert re.fullmatch(line['value'], value) is not None, \
263                f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
264
265
266 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
267 def check_header_no_attr(context, neg, attrs):
268     check_for_attributes(context.response.header, attrs,
269                          'absent' if neg else 'present')
270
271
272 @then(u'results contain(?: in field (?P<field>.*))?')
273 def step_impl(context, field):
274     context.execute_steps("then at least 1 result is returned")
275
276     for line in context.table:
277         context.response.match_row(line, context=context, field=field)
278
279
280 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
281 def validate_attributes(context, lid, neg, attrs):
282     for i in make_todo_list(context, lid):
283         check_for_attributes(context.response.result[i], attrs,
284                              'absent' if neg else 'present')
285
286
287 @then(u'result addresses contain')
288 def step_impl(context):
289     context.execute_steps("then at least 1 result is returned")
290
291     for line in context.table:
292         idx = int(line['ID']) if 'ID' in line.headings else None
293
294         for name, value in zip(line.headings, line.cells):
295             if name != 'ID':
296                 context.response.assert_address_field(idx, name, value)
297
298 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
299 def check_address(context, lid, neg, attrs):
300     context.execute_steps(f"then more than {lid} results are returned")
301
302     addr_parts = context.response.result[int(lid)]['address']
303
304     for attr in attrs.split(','):
305         if neg:
306             assert attr not in addr_parts
307         else:
308             assert attr in addr_parts
309
310 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
311 def check_address(context, lid, complete):
312     context.execute_steps(f"then more than {lid} results are returned")
313
314     lid = int(lid)
315     addr_parts = dict(context.response.result[lid]['address'])
316
317     for line in context.table:
318         context.response.assert_address_field(lid, line['type'], line['value'])
319         del addr_parts[line['type']]
320
321     if complete == 'is':
322         assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
323
324
325 @then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
326 def check_bounding_box_in_area(context, lid, coords):
327     expected = Bbox(coords)
328
329     for idx in make_todo_list(context, lid):
330         res = context.response.result[idx]
331         check_for_attributes(res, 'boundingbox')
332         context.response.check_row(idx, res['boundingbox'] in expected,
333                                    f"Bbox is not contained in {expected}")
334
335
336 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
337 def check_centroid_in_area(context, lid, coords):
338     expected = Bbox(coords)
339
340     for idx in make_todo_list(context, lid):
341         res = context.response.result[idx]
342         check_for_attributes(res, 'lat,lon')
343         context.response.check_row(idx, (res['lon'], res['lat']) in expected,
344                                    f"Centroid is not inside {expected}")
345
346
347 @then(u'there are(?P<neg> no)? duplicates')
348 def check_for_duplicates(context, neg):
349     context.execute_steps("then at least 1 result is returned")
350
351     resarr = set()
352     has_dupe = False
353
354     for res in context.response.result:
355         dup = (res['osm_type'], res['class'], res['type'], res['display_name'])
356         if dup in resarr:
357             has_dupe = True
358             break
359         resarr.add(dup)
360
361     if neg:
362         assert not has_dupe, f"Found duplicate for {dup}"
363     else:
364         assert has_dupe, "No duplicates found"
365