]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/db_results.py
Merge remote-tracking branch 'markus/master' into master
[nominatim.git] / tests / steps / db_results.py
1 """ Steps for checking the DB after import and update tests.
2
3     There are two groups of test here. The first group tests
4     the contents of db tables directly, the second checks
5     query results by using the command line query tool.
6 """
7
8 from nose.tools import *
9 from lettuce import *
10 import psycopg2
11 import psycopg2.extensions
12 import psycopg2.extras
13 import os
14 import random
15 import json
16 import re
17 import logging
18 from collections import OrderedDict
19
20 logger = logging.getLogger(__name__)
21
22 @step(u'table placex contains as names for (N|R|W)(\d+)')
23 def check_placex_names(step, osmtyp, osmid):
24     """ Check for the exact content of the name hstore in placex.
25     """
26     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
27     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
28     for line in cur:
29         names = dict(line['name'])
30         for name in step.hashes:
31             assert_in(name['k'], names)
32             assert_equals(names[name['k']], name['v'])
33             del names[name['k']]
34         assert_equals(len(names), 0)
35
36
37
38
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41     """ check that the given lines are in the given table
42         Entries are searched by osm_type/osm_id and then all
43         given columns are tested. If there is more than one
44         line for an OSM object, they must match in these columns.
45     """
46     try:
47         cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
48         for line in step.hashes:
49             osmtype, osmid, cls = world.split_id(line['object'])
50             q = 'SELECT *'
51             if tablename == 'placex':
52                 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
53             if tablename == 'location_property_osmline':
54                 q = q + ' FROM %s where osm_id = %%s' % (tablename,)
55             else:
56                 q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
57             if cls is None:
58                 if tablename == 'location_property_osmline':
59                     params = (osmid,)
60                 else:
61                     params = (osmtype, osmid)
62             else:
63                 q = q + ' and class = %s'
64                 if tablename == 'location_property_osmline':
65                     params = (osmid, cls)
66                 else:
67                     params = (osmtype, osmid, cls)
68             cur.execute(q, params)
69             assert(cur.rowcount > 0)
70             for res in cur:
71                 for k,v in line.iteritems():
72                     if not k == 'object':
73                         assert_in(k, res)
74                         if type(res[k]) is dict:
75                             val = world.make_hash(v)
76                             assert_equals(res[k], val)
77                         elif k in ('parent_place_id', 'linked_place_id'):
78                             pid = world.get_placeid(v)
79                             assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
80                         elif k == 'centroid':
81                             world.match_geometry((res['clat'], res['clon']), v)
82                         else:
83                             assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
84     finally:
85         cur.close()
86         world.conn.commit()
87
88 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
89 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
90     cur = world.conn.cursor()
91     try:
92         q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
93         args = [osmtyp, int(osmid)]
94         if placeclass is not None:
95             q = q + ' and class = %s'
96             args.append(placeclass[1:])
97         cur.execute(q, args)
98         numres = cur.fetchone()[0]
99         assert_equals (numres, 0)
100     finally:
101         cur.close()
102         world.conn.commit()
103
104 @step(u'table location_property_osmline has no entry for W(\d+)?')
105 def check_osmline_missing(step, osmid):
106     cur = world.conn.cursor()
107     try:
108         q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
109         cur.execute(q)
110         numres = cur.fetchone()[0]
111         assert_equals (numres, 0)
112     finally:
113         cur.close()
114         world.conn.commit()
115
116 @step(u'search_name table contains$')
117 def check_search_name_content(step):
118     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
119     for line in step.hashes:
120         placeid = world.get_placeid(line['place_id'])
121         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
122         assert(cur.rowcount > 0)
123         for res in cur:
124             for k,v in line.iteritems():
125                 if k in ('search_rank', 'address_rank'):
126                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
127                 elif k in ('importance'):
128                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
129                 elif k in ('name_vector', 'nameaddress_vector'):
130                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
131                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
132                     assert cur.rowcount >= len(terms)
133                     for wid in cur:
134                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
135                 elif k in ('country_code'):
136                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
137                 elif k == 'place_id':
138                     pass
139                 else:
140                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
141
142 @step(u'way (\d+) expands to housenumbers')
143 def check_interpolated_housenumbers(step, nodeid):
144     """Check that the exact set of housenumbers has been entered in
145        placex for the given source node. Expected are two columns:
146        housenumber and centroid
147     """
148     numbers = {}
149     for line in step.hashes:
150         assert line["housenumber"] not in numbers
151         numbers[line["housenumber"]] = line["centroid"]
152     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
153     cur.execute("""SELECT DISTINCT housenumber,
154                           ST_X(centroid) as clat, ST_Y(centroid) as clon
155                    FROM placex WHERE osm_type = 'W' and osm_id = %s
156                                  and class = 'place' and type = 'address'""",
157                    (int(nodeid),))
158                    
159     assert_equals(len(numbers), cur.rowcount)
160     for r in cur:
161         assert_in(r["housenumber"], numbers)
162         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
163         del numbers[r["housenumber"]]
164
165 @step(u'way (\d+) expands to lines')
166 def check_interpolation_lines(step, wayid):
167     """Check that the correct interpolation line has been entered in
168        location_property_osmline for the given source line/nodes. Expected are three columns:
169        startnumber, endnumber and linegeo
170     """
171     lines = {}
172     for line in step.hashes:
173         assert line["startnumber"] not in lines
174         lines[line["startnumber"]] = {'endnumber': line["endnumber"], 'geometry': line["geometry"]}
175     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
176     cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
177                    FROM location_property_osmline WHERE osm_id = %s""",
178                    (int(wayid),))
179     assert_equals(len(lines), cur.rowcount)
180     for r in cur:
181         assert_in(r["startnumber"], lines)
182         assert_equals(r["endnumber"], lines[r["startnumber"]]["endnumber"])
183         linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
184         assert_equals(linegeo, lines[r["startnumber"]]["geometry"])
185         del lines[r["startnumber"]]
186
187 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
188 def check_interpolated_housenumber_list(step, nodeid, numberlist):
189     """ Checks that the interpolated house numbers corresponds
190         to the given list.
191     """
192     expected = numberlist.split(',');
193     cur = world.conn.cursor()
194     cur.execute("""SELECT housenumber FROM placex
195                    WHERE osm_type = 'W' and osm_id = %s
196                    and class = 'place' and type = 'address'""", (int(nodeid),))
197     for r in cur:
198         assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
199         expected.remove(r[0])
200     assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
201
202 @step(u'way (\d+) expands to no housenumbers')
203 def check_no_interpolated_housenumber_list(step, nodeid):
204     """ Checks that the interpolated house numbers corresponds
205         to the given list.
206     """
207     cur = world.conn.cursor()
208     cur.execute("""SELECT housenumber FROM placex
209                    WHERE osm_type = 'W' and osm_id = %s
210                      and class = 'place' and type = 'address'""", (int(nodeid),))
211     res = [r[0] for r in cur]
212     assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
213
214 @step(u'table search_name has no entry for (.*)')
215 def check_placex_missing(step, osmid):
216     """ Checks if there is an entry in the search index for the
217         given place object.
218     """
219     cur = world.conn.cursor()
220     placeid = world.get_placeid(osmid)
221     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
222     numres = cur.fetchone()[0]
223     assert_equals (numres, 0)
224