]> git.openstreetmap.org Git - nominatim.git/blobdiff - tests/steps/db_results.py
Remove interpolation lines from placex and save them in an extra table.
[nominatim.git] / tests / steps / db_results.py
index 2b44215e7e0aa328f421421559c2b60b186dbf44..7fe4da30557b0657448ba020c4e859ab744483eb 100644 (file)
@@ -11,7 +11,6 @@ import psycopg2
 import psycopg2.extensions
 import psycopg2.extras
 import os
 import psycopg2.extensions
 import psycopg2.extras
 import os
-import subprocess
 import random
 import json
 import re
 import random
 import json
 import re
@@ -22,7 +21,7 @@ logger = logging.getLogger(__name__)
 
 @step(u'table placex contains as names for (N|R|W)(\d+)')
 def check_placex_names(step, osmtyp, osmid):
 
 @step(u'table placex contains as names for (N|R|W)(\d+)')
 def check_placex_names(step, osmtyp, osmid):
-    """ Check for the exact content of the name hstaore in placex.
+    """ Check for the exact content of the name hstore in placex.
     """
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
     """
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
@@ -36,6 +35,7 @@ def check_placex_names(step, osmtyp, osmid):
 
 
 
 
 
 
+
 @step(u'table ([a-z_]+) contains$')
 def check_placex_content(step, tablename):
     """ check that the given lines are in the given table
 @step(u'table ([a-z_]+) contains$')
 def check_placex_content(step, tablename):
     """ check that the given lines are in the given table
@@ -43,46 +43,75 @@ def check_placex_content(step, tablename):
         given columns are tested. If there is more than one
         line for an OSM object, they must match in these columns.
     """
         given columns are tested. If there is more than one
         line for an OSM object, they must match in these columns.
     """
-    cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
-    for line in step.hashes:
-        osmtype, osmid, cls = world.split_id(line['object'])
-        q = 'SELECT *'
-        if tablename == 'placex':
-            q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
-        q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
-        if cls is None:
-            params = (osmtype, osmid)
-        else:
-            q = q + ' and class = %s'
-            params = (osmtype, osmid, cls)
-        cur.execute(q, params)
-        assert(cur.rowcount > 0)
-        for res in cur:
-            for k,v in line.iteritems():
-                if not k == 'object':
-                    assert_in(k, res)
-                    if type(res[k]) is dict:
-                        val = world.make_hash(v)
-                        assert_equals(res[k], val)
-                    elif k in ('parent_place_id', 'linked_place_id'):
-                        pid = world.get_placeid(v)
-                        assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
-                    elif k == 'centroid':
-                        world.match_geometry((res['clat'], res['clon']), v)
-                    else:
-                        assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    try:
+        cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+        for line in step.hashes:
+            osmtype, osmid, cls = world.split_id(line['object'])
+            q = 'SELECT *'
+            if tablename == 'placex':
+                q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
+            if tablename == 'location_property_osmline':
+                q = q + ' FROM %s where osm_id = %%s' % (tablename,)
+            else:
+                q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
+            if cls is None:
+                if tablename == 'location_property_osmline':
+                    params = (osmid,)
+                else:
+                    params = (osmtype, osmid)
+            else:
+                q = q + ' and class = %s'
+                if tablename == 'location_property_osmline':
+                    params = (osmid, cls)
+                else:
+                    params = (osmtype, osmid, cls)
+            cur.execute(q, params)
+            assert(cur.rowcount > 0)
+            for res in cur:
+                for k,v in line.iteritems():
+                    if not k == 'object':
+                        assert_in(k, res)
+                        if type(res[k]) is dict:
+                            val = world.make_hash(v)
+                            assert_equals(res[k], val)
+                        elif k in ('parent_place_id', 'linked_place_id'):
+                            pid = world.get_placeid(v)
+                            assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
+                        elif k == 'centroid':
+                            world.match_geometry((res['clat'], res['clon']), v)
+                        else:
+                            assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
     cur = world.conn.cursor()
 
 @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
 def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
     cur = world.conn.cursor()
-    q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
-    args = [osmtyp, int(osmid)]
-    if placeclass is not None:
-        q = q + ' and class = %s'
-        args.append(placeclass[1:])
-    cur.execute(q, args)
-    numres = cur.fetchone()[0]
-    assert_equals (numres, 0)
+    try:
+        q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
+        args = [osmtyp, int(osmid)]
+        if placeclass is not None:
+            q = q + ' and class = %s'
+            args.append(placeclass[1:])
+        cur.execute(q, args)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
+
+@step(u'table location_property_osmline has no entry for W(\d+)?')
+def check_osmline_missing(step, osmid):
+    cur = world.conn.cursor()
+    try:
+        q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, )
+        cur.execute(q)
+        numres = cur.fetchone()[0]
+        assert_equals (numres, 0)
+    finally:
+        cur.close()
+        world.conn.commit()
 
 @step(u'search_name table contains$')
 def check_search_name_content(step):
 
 @step(u'search_name table contains$')
 def check_search_name_content(step):
@@ -110,10 +139,10 @@ def check_search_name_content(step):
                 else:
                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
 
                 else:
                     raise Exception("Cannot handle field %s in search_name table" % (k, ))
 
-@step(u'node (\d+) expands to housenumbers')
+@step(u'way (\d+) expands to housenumbers')
 def check_interpolated_housenumbers(step, nodeid):
     """Check that the exact set of housenumbers has been entered in
 def check_interpolated_housenumbers(step, nodeid):
     """Check that the exact set of housenumbers has been entered in
-       placex for the given source node. Expected are tow columns:
+       placex for the given source node. Expected are two columns:
        housenumber and centroid
     """
     numbers = {}
        housenumber and centroid
     """
     numbers = {}
@@ -123,14 +152,64 @@ def check_interpolated_housenumbers(step, nodeid):
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute("""SELECT DISTINCT housenumber,
                           ST_X(centroid) as clat, ST_Y(centroid) as clon
     cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
     cur.execute("""SELECT DISTINCT housenumber,
                           ST_X(centroid) as clat, ST_Y(centroid) as clon
-                   FROM placex WHERE osm_type = 'N' and osm_id = %s""",
+                   FROM placex WHERE osm_type = 'W' and osm_id = %s
+                                 and class = 'place' and type = 'address'""",
                    (int(nodeid),))
                    (int(nodeid),))
+                   
     assert_equals(len(numbers), cur.rowcount)
     for r in cur:
         assert_in(r["housenumber"], numbers)
         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
         del numbers[r["housenumber"]]
 
     assert_equals(len(numbers), cur.rowcount)
     for r in cur:
         assert_in(r["housenumber"], numbers)
         world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
         del numbers[r["housenumber"]]
 
+@step(u'way (\d+) expands to lines')
+def check_interpolation_lines(step, wayid):
+    """Check that the correct interpolation line has been entered in
+       location_property_osmline for the given source line/nodes. Expected are three columns:
+       startnumber, endnumber and linegeo
+    """
+    lines = {}
+    for line in step.hashes:
+        assert line["startnumber"] not in lines
+        lines[line["startnumber"]] = {'endnumber': line["endnumber"], 'geometry': line["geometry"]}
+    cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+    cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry
+                   FROM location_property_osmline WHERE osm_id = %s""",
+                   (int(wayid),))
+    assert_equals(len(lines), cur.rowcount)
+    for r in cur:
+        assert_in(r["startnumber"], lines)
+        assert_equals(r["endnumber"], lines[r["startnumber"]]["endnumber"])
+        linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ')
+        assert_equals(linegeo, lines[r["startnumber"]]["geometry"])
+        del lines[r["startnumber"]]
+
+@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
+def check_interpolated_housenumber_list(step, nodeid, numberlist):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    expected = numberlist.split(',');
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                   and class = 'place' and type = 'address'""", (int(nodeid),))
+    for r in cur:
+        assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
+        expected.remove(r[0])
+    assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
+
+@step(u'way (\d+) expands to no housenumbers')
+def check_no_interpolated_housenumber_list(step, nodeid):
+    """ Checks that the interpolated house numbers corresponds
+        to the given list.
+    """
+    cur = world.conn.cursor()
+    cur.execute("""SELECT housenumber FROM placex
+                   WHERE osm_type = 'W' and osm_id = %s
+                     and class = 'place' and type = 'address'""", (int(nodeid),))
+    res = [r[0] for r in cur]
+    assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
 
 @step(u'table search_name has no entry for (.*)')
 def check_placex_missing(step, osmid):
 
 @step(u'table search_name has no entry for (.*)')
 def check_placex_missing(step, osmid):