+ elif h.startswith('addr+'):
+ if row[h] == '-':
+ if res['address'] is not None:
+ assert_not_in(h[5:], res['address'])
+ else:
+ assert_in(h[5:], res['address'], "column " + h)
+ assert_equals(res['address'][h[5:]], row[h],
+ "column %s" % h)
+ elif h in ('linked_place_id', 'parent_place_id'):
+ compare_place_id(row[h], res[h], h, context)
+ else:
+ assert_db_column(res, h, row[h], context)
+
+ if exact:
+ cur.execute('SELECT osm_type, osm_id, class from placex')
+ eq_(expected_content, set([(r[0], r[1], r[2]) for r in cur]))
+
+ context.db.commit()
+
+@then("place contains(?P<exact> exactly)?")
+def check_placex_contents(context, exact):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ expected_content = set()
+ for row in context.table:
+ nid = NominatimID(row['object'])
+ where, params = nid.table_select()
+ cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt,
+ ST_GeometryType(geometry) as geometrytype
+ FROM place where %s""" % where,
+ params)
+ assert_less(0, cur.rowcount, "No rows found for " + row['object'])
+
+ for res in cur:
+ if exact:
+ expected_content.add((res['osm_type'], res['osm_id'], res['class']))
+ for h in row.headings:
+ msg = "%s: %s" % (row['object'], h)
+ if h in ('name', 'extratags', 'address'):
+ if row[h] == '-':
+ assert_is_none(res[h], msg)
+ else:
+ vdict = eval('{' + row[h] + '}')
+ assert_equals(vdict, res[h], msg)
+ elif h.startswith('name+'):
+ assert_equals(res['name'][h[5:]], row[h], msg)
+ elif h.startswith('extratags+'):
+ assert_equals(res['extratags'][h[10:]], row[h], msg)
+ elif h.startswith('addr+'):
+ if row[h] == '-':
+ if res['address'] is not None:
+ assert_not_in(h[5:], res['address'])
+ else:
+ assert_equals(res['address'][h[5:]], row[h], msg)
+ elif h in ('linked_place_id', 'parent_place_id'):
+ compare_place_id(row[h], res[h], h, context)
+ else:
+ assert_db_column(res, h, row[h], context)
+
+ if exact:
+ cur.execute('SELECT osm_type, osm_id, class from place')
+ eq_(expected_content, set([(r[0], r[1], r[2]) for r in cur]))
+
+ context.db.commit()
+
+@then("search_name contains")
+def check_search_name_contents(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ pid = NominatimID(row['object']).get_place_id(cur)
+ cur.execute("""SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
+ FROM search_name WHERE place_id = %s""", (pid, ))
+ assert_less(0, cur.rowcount, "No rows found for " + row['object'])
+
+ for res in cur:
+ for h in row.headings:
+ if h in ('name_vector', 'nameaddress_vector'):
+ terms = [x.strip().replace('#', ' ') for x in row[h].split(',')]
+ subcur = context.db.cursor()
+ subcur.execute("""SELECT word_id, word_token
+ FROM word, (SELECT unnest(%s) as term) t
+ WHERE word_token = make_standard_name(t.term)""",
+ (terms,))
+ ok_(subcur.rowcount >= len(terms),
+ "No word entry found for " + row[h])
+ for wid in subcur:
+ assert_in(wid[0], res[h],
+ "Missing term for %s/%s: %s" % (pid, h, wid[1]))