+@then("place contains(?P<exact> exactly)?")
+def check_placex_contents(context, exact):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ expected_content = set()
+ for row in context.table:
+ nid = NominatimID(row['object'])
+ where, params = nid.table_select()
+ cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt,
+ ST_GeometryType(geometry) as geometrytype
+ FROM place where %s""" % where,
+ params)
+ assert_less(0, cur.rowcount, "No rows found for " + row['object'])
+
+ for res in cur:
+ if exact:
+ expected_content.add((res['osm_type'], res['osm_id'], res['class']))
+ for h in row.headings:
+ msg = "%s: %s" % (row['object'], h)
+ if h in ('name', 'extratags'):
+ if row[h] == '-':
+ assert_is_none(res[h], msg)
+ else:
+ vdict = eval('{' + row[h] + '}')
+ assert_equals(vdict, res[h], msg)
+ elif h.startswith('name+'):
+ assert_equals(res['name'][h[5:]], row[h], msg)
+ elif h.startswith('extratags+'):
+ assert_equals(res['extratags'][h[10:]], row[h], msg)
+ elif h in ('linked_place_id', 'parent_place_id'):
+ if row[h] == '0':
+ assert_equals(0, res[h], msg)
+ elif row[h] == '-':
+ assert_is_none(res[h], msg)
+ else:
+ assert_equals(NominatimID(row[h]).get_place_id(context.db.cursor()),
+ res[h], msg)
+ else:
+ assert_db_column(res, h, row[h], context)
+
+ if exact:
+ cur.execute('SELECT osm_type, osm_id, class from place')
+ eq_(expected_content, set([(r[0], r[1], r[2]) for r in cur]))
+
+ context.db.commit()
+
+@then("search_name contains")
+def check_search_name_contents(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ pid = NominatimID(row['object']).get_place_id(cur)
+ cur.execute("""SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
+ FROM search_name WHERE place_id = %s""", (pid, ))
+ assert_less(0, cur.rowcount, "No rows found for " + row['object'])
+
+ for res in cur:
+ for h in row.headings:
+ if h in ('name_vector', 'nameaddress_vector'):
+ terms = [x.strip().replace('#', ' ') for x in row[h].split(',')]
+ subcur = context.db.cursor()
+ subcur.execute("""SELECT word_id, word_token
+ FROM word, (SELECT unnest(%s) as term) t
+ WHERE word_token = make_standard_name(t.term)""",
+ (terms,))
+ ok_(subcur.rowcount >= len(terms))
+ for wid in subcur:
+ assert_in(wid[0], res[h],
+ "Missing term for %s/%s: %s" % (pid, h, wid[1]))
+ else:
+ assert_db_column(res, h, row[h], context)
+
+
+ context.db.commit()
+
+@then("(?P<oid>\w+) expands to(?P<neg> no)? interpolation")
+def check_location_property_osmline(context, oid, neg):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+ nid = NominatimID(oid)
+
+ eq_('W', nid.typ, "interpolation must be a way")
+
+ cur.execute("""SELECT *, ST_AsText(linegeo) as geomtxt
+ FROM location_property_osmline WHERE osm_id = %s""",
+ (nid.oid, ))
+
+ if neg:
+ eq_(0, cur.rowcount)
+ return
+
+ todo = list(range(len(list(context.table))))
+ for res in cur:
+ for i in todo:
+ row = context.table[i]
+ if (int(row['start']) == res['startnumber']
+ and int(row['end']) == res['endnumber']):
+ todo.remove(i)
+ break
+ else:
+ assert False, "Unexpected row %s" % (str(res))
+
+ for h in row.headings:
+ if h in ('start', 'end'):
+ continue
+ elif h == 'parent_place_id':
+ if row[h] == '0':
+ eq_(0, res[h])
+ elif row[h] == '-':
+ assert_is_none(res[h])
+ else:
+ eq_(NominatimID(row[h]).get_place_id(context.db.cursor()),
+ res[h])
+ else:
+ assert_db_column(res, h, row[h], context)
+
+ eq_(todo, [])
+
+
+@then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
+def check_placex_has_entry(context, table, oid):