X-Git-Url: https://git.openstreetmap.org./nominatim.git/blobdiff_plain/b70b2213a542595f3a890445103031e5b72e847b..a1a03a9b7965edf4f0dbbd6ac770dbe60190cbcb:/tests/steps/db_results.py?ds=inline diff --git a/tests/steps/db_results.py b/tests/steps/db_results.py index 9da1ad6b..2566418e 100644 --- a/tests/steps/db_results.py +++ b/tests/steps/db_results.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) @step(u'table placex contains as names for (N|R|W)(\d+)') def check_placex_names(step, osmtyp, osmid): - """ Check for the exact content of the name hstaore in placex. + """ Check for the exact content of the name hstore in placex. """ cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid))) @@ -43,46 +43,55 @@ def check_placex_content(step, tablename): given columns are tested. If there is more than one line for an OSM object, they must match in these columns. """ - cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) - for line in step.hashes: - osmtype, osmid, cls = world.split_id(line['object']) - q = 'SELECT *' - if tablename == 'placex': - q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" - q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) - if cls is None: - params = (osmtype, osmid) - else: - q = q + ' and class = %s' - params = (osmtype, osmid, cls) - cur.execute(q, params) - assert(cur.rowcount > 0) - for res in cur: - for k,v in line.iteritems(): - if not k == 'object': - assert_in(k, res) - if type(res[k]) is dict: - val = world.make_hash(v) - assert_equals(res[k], val) - elif k in ('parent_place_id', 'linked_place_id'): - pid = world.get_placeid(v) - assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) - elif k == 'centroid': - world.match_geometry((res['clat'], res['clon']), v) - else: - assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + try: + cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + for line in step.hashes: + osmtype, osmid, cls = world.split_id(line['object']) + q = 'SELECT *' + if tablename == 'placex': + q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon" + q = q + ", ST_GeometryType(geometry) as geometrytype" + q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,) + if cls is None: + params = (osmtype, osmid) + else: + q = q + ' and class = %s' + params = (osmtype, osmid, cls) + cur.execute(q, params) + assert(cur.rowcount > 0) + for res in cur: + for k,v in line.iteritems(): + if not k == 'object': + assert_in(k, res) + if type(res[k]) is dict: + val = world.make_hash(v) + assert_equals(res[k], val) + elif k in ('parent_place_id', 'linked_place_id'): + pid = world.get_placeid(v) + assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k])) + elif k == 'centroid': + world.match_geometry((res['clat'], res['clon']), v) + else: + assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v)) + finally: + cur.close() + world.conn.commit() @step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?') def check_placex_missing(step, tablename, osmtyp, osmid, placeclass): cur = world.conn.cursor() - q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) - args = [osmtyp, int(osmid)] - if placeclass is not None: - q = q + ' and class = %s' - args.append(placeclass[1:]) - cur.execute(q, args) - numres = cur.fetchone()[0] - assert_equals (numres, 0) + try: + q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, ) + args = [osmtyp, int(osmid)] + if placeclass is not None: + q = q + ' and class = %s' + args.append(placeclass[1:]) + cur.execute(q, args) + numres = cur.fetchone()[0] + assert_equals (numres, 0) + finally: + cur.close() + world.conn.commit() @step(u'search_name table contains$') def check_search_name_content(step): @@ -110,6 +119,27 @@ def check_search_name_content(step): else: raise Exception("Cannot handle field %s in search_name table" % (k, )) +@step(u'node (\d+) expands to housenumbers') +def check_interpolated_housenumbers(step, nodeid): + """Check that the exact set of housenumbers has been entered in + placex for the given source node. Expected are tow columns: + housenumber and centroid + """ + numbers = {} + for line in step.hashes: + assert line["housenumber"] not in numbers + numbers[line["housenumber"]] = line["centroid"] + cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute("""SELECT DISTINCT housenumber, + ST_X(centroid) as clat, ST_Y(centroid) as clon + FROM placex WHERE osm_type = 'N' and osm_id = %s""", + (int(nodeid),)) + assert_equals(len(numbers), cur.rowcount) + for r in cur: + assert_in(r["housenumber"], numbers) + world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]]) + del numbers[r["housenumber"]] + @step(u'table search_name has no entry for (.*)') def check_placex_missing(step, osmid):