1 """ Steps for checking the results of queries.
4 from nose.tools import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
11 from xml.dom.minidom import parseString
13 logger = logging.getLogger(__name__)
16 """ Puts the DOM structure into more convenient python
17 with a similar structure as the json document, so
18 that the same the semantics can be used. It does not
19 check if the content is valid (or at least not more than
20 necessary to transform it into a dict structure).
22 page = parseString(world.page).documentElement
25 world.result_header = OrderedDict(page.attributes.items())
26 logger.debug('Result header: %r' % (world.result_header))
30 if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31 for node in page.childNodes:
32 if node.nodeName != "#text":
33 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34 newresult = OrderedDict(node.attributes.items())
35 assert_not_in('address', newresult)
36 assert_not_in('geokml', newresult)
37 assert_not_in('extratags', newresult)
38 assert_not_in('namedetails', newresult)
39 address = OrderedDict()
40 for sub in node.childNodes:
41 if sub.nodeName == 'geokml':
42 newresult['geokml'] = sub.childNodes[0].toxml()
43 elif sub.nodeName == 'extratags':
44 newresult['extratags'] = {}
45 for tag in sub.childNodes:
46 assert_equals(tag.nodeName, 'tag')
47 attrs = dict(tag.attributes.items())
48 assert_in('key', attrs)
49 assert_in('value', attrs)
50 newresult['extratags'][attrs['key']] = attrs['value']
51 elif sub.nodeName == 'namedetails':
52 newresult['namedetails'] = {}
53 for tag in sub.childNodes:
54 assert_equals(tag.nodeName, 'name')
55 attrs = dict(tag.attributes.items())
56 assert_in('desc', attrs)
57 newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
59 elif sub.nodeName == '#text':
62 address[sub.nodeName] = sub.firstChild.nodeValue.strip()
64 newresult['address'] = address
65 world.results.append(newresult)
66 elif page.nodeName == 'reversegeocode':
69 for node in page.childNodes:
70 if node.nodeName == 'result':
71 assert_equals(len(world.results), 0)
73 world.results.append(OrderedDict(node.attributes.items()))
74 assert_not_in('display_name', world.results[0])
75 assert_not_in('address', world.results[0])
76 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
77 elif node.nodeName == 'error':
78 assert_equals(len(world.results), 0)
80 elif node.nodeName == 'addressparts':
82 address = OrderedDict()
83 for sub in node.childNodes:
84 address[sub.nodeName] = sub.firstChild.nodeValue.strip()
85 world.results[0]['address'] = address
86 elif node.nodeName == 'extratags':
87 world.results[0]['extratags'] = {}
88 for tag in node.childNodes:
89 assert_equals(tag.nodeName, 'tag')
90 attrs = dict(tag.attributes.items())
91 assert_in('key', attrs)
92 assert_in('value', attrs)
93 world.results[0]['extratags'][attrs['key']] = attrs['value']
94 elif node.nodeName == 'namedetails':
95 world.results[0]['namedetails'] = {}
96 for tag in node.childNodes:
97 assert_equals(tag.nodeName, 'name')
98 attrs = dict(tag.attributes.items())
99 assert_in('desc', attrs)
100 world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
101 elif node.nodeName == "geokml":
102 world.results[0]['geokml'] = node
103 elif node.nodeName == "#text":
106 assert False, "Unknown content '%s' in XML" % node.nodeName
108 assert False, "Unknown document node name %s in XML" % page.nodeName
110 logger.debug("The following was parsed out of XML:")
111 logger.debug(world.results)
113 @step(u'a HTTP (\d+) is returned')
114 def api_result_http_error(step, error):
115 assert_equals(world.returncode, int(error))
117 @step(u'the result is valid( \w+)?')
118 def api_result_is_valid(step, fmt):
119 assert_equals(world.returncode, 200)
121 if world.response_format == 'html':
122 document, errors = tidy_document(world.page,
123 options={'char-encoding' : 'utf8'})
124 # assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
125 world.results = document
126 elif world.response_format == 'xml':
128 elif world.response_format == 'json':
129 world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
130 if world.request_type == 'reverse':
131 world.results = (world.results,)
133 assert False, "Unknown page format: %s" % (world.response_format)
136 assert_equals (fmt.strip(), world.response_format)
139 def compare(operator, op1, op2):
140 if operator == 'less than':
142 elif operator == 'more than':
144 elif operator == 'exactly':
146 elif operator == 'at least':
148 elif operator == 'at most':
151 raise Exception("unknown operator '%s'" % operator)
153 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
154 def validate_result_number(step, operator, number):
155 step.given('the result is valid')
156 numres = len(world.results)
157 assert compare(operator, numres, int(number)), \
158 "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
160 @step(u'result (\d+) has( not)? attributes (\S+)')
161 def search_check_for_result_attribute(step, num, invalid, attrs):
163 step.given('at least %d results are returned' % (num + 1))
164 res = world.results[num]
165 for attr in attrs.split(','):
167 assert_not_in(attr.strip(), res)
169 assert_in(attr.strip(),res)
171 @step(u'there is a json wrapper "([^"]*)"')
172 def api_result_check_json_wrapper(step, wrapper):
173 step.given('the result is valid json')
174 assert_equals(world.json_callback, wrapper)
176 @step(u'result header contains')
177 def api_result_header_contains(step):
178 step.given('the result is valid')
179 for line in step.hashes:
180 assert_in(line['attr'], world.result_header)
181 m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
183 @step(u'result header has no attribute (.*)')
184 def api_result_header_contains_not(step, attr):
185 step.given('the result is valid')
186 assert_not_in(attr, world.result_header)
188 @step(u'results contain$')
189 def api_result_contains(step):
190 step.given('at least 1 result is returned')
191 for line in step.hashes:
193 reslist = (world.results[int(line['ID'])],)
195 reslist = world.results
196 for k,v in line.iteritems():
198 for curres in reslist:
199 world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
201 for curres in reslist:
204 # mathematical operation
205 evalexp = '%s %s' % (curres[k], v)
207 logger.debug('Evaluating: %s = %s' % (res, evalexp))
208 assert_true(res, "Evaluation failed: %s" % (evalexp, ))
211 m = re.match("%s$" % (v,), curres[k])
212 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
215 @step(u'result addresses contain$')
216 def api_result_address_contains(step):
217 step.given('the result is valid')
218 for line in step.hashes:
220 reslist = (world.results[int(line['ID'])],)
222 reslist = world.results
223 for k,v in line.iteritems():
226 curres = res['address']
228 m = re.match("%s$" % (v,), curres[k])
229 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
232 @step(u'address of result (\d+) contains')
233 def api_result_address_exact(step, resid):
235 step.given('at least %d results are returned' % (resid + 1))
236 addr = world.results[resid]['address']
237 for line in step.hashes:
238 assert_in(line['type'], addr)
239 m = re.match("%s$" % line['value'], addr[line['type']])
240 assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
241 line['type'], line['value'], addr[line['type']]))
242 #assert_equals(line['value'], addr[line['type']])
244 @step(u'address of result (\d+) does not contain (.*)')
245 def api_result_address_details_missing(step, resid, types):
247 step.given('at least %d results are returned' % (resid + 1))
248 addr = world.results[resid]['address']
249 for t in types.split(','):
250 assert_not_in(t.strip(), addr)
253 @step(u'address of result (\d+) is')
254 def api_result_address_exact(step, resid):
256 step.given('at least %d results are returned' % (resid + 1))
257 result = world.results[resid]
259 assert_equals(len(step.hashes), len(result['address']))
260 for k,v in result['address'].iteritems():
261 assert_equals(step.hashes[linenr]['type'], k)
262 assert_equals(step.hashes[linenr]['value'], v)
266 @step('there are( no)? duplicates')
267 def api_result_check_for_duplicates(step, nodups=None):
268 step.given('at least 1 result is returned')
270 for res in world.results:
271 resarr.append((res['osm_type'], res['class'],
272 res['type'], res['display_name']))
275 assert len(resarr) > len(set(resarr))
277 assert_equal(len(resarr), len(set(resarr)))