import psycopg2.extensions
import psycopg2.extras
import os
-import subprocess
import random
import json
import re
@step(u'table placex contains as names for (N|R|W)(\d+)')
def check_placex_names(step, osmtyp, osmid):
- """ Check for the exact content of the name hstaore in placex.
+ """ Check for the exact content of the name hstore in placex.
"""
cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('SELECT name FROM placex where osm_type = %s and osm_id =%s', (osmtyp, int(osmid)))
given columns are tested. If there is more than one
line for an OSM object, they must match in these columns.
"""
- cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- for line in step.hashes:
- osmtype, osmid, cls = world.split_id(line['object'])
- q = 'SELECT *'
- if tablename == 'placex':
- q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
- q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
- if cls is None:
- params = (osmtype, osmid)
- else:
- q = q + ' and class = %s'
- params = (osmtype, osmid, cls)
- cur.execute(q, params)
- assert(cur.rowcount > 0)
- for res in cur:
- for k,v in line.iteritems():
- if not k == 'object':
- assert_in(k, res)
- if type(res[k]) is dict:
- val = world.make_hash(v)
- assert_equals(res[k], val)
- elif k in ('parent_place_id', 'linked_place_id'):
- pid = world.get_placeid(v)
- assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
- elif k == 'centroid':
- world.match_geometry((res['clat'], res['clon']), v)
- else:
- assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+ try:
+ cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+ for line in step.hashes:
+ osmtype, osmid, cls = world.split_id(line['object'])
+ q = 'SELECT *'
+ if tablename == 'placex':
+ q = q + ", ST_X(centroid) as clat, ST_Y(centroid) as clon"
+ q = q + ", ST_GeometryType(geometry) as geometrytype"
+ q = q + ' FROM %s where osm_type = %%s and osm_id = %%s' % (tablename,)
+ if cls is None:
+ params = (osmtype, osmid)
+ else:
+ q = q + ' and class = %s'
+ params = (osmtype, osmid, cls)
+ cur.execute(q, params)
+ assert(cur.rowcount > 0)
+ for res in cur:
+ for k,v in line.iteritems():
+ if not k == 'object':
+ assert_in(k, res)
+ if type(res[k]) is dict:
+ val = world.make_hash(v)
+ assert_equals(res[k], val)
+ elif k in ('parent_place_id', 'linked_place_id'):
+ pid = world.get_placeid(v)
+ assert_equals(pid, res[k], "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, pid, res[k]))
+ elif k == 'centroid':
+ world.match_geometry((res['clat'], res['clon']), v)
+ else:
+ assert_equals(str(res[k]), v, "Results for '%s'/'%s' differ: '%s' != '%s'" % (line['object'], k, str(res[k]), v))
+ finally:
+ cur.close()
+ world.conn.commit()
@step(u'table (placex?) has no entry for (N|R|W)(\d+)(:\w+)?')
def check_placex_missing(step, tablename, osmtyp, osmid, placeclass):
cur = world.conn.cursor()
- q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
- args = [osmtyp, int(osmid)]
- if placeclass is not None:
- q = q + ' and class = %s'
- args.append(placeclass[1:])
- cur.execute(q, args)
- numres = cur.fetchone()[0]
- assert_equals (numres, 0)
+ try:
+ q = 'SELECT count(*) FROM %s where osm_type = %%s and osm_id = %%s' % (tablename, )
+ args = [osmtyp, int(osmid)]
+ if placeclass is not None:
+ q = q + ' and class = %s'
+ args.append(placeclass[1:])
+ cur.execute(q, args)
+ numres = cur.fetchone()[0]
+ assert_equals (numres, 0)
+ finally:
+ cur.close()
+ world.conn.commit()
@step(u'search_name table contains$')
def check_search_name_content(step):
else:
raise Exception("Cannot handle field %s in search_name table" % (k, ))
+@step(u'way (\d+) expands to housenumbers')
+def check_interpolated_housenumbers(step, nodeid):
+ """Check that the exact set of housenumbers has been entered in
+ placex for the given source node. Expected are two columns:
+ housenumber and centroid
+ """
+ numbers = {}
+ for line in step.hashes:
+ assert line["housenumber"] not in numbers
+ numbers[line["housenumber"]] = line["centroid"]
+ cur = world.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+ cur.execute("""SELECT DISTINCT housenumber,
+ ST_X(centroid) as clat, ST_Y(centroid) as clon
+ FROM placex WHERE osm_type = 'W' and osm_id = %s
+ and class = 'place' and type = 'address'""",
+ (int(nodeid),))
+ assert_equals(len(numbers), cur.rowcount)
+ for r in cur:
+ assert_in(r["housenumber"], numbers)
+ world.match_geometry((r['clat'], r['clon']), numbers[r["housenumber"]])
+ del numbers[r["housenumber"]]
+
+@step(u'way (\d+) expands exactly to housenumbers ([0-9,]*)')
+def check_interpolated_housenumber_list(step, nodeid, numberlist):
+ """ Checks that the interpolated house numbers corresponds
+ to the given list.
+ """
+ expected = numberlist.split(',');
+ cur = world.conn.cursor()
+ cur.execute("""SELECT housenumber FROM placex
+ WHERE osm_type = 'W' and osm_id = %s
+ and class = 'place' and type = 'address'""", (int(nodeid),))
+ for r in cur:
+ assert_in(r[0], expected, "Unexpected house number %s for node %s." % (r[0], nodeid))
+ expected.remove(r[0])
+ assert_equals(0, len(expected), "Missing house numbers for way %s: %s" % (nodeid, expected))
+
+@step(u'way (\d+) expands to no housenumbers')
+def check_no_interpolated_housenumber_list(step, nodeid):
+ """ Checks that the interpolated house numbers corresponds
+ to the given list.
+ """
+ cur = world.conn.cursor()
+ cur.execute("""SELECT housenumber FROM placex
+ WHERE osm_type = 'W' and osm_id = %s
+ and class = 'place' and type = 'address'""", (int(nodeid),))
+ res = [r[0] for r in cur]
+ assert_equals(0, len(res), "Unexpected house numbers for way %s: %s" % (nodeid, res))
@step(u'table search_name has no entry for (.*)')
def check_placex_missing(step, osmid):