]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/api_result.py
switch query log from CSV to simple space-separated format
[nominatim.git] / tests / steps / api_result.py
1 """ Steps for checking the results of queries.
2 """
3
4 from nose.tools import *
5 from lettuce import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
8 import json
9 import logging
10 import re
11 from xml.dom.minidom import parseString
12
13 logger = logging.getLogger(__name__)
14
15 def _parse_xml():
16     """ Puts the DOM structure into more convenient python
17         with a similar structure as the json document, so
18         that the same the semantics can be used. It does not
19         check if the content is valid (or at least not more than
20         necessary to transform it into a dict structure).
21     """
22     page = parseString(world.page).documentElement
23
24     # header info
25     world.result_header = OrderedDict(page.attributes.items())
26     logger.debug('Result header: %r' % (world.result_header))
27     world.results = []
28
29     # results
30     if page.nodeName == 'searchresults':
31         for node in page.childNodes:
32             if node.nodeName != "#text":
33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34                 newresult = OrderedDict(node.attributes.items())
35                 assert_not_in('address', newresult)
36                 assert_not_in('geokml', newresult)
37                 address = OrderedDict()
38                 for sub in node.childNodes:
39                     if sub.nodeName == 'geokml':
40                         newresult['geokml'] = sub.childNodes[0].toxml()
41                     elif sub.nodeName == '#text':
42                         pass
43                     else:
44                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
45                 if address:
46                     newresult['address'] = address
47                 world.results.append(newresult)
48     elif page.nodeName == 'reversegeocode':
49         haserror = False
50         address = {}
51         for node in page.childNodes:
52             if node.nodeName == 'result':
53                 assert_equals(len(world.results), 0)
54                 assert (not haserror)
55                 world.results.append(OrderedDict(node.attributes.items()))
56                 assert_not_in('display_name', world.results[0])
57                 assert_not_in('address', world.results[0])
58                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
59             elif node.nodeName == 'error':
60                 assert_equals(len(world.results), 0)
61                 haserror = True
62             elif node.nodeName == 'addressparts':
63                 assert (not haserror)
64                 address = OrderedDict()
65                 for sub in node.childNodes:
66                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
67                 world.results[0]['address'] = address
68             elif node.nodeName == "#text":
69                 pass
70             else:
71                 assert False, "Unknown content '%s' in XML" % node.nodeName
72     else:
73         assert False, "Unknown document node name %s in XML" % page.nodeName
74
75     logger.debug("The following was parsed out of XML:")
76     logger.debug(world.results)
77
78 @step(u'a HTTP (\d+) is returned')
79 def api_result_http_error(step, error):
80     assert_equals(world.returncode, int(error))
81
82 @step(u'the result is valid( \w+)?')
83 def api_result_is_valid(step, fmt):
84     assert_equals(world.returncode, 200)
85
86     if world.response_format == 'html':
87         document, errors = tidy_document(world.page, 
88                              options={'char-encoding' : 'utf8'})
89         assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
90         world.results = document
91     elif world.response_format == 'xml':
92         _parse_xml()
93     elif world.response_format == 'json':
94         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
95     else:
96         assert False, "Unknown page format: %s" % (world.response_format)
97
98     if fmt:
99         assert_equals (fmt.strip(), world.response_format)
100
101
102 def compare(operator, op1, op2):
103     if operator == 'less than':
104         return op1 < op2
105     elif operator == 'more than':
106         return op1 > op2
107     elif operator == 'exactly':
108         return op1 == op2
109     elif operator == 'at least':
110         return op1 >= op2
111     elif operator == 'at most':
112         return op1 <= op2
113     else:
114         raise Exception("unknown operator '%s'" % operator)
115
116 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
117 def validate_result_number(step, operator, number):
118     step.given('the result is valid')
119     numres = len(world.results)
120     assert compare(operator, numres, int(number)), \
121         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
122
123 @step(u'result (\d+) has( not)? attributes (\S+)')
124 def search_check_for_result_attribute(step, num, invalid, attrs):
125     num = int(num)
126     step.given('at least %d results are returned' % (num + 1))
127     res = world.results[num]
128     for attr in attrs.split(','):
129         if invalid:
130             assert_not_in(attr.strip(), res)
131         else:
132             assert_in(attr.strip(),res)
133         
134 @step(u'there is a json wrapper "([^"]*)"')
135 def api_result_check_json_wrapper(step, wrapper):
136     step.given('the result is valid json')
137     assert_equals(world.json_callback, wrapper)
138
139 @step(u'result header contains')
140 def api_result_header_contains(step):
141     step.given('the result is valid')
142     for line in step.hashes:
143         assert_in(line['attr'], world.result_header)
144         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
145
146 @step(u'results contain$')
147 def api_result_contains(step):
148     step.given('at least 1 result is returned')
149     for line in step.hashes:
150         if 'ID' in line:
151             reslist = (world.results[int(line['ID'])],)
152         else:
153             reslist = world.results
154         for k,v in line.iteritems():
155             if k == 'latlon':
156                 for curres in reslist:
157                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
158             elif k != 'ID':
159                 for curres in reslist:
160                     assert_in(k, curres)
161                     if v[0] in '<>=':
162                         # mathematical operation
163                         evalexp = '%s %s' % (curres[k], v)
164                         res = eval(evalexp)
165                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
166                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
167                     else:
168                         # regex match
169                         m = re.match("%s$" % (v,), curres[k])
170                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
171
172
173 @step(u'result addresses contain$')
174 def api_result_address_contains(step):
175     step.given('the result is valid')
176     for line in step.hashes:
177         if 'ID' in line:
178             reslist = (world.results[int(line['ID'])],)
179         else:
180             reslist = world.results
181         for k,v in line.iteritems():
182             if k != 'ID':
183                 for res in reslist:
184                     curres = res['address']
185                     assert_in(k, curres)
186                     m = re.match("%s$" % (v,), curres[k])
187                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
188
189
190 @step(u'address of result (\d+) contains')
191 def api_result_address_exact(step, resid):
192     resid = int(resid)
193     step.given('at least %d results are returned' % (resid + 1))
194     addr = world.results[resid]['address']
195     for line in step.hashes:
196         assert_in(line['type'], addr)
197         assert_equals(line['value'], addr[line['type']])
198
199 @step(u'address of result (\d+) does not contain (.*)')
200 def api_result_address_details_missing(step, resid, types):
201     resid = int(resid)
202     step.given('at least %d results are returned' % (resid + 1))
203     addr = world.results[resid]['address']
204     for t in types.split(','):
205         assert_not_in(t.strip(), addr)
206
207
208 @step(u'address of result (\d+) is')
209 def api_result_address_exact(step, resid):
210     resid = int(resid)
211     step.given('at least %d results are returned' % (resid + 1))
212     result = world.results[resid]
213     linenr = 0
214     assert_equals(len(step.hashes), len(result['address']))
215     for k,v in result['address'].iteritems():
216         assert_equals(step.hashes[linenr]['type'], k)
217         assert_equals(step.hashes[linenr]['value'], v)
218         linenr += 1
219
220
221 @step('there are( no)? duplicates')
222 def api_result_check_for_duplicates(step, nodups=None):
223     step.given('at least 1 result is returned')
224     resarr = []
225     for res in world.results:
226         resarr.append((res['osm_type'], res['class'],
227                         res['type'], res['display_name']))
228
229     if nodups is None:
230         assert len(resarr) > len(set(resarr))
231     else:
232         assert_equal(len(resarr), len(set(resarr)))