+ assert_db_column(res, h, row[h], context)
+
+ if exact:
+ cur.execute('SELECT osm_type, osm_id, class from place')
+ eq_(expected_content, set([(r[0], r[1], r[2]) for r in cur]))
+
+ context.db.commit()
+
+@then("search_name contains(?P<exclude> not)?")
+def check_search_name_contents(context, exclude):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ pid = NominatimID(row['object']).get_place_id(cur)
+ cur.execute("""SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
+ FROM search_name WHERE place_id = %s""", (pid, ))
+ assert_less(0, cur.rowcount, "No rows found for " + row['object'])
+
+ for res in cur:
+ for h in row.headings:
+ if h in ('name_vector', 'nameaddress_vector'):
+ terms = [x.strip().replace('#', ' ') for x in row[h].split(',')]
+ subcur = context.db.cursor()
+ subcur.execute("""SELECT word_id, word_token
+ FROM word, (SELECT unnest(%s) as term) t
+ WHERE word_token = make_standard_name(t.term)""",
+ (terms,))
+ if not exclude:
+ ok_(subcur.rowcount >= len(terms),
+ "No word entry found for " + row[h])
+ for wid in subcur:
+ if exclude:
+ assert_not_in(wid[0], res[h],
+ "Found term for %s/%s: %s" % (pid, h, wid[1]))
+ else:
+ assert_in(wid[0], res[h],
+ "Missing term for %s/%s: %s" % (pid, h, wid[1]))
+ else:
+ assert_db_column(res, h, row[h], context)
+
+
+ context.db.commit()
+
+@then("location_postcode contains exactly")
+def check_location_postcode(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
+ eq_(cur.rowcount, len(list(context.table)),
+ "Postcode table has %d rows, expected %d rows."
+ % (cur.rowcount, len(list(context.table))))
+
+ table = list(cur)
+ for row in context.table:
+ for i in range(len(table)):
+ if table[i]['country_code'] != row['country'] \
+ or table[i]['postcode'] != row['postcode']:
+ continue
+ for h in row.headings:
+ if h not in ('country', 'postcode'):
+ assert_db_column(table[i], h, row[h], context)
+
+@then("word contains(?P<exclude> not)?")
+def check_word_table(context, exclude):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ wheres = []
+ values = []
+ for h in row.headings:
+ wheres.append("%s = %%s" % h)
+ values.append(row[h])
+ cur.execute("SELECT * from word WHERE %s" % ' AND '.join(wheres), values)
+ if exclude:
+ eq_(0, cur.rowcount,
+ "Row still in word table: %s" % '/'.join(values))
+ else:
+ assert_greater(cur.rowcount, 0,
+ "Row not in word table: %s" % '/'.join(values))
+
+@then("place_addressline contains")
+def check_place_addressline(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ pid = NominatimID(row['object']).get_place_id(cur)
+ apid = NominatimID(row['address']).get_place_id(cur)
+ cur.execute(""" SELECT * FROM place_addressline
+ WHERE place_id = %s AND address_place_id = %s""",
+ (pid, apid))
+ assert_less(0, cur.rowcount,
+ "No rows found for place %s and address %s"
+ % (row['object'], row['address']))
+
+ for res in cur:
+ for h in row.headings:
+ if h not in ('address', 'object'):
+ assert_db_column(res, h, row[h], context)
+
+ context.db.commit()
+
+@then("place_addressline doesn't contain")
+def check_place_addressline_exclude(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ pid = NominatimID(row['object']).get_place_id(cur)
+ apid = NominatimID(row['address']).get_place_id(cur)
+ cur.execute(""" SELECT * FROM place_addressline
+ WHERE place_id = %s AND address_place_id = %s""",
+ (pid, apid))
+ eq_(0, cur.rowcount,
+ "Row found for place %s and address %s" % (row['object'], row['address']))
+
+ context.db.commit()
+
+@then("(?P<oid>\w+) expands to(?P<neg> no)? interpolation")
+def check_location_property_osmline(context, oid, neg):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+ nid = NominatimID(oid)
+
+ eq_('W', nid.typ, "interpolation must be a way")
+
+ cur.execute("""SELECT *, ST_AsText(linegeo) as geomtxt
+ FROM location_property_osmline
+ WHERE osm_id = %s AND startnumber IS NOT NULL""",
+ (nid.oid, ))
+
+ if neg:
+ eq_(0, cur.rowcount)
+ return
+
+ todo = list(range(len(list(context.table))))
+ for res in cur:
+ for i in todo:
+ row = context.table[i]
+ if (int(row['start']) == res['startnumber']
+ and int(row['end']) == res['endnumber']):
+ todo.remove(i)
+ break
+ else:
+ assert False, "Unexpected row %s" % (str(res))
+
+ for h in row.headings:
+ if h in ('start', 'end'):
+ continue
+ elif h == 'parent_place_id':
+ compare_place_id(row[h], res[h], h, context)
+ else:
+ assert_db_column(res, h, row[h], context)
+
+ eq_(todo, [])
+
+
+@then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
+def check_placex_has_entry(context, table, oid):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+ nid = NominatimID(oid)
+ where, params = nid.table_select()
+ cur.execute("SELECT * FROM %s where %s" % (table, where), params)
+ eq_(0, cur.rowcount)
+ context.db.commit()
+
+@then("search_name has no entry for (?P<oid>.*)")
+def check_search_name_has_entry(context, oid):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+ pid = NominatimID(oid).get_place_id(cur)
+ cur.execute("SELECT * FROM search_name WHERE place_id = %s", (pid, ))
+ eq_(0, cur.rowcount)