1 """ Steps for checking the DB after import and update tests.
3 There are two groups of test here. The first group tests
4 the contents of db tables directly, the second checks
5 query results by using the command line query tool.
8 from nose.tools import *
11 import psycopg2.extensions
12 import psycopg2.extras
18 from collections import OrderedDict
20 logger = logging.getLogger(__name__)
22 @step(u'table placex contains as names for (N|R|W)(\d+)')
23 def check_placex_names(step, osmtyp, osmid):
24 """ Check for the exact content of the name hstore in placex.
26 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
27 cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
29 names = dict(line['name'])
30 for name in step.hashes:
31 assert_in(name['k'], names)
32 assert_equals(names[name['k']], name['v'])
34 assert_equals(len(names), 0)
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41 """ check that the given lines are in the given table
42 Entries are searched by osm_type/osm_id and then all
43 given columns are tested. If there is more than one
44 line for an OSM object, they must match in these columns.
47 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
48 for line in step.hashes:
49 osmtype, osmid, cls = world.split_id(line['object'])
51 if tablename == 'placex':
52 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
53 if tablename == 'location_property_osmline':
54 q = q + ' FROM %s where osm_id = %%s' % (tablename,)
56 q = q + ", ST_GeometryType(geometry) as geometrytype"
57 q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
59 if tablename == 'location_property_osmline':
62 params = (osmtype, osmid)
64 q = q + ' and class = %s'
65 if tablename == 'location_property_osmline':
68 params = (osmtype, osmid, cls)
69 cur.execute(q, params)
70 assert(cur.rowcount > 0)
72 for k,v in line.iteritems():
75 if type(res[k]) is dict:
76 val = world.make_hash(v)
77 assert_equals(res[k], val)
78 elif k in ('parent_place_id', 'linked_place_id'):
79 pid = world.get_placeid(v)
80 assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
82 world.match_geometry((res['clat'], res['clon']), v)
84 assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
89 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
90 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
91 cur = world.conn.cursor()
93 q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
94 args = [osmtyp, int(osmid)]
95 if placeclass is not None:
96 q = q + ' and class = %s'
97 args.append(placeclass[1:])
99 numres = cur.fetchone()[0]
100 assert_equals (numres, 0)
105 @step(u'table location_property_osmline has no entry for W(\d+)?')
106 def check_osmline_missing(step, osmid):
107 cur = world.conn.cursor()
109 q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
111 numres = cur.fetchone()[0]
112 assert_equals (numres, 0)
117 @step(u'search_name table contains$')
118 def check_search_name_content(step):
119 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
120 for line in step.hashes:
121 placeid = world.get_placeid(line['place_id'])
122 cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
123 assert(cur.rowcount > 0)
125 for k,v in line.iteritems():
126 if k in ('search_rank', 'address_rank'):
127 assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
128 elif k in ('importance'):
129 assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
130 elif k in ('name_vector', 'nameaddress_vector'):
131 terms = [x.strip().replace('#', ' ') for x in v.split(',')]
132 cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
133 assert cur.rowcount >= len(terms)
135 assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
136 elif k in ('country_code'):
137 assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
138 elif k == 'place_id':
141 raise Exception("Cannot handle field %s in search_name table" % (k, ))
143 @step(u'way (\d+) expands to lines')
144 def check_interpolation_lines(step, wayid):
145 """Check that the correct interpolation line has been entered in
146 location_property_osmline for the given source line/nodes.
147 Expected are three columns:
148 startnumber, endnumber and linegeo
151 for line in step.hashes:
152 lines.append((line["startnumber"], line["endnumber"], line["geometry"]))
153 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
154 cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
155 FROM location_property_osmline WHERE osm_id = %s""",
157 assert_equals(len(lines), cur.rowcount)
159 linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
160 exp = (r["startnumber"], r["endnumber"], linegeo)
161 assert_in(exp, lines)
164 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
165 def check_interpolated_housenumber_list(step, nodeid, numberlist):
166 """ Checks that the interpolated house numbers corresponds
169 expected = numberlist.split(',');
170 cur = world.conn.cursor()
171 cur.execute("""SELECT housenumber FROM placex
172 WHERE osm_type = 'W' and osm_id = %s
173 and class = 'place' and type = 'address'""", (int(nodeid),))
175 assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
176 expected.remove(r[0])
177 assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
179 @step(u'way (\d+) expands to no housenumbers')
180 def check_no_interpolated_housenumber_list(step, nodeid):
181 """ Checks that the interpolated house numbers corresponds
184 cur = world.conn.cursor()
185 cur.execute("""SELECT housenumber FROM placex
186 WHERE osm_type = 'W' and osm_id = %s
187 and class = 'place' and type = 'address'""", (int(nodeid),))
188 res = [r[0] for r in cur]
189 assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
191 @step(u'table search_name has no entry for (.*)')
192 def check_placex_missing(step, osmid):
193 """ Checks if there is an entry in the search index for the
196 cur = world.conn.cursor()
197 placeid = world.get_placeid(osmid)
198 cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
199 numres = cur.fetchone()[0]
200 assert_equals (numres, 0)