]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_api_queries.py
docs: external osm2pgsql is not longer optional
[nominatim.git] / test / bdd / steps / steps_api_queries.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """ Steps that run queries against the API.
8 """
9 from pathlib import Path
10 import json
11 import os
12 import re
13 import logging
14 import asyncio
15 import xml.etree.ElementTree as ET
16 from urllib.parse import urlencode
17
18 from utils import run_script
19 from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
20 from check_functions import Bbox, check_for_attributes
21 from table_compare import NominatimID
22
23 LOG = logging.getLogger(__name__)
24
25
26 def make_todo_list(context, result_id):
27     if result_id is None:
28         context.execute_steps("then at least 1 result is returned")
29         return range(len(context.response.result))
30
31     context.execute_steps(f"then more than {result_id}results are returned")
32     return (int(result_id.strip()), )
33
34
35 def compare(operator, op1, op2):
36     if operator == 'less than':
37         return op1 < op2
38     elif operator == 'more than':
39         return op1 > op2
40     elif operator == 'exactly':
41         return op1 == op2
42     elif operator == 'at least':
43         return op1 >= op2
44     elif operator == 'at most':
45         return op1 <= op2
46     else:
47         raise ValueError(f"Unknown operator '{operator}'")
48
49
50 def send_api_query(endpoint, params, fmt, context):
51     if fmt is not None:
52         if fmt.strip() == 'debug':
53             params['debug'] = '1'
54         else:
55             params['format'] = fmt.strip()
56
57     if context.table:
58         if context.table.headings[0] == 'param':
59             for line in context.table:
60                 params[line['param']] = line['value']
61         else:
62             for h in context.table.headings:
63                 params[h] = context.table[0][h]
64
65     return asyncio.run(context.nominatim.api_engine(endpoint, params,
66                                                     Path(context.nominatim.website_dir.name),
67                                                     context.nominatim.test_env,
68                                                     getattr(context, 'http_headers', {})))
69
70
71 @given(u'the HTTP header')
72 def add_http_header(context):
73     if not hasattr(context, 'http_headers'):
74         context.http_headers = {}
75
76     for h in context.table.headings:
77         context.http_headers[h] = context.table[0][h]
78
79
80 @when(u'sending (?P<fmt>\S+ )?search query "(?P<query>.*)"(?P<addr> with address)?')
81 def website_search_request(context, fmt, query, addr):
82     params = {}
83     if query:
84         params['q'] = query
85     if addr is not None:
86         params['addressdetails'] = '1'
87
88     outp, status = send_api_query('search', params, fmt, context)
89
90     context.response = SearchResponse(outp, fmt or 'json', status)
91
92
93 @when('sending v1/reverse at (?P<lat>[\d.-]*),(?P<lon>[\d.-]*)(?: with format (?P<fmt>.+))?')
94 def api_endpoint_v1_reverse(context, lat, lon, fmt):
95     params = {}
96     if lat is not None:
97         params['lat'] = lat
98     if lon is not None:
99         params['lon'] = lon
100     if fmt is None:
101         fmt = 'jsonv2'
102     elif fmt == "''":
103         fmt = None
104
105     outp, status = send_api_query('reverse', params, fmt, context)
106     context.response = ReverseResponse(outp, fmt or 'xml', status)
107
108
109 @when('sending v1/reverse N(?P<nodeid>\d+)(?: with format (?P<fmt>.+))?')
110 def api_endpoint_v1_reverse_from_node(context, nodeid, fmt):
111     params = {}
112     params['lon'], params['lat'] = (f'{c:f}' for c in context.osm.grid_node(int(nodeid)))
113
114     outp, status = send_api_query('reverse', params, fmt, context)
115     context.response = ReverseResponse(outp, fmt or 'xml', status)
116
117
118 @when(u'sending (?P<fmt>\S+ )?details query for (?P<query>.*)')
119 def website_details_request(context, fmt, query):
120     params = {}
121     if query[0] in 'NWR':
122         nid = NominatimID(query)
123         params['osmtype'] = nid.typ
124         params['osmid'] = nid.oid
125         if nid.cls:
126             params['class'] = nid.cls
127     else:
128         params['place_id'] = query
129     outp, status = send_api_query('details', params, fmt, context)
130
131     context.response = GenericResponse(outp, fmt or 'json', status)
132
133 @when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
134 def website_lookup_request(context, fmt, query):
135     params = { 'osm_ids' : query }
136     outp, status = send_api_query('lookup', params, fmt, context)
137
138     context.response = SearchResponse(outp, fmt or 'xml', status)
139
140 @when(u'sending (?P<fmt>\S+ )?status query')
141 def website_status_request(context, fmt):
142     params = {}
143     outp, status = send_api_query('status', params, fmt, context)
144
145     context.response = StatusResponse(outp, fmt or 'text', status)
146
147 @step(u'(?P<operator>less than|more than|exactly|at least|at most) (?P<number>\d+) results? (?:is|are) returned')
148 def validate_result_number(context, operator, number):
149     context.execute_steps("Then a HTTP 200 is returned")
150     numres = len(context.response.result)
151     assert compare(operator, numres, int(number)), \
152            f"Bad number of results: expected {operator} {number}, got {numres}."
153
154 @then(u'a HTTP (?P<status>\d+) is returned')
155 def check_http_return_status(context, status):
156     assert context.response.errorcode == int(status), \
157            f"Return HTTP status is {context.response.errorcode}."\
158            f" Full response:\n{context.response.page}"
159
160 @then(u'the page contents equals "(?P<text>.+)"')
161 def check_page_content_equals(context, text):
162     assert context.response.page == text
163
164 @then(u'the result is valid (?P<fmt>\w+)')
165 def step_impl(context, fmt):
166     context.execute_steps("Then a HTTP 200 is returned")
167     if fmt.strip() == 'html':
168         try:
169             tree = ET.fromstring(context.response.page)
170         except Exception as ex:
171             assert False, f"Could not parse page: {ex}\n{context.response.page}"
172
173         assert tree.tag == 'html'
174         body = tree.find('./body')
175         assert body is not None
176         assert body.find('.//script') is None
177     else:
178         assert context.response.format == fmt
179
180
181 @then(u'a (?P<fmt>\w+) user error is returned')
182 def check_page_error(context, fmt):
183     context.execute_steps("Then a HTTP 400 is returned")
184     assert context.response.format == fmt
185
186     if fmt == 'xml':
187         assert re.search(r'<error>.+</error>', context.response.page, re.DOTALL) is not None
188     else:
189         assert re.search(r'({"error":)', context.response.page, re.DOTALL) is not None
190
191 @then(u'result header contains')
192 def check_header_attr(context):
193     context.execute_steps("Then a HTTP 200 is returned")
194     for line in context.table:
195         assert line['attr'] in context.response.header, \
196                f"Field '{line['attr']}' missing in header. Full header:\n{context.response.header}"
197         value = context.response.header[line['attr']]
198         assert re.fullmatch(line['value'], value) is not None, \
199                f"Attribute '{line['attr']}': expected: '{line['value']}', got '{value}'"
200
201
202 @then(u'result header has (?P<neg>not )?attributes (?P<attrs>.*)')
203 def check_header_no_attr(context, neg, attrs):
204     check_for_attributes(context.response.header, attrs,
205                          'absent' if neg else 'present')
206
207
208 @then(u'results contain(?: in field (?P<field>.*))?')
209 def step_impl(context, field):
210     context.execute_steps("then at least 1 result is returned")
211
212     for line in context.table:
213         context.response.match_row(line, context=context, field=field)
214
215
216 @then(u'result (?P<lid>\d+ )?has (?P<neg>not )?attributes (?P<attrs>.*)')
217 def validate_attributes(context, lid, neg, attrs):
218     for i in make_todo_list(context, lid):
219         check_for_attributes(context.response.result[i], attrs,
220                              'absent' if neg else 'present')
221
222
223 @then(u'result addresses contain')
224 def step_impl(context):
225     context.execute_steps("then at least 1 result is returned")
226
227     for line in context.table:
228         idx = int(line['ID']) if 'ID' in line.headings else None
229
230         for name, value in zip(line.headings, line.cells):
231             if name != 'ID':
232                 context.response.assert_address_field(idx, name, value)
233
234 @then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
235 def check_address(context, lid, neg, attrs):
236     context.execute_steps(f"then more than {lid} results are returned")
237
238     addr_parts = context.response.result[int(lid)]['address']
239
240     for attr in attrs.split(','):
241         if neg:
242             assert attr not in addr_parts
243         else:
244             assert attr in addr_parts
245
246 @then(u'address of result (?P<lid>\d+) (?P<complete>is|contains)')
247 def check_address(context, lid, complete):
248     context.execute_steps(f"then more than {lid} results are returned")
249
250     lid = int(lid)
251     addr_parts = dict(context.response.result[lid]['address'])
252
253     for line in context.table:
254         context.response.assert_address_field(lid, line['type'], line['value'])
255         del addr_parts[line['type']]
256
257     if complete == 'is':
258         assert len(addr_parts) == 0, f"Additional address parts found: {addr_parts!s}"
259
260
261 @then(u'result (?P<lid>\d+ )?has bounding box in (?P<coords>[\d,.-]+)')
262 def check_bounding_box_in_area(context, lid, coords):
263     expected = Bbox(coords)
264
265     for idx in make_todo_list(context, lid):
266         res = context.response.result[idx]
267         check_for_attributes(res, 'boundingbox')
268         context.response.check_row(idx, res['boundingbox'] in expected,
269                                    f"Bbox is not contained in {expected}")
270
271
272 @then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
273 def check_centroid_in_area(context, lid, coords):
274     expected = Bbox(coords)
275
276     for idx in make_todo_list(context, lid):
277         res = context.response.result[idx]
278         check_for_attributes(res, 'lat,lon')
279         context.response.check_row(idx, (res['lon'], res['lat']) in expected,
280                                    f"Centroid is not inside {expected}")
281
282
283 @then(u'there are(?P<neg> no)? duplicates')
284 def check_for_duplicates(context, neg):
285     context.execute_steps("then at least 1 result is returned")
286
287     resarr = set()
288     has_dupe = False
289
290     for res in context.response.result:
291         dup = (res['osm_type'], res['class'], res['type'], res['display_name'])
292         if dup in resarr:
293             has_dupe = True
294             break
295         resarr.add(dup)
296
297     if neg:
298         assert not has_dupe, f"Found duplicate for {dup}"
299     else:
300         assert has_dupe, "No duplicates found"
301