1 """ Steps for checking the DB after import and update tests.
3 There are two groups of test here. The first group tests
4 the contents of db tables directly, the second checks
5 query results by using the command line query tool.
8 from nose.tools import *
11 import psycopg2.extensions
12 import psycopg2.extras
19 from collections import OrderedDict
21 logger = logging.getLogger(__name__)
23 @step(u'table placex contains as names for (N|R|W)(\d+)')
24 def check_placex_names(step, osmtyp, osmid):
25 """ Check for the exact content of the name hstaore in placex.
27 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
28 cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
30 names = dict(line['name'])
31 for name in step.hashes:
32 assert_in(name['k'], names)
33 assert_equals(names[name['k']], name['v'])
35 assert_equals(len(names), 0)
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41 """ check that the given lines are in the given table
42 Entries are searched by osm_type/osm_id and then all
43 given columns are tested. If there is more than one
44 line for an OSM object, they must match in these columns.
46 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
47 for line in step.hashes:
48 osmtype, osmid, cls = world.split_id(line['object'])
50 if tablename == 'placex':
51 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
52 q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
54 params = (osmtype, osmid)
56 q = q + ' and class = %s'
57 params = (osmtype, osmid, cls)
58 cur.execute(q, params)
59 assert(cur.rowcount > 0)
61 for k,v in line.iteritems():
64 if type(res[k]) is dict:
65 val = world.make_hash(v)
66 assert_equals(res[k], val)
67 elif k in ('parent_place_id', 'linked_place_id'):
68 pid = world.get_placeid(v)
69 assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
71 world.match_geometry((res['clat'], res['clon']), v)
73 assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
75 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
76 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
77 cur = world.conn.cursor()
78 q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
79 args = [osmtyp, int(osmid)]
80 if placeclass is not None:
81 q = q + ' and class = %s'
82 args.append(placeclass[1:])
84 numres = cur.fetchone()[0]
85 assert_equals (numres, 0)
87 @step(u'search_name table contains$')
88 def check_search_name_content(step):
89 cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
90 for line in step.hashes:
91 placeid = world.get_placeid(line['place_id'])
92 cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
93 assert(cur.rowcount > 0)
95 for k,v in line.iteritems():
96 if k in ('search_rank', 'address_rank'):
97 assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
98 elif k in ('importance'):
99 assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
100 elif k in ('name_vector', 'nameaddress_vector'):
101 terms = [x.strip().replace('#', ' ') for x in v.split(',')]
102 cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
103 assert cur.rowcount >= len(terms)
105 assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
106 elif k in ('country_code'):
107 assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
108 elif k == 'place_id':
111 raise Exception("Cannot handle field %s in search_name table" % (k, ))
114 @step(u'table search_name has no entry for (.*)')
115 def check_placex_missing(step, osmid):
116 """ Checks if there is an entry in the search index for the
119 cur = world.conn.cursor()
120 placeid = world.get_placeid(osmid)
121 cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
122 numres = cur.fetchone()[0]
123 assert_equals (numres, 0)