From: Sarah Hoffmann Date: Mon, 4 Jan 2021 15:29:15 +0000 (+0100) Subject: bdd: move geoemtry creation into separate file X-Git-Tag: v3.7.0~59^2~15 X-Git-Url: https://git.openstreetmap.org./nominatim.git/commitdiff_plain/f727620859cacfa39146642b31ad446762212262 bdd: move geoemtry creation into separate file Also renames the OsmDataFactory in the more appropriate GeometryFactory and modernizes code for python3. --- diff --git a/test/bdd/environment.py b/test/bdd/environment.py index d95fea4a..b6f6b986 100644 --- a/test/bdd/environment.py +++ b/test/bdd/environment.py @@ -6,6 +6,8 @@ import psycopg2.extras import subprocess import tempfile +from steps.geometry_factory import GeometryFactory + logger = logging.getLogger(__name__) userconfig = { @@ -209,83 +211,6 @@ class NominatimEnvironment(object): assert (proc.returncode == 0), "Script '%s' failed:\n%s\n%s\n" % (script, outp, outerr) -class OSMDataFactory(object): - - def __init__(self): - scriptpath = os.path.dirname(os.path.abspath(__file__)) - self.scene_path = os.environ.get('SCENE_PATH', - os.path.join(scriptpath, '..', 'scenes', 'data')) - self.scene_cache = {} - self.clear_grid() - - def parse_geometry(self, geom, scene): - if geom.find(':') >= 0: - return "ST_SetSRID(%s, 4326)" % self.get_scene_geometry(scene, geom) - - if geom.find(',') < 0: - out = "POINT(%s)" % self.mk_wkt_point(geom) - elif geom.find('(') < 0: - line = ','.join([self.mk_wkt_point(x) for x in geom.split(',')]) - out = "LINESTRING(%s)" % line - else: - inner = geom.strip('() ') - line = ','.join([self.mk_wkt_point(x) for x in inner.split(',')]) - out = "POLYGON((%s))" % line - - return "ST_SetSRID('%s'::geometry, 4326)" % out - - def mk_wkt_point(self, point): - geom = point.strip() - if geom.find(' ') >= 0: - return geom - else: - pt = self.grid_node(int(geom)) - assert pt is not None, "Bad scenario: Point '{}' not found in grid".format(geom) - return "%f %f" % pt - - def get_scene_geometry(self, default_scene, name): - geoms = [] - for obj in name.split('+'): - oname = obj.strip() - if oname.startswith(':'): - assert default_scene is not None, "Bad scenario: You need to set a scene" - defscene = self.load_scene(default_scene) - wkt = defscene[oname[1:]] - else: - scene, obj = oname.split(':', 2) - scene_geoms = self.load_scene(scene) - wkt = scene_geoms[obj] - - geoms.append("'%s'::geometry" % wkt) - - if len(geoms) == 1: - return geoms[0] - else: - return 'ST_LineMerge(ST_Collect(ARRAY[%s]))' % ','.join(geoms) - - def load_scene(self, name): - if name in self.scene_cache: - return self.scene_cache[name] - - scene = {} - with open(os.path.join(self.scene_path, "%s.wkt" % name), 'r') as fd: - for line in fd: - if line.strip(): - obj, wkt = line.split('|', 2) - scene[obj.strip()] = wkt.strip() - self.scene_cache[name] = scene - - return scene - - def clear_grid(self): - self.grid = {} - - def add_grid_node(self, nodeid, x, y): - self.grid[nodeid] = (x, y) - - def grid_node(self, nodeid): - return self.grid.get(nodeid) - def before_all(context): # logging setup @@ -296,7 +221,7 @@ def before_all(context): logging.debug('User config: %s' %(str(context.config.userdata))) # Nominatim test setup context.nominatim = NominatimEnvironment(context.config.userdata) - context.osm = OSMDataFactory() + context.osm = GeometryFactory() def before_scenario(context, scenario): diff --git a/test/bdd/steps/geometry_factory.py b/test/bdd/steps/geometry_factory.py new file mode 100644 index 00000000..7eedfc37 --- /dev/null +++ b/test/bdd/steps/geometry_factory.py @@ -0,0 +1,113 @@ +from pathlib import Path +import os + +class GeometryFactory: + """ Provides functions to create geometries from scenes and data grids. + """ + + def __init__(self): + defpath = Path(__file__) / '..' / '..' / '..' / 'scenes' / 'data' + self.scene_path = os.environ.get('SCENE_PATH', defpath.resolve()) + self.scene_cache = {} + self.clear_grid() + + def parse_geometry(self, geom, scene): + """ Create a WKT SQL term for the given geometry. + The function understands the following formats: + + []: + Geometry from a scene. If the scene is omitted, use the + default scene. +

+ Point geometry +

,...,

+ Line geometry + (

,...,

) + Polygon geometry + +

may either be a coordinate of the form ' ' or a single + number. In the latter case it must refer to a point in + a previously defined grid. + """ + if geom.find(':') >= 0: + return "ST_SetSRID({}, 4326)".format(self.get_scene_geometry(scene, geom)) + + if geom.find(',') < 0: + out = "POINT({})".format(self.mk_wkt_point(geom)) + elif geom.find('(') < 0: + out = "LINESTRING({})".format(self.mk_wkt_points(geom)) + else: + out = "POLYGON(({}))".format(self.mk_wkt_points(geom.strip('() '))) + + return "ST_SetSRID('{}'::geometry, 4326)".format(out) + + def mk_wkt_point(self, point): + """ Parse a point description. + The point may either consist of 'x y' cooordinates or a number + that refers to a grid setup. + """ + geom = point.strip() + if geom.find(' ') >= 0: + return geom + + try: + pt = self.grid_node(int(geom)) + except ValueError: + assert False, "Scenario error: Point '{}' is not a number".format(geom) + + assert pt is not None, "Scenario error: Point '{}' not found in grid".format(geom) + return "{} {}".format(*pt) + + def mk_wkt_points(self, geom): + """ Parse a list of points. + The list must be a comma-separated list of points. Points + in coordinate and grid format may be mixed. + """ + return ','.join([self.mk_wkt_point(x) for x in geom.split(',')]) + + def get_scene_geometry(self, default_scene, name): + """ Load the geometry from a scene. + """ + geoms = [] + for obj in name.split('+'): + oname = obj.strip() + if oname.startswith(':'): + assert default_scene is not None, "Scenario error: You need to set a scene" + defscene = self.load_scene(default_scene) + wkt = defscene[oname[1:]] + else: + scene, obj = oname.split(':', 2) + scene_geoms = self.load_scene(scene) + wkt = scene_geoms[obj] + + geoms.append("'{}'::geometry".format(wkt)) + + if len(geoms) == 1: + return geoms[0] + + return 'ST_LineMerge(ST_Collect(ARRAY[{}]))'.format(','.join(geoms)) + + def load_scene(self, name): + """ Load a scene from a file. + """ + if name in self.scene_cache: + return self.scene_cache[name] + + scene = {} + with open(Path(self.scene_path) / "{}.wkt".format(name), 'r') as fd: + for line in fd: + if line.strip(): + obj, wkt = line.split('|', 2) + scene[obj.strip()] = wkt.strip() + self.scene_cache[name] = scene + + return scene + + def clear_grid(self): + self.grid = {} + + def add_grid_node(self, nodeid, x, y): + self.grid[nodeid] = (x, y) + + def grid_node(self, nodeid): + return self.grid.get(nodeid)