X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/c68a8c9f2d4898bbc0fa5a13200b888093c0e808..21c590416854b1245866b96772fde61050c0e01a:/tests/steps/db_results.py?ds=sidebyside diff --git a/tests/steps/db_results.py b/tests/steps/db_results.py index f65e9924..c3ac9445 100644 --- a/tests/steps/db_results.py +++ b/tests/steps/db_results.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) @step(u'table placex contains as names for (N|R|W)(\d+)') def check_placex_names(step, osmtyp, osmid): - """ Check for the exact content of the name hstaore in placex. + """ Check for the exact content of the name hstore in placex. """ cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid))) @@ -43,47 +43,55 @@ def check_placex_content(step, tablename): given columns are tested. If there is more than one line for an OSM object, they must match in these columns. """ - cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - for line in step.hashes: - osmtype, osmid, cls = world.split_id(line['object']) - q = 'SELECT *' - if tablename == 'placex': - q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" - q = q + ", ST_GeometryType(geometry) as geometrytype" - q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) - if cls is None: - params = (osmtype, osmid) - else: - q = q + ' and class = %s' - params = (osmtype, osmid, cls) - cur.execute(q, params) - assert(cur.rowcount > 0) - for res in cur: - for k,v in line.iteritems(): - if not k == 'object': - assert_in(k, res) - if type(res[k]) is dict: - val = world.make_hash(v) - assert_equals(res[k], val) - elif k in ('parent_place_id', 'linked_place_id'): - pid = world.get_placeid(v) - assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) - elif k == 'centroid': - world.match_geometry((res['clat'], res['clon']), v) - else: - assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + try: + cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + for line in step.hashes: + osmtype, osmid, cls = world.split_id(line['object']) + q = 'SELECT *' + if tablename == 'placex': + q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" + q = q + ", ST_GeometryType(geometry) as geometrytype" + q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) + if cls is None: + params = (osmtype, osmid) + else: + q = q + ' and class = %s' + params = (osmtype, osmid, cls) + cur.execute(q, params) + assert(cur.rowcount > 0) + for res in cur: + for k,v in line.iteritems(): + if not k == 'object': + assert_in(k, res) + if type(res[k]) is dict: + val = world.make_hash(v) + assert_equals(res[k], val) + elif k in ('parent_place_id', 'linked_place_id'): + pid = world.get_placeid(v) + assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) + elif k == 'centroid': + world.match_geometry((res['clat'], res['clon']), v) + else: + assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + finally: + cur.close() + world.conn.commit() @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?') def check_placex_missing(step, tablename, osmtyp, osmid, placeclass): cur = world.conn.cursor() - q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) - args = [osmtyp, int(osmid)] - if placeclass is not None: - q = q + ' and class = %s' - args.append(placeclass[1:]) - cur.execute(q, args) - numres = cur.fetchone()[0] - assert_equals (numres, 0) + try: + q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) + args = [osmtyp, int(osmid)] + if placeclass is not None: + q = q + ' and class = %s' + args.append(placeclass[1:]) + cur.execute(q, args) + numres = cur.fetchone()[0] + assert_equals (numres, 0) + finally: + cur.close() + world.conn.commit() @step(u'search_name table contains$') def check_search_name_content(step): @@ -111,10 +119,10 @@ def check_search_name_content(step): else: raise Exception("Cannot handle field %s in search_name table" % (k, )) -@step(u'node (\d+) expands to housenumbers') +@step(u'way (\d+) expands to housenumbers') def check_interpolated_housenumbers(step, nodeid): """Check that the exact set of housenumbers has been entered in - placex for the given source node. Expected are tow columns: + placex for the given source node. Expected are two columns: housenumber and centroid """ numbers = {} @@ -124,7 +132,8 @@ def check_interpolated_housenumbers(step, nodeid): cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute("""SELECT DISTINCT housenumber, ST_X(centroid) as clat, ST_Y(centroid) as clon - FROM placex WHERE osm_type = 'N' and osm_id = %s""", + FROM placex WHERE osm_type = 'W' and osm_id = %s + and class = 'place' and type = 'address'""", (int(nodeid),)) assert_equals(len(numbers), cur.rowcount) for r in cur: @@ -132,6 +141,32 @@ def check_interpolated_housenumbers(step, nodeid): world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]]) del numbers[r["housenumber"]] +@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)') +def check_interpolated_housenumber_list(step, nodeid, numberlist): + """ Checks that the interpolated house numbers corresponds + to the given list. + """ + expected = numberlist.split(','); + cur = world.conn.cursor() + cur.execute("""SELECT housenumber FROM placex + WHERE osm_type = 'W' and osm_id = %s + and class = 'place' and type = 'address'""", (int(nodeid),)) + for r in cur: + assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid)) + expected.remove(r[0]) + assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected)) + +@step(u'way (\d+) expands to no housenumbers') +def check_no_interpolated_housenumber_list(step, nodeid): + """ Checks that the interpolated house numbers corresponds + to the given list. + """ + cur = world.conn.cursor() + cur.execute("""SELECT housenumber FROM placex + WHERE osm_type = 'W' and osm_id = %s + and class = 'place' and type = 'address'""", (int(nodeid),)) + res = [r[0] for r in cur] + assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res)) @step(u'table search_name has no entry for (.*)') def check_placex_missing(step, osmid):