1 """ Steps for checking the results of queries.
4 from nose.tools import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
11 from xml.dom.minidom import parseString
13 logger = logging.getLogger(__name__)
16 """ Puts the DOM structure into more convenient python
17 with a similar structure as the json document, so
18 that the same the semantics can be used. It does not
19 check if the content is valid (or at least not more than
20 necessary to transform it into a dict structure).
22 page = parseString(world.page).documentElement
25 world.result_header = OrderedDict(page.attributes.items())
26 logger.debug('Result header: %r' % (world.result_header))
30 if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31 for node in page.childNodes:
32 if node.nodeName != "#text":
33 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34 newresult = OrderedDict(node.attributes.items())
35 assert_not_in('address', newresult)
36 assert_not_in('geokml', newresult)
37 assert_not_in('extratags', newresult)
38 assert_not_in('namedetails', newresult)
39 address = OrderedDict()
40 for sub in node.childNodes:
41 if sub.nodeName == 'geokml':
42 newresult['geokml'] = sub.childNodes[0].toxml()
43 elif sub.nodeName == 'extratags':
44 newresult['extratags'] = {}
45 for tag in sub.childNodes:
46 assert_equals(tag.nodeName, 'tag')
47 attrs = dict(tag.attributes.items())
48 assert_in('key', attrs)
49 assert_in('value', attrs)
50 newresult['extratags'][attrs['key']] = attrs['value']
51 elif sub.nodeName == 'namedetails':
52 newresult['namedetails'] = {}
53 for tag in sub.childNodes:
54 assert_equals(tag.nodeName, 'name')
55 attrs = dict(tag.attributes.items())
56 assert_in('desc', attrs)
57 newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
59 elif sub.nodeName == '#text':
62 address[sub.nodeName] = sub.firstChild.nodeValue.strip()
64 newresult['address'] = address
65 world.results.append(newresult)
66 elif page.nodeName == 'reversegeocode':
69 for node in page.childNodes:
70 if node.nodeName == 'result':
71 assert_equals(len(world.results), 0)
73 world.results.append(OrderedDict(node.attributes.items()))
74 assert_not_in('display_name', world.results[0])
75 assert_not_in('address', world.results[0])
76 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
77 elif node.nodeName == 'error':
78 assert_equals(len(world.results), 0)
80 elif node.nodeName == 'addressparts':
82 address = OrderedDict()
83 for sub in node.childNodes:
84 address[sub.nodeName] = sub.firstChild.nodeValue.strip()
85 world.results[0]['address'] = address
86 elif node.nodeName == 'extratags':
87 world.results[0]['extratags'] = {}
88 for tag in node.childNodes:
89 assert_equals(tag.nodeName, 'tag')
90 attrs = dict(tag.attributes.items())
91 assert_in('key', attrs)
92 assert_in('value', attrs)
93 world.results[0]['extratags'][attrs['key']] = attrs['value']
94 elif node.nodeName == 'namedetails':
95 world.results[0]['namedetails'] = {}
96 for tag in node.childNodes:
97 assert_equals(tag.nodeName, 'name')
98 attrs = dict(tag.attributes.items())
99 assert_in('desc', attrs)
100 world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
101 elif node.nodeName == "#text":
104 assert False, "Unknown content '%s' in XML" % node.nodeName
106 assert False, "Unknown document node name %s in XML" % page.nodeName
108 logger.debug("The following was parsed out of XML:")
109 logger.debug(world.results)
111 @step(u'a HTTP (\d+) is returned')
112 def api_result_http_error(step, error):
113 assert_equals(world.returncode, int(error))
115 @step(u'the result is valid( \w+)?')
116 def api_result_is_valid(step, fmt):
117 assert_equals(world.returncode, 200)
119 if world.response_format == 'html':
120 document, errors = tidy_document(world.page,
121 options={'char-encoding' : 'utf8'})
122 # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
123 world.results = document
124 elif world.response_format == 'xml':
126 elif world.response_format == 'json':
127 world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
128 if world.request_type == 'reverse':
129 world.results = (world.results,)
131 assert False, "Unknown page format: %s" % (world.response_format)
134 assert_equals (fmt.strip(), world.response_format)
137 def compare(operator, op1, op2):
138 if operator == 'less than':
140 elif operator == 'more than':
142 elif operator == 'exactly':
144 elif operator == 'at least':
146 elif operator == 'at most':
149 raise Exception("unknown operator '%s'" % operator)
151 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
152 def validate_result_number(step, operator, number):
153 step.given('the result is valid')
154 numres = len(world.results)
155 assert compare(operator, numres, int(number)), \
156 "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
158 @step(u'result (\d+) has( not)? attributes (\S+)')
159 def search_check_for_result_attribute(step, num, invalid, attrs):
161 step.given('at least %d results are returned' % (num + 1))
162 res = world.results[num]
163 for attr in attrs.split(','):
165 assert_not_in(attr.strip(), res)
167 assert_in(attr.strip(),res)
169 @step(u'there is a json wrapper "([^"]*)"')
170 def api_result_check_json_wrapper(step, wrapper):
171 step.given('the result is valid json')
172 assert_equals(world.json_callback, wrapper)
174 @step(u'result header contains')
175 def api_result_header_contains(step):
176 step.given('the result is valid')
177 for line in step.hashes:
178 assert_in(line['attr'], world.result_header)
179 m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
181 @step(u'result header has no attribute (.*)')
182 def api_result_header_contains_not(step, attr):
183 step.given('the result is valid')
184 assert_not_in(attr, world.result_header)
186 @step(u'results contain$')
187 def api_result_contains(step):
188 step.given('at least 1 result is returned')
189 for line in step.hashes:
191 reslist = (world.results[int(line['ID'])],)
193 reslist = world.results
194 for k,v in line.iteritems():
196 for curres in reslist:
197 world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
199 for curres in reslist:
202 # mathematical operation
203 evalexp = '%s %s' % (curres[k], v)
205 logger.debug('Evaluating: %s = %s' % (res, evalexp))
206 assert_true(res, "Evaluation failed: %s" % (evalexp, ))
209 m = re.match("%s$" % (v,), curres[k])
210 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
213 @step(u'result addresses contain$')
214 def api_result_address_contains(step):
215 step.given('the result is valid')
216 for line in step.hashes:
218 reslist = (world.results[int(line['ID'])],)
220 reslist = world.results
221 for k,v in line.iteritems():
224 curres = res['address']
226 m = re.match("%s$" % (v,), curres[k])
227 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
230 @step(u'address of result (\d+) contains')
231 def api_result_address_exact(step, resid):
233 step.given('at least %d results are returned' % (resid + 1))
234 addr = world.results[resid]['address']
235 for line in step.hashes:
236 assert_in(line['type'], addr)
237 m = re.match("%s$" % line['value'], addr[line['type']])
238 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
239 line['type'], line['value'], addr[line['type']]))
240 #assert_equals(line['value'], addr[line['type']])
242 @step(u'address of result (\d+) does not contain (.*)')
243 def api_result_address_details_missing(step, resid, types):
245 step.given('at least %d results are returned' % (resid + 1))
246 addr = world.results[resid]['address']
247 for t in types.split(','):
248 assert_not_in(t.strip(), addr)
251 @step(u'address of result (\d+) is')
252 def api_result_address_exact(step, resid):
254 step.given('at least %d results are returned' % (resid + 1))
255 result = world.results[resid]
257 assert_equals(len(step.hashes), len(result['address']))
258 for k,v in result['address'].iteritems():
259 assert_equals(step.hashes[linenr]['type'], k)
260 assert_equals(step.hashes[linenr]['value'], v)
264 @step('there are( no)? duplicates')
265 def api_result_check_for_duplicates(step, nodups=None):
266 step.given('at least 1 result is returned')
268 for res in world.results:
269 resarr.append((res['osm_type'], res['class'],
270 res['type'], res['display_name']))
273 assert len(resarr) > len(set(resarr))
275 assert_equal(len(resarr), len(set(resarr)))