]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/api_result.py
Merge pull request #345 from netsgnut/updated-tiger-mips
[nominatim.git] / tests / steps / api_result.py
1 """ Steps for checking the results of queries.
2 """
3
4 from nose.tools import *
5 from lettuce import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
8 import json
9 import logging
10 import re
11 from xml.dom.minidom import parseString
12
13 logger = logging.getLogger(__name__)
14
15 def _parse_xml():
16     """ Puts the DOM structure into more convenient python
17         with a similar structure as the json document, so
18         that the same the semantics can be used. It does not
19         check if the content is valid (or at least not more than
20         necessary to transform it into a dict structure).
21     """
22     page = parseString(world.page).documentElement
23
24     # header info
25     world.result_header = OrderedDict(page.attributes.items())
26     logger.debug('Result header: %r' % (world.result_header))
27     world.results = []
28
29     # results
30     if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31         for node in page.childNodes:
32             if node.nodeName != "#text":
33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34                 newresult = OrderedDict(node.attributes.items())
35                 assert_not_in('address', newresult)
36                 assert_not_in('geokml', newresult)
37                 assert_not_in('extratags', newresult)
38                 assert_not_in('namedetails', newresult)
39                 address = OrderedDict()
40                 for sub in node.childNodes:
41                     if sub.nodeName == 'geokml':
42                         newresult['geokml'] = sub.childNodes[0].toxml()
43                     elif sub.nodeName == 'extratags':
44                         newresult['extratags'] = {}
45                         for tag in sub.childNodes:
46                             assert_equals(tag.nodeName, 'tag')
47                             attrs = dict(tag.attributes.items())
48                             assert_in('key', attrs)
49                             assert_in('value', attrs)
50                             newresult['extratags'][attrs['key']] = attrs['value']
51                     elif sub.nodeName == 'namedetails':
52                         newresult['namedetails'] = {}
53                         for tag in sub.childNodes:
54                             assert_equals(tag.nodeName, 'name')
55                             attrs = dict(tag.attributes.items())
56                             assert_in('desc', attrs)
57                             newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
58
59                     elif sub.nodeName == '#text':
60                         pass
61                     else:
62                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
63                 if address:
64                     newresult['address'] = address
65                 world.results.append(newresult)
66     elif page.nodeName == 'reversegeocode':
67         haserror = False
68         address = {}
69         for node in page.childNodes:
70             if node.nodeName == 'result':
71                 assert_equals(len(world.results), 0)
72                 assert (not haserror)
73                 world.results.append(OrderedDict(node.attributes.items()))
74                 assert_not_in('display_name', world.results[0])
75                 assert_not_in('address', world.results[0])
76                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
77             elif node.nodeName == 'error':
78                 assert_equals(len(world.results), 0)
79                 haserror = True
80             elif node.nodeName == 'addressparts':
81                 assert (not haserror)
82                 address = OrderedDict()
83                 for sub in node.childNodes:
84                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
85                 world.results[0]['address'] = address
86             elif node.nodeName == 'extratags':
87                 world.results[0]['extratags'] = {}
88                 for tag in node.childNodes:
89                     assert_equals(tag.nodeName, 'tag')
90                     attrs = dict(tag.attributes.items())
91                     assert_in('key', attrs)
92                     assert_in('value', attrs)
93                     world.results[0]['extratags'][attrs['key']] = attrs['value']
94             elif node.nodeName == 'namedetails':
95                 world.results[0]['namedetails'] = {}
96                 for tag in node.childNodes:
97                     assert_equals(tag.nodeName, 'name')
98                     attrs = dict(tag.attributes.items())
99                     assert_in('desc', attrs)
100                     world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
101             elif node.nodeName == "#text":
102                 pass
103             else:
104                 assert False, "Unknown content '%s' in XML" % node.nodeName
105     else:
106         assert False, "Unknown document node name %s in XML" % page.nodeName
107
108     logger.debug("The following was parsed out of XML:")
109     logger.debug(world.results)
110
111 @step(u'a HTTP (\d+) is returned')
112 def api_result_http_error(step, error):
113     assert_equals(world.returncode, int(error))
114
115 @step(u'the result is valid( \w+)?')
116 def api_result_is_valid(step, fmt):
117     assert_equals(world.returncode, 200)
118
119     if world.response_format == 'html':
120         document, errors = tidy_document(world.page, 
121                              options={'char-encoding' : 'utf8'})
122         assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
123         world.results = document
124     elif world.response_format == 'xml':
125         _parse_xml()
126     elif world.response_format == 'json':
127         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
128         if world.request_type == 'reverse':
129             world.results = (world.results,)
130     else:
131         assert False, "Unknown page format: %s" % (world.response_format)
132
133     if fmt:
134         assert_equals (fmt.strip(), world.response_format)
135
136
137 def compare(operator, op1, op2):
138     if operator == 'less than':
139         return op1 < op2
140     elif operator == 'more than':
141         return op1 > op2
142     elif operator == 'exactly':
143         return op1 == op2
144     elif operator == 'at least':
145         return op1 >= op2
146     elif operator == 'at most':
147         return op1 <= op2
148     else:
149         raise Exception("unknown operator '%s'" % operator)
150
151 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
152 def validate_result_number(step, operator, number):
153     step.given('the result is valid')
154     numres = len(world.results)
155     assert compare(operator, numres, int(number)), \
156         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
157
158 @step(u'result (\d+) has( not)? attributes (\S+)')
159 def search_check_for_result_attribute(step, num, invalid, attrs):
160     num = int(num)
161     step.given('at least %d results are returned' % (num + 1))
162     res = world.results[num]
163     for attr in attrs.split(','):
164         if invalid:
165             assert_not_in(attr.strip(), res)
166         else:
167             assert_in(attr.strip(),res)
168         
169 @step(u'there is a json wrapper "([^"]*)"')
170 def api_result_check_json_wrapper(step, wrapper):
171     step.given('the result is valid json')
172     assert_equals(world.json_callback, wrapper)
173
174 @step(u'result header contains')
175 def api_result_header_contains(step):
176     step.given('the result is valid')
177     for line in step.hashes:
178         assert_in(line['attr'], world.result_header)
179         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
180
181 @step(u'result header has no attribute (.*)')
182 def api_result_header_contains_not(step, attr):
183     step.given('the result is valid')
184     assert_not_in(attr, world.result_header)
185
186 @step(u'results contain$')
187 def api_result_contains(step):
188     step.given('at least 1 result is returned')
189     for line in step.hashes:
190         if 'ID' in line:
191             reslist = (world.results[int(line['ID'])],)
192         else:
193             reslist = world.results
194         for k,v in line.iteritems():
195             if k == 'latlon':
196                 for curres in reslist:
197                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
198             elif k != 'ID':
199                 for curres in reslist:
200                     assert_in(k, curres)
201                     if v[0] in '<>=':
202                         # mathematical operation
203                         evalexp = '%s %s' % (curres[k], v)
204                         res = eval(evalexp)
205                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
206                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
207                     else:
208                         # regex match
209                         m = re.match("%s$" % (v,), curres[k])
210                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
211
212
213 @step(u'result addresses contain$')
214 def api_result_address_contains(step):
215     step.given('the result is valid')
216     for line in step.hashes:
217         if 'ID' in line:
218             reslist = (world.results[int(line['ID'])],)
219         else:
220             reslist = world.results
221         for k,v in line.iteritems():
222             if k != 'ID':
223                 for res in reslist:
224                     curres = res['address']
225                     assert_in(k, curres)
226                     m = re.match("%s$" % (v,), curres[k])
227                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
228
229
230 @step(u'address of result (\d+) contains')
231 def api_result_address_exact(step, resid):
232     resid = int(resid)
233     step.given('at least %d results are returned' % (resid + 1))
234     addr = world.results[resid]['address']
235     for line in step.hashes:
236         assert_in(line['type'], addr)
237         m = re.match("%s$" % line['value'], addr[line['type']])
238         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
239                                   line['type'], line['value'], addr[line['type']]))
240         #assert_equals(line['value'], addr[line['type']])
241
242 @step(u'address of result (\d+) does not contain (.*)')
243 def api_result_address_details_missing(step, resid, types):
244     resid = int(resid)
245     step.given('at least %d results are returned' % (resid + 1))
246     addr = world.results[resid]['address']
247     for t in types.split(','):
248         assert_not_in(t.strip(), addr)
249
250
251 @step(u'address of result (\d+) is')
252 def api_result_address_exact(step, resid):
253     resid = int(resid)
254     step.given('at least %d results are returned' % (resid + 1))
255     result = world.results[resid]
256     linenr = 0
257     assert_equals(len(step.hashes), len(result['address']))
258     for k,v in result['address'].iteritems():
259         assert_equals(step.hashes[linenr]['type'], k)
260         assert_equals(step.hashes[linenr]['value'], v)
261         linenr += 1
262
263
264 @step('there are( no)? duplicates')
265 def api_result_check_for_duplicates(step, nodups=None):
266     step.given('at least 1 result is returned')
267     resarr = []
268     for res in world.results:
269         resarr.append((res['osm_type'], res['class'],
270                         res['type'], res['display_name']))
271
272     if nodups is None:
273         assert len(resarr) > len(set(resarr))
274     else:
275         assert_equal(len(resarr), len(set(resarr)))