if self.pid is None:
return "<null>"
+ if self.pid == 0:
+ return "place ID 0"
+
cur = self.conn.cursor()
cur.execute("""SELECT osm_type, osm_id, class
FROM placex WHERE place_id = %s""",
LazyFmt("Bad place id in column %s. Expected: %s, got: %s.",
column, expected, PlaceObjName(result, context.db)))
+def check_database_integrity(context):
+ """ Check some generic constraints on the tables.
+ """
+ # place_addressline should not have duplicate (place_id, address_place_id)
+ cur = context.db.cursor()
+ cur.execute("""SELECT count(*) FROM
+ (SELECT place_id, address_place_id, count(*) as c
+ FROM place_addressline GROUP BY place_id, address_place_id) x
+ WHERE c > 1""")
+ eq_(0, cur.fetchone()[0], "Duplicates found in place_addressline")
+
+
class NominatimID:
""" Splits a unique identifier for places into its components.
As place_ids cannot be used for testing, we use a unique
return
if column.startswith('centroid'):
- fac = float(column[9:]) if column.startswith('centroid*') else 1.0
- x, y = value.split(' ')
- assert_almost_equal(float(x) * fac, row['cx'], "Bad x coordinate")
- assert_almost_equal(float(y) * fac, row['cy'], "Bad y coordinate")
+ if value == 'in geometry':
+ query = """SELECT ST_Within(ST_SetSRID(ST_Point({}, {}), 4326),
+ ST_SetSRID('{}'::geometry, 4326))""".format(
+ row['cx'], row['cy'], row['geomtxt'])
+ cur = context.db.cursor()
+ cur.execute(query)
+ eq_(cur.fetchone()[0], True, "(Row %s failed: %s)" % (column, query))
+ else:
+ fac = float(column[9:]) if column.startswith('centroid*') else 1.0
+ x, y = value.split(' ')
+ assert_almost_equal(float(x) * fac, row['cx'], msg="Bad x coordinate")
+ assert_almost_equal(float(y) * fac, row['cy'], msg="Bad y coordinate")
elif column == 'geometry':
geom = context.osm.parse_geometry(value, context.scene)
cur = context.db.cursor()
and ST_GeometryType(geometry) = 'ST_LineString'""")
context.db.commit()
context.nominatim.run_setup_script('calculate-postcodes', 'index', 'index-noanalyse')
+ check_database_integrity(context)
@when("updating places")
def update_place_table(context):
if cur.rowcount == 0:
break
+ check_database_integrity(context)
+
+@when("updating postcodes")
+def update_postcodes(context):
+ context.nominatim.run_update_script('calculate-postcodes')
+
@when("marking for delete (?P<oids>.*)")
def delete_places(context, oids):
context.nominatim.run_setup_script(
for res in cur:
for h in row.headings:
if h in ('name_vector', 'nameaddress_vector'):
- terms = [x.strip().replace('#', ' ') for x in row[h].split(',')]
+ terms = [x.strip() for x in row[h].split(',') if not x.strip().startswith('#')]
+ words = [x.strip()[1:] for x in row[h].split(',') if x.strip().startswith('#')]
subcur = context.db.cursor()
- subcur.execute("""SELECT word_id, word_token
- FROM word, (SELECT unnest(%s) as term) t
- WHERE word_token = make_standard_name(t.term)""",
- (terms,))
+ subcur.execute(""" SELECT word_id, word_token
+ FROM word, (SELECT unnest(%s::TEXT[]) as term) t
+ WHERE word_token = make_standard_name(t.term)
+ and class is null and country_code is null
+ and operator is null
+ UNION
+ SELECT word_id, word_token
+ FROM word, (SELECT unnest(%s::TEXT[]) as term) t
+ WHERE word_token = ' ' || make_standard_name(t.term)
+ and class is null and country_code is null
+ and operator is null
+ """,
+ (terms, words))
if not exclude:
ok_(subcur.rowcount >= len(terms),
"No word entry found for " + row[h])
context.db.commit()
+@then("location_postcode contains exactly")
+def check_location_postcode(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
+ eq_(cur.rowcount, len(list(context.table)),
+ "Postcode table has %d rows, expected %d rows."
+ % (cur.rowcount, len(list(context.table))))
+
+ table = list(cur)
+ for row in context.table:
+ for i in range(len(table)):
+ if table[i]['country_code'] != row['country'] \
+ or table[i]['postcode'] != row['postcode']:
+ continue
+ for h in row.headings:
+ if h not in ('country', 'postcode'):
+ assert_db_column(table[i], h, row[h], context)
+
+@then("word contains(?P<exclude> not)?")
+def check_word_table(context, exclude):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ wheres = []
+ values = []
+ for h in row.headings:
+ wheres.append("%s = %%s" % h)
+ values.append(row[h])
+ cur.execute("SELECT * from word WHERE %s" % ' AND '.join(wheres), values)
+ if exclude:
+ eq_(0, cur.rowcount,
+ "Row still in word table: %s" % '/'.join(values))
+ else:
+ assert_greater(cur.rowcount, 0,
+ "Row not in word table: %s" % '/'.join(values))
+
@then("place_addressline contains")
def check_place_addressline(context):
cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
context.db.commit()
+@then("place_addressline doesn't contain")
+def check_place_addressline_exclude(context):
+ cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ for row in context.table:
+ pid = NominatimID(row['object']).get_place_id(cur)
+ apid = NominatimID(row['address']).get_place_id(cur)
+ cur.execute(""" SELECT * FROM place_addressline
+ WHERE place_id = %s AND address_place_id = %s""",
+ (pid, apid))
+ eq_(0, cur.rowcount,
+ "Row found for place %s and address %s" % (row['object'], row['address']))
+
+ context.db.commit()
+
@then("(?P<oid>\w+) expands to(?P<neg> no)? interpolation")
def check_location_property_osmline(context, oid, neg):
cur = context.db.cursor(cursor_factory=psycopg2.extras.DictCursor)