]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/bdd/steps/steps_db_ops.py
bdd: move nominitim id reader to separate file
[nominatim.git] / test / bdd / steps / steps_db_ops.py
index 6bee88c12cd265863e445f3c7d920d82e3d079ac..0df9fec2c050e95ff6a654ce1b2a6014e7a8c1fd 100644 (file)
@@ -3,6 +3,7 @@ import psycopg2.extras
 
 from check_functions import Almost
 from place_inserter import PlaceColumn
+from table_compare import NominatimID
 
 class PlaceObjName(object):
 
@@ -51,53 +52,6 @@ def check_database_integrity(context):
     assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
 
 
-class NominatimID:
-    """ Splits a unique identifier for places into its components.
-        As place_ids cannot be used for testing, we use a unique
-        identifier instead that is of the form <osmtype><osmid>[:<class>].
-    """
-
-    id_regex = re.compile(r"(?P<tp>[NRW])(?P<id>\d+)(:(?P<cls>\w+))?")
-
-    def __init__(self, oid):
-        self.typ = self.oid = self.cls = None
-
-        if oid is not None:
-            m = self.id_regex.fullmatch(oid)
-            assert m is not None, "ID '%s' not of form <osmtype><osmid>[:<class>]" % oid
-
-            self.typ = m.group('tp')
-            self.oid = m.group('id')
-            self.cls = m.group('cls')
-
-    def __str__(self):
-        if self.cls is None:
-            return self.typ + self.oid
-
-        return '%s%d:%s' % (self.typ, self.oid, self.cls)
-
-    def table_select(self):
-        """ Return where clause and parameter list to select the object
-            from a Nominatim table.
-        """
-        where = 'osm_type = %s and osm_id = %s'
-        params = [self.typ, self. oid]
-
-        if self.cls is not None:
-            where += ' and class = %s'
-            params.append(self.cls)
-
-        return where, params
-
-    def get_place_id(self, cur):
-        where, params = self.table_select()
-        cur.execute("SELECT place_id FROM placex WHERE %s" % where, params)
-        assert cur.rowcount == 1, \
-            "Expected exactly 1 entry in placex for %s found %s" % (str(self), cur.rowcount)
-
-        return cur.fetchone()[0]
-
-
 def assert_db_column(row, column, value, context):
     if column == 'object':
         return
@@ -218,8 +172,7 @@ def delete_places(context, oids):
         'create-functions', 'create-partition-functions', 'enable-diff-updates')
     with context.db.cursor() as cur:
         for oid in oids.split(','):
-            where, params = NominatimID(oid).table_select()
-            cur.execute("DELETE FROM place WHERE " + where, params)
+            NominatimID(oid).query_osm_id(cur, 'DELETE FROM place WHERE {}')
 
     context.nominatim.reindex_placex(context.db)
 
@@ -230,12 +183,10 @@ def check_placex_contents(context, exact):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         expected_content = set()
         for row in context.table:
-            nid = NominatimID(row['object'])
-            where, params = nid.table_select()
-            cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt,
-                           ST_X(centroid) as cx, ST_Y(centroid) as cy
-                           FROM placex where %s""" % where,
-                        params)
+            NominatimID(row['object']).query_osm_id(cur,
+                """SELECT *, ST_AsText(geometry) as geomtxt,
+                          ST_X(centroid) as cx, ST_Y(centroid) as cy
+                   FROM placex WHERE {}""")
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
@@ -275,12 +226,10 @@ def check_placex_contents(context, exact):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         expected_content = set()
         for row in context.table:
-            nid = NominatimID(row['object'])
-            where, params = nid.table_select()
-            cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt,
-                           ST_GeometryType(geometry) as geometrytype
-                           FROM place where %s""" % where,
-                        params)
+            NominatimID(row['object']).query_osm_id(cur,
+                """SELECT *, ST_AsText(geometry) as geomtxt,
+                          ST_GeometryType(geometry) as geometrytype
+                   FROM place WHERE {}""")
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
@@ -317,9 +266,9 @@ def check_placex_contents(context, exact):
 def check_search_name_contents(context, exclude):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         for row in context.table:
-            pid = NominatimID(row['object']).get_place_id(cur)
-            cur.execute("""SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
-                           FROM search_name WHERE place_id = %s""", (pid, ))
+            NominatimID(row['object']).query_place_id(cur,
+                """SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
+                     FROM search_name WHERE place_id = %s""")
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
@@ -346,9 +295,9 @@ def check_search_name_contents(context, exclude):
                                     "No word entry found for " + row[h] + ". Entries found: " + str(subcur.rowcount)
                             for wid in subcur:
                                 if exclude:
-                                    assert wid[0] not in res[h], "Found term for %s/%s: %s" % (pid, h, wid[1])
+                                    assert wid[0] not in res[h], "Found term for %s/%s: %s" % (row['object'], h, wid[1])
                                 else:
-                                    assert wid[0] in res[h], "Missing term for %s/%s: %s" % (pid, h, wid[1])
+                                    assert wid[0] in res[h], "Missing term for %s/%s: %s" % (row['object'], h, wid[1])
                     else:
                         assert_db_column(res, h, row[h], context)
 
@@ -454,14 +403,14 @@ def check_location_property_osmline(context, oid, neg):
 @then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
 def check_placex_has_entry(context, table, oid):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
-        nid = NominatimID(oid)
-        where, params = nid.table_select()
-        cur.execute("SELECT * FROM %s where %s" % (table, where), params)
-        assert cur.rowcount == 0
+        NominatimID(oid).query_osm_id(cur, "SELECT * FROM %s where {}" % table)
+        assert cur.rowcount == 0, \
+               "Found {} entries for ID {}".format(cur.rowcount, oid)
 
 @then("search_name has no entry for (?P<oid>.*)")
 def check_search_name_has_entry(context, oid):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
-        pid = NominatimID(oid).get_place_id(cur)
-        cur.execute("SELECT * FROM search_name WHERE place_id = %s", (pid, ))
-        assert cur.rowcount == 0
+        NominatimID(oid).query_place_id(cur,
+            "SELECT * FROM search_name WHERE place_id = %s")
+        assert cur.rowcount == 0, \
+               "Found {} entries for ID {}".format(cur.rowcount, oid)