]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/db_results.py
tests: permit duplicate entries in interpolation table
[nominatim.git] / tests / steps / db_results.py
1 """ Steps for checking the DB after import and update tests.
2
3     There are two groups of test here. The first group tests
4     the contents of db tables directly, the second checks
5     query results by using the command line query tool.
6 """
7
8 from nose.tools import *
9 from lettuce import *
10 import psycopg2
11 import psycopg2.extensions
12 import psycopg2.extras
13 import os
14 import random
15 import json
16 import re
17 import logging
18 from collections import OrderedDict
19
20 logger = logging.getLogger(__name__)
21
22 @step(u'table placex contains as names for (N|R|W)(\d+)')
23 def check_placex_names(step, osmtyp, osmid):
24     """ Check for the exact content of the name hstore in placex.
25     """
26     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
27     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
28     for line in cur:
29         names = dict(line['name'])
30         for name in step.hashes:
31             assert_in(name['k'], names)
32             assert_equals(names[name['k']], name['v'])
33             del names[name['k']]
34         assert_equals(len(names), 0)
35
36
37
38
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41     """ check that the given lines are in the given table
42         Entries are searched by osm_type/osm_id and then all
43         given columns are tested. If there is more than one
44         line for an OSM object, they must match in these columns.
45     """
46     try:
47         cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
48         for line in step.hashes:
49             osmtype, osmid, cls = world.split_id(line['object'])
50             q = 'SELECT *'
51             if tablename == 'placex':
52                 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
53             if tablename == 'location_property_osmline':
54                 q = q + ' FROM %s where osm_id = %%s' % (tablename,)
55             else:
56                 q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
57             if cls is None:
58                 if tablename == 'location_property_osmline':
59                     params = (osmid,)
60                 else:
61                     params = (osmtype, osmid)
62             else:
63                 q = q + ' and class = %s'
64                 if tablename == 'location_property_osmline':
65                     params = (osmid, cls)
66                 else:
67                     params = (osmtype, osmid, cls)
68             cur.execute(q, params)
69             assert(cur.rowcount > 0)
70             for res in cur:
71                 for k,v in line.iteritems():
72                     if not k == 'object':
73                         assert_in(k, res)
74                         if type(res[k]) is dict:
75                             val = world.make_hash(v)
76                             assert_equals(res[k], val)
77                         elif k in ('parent_place_id', 'linked_place_id'):
78                             pid = world.get_placeid(v)
79                             assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
80                         elif k == 'centroid':
81                             world.match_geometry((res['clat'], res['clon']), v)
82                         else:
83                             assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
84     finally:
85         cur.close()
86         world.conn.commit()
87
88 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
89 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
90     cur = world.conn.cursor()
91     try:
92         q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
93         args = [osmtyp, int(osmid)]
94         if placeclass is not None:
95             q = q + ' and class = %s'
96             args.append(placeclass[1:])
97         cur.execute(q, args)
98         numres = cur.fetchone()[0]
99         assert_equals (numres, 0)
100     finally:
101         cur.close()
102         world.conn.commit()
103
104 @step(u'table location_property_osmline has no entry for W(\d+)?')
105 def check_osmline_missing(step, osmid):
106     cur = world.conn.cursor()
107     try:
108         q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
109         cur.execute(q)
110         numres = cur.fetchone()[0]
111         assert_equals (numres, 0)
112     finally:
113         cur.close()
114         world.conn.commit()
115
116 @step(u'search_name table contains$')
117 def check_search_name_content(step):
118     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
119     for line in step.hashes:
120         placeid = world.get_placeid(line['place_id'])
121         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
122         assert(cur.rowcount > 0)
123         for res in cur:
124             for k,v in line.iteritems():
125                 if k in ('search_rank', 'address_rank'):
126                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
127                 elif k in ('importance'):
128                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
129                 elif k in ('name_vector', 'nameaddress_vector'):
130                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
131                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
132                     assert cur.rowcount >= len(terms)
133                     for wid in cur:
134                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
135                 elif k in ('country_code'):
136                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
137                 elif k == 'place_id':
138                     pass
139                 else:
140                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
141
142 @step(u'way (\d+) expands to lines')
143 def check_interpolation_lines(step, wayid):
144     """Check that the correct interpolation line has been entered in
145        location_property_osmline for the given source line/nodes.
146        Expected are three columns:
147        startnumber, endnumber and linegeo
148     """
149     lines = []
150     for line in step.hashes:
151         lines.append((line["startnumber"], line["endnumber"], line["geometry"]))
152     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
153     cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
154                    FROM location_property_osmline WHERE osm_id = %s""",
155                    (int(wayid),))
156     assert_equals(len(lines), cur.rowcount)
157     for r in cur:
158         linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
159         exp = (r["startnumber"], r["endnumber"], linegeo)
160         assert_in(exp, lines)
161         lines.remove(exp)
162
163 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
164 def check_interpolated_housenumber_list(step, nodeid, numberlist):
165     """ Checks that the interpolated house numbers corresponds
166         to the given list.
167     """
168     expected = numberlist.split(',');
169     cur = world.conn.cursor()
170     cur.execute("""SELECT housenumber FROM placex
171                    WHERE osm_type = 'W' and osm_id = %s
172                    and class = 'place' and type = 'address'""", (int(nodeid),))
173     for r in cur:
174         assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
175         expected.remove(r[0])
176     assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
177
178 @step(u'way (\d+) expands to no housenumbers')
179 def check_no_interpolated_housenumber_list(step, nodeid):
180     """ Checks that the interpolated house numbers corresponds
181         to the given list.
182     """
183     cur = world.conn.cursor()
184     cur.execute("""SELECT housenumber FROM placex
185                    WHERE osm_type = 'W' and osm_id = %s
186                      and class = 'place' and type = 'address'""", (int(nodeid),))
187     res = [r[0] for r in cur]
188     assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
189
190 @step(u'table search_name has no entry for (.*)')
191 def check_placex_missing(step, osmid):
192     """ Checks if there is an entry in the search index for the
193         given place object.
194     """
195     cur = world.conn.cursor()
196     placeid = world.get_placeid(osmid)
197     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
198     numres = cur.fetchone()[0]
199     assert_equals (numres, 0)
200