]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_api_queries.py
add support for new middle table format of osm2pgsql
[nominatim.git] / test / bdd / steps / steps_api_queries.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Steps that run queries against the API.
8
9     Queries may either be run directly via PHP using the query script
10     or via the HTTP interface using php-cgi.
11 """
12 from pathlib import Path
13 import json
14 import os
15 import re
16 import logging
17 import asyncio
18 import xml.etree.ElementTree as ET
19 from urllib.parse import urlencode
20
21 from utils import run_script
22 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
23 from check_functions import Bbox, check_for_attributes
24 from table_compare import NominatimID
25
26 LOG = logging.getLogger(__name__)
27
28 BASE_SERVER_ENV = {
29     'HTTP_HOST' : 'localhost',
30     'HTTP_USER_AGENT' : 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0',
31     'HTTP_ACCEPT' : 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
32     'HTTP_ACCEPT_ENCODING' : 'gzip, deflate',
33     'HTTP_CONNECTION' : 'keep-alive',
34     'SERVER_SIGNATURE' : '<address>Nominatim BDD Tests</address>',
35     'SERVER_SOFTWARE' : 'Nominatim test',
36     'SERVER_NAME' : 'localhost',
37     'SERVER_ADDR' : '127.0.1.1',
38     'SERVER_PORT' : '80',
39     'REMOTE_ADDR' : '127.0.0.1',
40     'DOCUMENT_ROOT' : '/var/www',
41     'REQUEST_SCHEME' : 'http',
42     'CONTEXT_PREFIX' : '/',
43     'SERVER_ADMIN' : 'webmaster@localhost',
44     'REMOTE_PORT' : '49319',
45     'GATEWAY_INTERFACE' : 'CGI/1.1',
46     'SERVER_PROTOCOL' : 'HTTP/1.1',
47     'REQUEST_METHOD' : 'GET',
48     'REDIRECT_STATUS' : 'CGI'
49 }
50
51
52 def make_todo_list(context, result_id):
53     if result_id is None:
54         context.execute_steps("then at least 1 result is returned")
55         return range(len(context.response.result))
56
57     context.execute_steps(f"then more than {result_id}results are returned")
58     return (int(result_id.strip()), )
59
60
61 def compare(operator, op1, op2):
62     if operator == 'less than':
63         return op1 < op2
64     elif operator == 'more than':
65         return op1 > op2
66     elif operator == 'exactly':
67         return op1 == op2
68     elif operator == 'at least':
69         return op1 >= op2
70     elif operator == 'at most':
71         return op1 <= op2
72     else:
73         raise ValueError(f"Unknown operator '{operator}'")
74
75
76 def send_api_query(endpoint, params, fmt, context):
77     if fmt is not None:
78         if fmt.strip() == 'debug':
79             params['debug'] = '1'
80         else:
81             params['format'] = fmt.strip()
82
83     if context.table:
84         if context.table.headings[0] == 'param':
85             for line in context.table:
86                 params[line['param']] = line['value']
87         else:
88             for h in context.table.headings:
89                 params[h] = context.table[0][h]
90
91     if context.nominatim.api_engine is None:
92         return send_api_query_php(endpoint, params, context)
93
94     return asyncio.run(context.nominatim.api_engine(endpoint, params,
95                                                     Path(context.nominatim.website_dir.name),
96                                                     context.nominatim.test_env,
97                                                     getattr(context, 'http_headers', {})))
98
99
100
101 def send_api_query_php(endpoint, params, context):
102     env = dict(BASE_SERVER_ENV)
103     env['QUERY_STRING'] = urlencode(params)
104
105     env['SCRIPT_NAME'] = f'/{endpoint}.php'
106     env['REQUEST_URI'] = f"{env['SCRIPT_NAME']}?{env['QUERY_STRING']}"
107     env['CONTEXT_DOCUMENT_ROOT'] = os.path.join(context.nominatim.website_dir.name, 'website')
108     env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
109                                           f'{endpoint}.php')
110
111     LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
112
113     if hasattr(context, 'http_headers'):
114         for k, v in context.http_headers.items():
115             env['HTTP_' + k.upper().replace('-', '_')] = v
116
117     cmd = ['/usr/bin/env', 'php-cgi', '-f']
118     if context.nominatim.code_coverage_path:
119         env['XDEBUG_MODE'] = 'coverage'
120         env['COV_SCRIPT_FILENAME'] = env['SCRIPT_FILENAME']
121         env['COV_PHP_DIR'] = context.nominatim.src_dir
122         env['COV_TEST_NAME'] = f"{context.scenario.filename}:{context.scenario.line}"
123         env['SCRIPT_FILENAME'] = \
124                 os.path.join(os.path.split(__file__)[0], 'cgi-with-coverage.php')
125         cmd.append(env['SCRIPT_FILENAME'])
126         env['PHP_CODE_COVERAGE_FILE'] = context.nominatim.next_code_coverage_file()
127     else:
128         cmd.append(env['SCRIPT_FILENAME'])
129
130     for k,v in params.items():
131         cmd.append(f"{k}={v}")
132
133     outp, err = run_script(cmd, cwd=context.nominatim.website_dir.name, env=env)
134
135     assert len(err) == 0, f"Unexpected PHP error: {err}"
136
137     if outp.startswith('Status: '):
138         status = int(outp[8:11])
139     else:
140         status = 200
141
142     content_start = outp.find('\r\n\r\n')
143
144     return outp[content_start + 4:], status
145
146 @given(u'the HTTP header')
147 def add_http_header(context):
148     if not hasattr(context, 'http_headers'):
149         context.http_headers = {}
150
151     for h in context.table.headings:
152         context.http_headers[h] = context.table[0][h]
153
154
155 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
156 def website_search_request(context, fmt, query, addr):
157     params = {}
158     if query:
159         params['q'] = query
160     if addr is not None:
161         params['addressdetails'] = '1'
162
163     outp, status = send_api_query('search', params, fmt, context)
164
165     context.response = SearchResponse(outp, fmt or 'json', status)
166
167
168 @when('sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
169 def api_endpoint_v1_reverse(context, lat, lon, fmt):
170     params = {}
171     if lat is not None:
172         params['lat'] = lat
173     if lon is not None:
174         params['lon'] = lon
175     if fmt is None:
176         fmt = 'jsonv2'
177     elif fmt == "''":
178         fmt = None
179
180     outp, status = send_api_query('reverse', params, fmt, context)
181     context.response = ReverseResponse(outp, fmt or 'xml', status)
182
183
184 @when('sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
185 def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
186     params = {}
187     params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
188
189     outp, status = send_api_query('reverse', params, fmt, context)
190     context.response = ReverseResponse(outp, fmt or 'xml', status)
191
192
193 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
194 def website_details_request(context, fmt, query):
195     params = {}
196     if query[0] in 'NWR':
197         nid = NominatimID(query)
198         params['osmtype'] = nid.typ
199         params['osmid'] = nid.oid
200         if nid.cls:
201             params['class'] = nid.cls
202     else:
203         params['place_id'] = query
204     outp, status = send_api_query('details', params, fmt, context)
205
206     context.response = GenericResponse(outp, fmt or 'json', status)
207
208 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
209 def website_lookup_request(context, fmt, query):
210     params = { 'osm_ids' : query }
211     outp, status = send_api_query('lookup', params, fmt, context)
212
213     context.response = SearchResponse(outp, fmt or 'xml', status)
214
215 @when(u'sending (?P<fmt>\S+ )?status query')
216 def website_status_request(context, fmt):
217     params = {}
218     outp, status = send_api_query('status', params, fmt, context)
219
220     context.response = StatusResponse(outp, fmt or 'text', status)
221
222 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
223 def validate_result_number(context, operator, number):
224     context.execute_steps("Then a HTTP 200 is returned")
225     numres = len(context.response.result)
226     assert compare(operator, numres, int(number)), \
227            f"Bad number of results: expected {operator} {number}, got {numres}."
228
229 @then(u'a HTTP (?P<status>\d+) is returned')
230 def check_http_return_status(context, status):
231     assert context.response.errorcode == int(status), \
232            f"Return HTTP status is {context.response.errorcode}."\
233            f" Full response:\n{context.response.page}"
234
235 @then(u'the page contents equals "(?P<text>.+)"')
236 def check_page_content_equals(context, text):
237     assert context.response.page == text
238
239 @then(u'the result is valid (?P<fmt>\w+)')
240 def step_impl(context, fmt):
241     context.execute_steps("Then a HTTP 200 is returned")
242     if fmt.strip() == 'html':
243         try:
244             tree = ET.fromstring(context.response.page)
245         except Exception as ex:
246             assert False, f"Could not parse page: {ex}\n{context.response.page}"
247
248         assert tree.tag == 'html'
249         body = tree.find('./body')
250         assert body is not None
251         assert body.find('.//script') is None
252     else:
253         assert context.response.format == fmt
254
255
256 @then(u'a (?P<fmt>\w+) user error is returned')
257 def check_page_error(context, fmt):
258     context.execute_steps("Then a HTTP 400 is returned")
259     assert context.response.format == fmt
260
261     if fmt == 'xml':
262         assert re.search(r'<error>.+</error>', context.response.page, re.DOTALL) is not None
263     else:
264         assert re.search(r'({"error":)', context.response.page, re.DOTALL) is not None
265
266 @then(u'result header contains')
267 def check_header_attr(context):
268     context.execute_steps("Then a HTTP 200 is returned")
269     for line in context.table:
270         assert line['attr'] in context.response.header, \
271                f"Field '{line['attr']}' missing in header. Full header:\n{context.response.header}"
272         value = context.response.header[line['attr']]
273         assert re.fullmatch(line['value'], value) is not None, \
274                f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
275
276
277 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
278 def check_header_no_attr(context, neg, attrs):
279     check_for_attributes(context.response.header, attrs,
280                          'absent' if neg else 'present')
281
282
283 @then(u'results contain(?: in field (?P<field>.*))?')
284 def step_impl(context, field):
285     context.execute_steps("then at least 1 result is returned")
286
287     for line in context.table:
288         context.response.match_row(line, context=context, field=field)
289
290
291 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
292 def validate_attributes(context, lid, neg, attrs):
293     for i in make_todo_list(context, lid):
294         check_for_attributes(context.response.result[i], attrs,
295                              'absent' if neg else 'present')
296
297
298 @then(u'result addresses contain')
299 def step_impl(context):
300     context.execute_steps("then at least 1 result is returned")
301
302     for line in context.table:
303         idx = int(line['ID']) if 'ID' in line.headings else None
304
305         for name, value in zip(line.headings, line.cells):
306             if name != 'ID':
307                 context.response.assert_address_field(idx, name, value)
308
309 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
310 def check_address(context, lid, neg, attrs):
311     context.execute_steps(f"then more than {lid} results are returned")
312
313     addr_parts = context.response.result[int(lid)]['address']
314
315     for attr in attrs.split(','):
316         if neg:
317             assert attr not in addr_parts
318         else:
319             assert attr in addr_parts
320
321 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
322 def check_address(context, lid, complete):
323     context.execute_steps(f"then more than {lid} results are returned")
324
325     lid = int(lid)
326     addr_parts = dict(context.response.result[lid]['address'])
327
328     for line in context.table:
329         context.response.assert_address_field(lid, line['type'], line['value'])
330         del addr_parts[line['type']]
331
332     if complete == 'is':
333         assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
334
335
336 @then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
337 def check_bounding_box_in_area(context, lid, coords):
338     expected = Bbox(coords)
339
340     for idx in make_todo_list(context, lid):
341         res = context.response.result[idx]
342         check_for_attributes(res, 'boundingbox')
343         context.response.check_row(idx, res['boundingbox'] in expected,
344                                    f"Bbox is not contained in {expected}")
345
346
347 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
348 def check_centroid_in_area(context, lid, coords):
349     expected = Bbox(coords)
350
351     for idx in make_todo_list(context, lid):
352         res = context.response.result[idx]
353         check_for_attributes(res, 'lat,lon')
354         context.response.check_row(idx, (res['lon'], res['lat']) in expected,
355                                    f"Centroid is not inside {expected}")
356
357
358 @then(u'there are(?P<neg> no)? duplicates')
359 def check_for_duplicates(context, neg):
360     context.execute_steps("then at least 1 result is returned")
361
362     resarr = set()
363     has_dupe = False
364
365     for res in context.response.result:
366         dup = (res['osm_type'], res['class'], res['type'], res['display_name'])
367         if dup in resarr:
368             has_dupe = True
369             break
370         resarr.add(dup)
371
372     if neg:
373         assert not has_dupe, f"Found duplicate for {dup}"
374     else:
375         assert has_dupe, "No duplicates found"
376