]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_api_queries.py
78967ad14e10faf8ae41a5ef13744fe70c185975
[nominatim.git] / test / bdd / steps / steps_api_queries.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Steps that run queries against the API.
8
9     Queries may either be run directly via PHP using the query script
10     or via the HTTP interface using php-cgi.
11 """
12 from pathlib import Path
13 import json
14 import os
15 import re
16 import logging
17 import asyncio
18 import xml.etree.ElementTree as ET
19 from urllib.parse import urlencode
20
21 from utils import run_script
22 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
23 from check_functions import Bbox, check_for_attributes
24 from table_compare import NominatimID
25
26 LOG = logging.getLogger(__name__)
27
28 BASE_SERVER_ENV = {
29     'HTTP_HOST' : 'localhost',
30     'HTTP_USER_AGENT' : 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
31     'HTTP_ACCEPT' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
32     'HTTP_ACCEPT_ENCODING' : 'gzip, deflate',
33     'HTTP_CONNECTION' : 'keep-alive',
34     'SERVER_SIGNATURE' : '<address>Nominatim BDD Tests</address>',
35     'SERVER_SOFTWARE' : 'Nominatim test',
36     'SERVER_NAME' : 'localhost',
37     'SERVER_ADDR' : '127.0.1.1',
38     'SERVER_PORT' : '80',
39     'REMOTE_ADDR' : '127.0.0.1',
40     'DOCUMENT_ROOT' : '/var/www',
41     'REQUEST_SCHEME' : 'http',
42     'CONTEXT_PREFIX' : '/',
43     'SERVER_ADMIN' : 'webmaster@localhost',
44     'REMOTE_PORT' : '49319',
45     'GATEWAY_INTERFACE' : 'CGI/1.1',
46     'SERVER_PROTOCOL' : 'HTTP/1.1',
47     'REQUEST_METHOD' : 'GET',
48     'REDIRECT_STATUS' : 'CGI'
49 }
50
51
52 def make_todo_list(context, result_id):
53     if result_id is None:
54         context.execute_steps("then at least 1 result is returned")
55         return range(len(context.response.result))
56
57     context.execute_steps(f"then more than {result_id}results are returned")
58     return (int(result_id.strip()), )
59
60
61 def compare(operator, op1, op2):
62     if operator == 'less than':
63         return op1 < op2
64     elif operator == 'more than':
65         return op1 > op2
66     elif operator == 'exactly':
67         return op1 == op2
68     elif operator == 'at least':
69         return op1 >= op2
70     elif operator == 'at most':
71         return op1 <= op2
72     else:
73         raise ValueError(f"Unknown operator '{operator}'")
74
75
76 def send_api_query(endpoint, params, fmt, context):
77     if fmt is not None:
78         if fmt.strip() == 'debug':
79             params['debug'] = '1'
80         else:
81             params['format'] = fmt.strip()
82
83     if context.table:
84         if context.table.headings[0] == 'param':
85             for line in context.table:
86                 params[line['param']] = line['value']
87         else:
88             for h in context.table.headings:
89                 params[h] = context.table[0][h]
90
91     if context.nominatim.api_engine is None:
92         return send_api_query_php(endpoint, params, context)
93
94     return asyncio.run(context.nominatim.api_engine(endpoint, params,
95                                                     Path(context.nominatim.website_dir.name),
96                                                     context.nominatim.test_env,
97                                                     getattr(context, 'http_headers', {})))
98
99
100
101 def send_api_query_php(endpoint, params, context):
102     env = dict(BASE_SERVER_ENV)
103     env['QUERY_STRING'] = urlencode(params)
104
105     env['SCRIPT_NAME'] = f'/{endpoint}.php'
106     env['REQUEST_URI'] = f"{env['SCRIPT_NAME']}?{env['QUERY_STRING']}"
107     env['CONTEXT_DOCUMENT_ROOT'] = os.path.join(context.nominatim.website_dir.name, 'website')
108     env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
109                                           f'{endpoint}.php')
110
111     LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
112
113     if hasattr(context, 'http_headers'):
114         env.update(context.http_headers)
115
116     cmd = ['/usr/bin/env', 'php-cgi', '-f']
117     if context.nominatim.code_coverage_path:
118         env['XDEBUG_MODE'] = 'coverage'
119         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
120         env['COV_PHP_DIR'] = context.nominatim.src_dir
121         env['COV_TEST_NAME'] = f"{context.scenario.filename}:{context.scenario.line}"
122         env['SCRIPT_FILENAME'] = \
123                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
124         cmd.append(env['SCRIPT_FILENAME'])
125         env['PHP_CODE_COVERAGE_FILE'] = context.nominatim.next_code_coverage_file()
126     else:
127         cmd.append(env['SCRIPT_FILENAME'])
128
129     for k,v in params.items():
130         cmd.append(f"{k}={v}")
131
132     outp, err = run_script(cmd, cwd=context.nominatim.website_dir.name, env=env)
133
134     assert len(err) == 0, f"Unexpected PHP error: {err}"
135
136     if outp.startswith('Status: '):
137         status = int(outp[8:11])
138     else:
139         status = 200
140
141     content_start = outp.find('\r\n\r\n')
142
143     return outp[content_start + 4:], status
144
145 @given(u'the HTTP header')
146 def add_http_header(context):
147     if not hasattr(context, 'http_headers'):
148         context.http_headers = {}
149
150     for h in context.table.headings:
151         envvar = 'HTTP_' + h.upper().replace('-', '_')
152         context.http_headers[envvar] = context.table[0][h]
153
154
155 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
156 def website_search_request(context, fmt, query, addr):
157     params = {}
158     if query:
159         params['q'] = query
160     if addr is not None:
161         params['addressdetails'] = '1'
162
163     outp, status = send_api_query('search', params, fmt, context)
164
165     context.response = SearchResponse(outp, fmt or 'json', status)
166
167 @when(u'sending (?P<fmt>\S+ )?reverse coordinates (?P<lat>.+)?,(?P<lon>.+)?')
168 def website_reverse_request(context, fmt, lat, lon):
169     params = {}
170     if lat is not None:
171         params['lat'] = lat
172     if lon is not None:
173         params['lon'] = lon
174
175     outp, status = send_api_query('reverse', params, fmt, context)
176
177     context.response = ReverseResponse(outp, fmt or 'xml', status)
178
179 @when(u'sending (?P<fmt>\S+ )?reverse point (?P<nodeid>.+)')
180 def website_reverse_request(context, fmt, nodeid):
181     params = {}
182     params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
183
184
185     outp, status = send_api_query('reverse', params, fmt, context)
186
187     context.response = ReverseResponse(outp, fmt or 'xml', status)
188
189
190
191 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
192 def website_details_request(context, fmt, query):
193     params = {}
194     if query[0] in 'NWR':
195         nid = NominatimID(query)
196         params['osmtype'] = nid.typ
197         params['osmid'] = nid.oid
198         if nid.cls:
199             params['class'] = nid.cls
200     else:
201         params['place_id'] = query
202     outp, status = send_api_query('details', params, fmt, context)
203
204     context.response = GenericResponse(outp, fmt or 'json', status)
205
206 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
207 def website_lookup_request(context, fmt, query):
208     params = { 'osm_ids' : query }
209     outp, status = send_api_query('lookup', params, fmt, context)
210
211     context.response = SearchResponse(outp, fmt or 'xml', status)
212
213 @when(u'sending (?P<fmt>\S+ )?status query')
214 def website_status_request(context, fmt):
215     params = {}
216     outp, status = send_api_query('status', params, fmt, context)
217
218     context.response = StatusResponse(outp, fmt or 'text', status)
219
220 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
221 def validate_result_number(context, operator, number):
222     context.execute_steps("Then a HTTP 200 is returned")
223     numres = len(context.response.result)
224     assert compare(operator, numres, int(number)), \
225            f"Bad number of results: expected {operator} {number}, got {numres}."
226
227 @then(u'a HTTP (?P<status>\d+) is returned')
228 def check_http_return_status(context, status):
229     assert context.response.errorcode == int(status), \
230            f"Return HTTP status is {context.response.errorcode}."
231
232 @then(u'the page contents equals "(?P<text>.+)"')
233 def check_page_content_equals(context, text):
234     assert context.response.page == text
235
236 @then(u'the result is valid (?P<fmt>\w+)')
237 def step_impl(context, fmt):
238     context.execute_steps("Then a HTTP 200 is returned")
239     if fmt.strip() == 'html':
240         try:
241             tree = ET.fromstring(context.response.page)
242         except Exception as ex:
243             assert False, f"Could not parse page:\n{context.response.page}"
244
245         assert tree.tag == 'html'
246         body = tree.find('./body')
247         assert body is not None
248         assert body.find('.//script') is None
249     else:
250         assert context.response.format == fmt
251
252
253 @then(u'a (?P<fmt>\w+) user error is returned')
254 def check_page_error(context, fmt):
255     context.execute_steps("Then a HTTP 400 is returned")
256     assert context.response.format == fmt
257
258     if fmt == 'xml':
259         assert re.search(r'<error>.+</error>', context.response.page, re.DOTALL) is not None
260     else:
261         assert re.search(r'({"error":)', context.response.page, re.DOTALL) is not None
262
263 @then(u'result header contains')
264 def check_header_attr(context):
265     for line in context.table:
266         value = context.response.header[line['attr']]
267         assert re.fullmatch(line['value'], value) is not None, \
268                f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
269
270
271 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
272 def check_header_no_attr(context, neg, attrs):
273     check_for_attributes(context.response.header, attrs,
274                          'absent' if neg else 'present')
275
276
277 @then(u'results contain(?: in field (?P<field>.*))?')
278 def step_impl(context, field):
279     context.execute_steps("then at least 1 result is returned")
280
281     for line in context.table:
282         context.response.match_row(line, context=context, field=field)
283
284
285 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
286 def validate_attributes(context, lid, neg, attrs):
287     for i in make_todo_list(context, lid):
288         check_for_attributes(context.response.result[i], attrs,
289                              'absent' if neg else 'present')
290
291
292 @then(u'result addresses contain')
293 def step_impl(context):
294     context.execute_steps("then at least 1 result is returned")
295
296     for line in context.table:
297         idx = int(line['ID']) if 'ID' in line.headings else None
298
299         for name, value in zip(line.headings, line.cells):
300             if name != 'ID':
301                 context.response.assert_address_field(idx, name, value)
302
303 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
304 def check_address(context, lid, neg, attrs):
305     context.execute_steps(f"then more than {lid} results are returned")
306
307     addr_parts = context.response.result[int(lid)]['address']
308
309     for attr in attrs.split(','):
310         if neg:
311             assert attr not in addr_parts
312         else:
313             assert attr in addr_parts
314
315 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
316 def check_address(context, lid, complete):
317     context.execute_steps(f"then more than {lid} results are returned")
318
319     lid = int(lid)
320     addr_parts = dict(context.response.result[lid]['address'])
321
322     for line in context.table:
323         context.response.assert_address_field(lid, line['type'], line['value'])
324         del addr_parts[line['type']]
325
326     if complete == 'is':
327         assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
328
329
330 @then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
331 def check_bounding_box_in_area(context, lid, coords):
332     expected = Bbox(coords)
333
334     for idx in make_todo_list(context, lid):
335         res = context.response.result[idx]
336         check_for_attributes(res, 'boundingbox')
337         context.response.check_row(idx, res['boundingbox'] in expected,
338                                    f"Bbox is not contained in {expected}")
339
340
341 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
342 def check_centroid_in_area(context, lid, coords):
343     expected = Bbox(coords)
344
345     for idx in make_todo_list(context, lid):
346         res = context.response.result[idx]
347         check_for_attributes(res, 'lat,lon')
348         context.response.check_row(idx, (res['lon'], res['lat']) in expected,
349                                    f"Centroid is not inside {expected}")
350
351
352 @then(u'there are(?P<neg> no)? duplicates')
353 def check_for_duplicates(context, neg):
354     context.execute_steps("then at least 1 result is returned")
355
356     resarr = set()
357     has_dupe = False
358
359     for res in context.response.result:
360         dup = (res['osm_type'], res['class'], res['type'], res['display_name'])
361         if dup in resarr:
362             has_dupe = True
363             break
364         resarr.add(dup)
365
366     if neg:
367         assert not has_dupe, f"Found duplicate for {dup}"
368     else:
369         assert has_dupe, "No duplicates found"
370