]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/api_result.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / tests / steps / api_result.py
1 """ Steps for checking the results of queries.
2 """
3
4 from nose.tools import *
5 from lettuce import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
8 import json
9 import logging
10 import re
11 from xml.dom.minidom import parseString
12
13 logger = logging.getLogger(__name__)
14
15 def _parse_xml():
16     """ Puts the DOM structure into more convenient python
17         with a similar structure as the json document, so
18         that the same the semantics can be used. It does not
19         check if the content is valid (or at least not more than
20         necessary to transform it into a dict structure).
21     """
22     page = parseString(world.page).documentElement
23
24     # header info
25     world.result_header = OrderedDict(page.attributes.items())
26     logger.debug('Result header: %r' % (world.result_header))
27     world.results = []
28
29     # results
30     if page.nodeName == 'searchresults':
31         for node in page.childNodes:
32             if node.nodeName != "#text":
33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34                 newresult = OrderedDict(node.attributes.items())
35                 assert_not_in('address', newresult)
36                 assert_not_in('geokml', newresult)
37                 address = OrderedDict()
38                 for sub in node.childNodes:
39                     if sub.nodeName == 'geokml':
40                         newresult['geokml'] = sub.childNodes[0].toxml()
41                     elif sub.nodeName == '#text':
42                         pass
43                     else:
44                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
45                 if address:
46                     newresult['address'] = address
47                 world.results.append(newresult)
48     elif page.nodeName == 'reversegeocode':
49         haserror = False
50         address = {}
51         for node in page.childNodes:
52             if node.nodeName == 'result':
53                 assert_equals(len(world.results), 0)
54                 assert (not haserror)
55                 world.results.append(OrderedDict(node.attributes.items()))
56                 assert_not_in('display_name', world.results[0])
57                 assert_not_in('address', world.results[0])
58                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
59             elif node.nodeName == 'error':
60                 assert_equals(len(world.results), 0)
61                 haserror = True
62             elif node.nodeName == 'addressparts':
63                 assert (not haserror)
64                 address = OrderedDict()
65                 for sub in node.childNodes:
66                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
67                 world.results[0]['address'] = address
68             elif node.nodeName == "#text":
69                 pass
70             else:
71                 assert False, "Unknown content '%s' in XML" % node.nodeName
72     else:
73         assert False, "Unknown document node name %s in XML" % page.nodeName
74
75     logger.debug("The following was parsed out of XML:")
76     logger.debug(world.results)
77
78 @step(u'a HTTP (\d+) is returned')
79 def api_result_http_error(step, error):
80     assert_equals(world.returncode, int(error))
81
82 @step(u'the result is valid( \w+)?')
83 def api_result_is_valid(step, fmt):
84     assert_equals(world.returncode, 200)
85
86     if world.response_format == 'html':
87         document, errors = tidy_document(world.page, 
88                              options={'char-encoding' : 'utf8'})
89         assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
90         world.results = document
91     elif world.response_format == 'xml':
92         _parse_xml()
93     elif world.response_format == 'json':
94         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
95     else:
96         assert False, "Unknown page format: %s" % (world.response_format)
97
98     if fmt:
99         assert_equals (fmt.strip(), world.response_format)
100
101
102 def compare(operator, op1, op2):
103     if operator == 'less than':
104         return op1 < op2
105     elif operator == 'more than':
106         return op1 > op2
107     elif operator == 'exactly':
108         return op1 == op2
109     elif operator == 'at least':
110         return op1 >= op2
111     elif operator == 'at most':
112         return op1 <= op2
113     else:
114         raise Exception("unknown operator '%s'" % operator)
115
116 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
117 def validate_result_number(step, operator, number):
118     step.given('the result is valid')
119     numres = len(world.results)
120     assert compare(operator, numres, int(number)), \
121         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
122
123 @step(u'result (\d+) has( not)? attributes (\S+)')
124 def search_check_for_result_attribute(step, num, invalid, attrs):
125     num = int(num)
126     step.given('at least %d results are returned' % (num + 1))
127     res = world.results[num]
128     for attr in attrs.split(','):
129         if invalid:
130             assert_not_in(attr.strip(), res)
131         else:
132             assert_in(attr.strip(),res)
133         
134 @step(u'there is a json wrapper "([^"]*)"')
135 def api_result_check_json_wrapper(step, wrapper):
136     step.given('the result is valid json')
137     assert_equals(world.json_callback, wrapper)
138
139 @step(u'result header contains')
140 def api_result_header_contains(step):
141     step.given('the result is valid')
142     for line in step.hashes:
143         assert_in(line['attr'], world.result_header)
144         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
145
146 @step(u'result header has no attribute (.*)')
147 def api_result_header_contains_not(step, attr):
148     step.given('the result is valid')
149     assert_not_in(attr, world.result_header)
150
151 @step(u'results contain$')
152 def api_result_contains(step):
153     step.given('at least 1 result is returned')
154     for line in step.hashes:
155         if 'ID' in line:
156             reslist = (world.results[int(line['ID'])],)
157         else:
158             reslist = world.results
159         for k,v in line.iteritems():
160             if k == 'latlon':
161                 for curres in reslist:
162                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
163             elif k != 'ID':
164                 for curres in reslist:
165                     assert_in(k, curres)
166                     if v[0] in '<>=':
167                         # mathematical operation
168                         evalexp = '%s %s' % (curres[k], v)
169                         res = eval(evalexp)
170                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
171                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
172                     else:
173                         # regex match
174                         m = re.match("%s$" % (v,), curres[k])
175                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
176
177
178 @step(u'result addresses contain$')
179 def api_result_address_contains(step):
180     step.given('the result is valid')
181     for line in step.hashes:
182         if 'ID' in line:
183             reslist = (world.results[int(line['ID'])],)
184         else:
185             reslist = world.results
186         for k,v in line.iteritems():
187             if k != 'ID':
188                 for res in reslist:
189                     curres = res['address']
190                     assert_in(k, curres)
191                     m = re.match("%s$" % (v,), curres[k])
192                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
193
194
195 @step(u'address of result (\d+) contains')
196 def api_result_address_exact(step, resid):
197     resid = int(resid)
198     step.given('at least %d results are returned' % (resid + 1))
199     addr = world.results[resid]['address']
200     for line in step.hashes:
201         assert_in(line['type'], addr)
202         assert_equals(line['value'], addr[line['type']])
203
204 @step(u'address of result (\d+) does not contain (.*)')
205 def api_result_address_details_missing(step, resid, types):
206     resid = int(resid)
207     step.given('at least %d results are returned' % (resid + 1))
208     addr = world.results[resid]['address']
209     for t in types.split(','):
210         assert_not_in(t.strip(), addr)
211
212
213 @step(u'address of result (\d+) is')
214 def api_result_address_exact(step, resid):
215     resid = int(resid)
216     step.given('at least %d results are returned' % (resid + 1))
217     result = world.results[resid]
218     linenr = 0
219     assert_equals(len(step.hashes), len(result['address']))
220     for k,v in result['address'].iteritems():
221         assert_equals(step.hashes[linenr]['type'], k)
222         assert_equals(step.hashes[linenr]['value'], v)
223         linenr += 1
224
225
226 @step('there are( no)? duplicates')
227 def api_result_check_for_duplicates(step, nodups=None):
228     step.given('at least 1 result is returned')
229     resarr = []
230     for res in world.results:
231         resarr.append((res['osm_type'], res['class'],
232                         res['type'], res['display_name']))
233
234     if nodups is None:
235         assert len(resarr) > len(set(resarr))
236     else:
237         assert_equal(len(resarr), len(set(resarr)))