]> git.openstreetmap.org Git - nominatim.git/blobdiff - tests/steps/db_results.py
Merge branch 'interpolations'
[nominatim.git] / tests / steps / db_results.py
index f65e992462366373143519d03e1adeb7bf2a11ec..53374e71e463f2f92033536bf07ee1c98c31070a 100644 (file)
@@ -11,7 +11,6 @@ import psycopg2
 import psycopg2.extensions
 import psycopg2.extras
 import os
-import subprocess
 import random
 import json
 import re
@@ -22,7 +21,7 @@ logger = logging.getLogger(__name__)
 
 @step(u'table placex contains as names for (N|R|W)(\d+)')
 def check_placex_names(step, osmtyp, osmid):
-    """ Check for the exact content of the name hstaore in placex.
+    """ Check for the exact content of the name hstore in placex.
     """
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
@@ -36,6 +35,7 @@ def check_placex_names(step, osmtyp, osmid):
 
 
 
+
 @step(u'table ([a-z_]+) contains$')
 def check_placex_content(step, tablename):
     """ check that the given lines are in the given table
@@ -43,47 +43,76 @@ def check_placex_content(step, tablename):
         given columns are tested. If there is more than one
         line for an OSM object, they must match in these columns.
     """
-    cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
-    for line in step.hashes:
-        osmtype, osmid, cls = world.split_id(line['object'])
-        q = 'SELECT *'
-        if tablename == 'placex':
-            q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
-        q = q + ", ST_GeometryType(geometry) as geometrytype"
-        q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
-        if cls is None:
-            params = (osmtype, osmid)
-        else:
-            q = q + ' and class = %s'
-            params = (osmtype, osmid, cls)
-        cur.execute(q, params)
-        assert(cur.rowcount > 0)
-        for res in cur:
-            for k,v in line.iteritems():
-                if not k == 'object':
-                    assert_in(k, res)
-                    if type(res[k]) is dict:
-                        val = world.make_hash(v)
-                        assert_equals(res[k], val)
-                    elif k in ('parent_place_id', 'linked_place_id'):
-                        pid = world.get_placeid(v)
-                        assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
-                    elif k == 'centroid':
-                        world.match_geometry((res['clat'], res['clon']), v)
-                    else:
-                        assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    try:
+        cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+        for line in step.hashes:
+            osmtype, osmid, cls = world.split_id(line['object'])
+            q = 'SELECT *'
+            if tablename == 'placex':
+                q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
+            if tablename == 'location_property_osmline':
+                q = q + ' FROM %s where osm_id = %%s' % (tablename,)
+            else:
+                q = q + ", ST_GeometryType(geometry) as geometrytype"
+                q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
+            if cls is None:
+                if tablename == 'location_property_osmline':
+                    params = (osmid,)
+                else:
+                    params = (osmtype, osmid)
+            else:
+                q = q + ' and class = %s'
+                if tablename == 'location_property_osmline':
+                    params = (osmid, cls)
+                else:
+                    params = (osmtype, osmid, cls)
+            cur.execute(q, params)
+            assert(cur.rowcount > 0)
+            for res in cur:
+                for k,v in line.iteritems():
+                    if not k == 'object':
+                        assert_in(k, res)
+                        if type(res[k]) is dict:
+                            val = world.make_hash(v)
+                            assert_equals(res[k], val)
+                        elif k in ('parent_place_id', 'linked_place_id'):
+                            pid = world.get_placeid(v)
+                            assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
+                        elif k == 'centroid':
+                            world.match_geometry((res['clat'], res['clon']), v)
+                        else:
+                            assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
     cur = world.conn.cursor()
-    q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
-    args = [osmtyp, int(osmid)]
-    if placeclass is not None:
-        q = q + ' and class = %s'
-        args.append(placeclass[1:])
-    cur.execute(q, args)
-    numres = cur.fetchone()[0]
-    assert_equals (numres, 0)
+    try:
+        q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
+        args = [osmtyp, int(osmid)]
+        if placeclass is not None:
+            q = q + ' and class = %s'
+            args.append(placeclass[1:])
+        cur.execute(q, args)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
+
+@step(u'table location_property_osmline has no entry for W(\d+)?')
+def check_osmline_missing(step, osmid):
+    cur = world.conn.cursor()
+    try:
+        q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
+        cur.execute(q)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'search_name table contains$')
 def check_search_name_content(step):
@@ -111,27 +140,53 @@ def check_search_name_content(step):
                 else:
                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
 
-@step(u'node (\d+) expands to housenumbers')
-def check_interpolated_housenumbers(step, nodeid):
-    """Check that the exact set of housenumbers has been entered in
-       placex for the given source node. Expected are tow columns:
-       housenumber and centroid
+@step(u'way (\d+) expands to lines')
+def check_interpolation_lines(step, wayid):
+    """Check that the correct interpolation line has been entered in
+       location_property_osmline for the given source line/nodes.
+       Expected are three columns:
+       startnumber, endnumber and linegeo
     """
-    numbers = {}
+    lines = []
     for line in step.hashes:
-        assert line["housenumber"] not in numbers
-        numbers[line["housenumber"]] = line["centroid"]
+        lines.append((line["startnumber"], line["endnumber"], line["geometry"]))
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
-    cur.execute("""SELECT DISTINCT housenumber,
-                          ST_X(centroid) as clat, ST_Y(centroid) as clon
-                   FROM placex WHERE osm_type = 'N' and osm_id = %s""",
-                   (int(nodeid),))
-    assert_equals(len(numbers), cur.rowcount)
+    cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
+                   FROM location_property_osmline WHERE osm_id = %s""",
+                   (int(wayid),))
+    assert_equals(len(lines), cur.rowcount)
+    for r in cur:
+        linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
+        exp = (r["startnumber"], r["endnumber"], linegeo)
+        assert_in(exp, lines)
+        lines.remove(exp)
+
+@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
+def check_interpolated_housenumber_list(step, nodeid, numberlist):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    expected = numberlist.split(',');
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                   and class = 'place' and type = 'address'""", (int(nodeid),))
     for r in cur:
-        assert_in(r["housenumber"], numbers)
-        world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
-        del numbers[r["housenumber"]]
+        assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
+        expected.remove(r[0])
+    assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
 
+@step(u'way (\d+) expands to no housenumbers')
+def check_no_interpolated_housenumber_list(step, nodeid):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                     and class = 'place' and type = 'address'""", (int(nodeid),))
+    res = [r[0] for r in cur]
+    assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
 
 @step(u'table search_name has no entry for (.*)')
 def check_placex_missing(step, osmid):