]> git.openstreetmap.org Git - nominatim.git/blobdiff - tests/steps/db_results.py
Remove interpolation lines from placex and save them in an extra table.
[nominatim.git] / tests / steps / db_results.py
index 2b44215e7e0aa328f421421559c2b60b186dbf44..7fe4da30557b0657448ba020c4e859ab744483eb 100644 (file)
@@ -11,7 +11,6 @@ import psycopg2
 import psycopg2.extensions
 import psycopg2.extras
 import os
-import subprocess
 import random
 import json
 import re
@@ -22,7 +21,7 @@ logger = logging.getLogger(__name__)
 
 @step(u'table placex contains as names for (N|R|W)(\d+)')
 def check_placex_names(step, osmtyp, osmid):
-    """ Check for the exact content of the name hstaore in placex.
+    """ Check for the exact content of the name hstore in placex.
     """
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
@@ -36,6 +35,7 @@ def check_placex_names(step, osmtyp, osmid):
 
 
 
+
 @step(u'table ([a-z_]+) contains$')
 def check_placex_content(step, tablename):
     """ check that the given lines are in the given table
@@ -43,46 +43,75 @@ def check_placex_content(step, tablename):
         given columns are tested. If there is more than one
         line for an OSM object, they must match in these columns.
     """
-    cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
-    for line in step.hashes:
-        osmtype, osmid, cls = world.split_id(line['object'])
-        q = 'SELECT *'
-        if tablename == 'placex':
-            q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
-        q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
-        if cls is None:
-            params = (osmtype, osmid)
-        else:
-            q = q + ' and class = %s'
-            params = (osmtype, osmid, cls)
-        cur.execute(q, params)
-        assert(cur.rowcount > 0)
-        for res in cur:
-            for k,v in line.iteritems():
-                if not k == 'object':
-                    assert_in(k, res)
-                    if type(res[k]) is dict:
-                        val = world.make_hash(v)
-                        assert_equals(res[k], val)
-                    elif k in ('parent_place_id', 'linked_place_id'):
-                        pid = world.get_placeid(v)
-                        assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
-                    elif k == 'centroid':
-                        world.match_geometry((res['clat'], res['clon']), v)
-                    else:
-                        assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    try:
+        cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+        for line in step.hashes:
+            osmtype, osmid, cls = world.split_id(line['object'])
+            q = 'SELECT *'
+            if tablename == 'placex':
+                q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
+            if tablename == 'location_property_osmline':
+                q = q + ' FROM %s where osm_id = %%s' % (tablename,)
+            else:
+                q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
+            if cls is None:
+                if tablename == 'location_property_osmline':
+                    params = (osmid,)
+                else:
+                    params = (osmtype, osmid)
+            else:
+                q = q + ' and class = %s'
+                if tablename == 'location_property_osmline':
+                    params = (osmid, cls)
+                else:
+                    params = (osmtype, osmid, cls)
+            cur.execute(q, params)
+            assert(cur.rowcount > 0)
+            for res in cur:
+                for k,v in line.iteritems():
+                    if not k == 'object':
+                        assert_in(k, res)
+                        if type(res[k]) is dict:
+                            val = world.make_hash(v)
+                            assert_equals(res[k], val)
+                        elif k in ('parent_place_id', 'linked_place_id'):
+                            pid = world.get_placeid(v)
+                            assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
+                        elif k == 'centroid':
+                            world.match_geometry((res['clat'], res['clon']), v)
+                        else:
+                            assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
     cur = world.conn.cursor()
-    q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
-    args = [osmtyp, int(osmid)]
-    if placeclass is not None:
-        q = q + ' and class = %s'
-        args.append(placeclass[1:])
-    cur.execute(q, args)
-    numres = cur.fetchone()[0]
-    assert_equals (numres, 0)
+    try:
+        q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
+        args = [osmtyp, int(osmid)]
+        if placeclass is not None:
+            q = q + ' and class = %s'
+            args.append(placeclass[1:])
+        cur.execute(q, args)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
+
+@step(u'table location_property_osmline has no entry for W(\d+)?')
+def check_osmline_missing(step, osmid):
+    cur = world.conn.cursor()
+    try:
+        q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
+        cur.execute(q)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'search_name table contains$')
 def check_search_name_content(step):
@@ -110,10 +139,10 @@ def check_search_name_content(step):
                 else:
                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
 
-@step(u'node (\d+) expands to housenumbers')
+@step(u'way (\d+) expands to housenumbers')
 def check_interpolated_housenumbers(step, nodeid):
     """Check that the exact set of housenumbers has been entered in
-       placex for the given source node. Expected are tow columns:
+       placex for the given source node. Expected are two columns:
        housenumber and centroid
     """
     numbers = {}
@@ -123,14 +152,64 @@ def check_interpolated_housenumbers(step, nodeid):
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute("""SELECT DISTINCT housenumber,
                           ST_X(centroid) as clat, ST_Y(centroid) as clon
-                   FROM placex WHERE osm_type = 'N' and osm_id = %s""",
+                   FROM placex WHERE osm_type = 'W' and osm_id = %s
+                                 and class = 'place' and type = 'address'""",
                    (int(nodeid),))
+                   
     assert_equals(len(numbers), cur.rowcount)
     for r in cur:
         assert_in(r["housenumber"], numbers)
         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
         del numbers[r["housenumber"]]
 
+@step(u'way (\d+) expands to lines')
+def check_interpolation_lines(step, wayid):
+    """Check that the correct interpolation line has been entered in
+       location_property_osmline for the given source line/nodes. Expected are three columns:
+       startnumber, endnumber and linegeo
+    """
+    lines = {}
+    for line in step.hashes:
+        assert line["startnumber"] not in lines
+        lines[line["startnumber"]] = {'endnumber': line["endnumber"], 'geometry': line["geometry"]}
+    cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+    cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
+                   FROM location_property_osmline WHERE osm_id = %s""",
+                   (int(wayid),))
+    assert_equals(len(lines), cur.rowcount)
+    for r in cur:
+        assert_in(r["startnumber"], lines)
+        assert_equals(r["endnumber"], lines[r["startnumber"]]["endnumber"])
+        linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
+        assert_equals(linegeo, lines[r["startnumber"]]["geometry"])
+        del lines[r["startnumber"]]
+
+@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
+def check_interpolated_housenumber_list(step, nodeid, numberlist):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    expected = numberlist.split(',');
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                   and class = 'place' and type = 'address'""", (int(nodeid),))
+    for r in cur:
+        assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
+        expected.remove(r[0])
+    assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
+
+@step(u'way (\d+) expands to no housenumbers')
+def check_no_interpolated_housenumber_list(step, nodeid):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                     and class = 'place' and type = 'address'""", (int(nodeid),))
+    res = [r[0] for r in cur]
+    assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
 
 @step(u'table search_name has no entry for (.*)')
 def check_placex_missing(step, osmid):