X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/2d6f00945ab433f51be5df0576625e29d5b6300c..7879ad44cd1e36ae1235c3dcdfd9e477491813e4:/tests/steps/db_results.py?ds=sidebyside diff --git a/tests/steps/db_results.py b/tests/steps/db_results.py index 2b44215e..7fe4da30 100644 --- a/tests/steps/db_results.py +++ b/tests/steps/db_results.py @@ -11,7 +11,6 @@ import psycopg2 import psycopg2.extensions import psycopg2.extras import os -import subprocess import random import json import re @@ -22,7 +21,7 @@ logger = logging.getLogger(__name__) @step(u'table placex contains as names for (N|R|W)(\d+)') def check_placex_names(step, osmtyp, osmid): - """ Check for the exact content of the name hstaore in placex. + """ Check for the exact content of the name hstore in placex. """ cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid))) @@ -36,6 +35,7 @@ def check_placex_names(step, osmtyp, osmid): + @step(u'table ([a-z_]+) contains$') def check_placex_content(step, tablename): """ check that the given lines are in the given table @@ -43,46 +43,75 @@ def check_placex_content(step, tablename): given columns are tested. If there is more than one line for an OSM object, they must match in these columns. """ - cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - for line in step.hashes: - osmtype, osmid, cls = world.split_id(line['object']) - q = 'SELECT *' - if tablename == 'placex': - q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" - q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) - if cls is None: - params = (osmtype, osmid) - else: - q = q + ' and class = %s' - params = (osmtype, osmid, cls) - cur.execute(q, params) - assert(cur.rowcount > 0) - for res in cur: - for k,v in line.iteritems(): - if not k == 'object': - assert_in(k, res) - if type(res[k]) is dict: - val = world.make_hash(v) - assert_equals(res[k], val) - elif k in ('parent_place_id', 'linked_place_id'): - pid = world.get_placeid(v) - assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) - elif k == 'centroid': - world.match_geometry((res['clat'], res['clon']), v) - else: - assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + try: + cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + for line in step.hashes: + osmtype, osmid, cls = world.split_id(line['object']) + q = 'SELECT *' + if tablename == 'placex': + q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" + if tablename == 'location_property_osmline': + q = q + ' FROM %s where osm_id = %%s' % (tablename,) + else: + q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) + if cls is None: + if tablename == 'location_property_osmline': + params = (osmid,) + else: + params = (osmtype, osmid) + else: + q = q + ' and class = %s' + if tablename == 'location_property_osmline': + params = (osmid, cls) + else: + params = (osmtype, osmid, cls) + cur.execute(q, params) + assert(cur.rowcount > 0) + for res in cur: + for k,v in line.iteritems(): + if not k == 'object': + assert_in(k, res) + if type(res[k]) is dict: + val = world.make_hash(v) + assert_equals(res[k], val) + elif k in ('parent_place_id', 'linked_place_id'): + pid = world.get_placeid(v) + assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) + elif k == 'centroid': + world.match_geometry((res['clat'], res['clon']), v) + else: + assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + finally: + cur.close() + world.conn.commit() @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?') def check_placex_missing(step, tablename, osmtyp, osmid, placeclass): cur = world.conn.cursor() - q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) - args = [osmtyp, int(osmid)] - if placeclass is not None: - q = q + ' and class = %s' - args.append(placeclass[1:]) - cur.execute(q, args) - numres = cur.fetchone()[0] - assert_equals (numres, 0) + try: + q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) + args = [osmtyp, int(osmid)] + if placeclass is not None: + q = q + ' and class = %s' + args.append(placeclass[1:]) + cur.execute(q, args) + numres = cur.fetchone()[0] + assert_equals (numres, 0) + finally: + cur.close() + world.conn.commit() + +@step(u'table location_property_osmline has no entry for W(\d+)?') +def check_osmline_missing(step, osmid): + cur = world.conn.cursor() + try: + q = 'SELECT count(*) FROM location_property_osmline where osm_id = %s' % (osmid, ) + cur.execute(q) + numres = cur.fetchone()[0] + assert_equals (numres, 0) + finally: + cur.close() + world.conn.commit() @step(u'search_name table contains$') def check_search_name_content(step): @@ -110,10 +139,10 @@ def check_search_name_content(step): else: raise Exception("Cannot handle field %s in search_name table" % (k, )) -@step(u'node (\d+) expands to housenumbers') +@step(u'way (\d+) expands to housenumbers') def check_interpolated_housenumbers(step, nodeid): """Check that the exact set of housenumbers has been entered in - placex for the given source node. Expected are tow columns: + placex for the given source node. Expected are two columns: housenumber and centroid """ numbers = {} @@ -123,14 +152,64 @@ def check_interpolated_housenumbers(step, nodeid): cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute("""SELECT DISTINCT housenumber, ST_X(centroid) as clat, ST_Y(centroid) as clon - FROM placex WHERE osm_type = 'N' and osm_id = %s""", + FROM placex WHERE osm_type = 'W' and osm_id = %s + and class = 'place' and type = 'address'""", (int(nodeid),)) + assert_equals(len(numbers), cur.rowcount) for r in cur: assert_in(r["housenumber"], numbers) world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]]) del numbers[r["housenumber"]] +@step(u'way (\d+) expands to lines') +def check_interpolation_lines(step, wayid): + """Check that the correct interpolation line has been entered in + location_property_osmline for the given source line/nodes. Expected are three columns: + startnumber, endnumber and linegeo + """ + lines = {} + for line in step.hashes: + assert line["startnumber"] not in lines + lines[line["startnumber"]] = {'endnumber': line["endnumber"], 'geometry': line["geometry"]} + cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute("""SELECT startnumber::text, endnumber::text, st_astext(linegeo) as geometry + FROM location_property_osmline WHERE osm_id = %s""", + (int(wayid),)) + assert_equals(len(lines), cur.rowcount) + for r in cur: + assert_in(r["startnumber"], lines) + assert_equals(r["endnumber"], lines[r["startnumber"]]["endnumber"]) + linegeo = str(str(r["geometry"].split('(')[1]).split(')')[0]).replace(',', ', ') + assert_equals(linegeo, lines[r["startnumber"]]["geometry"]) + del lines[r["startnumber"]] + +@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)') +def check_interpolated_housenumber_list(step, nodeid, numberlist): + """ Checks that the interpolated house numbers corresponds + to the given list. + """ + expected = numberlist.split(','); + cur = world.conn.cursor() + cur.execute("""SELECT housenumber FROM placex + WHERE osm_type = 'W' and osm_id = %s + and class = 'place' and type = 'address'""", (int(nodeid),)) + for r in cur: + assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid)) + expected.remove(r[0]) + assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected)) + +@step(u'way (\d+) expands to no housenumbers') +def check_no_interpolated_housenumber_list(step, nodeid): + """ Checks that the interpolated house numbers corresponds + to the given list. + """ + cur = world.conn.cursor() + cur.execute("""SELECT housenumber FROM placex + WHERE osm_type = 'W' and osm_id = %s + and class = 'place' and type = 'address'""", (int(nodeid),)) + res = [r[0] for r in cur] + assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res)) @step(u'table search_name has no entry for (.*)') def check_placex_missing(step, osmid):