1 """ Steps for checking the results of queries.
4 from nose.tools import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
11 from xml.dom.minidom import parseString
13 logger = logging.getLogger(__name__)
16 """ Puts the DOM structure into more convenient python
17 with a similar structure as the json document, so
18 that the same the semantics can be used. It does not
19 check if the content is valid (or at least not more than
20 necessary to transform it into a dict structure).
22 page = parseString(world.page).documentElement
25 world.result_header = OrderedDict(page.attributes.items())
26 logger.debug('Result header: %r' % (world.result_header))
30 if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31 for node in page.childNodes:
32 if node.nodeName != "#text":
33 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34 newresult = OrderedDict(node.attributes.items())
35 assert_not_in('address', newresult)
36 assert_not_in('geokml', newresult)
37 assert_not_in('extratags', newresult)
38 assert_not_in('namedetails', newresult)
39 address = OrderedDict()
40 for sub in node.childNodes:
41 if sub.nodeName == 'geokml':
42 newresult['geokml'] = sub.childNodes[0].toxml()
43 elif sub.nodeName == 'extratags':
44 newresult['extratags'] = {}
45 for tag in sub.childNodes:
46 assert_equals(tag.nodeName, 'tag')
47 attrs = dict(tag.attributes.items())
48 assert_in('key', attrs)
49 assert_in('value', attrs)
50 newresult['extratags'][attrs['key']] = attrs['value']
51 elif sub.nodeName == 'namedetails':
52 newresult['namedetails'] = {}
53 for tag in sub.childNodes:
54 assert_equals(tag.nodeName, 'name')
55 attrs = dict(tag.attributes.items())
56 assert_in('desc', attrs)
57 newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
59 elif sub.nodeName == '#text':
62 address[sub.nodeName] = sub.firstChild.nodeValue.strip()
64 newresult['address'] = address
65 world.results.append(newresult)
66 elif page.nodeName == 'reversegeocode':
69 for node in page.childNodes:
70 if node.nodeName == 'result':
71 assert_equals(len(world.results), 0)
73 world.results.append(OrderedDict(node.attributes.items()))
74 assert_not_in('display_name', world.results[0])
75 assert_not_in('address', world.results[0])
76 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
77 elif node.nodeName == 'error':
78 assert_equals(len(world.results), 0)
80 elif node.nodeName == 'addressparts':
82 address = OrderedDict()
83 for sub in node.childNodes:
84 address[sub.nodeName] = sub.firstChild.nodeValue.strip()
85 world.results[0]['address'] = address
86 elif node.nodeName == 'extratags':
87 world.results[0]['extratags'] = {}
88 for tag in node.childNodes:
89 assert_equals(tag.nodeName, 'tag')
90 attrs = dict(tag.attributes.items())
91 assert_in('key', attrs)
92 assert_in('value', attrs)
93 world.results[0]['extratags'][attrs['key']] = attrs['value']
94 elif node.nodeName == 'namedetails':
95 world.results[0]['namedetails'] = {}
96 for tag in node.childNodes:
97 assert_equals(tag.nodeName, 'name')
98 attrs = dict(tag.attributes.items())
99 assert_in('desc', attrs)
100 world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
101 elif node.nodeName == "geokml":
102 world.results[0]['geokml'] = node
103 elif node.nodeName == "#text":
106 assert False, "Unknown content '%s' in XML" % node.nodeName
108 assert False, "Unknown document node name %s in XML" % page.nodeName
110 logger.debug("The following was parsed out of XML:")
111 logger.debug(world.results)
113 @step(u'a HTTP (\d+) is returned')
114 def api_result_http_error(step, error):
115 assert_equals(world.returncode, int(error))
117 @step(u'the result is valid( \w+)?')
118 def api_result_is_valid(step, fmt):
119 assert_equals(world.returncode, 200)
121 if world.response_format == 'html':
122 document, errors = tidy_document(world.page,
123 options={'char-encoding' : 'utf8'})
124 # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
125 world.results = document
126 elif world.response_format == 'xml':
128 elif world.response_format == 'json':
129 world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
130 if world.request_type == 'reverse':
131 world.results = (world.results,)
133 assert False, "Unknown page format: %s" % (world.response_format)
136 assert_equals (fmt.strip(), world.response_format)
139 def compare(operator, op1, op2):
140 if operator == 'less than':
142 elif operator == 'more than':
144 elif operator == 'exactly':
146 elif operator == 'at least':
148 elif operator == 'at most':
151 raise Exception("unknown operator '%s'" % operator)
153 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
154 def validate_result_number(step, operator, number):
155 step.given('the result is valid')
156 numres = len(world.results)
157 assert compare(operator, numres, int(number)), \
158 "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
160 @step(u'result (\d+) has( not)? attributes (\S+)')
161 def search_check_for_result_attribute(step, num, invalid, attrs):
163 step.given('at least %d results are returned' % (num + 1))
164 res = world.results[num]
165 for attr in attrs.split(','):
167 assert_not_in(attr.strip(), res)
169 assert_in(attr.strip(),res)
171 @step(u'there is a json wrapper "([^"]*)"')
172 def api_result_check_json_wrapper(step, wrapper):
173 step.given('the result is valid json')
174 assert_equals(world.json_callback, wrapper)
176 @step(u'result header contains')
177 def api_result_header_contains(step):
178 step.given('the result is valid')
179 for line in step.hashes:
180 assert_in(line['attr'], world.result_header)
181 m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
183 @step(u'result header has no attribute (.*)')
184 def api_result_header_contains_not(step, attr):
185 step.given('the result is valid')
186 assert_not_in(attr, world.result_header)
188 @step(u'results contain$')
189 def api_result_contains(step):
190 step.given('at least 1 result is returned')
191 for line in step.hashes:
193 reslist = (world.results[int(line['ID'])],)
195 reslist = world.results
196 for k,v in line.iteritems():
198 for curres in reslist:
199 world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
201 for curres in reslist:
204 # mathematical operation
205 evalexp = '%s %s' % (curres[k], v)
207 logger.debug('Evaluating: %s = %s' % (res, evalexp))
208 assert_true(res, "Evaluation failed: %s" % (evalexp, ))
211 m = re.match("%s$" % (v,), curres[k])
212 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
214 @step(u'results contain valid boundingboxes$')
215 def api_result_address_contains(step):
216 step.given('the result is valid')
217 for curres in world.results:
218 bb = curres['boundingbox']
219 if world.response_format == 'json':
221 m = re.match('^(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+)$', bb)
222 assert_is_not_none(m, msg="invalid boundingbox: %s." % (curres['boundingbox']))
224 @step(u'result addresses contain$')
225 def api_result_address_contains(step):
226 step.given('the result is valid')
227 for line in step.hashes:
229 reslist = (world.results[int(line['ID'])],)
231 reslist = world.results
232 for k,v in line.iteritems():
235 curres = res['address']
237 m = re.match("%s$" % (v,), curres[k])
238 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
241 @step(u'address of result (\d+) contains')
242 def api_result_address_exact(step, resid):
244 step.given('at least %d results are returned' % (resid + 1))
245 addr = world.results[resid]['address']
246 for line in step.hashes:
247 assert_in(line['type'], addr)
248 m = re.match("%s$" % line['value'], addr[line['type']])
249 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
250 line['type'], line['value'], addr[line['type']]))
251 #assert_equals(line['value'], addr[line['type']])
253 @step(u'address of result (\d+) does not contain (.*)')
254 def api_result_address_details_missing(step, resid, types):
256 step.given('at least %d results are returned' % (resid + 1))
257 addr = world.results[resid]['address']
258 for t in types.split(','):
259 assert_not_in(t.strip(), addr)
262 @step(u'address of result (\d+) is')
263 def api_result_address_exact(step, resid):
265 step.given('at least %d results are returned' % (resid + 1))
266 result = world.results[resid]
268 assert_equals(len(step.hashes), len(result['address']))
269 for k,v in result['address'].iteritems():
270 assert_equals(step.hashes[linenr]['type'], k)
271 assert_equals(step.hashes[linenr]['value'], v)
275 @step('there are( no)? duplicates')
276 def api_result_check_for_duplicates(step, nodups=None):
277 step.given('at least 1 result is returned')
279 for res in world.results:
280 resarr.append((res['osm_type'], res['class'],
281 res['type'], res['display_name']))
284 assert len(resarr) > len(set(resarr))
286 assert_equal(len(resarr), len(set(resarr)))