]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/db_results.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / tests / steps / db_results.py
1 """ Steps for checking the DB after import and update tests.
2
3     There are two groups of test here. The first group tests
4     the contents of db tables directly, the second checks
5     query results by using the command line query tool.
6 """
7
8 from nose.tools import *
9 from lettuce import *
10 import psycopg2
11 import psycopg2.extensions
12 import psycopg2.extras
13 import os
14 import random
15 import json
16 import re
17 import logging
18 from collections import OrderedDict
19
20 logger = logging.getLogger(__name__)
21
22 @step(u'table placex contains as names for (N|R|W)(\d+)')
23 def check_placex_names(step, osmtyp, osmid):
24     """ Check for the exact content of the name hstore in placex.
25     """
26     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
27     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
28     for line in cur:
29         names = dict(line['name'])
30         for name in step.hashes:
31             assert_in(name['k'], names)
32             assert_equals(names[name['k']], name['v'])
33             del names[name['k']]
34         assert_equals(len(names), 0)
35
36
37
38
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41     """ check that the given lines are in the given table
42         Entries are searched by osm_type/osm_id and then all
43         given columns are tested. If there is more than one
44         line for an OSM object, they must match in these columns.
45     """
46     try:
47         cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
48         for line in step.hashes:
49             osmtype, osmid, cls = world.split_id(line['object'])
50             q = 'SELECT *'
51             if tablename == 'placex':
52                 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
53             if tablename == 'location_property_osmline':
54                 q = q + ' FROM %s where osm_id = %%s' % (tablename,)
55             else:
56                 q = q + ", ST_GeometryType(geometry) as geometrytype"
57                 q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
58             if cls is None:
59                 if tablename == 'location_property_osmline':
60                     params = (osmid,)
61                 else:
62                     params = (osmtype, osmid)
63             else:
64                 q = q + ' and class = %s'
65                 if tablename == 'location_property_osmline':
66                     params = (osmid, cls)
67                 else:
68                     params = (osmtype, osmid, cls)
69             cur.execute(q, params)
70             assert(cur.rowcount > 0)
71             for res in cur:
72                 for k,v in line.iteritems():
73                     if not k == 'object':
74                         assert_in(k, res)
75                         if type(res[k]) is dict:
76                             val = world.make_hash(v)
77                             assert_equals(res[k], val)
78                         elif k in ('parent_place_id', 'linked_place_id'):
79                             pid = world.get_placeid(v)
80                             assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
81                         elif k == 'centroid':
82                             world.match_geometry((res['clat'], res['clon']), v)
83                         else:
84                             assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
85     finally:
86         cur.close()
87         world.conn.commit()
88
89 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
90 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
91     cur = world.conn.cursor()
92     try:
93         q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
94         args = [osmtyp, int(osmid)]
95         if placeclass is not None:
96             q = q + ' and class = %s'
97             args.append(placeclass[1:])
98         cur.execute(q, args)
99         numres = cur.fetchone()[0]
100         assert_equals (numres, 0)
101     finally:
102         cur.close()
103         world.conn.commit()
104
105 @step(u'table location_property_osmline has no entry for W(\d+)?')
106 def check_osmline_missing(step, osmid):
107     cur = world.conn.cursor()
108     try:
109         q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
110         cur.execute(q)
111         numres = cur.fetchone()[0]
112         assert_equals (numres, 0)
113     finally:
114         cur.close()
115         world.conn.commit()
116
117 @step(u'search_name table contains$')
118 def check_search_name_content(step):
119     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
120     for line in step.hashes:
121         placeid = world.get_placeid(line['place_id'])
122         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
123         assert(cur.rowcount > 0)
124         for res in cur:
125             for k,v in line.iteritems():
126                 if k in ('search_rank', 'address_rank'):
127                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
128                 elif k in ('importance'):
129                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
130                 elif k in ('name_vector', 'nameaddress_vector'):
131                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
132                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
133                     assert cur.rowcount >= len(terms)
134                     for wid in cur:
135                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
136                 elif k in ('country_code'):
137                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
138                 elif k == 'place_id':
139                     pass
140                 else:
141                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
142
143 @step(u'way (\d+) expands to lines')
144 def check_interpolation_lines(step, wayid):
145     """Check that the correct interpolation line has been entered in
146        location_property_osmline for the given source line/nodes.
147        Expected are three columns:
148        startnumber, endnumber and linegeo
149     """
150     lines = []
151     for line in step.hashes:
152         lines.append((line["startnumber"], line["endnumber"], line["geometry"]))
153     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
154     cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
155                    FROM location_property_osmline WHERE osm_id = %s""",
156                    (int(wayid),))
157     assert_equals(len(lines), cur.rowcount)
158     for r in cur:
159         linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
160         exp = (r["startnumber"], r["endnumber"], linegeo)
161         assert_in(exp, lines)
162         lines.remove(exp)
163
164 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
165 def check_interpolated_housenumber_list(step, nodeid, numberlist):
166     """ Checks that the interpolated house numbers corresponds
167         to the given list.
168     """
169     expected = numberlist.split(',');
170     cur = world.conn.cursor()
171     cur.execute("""SELECT housenumber FROM placex
172                    WHERE osm_type = 'W' and osm_id = %s
173                    and class = 'place' and type = 'address'""", (int(nodeid),))
174     for r in cur:
175         assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
176         expected.remove(r[0])
177     assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
178
179 @step(u'way (\d+) expands to no housenumbers')
180 def check_no_interpolated_housenumber_list(step, nodeid):
181     """ Checks that the interpolated house numbers corresponds
182         to the given list.
183     """
184     cur = world.conn.cursor()
185     cur.execute("""SELECT housenumber FROM placex
186                    WHERE osm_type = 'W' and osm_id = %s
187                      and class = 'place' and type = 'address'""", (int(nodeid),))
188     res = [r[0] for r in cur]
189     assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
190
191 @step(u'table search_name has no entry for (.*)')
192 def check_placex_missing(step, osmid):
193     """ Checks if there is an entry in the search index for the
194         given place object.
195     """
196     cur = world.conn.cursor()
197     placeid = world.get_placeid(osmid)
198     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
199     numres = cur.fetchone()[0]
200     assert_equals (numres, 0)
201