]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/db_results.py
71a30927b3314076ff7427a59b7ed71ce3b7f5c8
[nominatim.git] / tests / steps / db_results.py
1 """ Steps for checking the DB after import and update tests.
2
3     There are two groups of test here. The first group tests
4     the contents of db tables directly, the second checks
5     query results by using the command line query tool.
6 """
7
8 from nose.tools import *
9 from lettuce import *
10 import psycopg2
11 import psycopg2.extensions
12 import psycopg2.extras
13 import os
14 import random
15 import json
16 import re
17 import logging
18 from collections import OrderedDict
19
20 logger = logging.getLogger(__name__)
21
22 @step(u'table placex contains as names for (N|R|W)(\d+)')
23 def check_placex_names(step, osmtyp, osmid):
24     """ Check for the exact content of the name hstore in placex.
25     """
26     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
27     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
28     for line in cur:
29         names = dict(line['name'])
30         for name in step.hashes:
31             assert_in(name['k'], names)
32             assert_equals(names[name['k']], name['v'])
33             del names[name['k']]
34         assert_equals(len(names), 0)
35
36
37
38 @step(u'table ([a-z_]+) contains$')
39 def check_placex_content(step, tablename):
40     """ check that the given lines are in the given table
41         Entries are searched by osm_type/osm_id and then all
42         given columns are tested. If there is more than one
43         line for an OSM object, they must match in these columns.
44     """
45     try:
46         cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
47         for line in step.hashes:
48             osmtype, osmid, cls = world.split_id(line['object'])
49             q = 'SELECT *'
50             if tablename == 'placex':
51                 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
52             q = q + ", ST_GeometryType(geometry) as geometrytype"
53             q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
54             if cls is None:
55                 params = (osmtype, osmid)
56             else:
57                 q = q + ' and class = %s'
58                 params = (osmtype, osmid, cls)
59             cur.execute(q, params)
60             assert(cur.rowcount > 0)
61             for res in cur:
62                 for k,v in line.iteritems():
63                     if not k == 'object':
64                         assert_in(k, res)
65                         if type(res[k]) is dict:
66                             val = world.make_hash(v)
67                             assert_equals(res[k], val)
68                         elif k in ('parent_place_id', 'linked_place_id'):
69                             pid = world.get_placeid(v)
70                             assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
71                         elif k == 'centroid':
72                             world.match_geometry((res['clat'], res['clon']), v)
73                         else:
74                             assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
75     finally:
76         cur.close()
77         world.conn.commit()
78
79 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
80 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
81     cur = world.conn.cursor()
82     try:
83         q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
84         args = [osmtyp, int(osmid)]
85         if placeclass is not None:
86             q = q + ' and class = %s'
87             args.append(placeclass[1:])
88         cur.execute(q, args)
89         numres = cur.fetchone()[0]
90         assert_equals (numres, 0)
91     finally:
92         cur.close()
93         world.conn.commit()
94
95 @step(u'search_name table contains$')
96 def check_search_name_content(step):
97     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
98     for line in step.hashes:
99         placeid = world.get_placeid(line['place_id'])
100         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
101         assert(cur.rowcount > 0)
102         for res in cur:
103             for k,v in line.iteritems():
104                 if k in ('search_rank', 'address_rank'):
105                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
106                 elif k in ('importance'):
107                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
108                 elif k in ('name_vector', 'nameaddress_vector'):
109                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
110                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
111                     assert cur.rowcount >= len(terms)
112                     for wid in cur:
113                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
114                 elif k in ('country_code'):
115                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
116                 elif k == 'place_id':
117                     pass
118                 else:
119                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
120
121 @step(u'way (\d+) expands to housenumbers')
122 def check_interpolated_housenumbers(step, nodeid):
123     """Check that the exact set of housenumbers has been entered in
124        placex for the given source node. Expected are two columns:
125        housenumber and centroid
126     """
127     numbers = {}
128     for line in step.hashes:
129         assert line["housenumber"] not in numbers
130         numbers[line["housenumber"]] = line["centroid"]
131     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
132     cur.execute("""SELECT DISTINCT housenumber,
133                           ST_X(centroid) as clat, ST_Y(centroid) as clon
134                    FROM placex WHERE osm_type = 'W' and osm_id = %s
135                                  and class = 'place' and type = 'address'""",
136                    (int(nodeid),))
137     assert_equals(len(numbers), cur.rowcount)
138     for r in cur:
139         assert_in(r["housenumber"], numbers)
140         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
141         del numbers[r["housenumber"]]
142
143 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
144 def check_interpolated_housenumber_list(step, nodeid, numberlist):
145     """ Checks that the interpolated house numbers corresponds
146         to the given list.
147     """
148     expected = numberlist.split(',');
149     cur = world.conn.cursor()
150     cur.execute("""SELECT housenumber FROM placex
151                    WHERE osm_type = 'W' and osm_id = %s
152                      and class = 'place' and type = 'address'""", (int(nodeid),))
153     for r in cur:
154         assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
155         expected.remove(r[0])
156     assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
157
158 @step(u'way (\d+) expands to no housenumbers')
159 def check_no_interpolated_housenumber_list(step, nodeid):
160     """ Checks that the interpolated house numbers corresponds
161         to the given list.
162     """
163     cur = world.conn.cursor()
164     cur.execute("""SELECT housenumber FROM placex
165                    WHERE osm_type = 'W' and osm_id = %s
166                      and class = 'place' and type = 'address'""", (int(nodeid),))
167     res = [r[0] for r in cur]
168     assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
169
170 @step(u'table search_name has no entry for (.*)')
171 def check_placex_missing(step, osmid):
172     """ Checks if there is an entry in the search index for the
173         given place object.
174     """
175     cur = world.conn.cursor()
176     placeid = world.get_placeid(osmid)
177     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
178     numres = cur.fetchone()[0]
179     assert_equals (numres, 0)
180