]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/api_result.py
tests for update of search_name table
[nominatim.git] / tests / steps / api_result.py
1 """ Steps for checking the results of queries.
2 """
3
4 from nose.tools import *
5 from lettuce import *
6 from tidylib import tidy_document
7 from collections import OrderedDict
8 import json
9 import logging
10 import re
11 from xml.dom.minidom import parseString
12
13 logger = logging.getLogger(__name__)
14
15 def _parse_xml():
16     """ Puts the DOM structure into more convenient python
17         with a similar structure as the json document, so
18         that the same the semantics can be used. It does not
19         check if the content is valid (or at least not more than
20         necessary to transform it into a dict structure).
21     """
22     page = parseString(world.page).documentElement
23
24     # header info
25     world.result_header = OrderedDict(page.attributes.items())
26     logger.debug('Result header: %r' % (world.result_header))
27     world.results = []
28
29     # results
30     if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
31         for node in page.childNodes:
32             if node.nodeName != "#text":
33                 assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
34                 newresult = OrderedDict(node.attributes.items())
35                 assert_not_in('address', newresult)
36                 assert_not_in('geokml', newresult)
37                 address = OrderedDict()
38                 for sub in node.childNodes:
39                     if sub.nodeName == 'geokml':
40                         newresult['geokml'] = sub.childNodes[0].toxml()
41                     elif sub.nodeName == '#text':
42                         pass
43                     else:
44                         address[sub.nodeName] = sub.firstChild.nodeValue.strip()
45                 if address:
46                     newresult['address'] = address
47                 world.results.append(newresult)
48     elif page.nodeName == 'reversegeocode':
49         haserror = False
50         address = {}
51         for node in page.childNodes:
52             if node.nodeName == 'result':
53                 assert_equals(len(world.results), 0)
54                 assert (not haserror)
55                 world.results.append(OrderedDict(node.attributes.items()))
56                 assert_not_in('display_name', world.results[0])
57                 assert_not_in('address', world.results[0])
58                 world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
59             elif node.nodeName == 'error':
60                 assert_equals(len(world.results), 0)
61                 haserror = True
62             elif node.nodeName == 'addressparts':
63                 assert (not haserror)
64                 address = OrderedDict()
65                 for sub in node.childNodes:
66                     address[sub.nodeName] = sub.firstChild.nodeValue.strip()
67                 world.results[0]['address'] = address
68             elif node.nodeName == "#text":
69                 pass
70             else:
71                 assert False, "Unknown content '%s' in XML" % node.nodeName
72     else:
73         assert False, "Unknown document node name %s in XML" % page.nodeName
74
75     logger.debug("The following was parsed out of XML:")
76     logger.debug(world.results)
77
78 @step(u'a HTTP (\d+) is returned')
79 def api_result_http_error(step, error):
80     assert_equals(world.returncode, int(error))
81
82 @step(u'the result is valid( \w+)?')
83 def api_result_is_valid(step, fmt):
84     assert_equals(world.returncode, 200)
85
86     if world.response_format == 'html':
87         document, errors = tidy_document(world.page, 
88                              options={'char-encoding' : 'utf8'})
89         assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
90         world.results = document
91     elif world.response_format == 'xml':
92         _parse_xml()
93     elif world.response_format == 'json':
94         world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
95     else:
96         assert False, "Unknown page format: %s" % (world.response_format)
97
98     if fmt:
99         assert_equals (fmt.strip(), world.response_format)
100
101
102 def compare(operator, op1, op2):
103     if operator == 'less than':
104         return op1 < op2
105     elif operator == 'more than':
106         return op1 > op2
107     elif operator == 'exactly':
108         return op1 == op2
109     elif operator == 'at least':
110         return op1 >= op2
111     elif operator == 'at most':
112         return op1 <= op2
113     else:
114         raise Exception("unknown operator '%s'" % operator)
115
116 @step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
117 def validate_result_number(step, operator, number):
118     step.given('the result is valid')
119     numres = len(world.results)
120     assert compare(operator, numres, int(number)), \
121         "Bad number of results: expected %s %s, got %d." % (operator, number, numres)
122
123 @step(u'result (\d+) has( not)? attributes (\S+)')
124 def search_check_for_result_attribute(step, num, invalid, attrs):
125     num = int(num)
126     step.given('at least %d results are returned' % (num + 1))
127     res = world.results[num]
128     for attr in attrs.split(','):
129         if invalid:
130             assert_not_in(attr.strip(), res)
131         else:
132             assert_in(attr.strip(),res)
133         
134 @step(u'there is a json wrapper "([^"]*)"')
135 def api_result_check_json_wrapper(step, wrapper):
136     step.given('the result is valid json')
137     assert_equals(world.json_callback, wrapper)
138
139 @step(u'result header contains')
140 def api_result_header_contains(step):
141     step.given('the result is valid')
142     for line in step.hashes:
143         assert_in(line['attr'], world.result_header)
144         m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
145
146 @step(u'result header has no attribute (.*)')
147 def api_result_header_contains_not(step, attr):
148     step.given('the result is valid')
149     assert_not_in(attr, world.result_header)
150
151 @step(u'results contain$')
152 def api_result_contains(step):
153     step.given('at least 1 result is returned')
154     for line in step.hashes:
155         if 'ID' in line:
156             reslist = (world.results[int(line['ID'])],)
157         else:
158             reslist = world.results
159         for k,v in line.iteritems():
160             if k == 'latlon':
161                 for curres in reslist:
162                     world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
163             elif k != 'ID':
164                 for curres in reslist:
165                     assert_in(k, curres)
166                     if v[0] in '<>=':
167                         # mathematical operation
168                         evalexp = '%s %s' % (curres[k], v)
169                         res = eval(evalexp)
170                         logger.debug('Evaluating: %s = %s' % (res, evalexp))
171                         assert_true(res, "Evaluation failed: %s" % (evalexp, ))
172                     else:
173                         # regex match
174                         m = re.match("%s$" % (v,), curres[k])
175                         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
176
177
178 @step(u'result addresses contain$')
179 def api_result_address_contains(step):
180     step.given('the result is valid')
181     for line in step.hashes:
182         if 'ID' in line:
183             reslist = (world.results[int(line['ID'])],)
184         else:
185             reslist = world.results
186         for k,v in line.iteritems():
187             if k != 'ID':
188                 for res in reslist:
189                     curres = res['address']
190                     assert_in(k, curres)
191                     m = re.match("%s$" % (v,), curres[k])
192                     assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
193
194
195 @step(u'address of result (\d+) contains')
196 def api_result_address_exact(step, resid):
197     resid = int(resid)
198     step.given('at least %d results are returned' % (resid + 1))
199     addr = world.results[resid]['address']
200     for line in step.hashes:
201         assert_in(line['type'], addr)
202         m = re.match("%s$" % line['value'], addr[line['type']])
203         assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
204                                   line['type'], line['value'], addr[line['type']]))
205         #assert_equals(line['value'], addr[line['type']])
206
207 @step(u'address of result (\d+) does not contain (.*)')
208 def api_result_address_details_missing(step, resid, types):
209     resid = int(resid)
210     step.given('at least %d results are returned' % (resid + 1))
211     addr = world.results[resid]['address']
212     for t in types.split(','):
213         assert_not_in(t.strip(), addr)
214
215
216 @step(u'address of result (\d+) is')
217 def api_result_address_exact(step, resid):
218     resid = int(resid)
219     step.given('at least %d results are returned' % (resid + 1))
220     result = world.results[resid]
221     linenr = 0
222     assert_equals(len(step.hashes), len(result['address']))
223     for k,v in result['address'].iteritems():
224         assert_equals(step.hashes[linenr]['type'], k)
225         assert_equals(step.hashes[linenr]['value'], v)
226         linenr += 1
227
228
229 @step('there are( no)? duplicates')
230 def api_result_check_for_duplicates(step, nodups=None):
231     step.given('at least 1 result is returned')
232     resarr = []
233     for res in world.results:
234         resarr.append((res['osm_type'], res['class'],
235                         res['type'], res['display_name']))
236
237     if nodups is None:
238         assert len(resarr) > len(set(resarr))
239     else:
240         assert_equal(len(resarr), len(set(resarr)))