]> git.openstreetmap.org Git - nominatim.git/blobdiff - tests/steps/db_results.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / tests / steps / db_results.py
index 2b44215e7e0aa328f421421559c2b60b186dbf44..c3ac9445fe55738c8e87f9f78dc855c91c0c2ba8 100644 (file)
@@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
 
 @step(u'table placex contains as names for (N|R|W)(\d+)')
 def check_placex_names(step, osmtyp, osmid):
-    """ Check for the exact content of the name hstaore in placex.
+    """ Check for the exact content of the name hstore in placex.
     """
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
@@ -43,46 +43,55 @@ def check_placex_content(step, tablename):
         given columns are tested. If there is more than one
         line for an OSM object, they must match in these columns.
     """
-    cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
-    for line in step.hashes:
-        osmtype, osmid, cls = world.split_id(line['object'])
-        q = 'SELECT *'
-        if tablename == 'placex':
-            q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
-        q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
-        if cls is None:
-            params = (osmtype, osmid)
-        else:
-            q = q + ' and class = %s'
-            params = (osmtype, osmid, cls)
-        cur.execute(q, params)
-        assert(cur.rowcount > 0)
-        for res in cur:
-            for k,v in line.iteritems():
-                if not k == 'object':
-                    assert_in(k, res)
-                    if type(res[k]) is dict:
-                        val = world.make_hash(v)
-                        assert_equals(res[k], val)
-                    elif k in ('parent_place_id', 'linked_place_id'):
-                        pid = world.get_placeid(v)
-                        assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
-                    elif k == 'centroid':
-                        world.match_geometry((res['clat'], res['clon']), v)
-                    else:
-                        assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    try:
+        cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+        for line in step.hashes:
+            osmtype, osmid, cls = world.split_id(line['object'])
+            q = 'SELECT *'
+            if tablename == 'placex':
+                q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
+            q = q + ", ST_GeometryType(geometry) as geometrytype"
+            q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
+            if cls is None:
+                params = (osmtype, osmid)
+            else:
+                q = q + ' and class = %s'
+                params = (osmtype, osmid, cls)
+            cur.execute(q, params)
+            assert(cur.rowcount > 0)
+            for res in cur:
+                for k,v in line.iteritems():
+                    if not k == 'object':
+                        assert_in(k, res)
+                        if type(res[k]) is dict:
+                            val = world.make_hash(v)
+                            assert_equals(res[k], val)
+                        elif k in ('parent_place_id', 'linked_place_id'):
+                            pid = world.get_placeid(v)
+                            assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
+                        elif k == 'centroid':
+                            world.match_geometry((res['clat'], res['clon']), v)
+                        else:
+                            assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
     cur = world.conn.cursor()
-    q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
-    args = [osmtyp, int(osmid)]
-    if placeclass is not None:
-        q = q + ' and class = %s'
-        args.append(placeclass[1:])
-    cur.execute(q, args)
-    numres = cur.fetchone()[0]
-    assert_equals (numres, 0)
+    try:
+        q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
+        args = [osmtyp, int(osmid)]
+        if placeclass is not None:
+            q = q + ' and class = %s'
+            args.append(placeclass[1:])
+        cur.execute(q, args)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'search_name table contains$')
 def check_search_name_content(step):
@@ -110,10 +119,10 @@ def check_search_name_content(step):
                 else:
                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
 
-@step(u'node (\d+) expands to housenumbers')
+@step(u'way (\d+) expands to housenumbers')
 def check_interpolated_housenumbers(step, nodeid):
     """Check that the exact set of housenumbers has been entered in
-       placex for the given source node. Expected are tow columns:
+       placex for the given source node. Expected are two columns:
        housenumber and centroid
     """
     numbers = {}
@@ -123,7 +132,8 @@ def check_interpolated_housenumbers(step, nodeid):
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute("""SELECT DISTINCT housenumber,
                           ST_X(centroid) as clat, ST_Y(centroid) as clon
-                   FROM placex WHERE osm_type = 'N' and osm_id = %s""",
+                   FROM placex WHERE osm_type = 'W' and osm_id = %s
+                                 and class = 'place' and type = 'address'""",
                    (int(nodeid),))
     assert_equals(len(numbers), cur.rowcount)
     for r in cur:
@@ -131,6 +141,32 @@ def check_interpolated_housenumbers(step, nodeid):
         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
         del numbers[r["housenumber"]]
 
+@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
+def check_interpolated_housenumber_list(step, nodeid, numberlist):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    expected = numberlist.split(',');
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                     and class = 'place' and type = 'address'""", (int(nodeid),))
+    for r in cur:
+        assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
+        expected.remove(r[0])
+    assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
+
+@step(u'way (\d+) expands to no housenumbers')
+def check_no_interpolated_housenumber_list(step, nodeid):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                     and class = 'place' and type = 'address'""", (int(nodeid),))
+    res = [r[0] for r in cur]
+    assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
 
 @step(u'table search_name has no entry for (.*)')
 def check_placex_missing(step, osmid):