r |= r.pop('geocoding')
- def assert_subfield(self, idx, path, value):
- assert path
-
- field = self.result[idx]
- for p in path:
- assert isinstance(field, dict)
- assert p in field
- field = field[p]
-
- if isinstance(value, float):
- assert Almost(value) == float(field)
- elif value.startswith("^"):
- assert re.fullmatch(value, field)
- elif isinstance(field, dict):
- assert field, eval('{' + value + '}')
- else:
- assert str(field) == str(value)
-
-
def assert_address_field(self, idx, field, value):
""" Check that result rows`idx` has a field `field` with value `value`
in its address. If idx is None, then all results are checked.
self.check_row_field(idx, field, value, base=address)
- def match_row(self, row, context=None):
+ def match_row(self, row, context=None, field=None):
""" Match the result fields against the given behave table row.
"""
if 'ID' in row.headings:
todo = range(len(self.result))
for i in todo:
+ subdict = self.result[i]
+ if field is not None:
+ for key in field.split('.'):
+ self.check_row(i, key in subdict, f"Missing subfield {key}")
+ subdict = subdict[key]
+ self.check_row(i, isinstance(subdict, dict),
+ f"Subfield {key} not a dict")
+
for name, value in zip(row.headings, row.cells):
if name == 'ID':
pass
elif name == 'osm':
- self.check_row_field(i, 'osm_type', OsmType(value[0]))
- self.check_row_field(i, 'osm_id', Field(value[1:]))
+ self.check_row_field(i, 'osm_type', OsmType(value[0]), base=subdict)
+ self.check_row_field(i, 'osm_id', Field(value[1:]), base=subdict)
elif name == 'centroid':
if ' ' in value:
lon, lat = value.split(' ')
lon, lat = context.osm.grid_node(int(value))
else:
raise RuntimeError("Context needed when using grid coordinates")
- self.check_row_field(i, 'lat', Field(float(lat)))
- self.check_row_field(i, 'lon', Field(float(lon)))
- elif '+' in name:
- self.assert_subfield(i, name.split('+'), value)
+ self.check_row_field(i, 'lat', Field(float(lat)), base=subdict)
+ self.check_row_field(i, 'lon', Field(float(lon)), base=subdict)
else:
- self.check_row_field(i, name, Field(value))
+ self.check_row_field(i, name, Field(value), base=subdict)
def check_row(self, idx, check, msg):