]> git.openstreetmap.org Git - nominatim.git/blob - tests/steps/db_results.py
c3ac9445fe55738c8e87f9f78dc855c91c0c2ba8
[nominatim.git] / tests / steps / db_results.py
1 """ Steps for checking the DB after import and update tests.
2
3     There are two groups of test here. The first group tests
4     the contents of db tables directly, the second checks
5     query results by using the command line query tool.
6 """
7
8 from nose.tools import *
9 from lettuce import *
10 import psycopg2
11 import psycopg2.extensions
12 import psycopg2.extras
13 import os
14 import subprocess
15 import random
16 import json
17 import re
18 import logging
19 from collections import OrderedDict
20
21 logger = logging.getLogger(__name__)
22
23 @step(u'table placex contains as names for (N|R|W)(\d+)')
24 def check_placex_names(step, osmtyp, osmid):
25     """ Check for the exact content of the name hstore in placex.
26     """
27     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
28     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
29     for line in cur:
30         names = dict(line['name'])
31         for name in step.hashes:
32             assert_in(name['k'], names)
33             assert_equals(names[name['k']], name['v'])
34             del names[name['k']]
35         assert_equals(len(names), 0)
36
37
38
39 @step(u'table ([a-z_]+) contains$')
40 def check_placex_content(step, tablename):
41     """ check that the given lines are in the given table
42         Entries are searched by osm_type/osm_id and then all
43         given columns are tested. If there is more than one
44         line for an OSM object, they must match in these columns.
45     """
46     try:
47         cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
48         for line in step.hashes:
49             osmtype, osmid, cls = world.split_id(line['object'])
50             q = 'SELECT *'
51             if tablename == 'placex':
52                 q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
53             q = q + ", ST_GeometryType(geometry) as geometrytype"
54             q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
55             if cls is None:
56                 params = (osmtype, osmid)
57             else:
58                 q = q + ' and class = %s'
59                 params = (osmtype, osmid, cls)
60             cur.execute(q, params)
61             assert(cur.rowcount > 0)
62             for res in cur:
63                 for k,v in line.iteritems():
64                     if not k == 'object':
65                         assert_in(k, res)
66                         if type(res[k]) is dict:
67                             val = world.make_hash(v)
68                             assert_equals(res[k], val)
69                         elif k in ('parent_place_id', 'linked_place_id'):
70                             pid = world.get_placeid(v)
71                             assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
72                         elif k == 'centroid':
73                             world.match_geometry((res['clat'], res['clon']), v)
74                         else:
75                             assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
76     finally:
77         cur.close()
78         world.conn.commit()
79
80 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
81 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
82     cur = world.conn.cursor()
83     try:
84         q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
85         args = [osmtyp, int(osmid)]
86         if placeclass is not None:
87             q = q + ' and class = %s'
88             args.append(placeclass[1:])
89         cur.execute(q, args)
90         numres = cur.fetchone()[0]
91         assert_equals (numres, 0)
92     finally:
93         cur.close()
94         world.conn.commit()
95
96 @step(u'search_name table contains$')
97 def check_search_name_content(step):
98     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
99     for line in step.hashes:
100         placeid = world.get_placeid(line['place_id'])
101         cur.execute('SELECT * FROM search_name WHERE place_id = %s', (placeid,))
102         assert(cur.rowcount > 0)
103         for res in cur:
104             for k,v in line.iteritems():
105                 if k in ('search_rank', 'address_rank'):
106                     assert_equals(int(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
107                 elif k in ('importance'):
108                     assert_equals(float(v), res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
109                 elif k in ('name_vector', 'nameaddress_vector'):
110                     terms = [x.strip().replace('#', ' ') for x in v.split(',')]
111                     cur.execute('SELECT word_id, word_token FROM word, (SELECT unnest(%s) as term) t WHERE word_token = make_standard_name(t.term)', (terms,))
112                     assert cur.rowcount >= len(terms)
113                     for wid in cur:
114                         assert_in(wid['word_id'], res[k], "Missing term for %s/%s: %s" % (line['place_id'], k, wid['word_token']))
115                 elif k in ('country_code'):
116                     assert_equals(v, res[k], "Results for '%s'/'%s' differ: '%s' != '%d'" % (line['place_id'], k, v, res[k]))
117                 elif k == 'place_id':
118                     pass
119                 else:
120                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
121
122 @step(u'way (\d+) expands to housenumbers')
123 def check_interpolated_housenumbers(step, nodeid):
124     """Check that the exact set of housenumbers has been entered in
125        placex for the given source node. Expected are two columns:
126        housenumber and centroid
127     """
128     numbers = {}
129     for line in step.hashes:
130         assert line["housenumber"] not in numbers
131         numbers[line["housenumber"]] = line["centroid"]
132     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
133     cur.execute("""SELECT DISTINCT housenumber,
134                           ST_X(centroid) as clat, ST_Y(centroid) as clon
135                    FROM placex WHERE osm_type = 'W' and osm_id = %s
136                                  and class = 'place' and type = 'address'""",
137                    (int(nodeid),))
138     assert_equals(len(numbers), cur.rowcount)
139     for r in cur:
140         assert_in(r["housenumber"], numbers)
141         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
142         del numbers[r["housenumber"]]
143
144 @step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
145 def check_interpolated_housenumber_list(step, nodeid, numberlist):
146     """ Checks that the interpolated house numbers corresponds
147         to the given list.
148     """
149     expected = numberlist.split(',');
150     cur = world.conn.cursor()
151     cur.execute("""SELECT housenumber FROM placex
152                    WHERE osm_type = 'W' and osm_id = %s
153                      and class = 'place' and type = 'address'""", (int(nodeid),))
154     for r in cur:
155         assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
156         expected.remove(r[0])
157     assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
158
159 @step(u'way (\d+) expands to no housenumbers')
160 def check_no_interpolated_housenumber_list(step, nodeid):
161     """ Checks that the interpolated house numbers corresponds
162         to the given list.
163     """
164     cur = world.conn.cursor()
165     cur.execute("""SELECT housenumber FROM placex
166                    WHERE osm_type = 'W' and osm_id = %s
167                      and class = 'place' and type = 'address'""", (int(nodeid),))
168     res = [r[0] for r in cur]
169     assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
170
171 @step(u'table search_name has no entry for (.*)')
172 def check_placex_missing(step, osmid):
173     """ Checks if there is an entry in the search index for the
174         given place object.
175     """
176     cur = world.conn.cursor()
177     placeid = world.get_placeid(osmid)
178     cur.execute('SELECT count(*) FROM search_name WHERE place_id =%s', (placeid,))
179     numres = cur.fetchone()[0]
180     assert_equals (numres, 0)
181