]> git.openstreetmap.org Git - nominatim.git/commitdiff
bdd: move nominitim id reader to separate file
authorSarah Hoffmann <lonvia@denofr.de>
Tue, 5 Jan 2021 15:00:48 +0000 (16:00 +0100)
committerSarah Hoffmann <lonvia@denofr.de>
Tue, 5 Jan 2021 15:00:48 +0000 (16:00 +0100)
test/bdd/steps/steps_db_ops.py
test/bdd/steps/table_compare.py [new file with mode: 0644]

index 6bee88c12cd265863e445f3c7d920d82e3d079ac..0df9fec2c050e95ff6a654ce1b2a6014e7a8c1fd 100644 (file)
@@ -3,6 +3,7 @@ import psycopg2.extras
 
 from check_functions import Almost
 from place_inserter import PlaceColumn
 
 from check_functions import Almost
 from place_inserter import PlaceColumn
+from table_compare import NominatimID
 
 class PlaceObjName(object):
 
 
 class PlaceObjName(object):
 
@@ -51,53 +52,6 @@ def check_database_integrity(context):
     assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
 
 
     assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
 
 
-class NominatimID:
-    """ Splits a unique identifier for places into its components.
-        As place_ids cannot be used for testing, we use a unique
-        identifier instead that is of the form <osmtype><osmid>[:<class>].
-    """
-
-    id_regex = re.compile(r"(?P<tp>[NRW])(?P<id>\d+)(:(?P<cls>\w+))?")
-
-    def __init__(self, oid):
-        self.typ = self.oid = self.cls = None
-
-        if oid is not None:
-            m = self.id_regex.fullmatch(oid)
-            assert m is not None, "ID '%s' not of form <osmtype><osmid>[:<class>]" % oid
-
-            self.typ = m.group('tp')
-            self.oid = m.group('id')
-            self.cls = m.group('cls')
-
-    def __str__(self):
-        if self.cls is None:
-            return self.typ + self.oid
-
-        return '%s%d:%s' % (self.typ, self.oid, self.cls)
-
-    def table_select(self):
-        """ Return where clause and parameter list to select the object
-            from a Nominatim table.
-        """
-        where = 'osm_type = %s and osm_id = %s'
-        params = [self.typ, self. oid]
-
-        if self.cls is not None:
-            where += ' and class = %s'
-            params.append(self.cls)
-
-        return where, params
-
-    def get_place_id(self, cur):
-        where, params = self.table_select()
-        cur.execute("SELECT place_id FROM placex WHERE %s" % where, params)
-        assert cur.rowcount == 1, \
-            "Expected exactly 1 entry in placex for %s found %s" % (str(self), cur.rowcount)
-
-        return cur.fetchone()[0]
-
-
 def assert_db_column(row, column, value, context):
     if column == 'object':
         return
 def assert_db_column(row, column, value, context):
     if column == 'object':
         return
@@ -218,8 +172,7 @@ def delete_places(context, oids):
         'create-functions', 'create-partition-functions', 'enable-diff-updates')
     with context.db.cursor() as cur:
         for oid in oids.split(','):
         'create-functions', 'create-partition-functions', 'enable-diff-updates')
     with context.db.cursor() as cur:
         for oid in oids.split(','):
-            where, params = NominatimID(oid).table_select()
-            cur.execute("DELETE FROM place WHERE " + where, params)
+            NominatimID(oid).query_osm_id(cur, 'DELETE FROM place WHERE {}')
 
     context.nominatim.reindex_placex(context.db)
 
 
     context.nominatim.reindex_placex(context.db)
 
@@ -230,12 +183,10 @@ def check_placex_contents(context, exact):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         expected_content = set()
         for row in context.table:
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         expected_content = set()
         for row in context.table:
-            nid = NominatimID(row['object'])
-            where, params = nid.table_select()
-            cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt,
-                           ST_X(centroid) as cx, ST_Y(centroid) as cy
-                           FROM placex where %s""" % where,
-                        params)
+            NominatimID(row['object']).query_osm_id(cur,
+                """SELECT *, ST_AsText(geometry) as geomtxt,
+                          ST_X(centroid) as cx, ST_Y(centroid) as cy
+                   FROM placex WHERE {}""")
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
@@ -275,12 +226,10 @@ def check_placex_contents(context, exact):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         expected_content = set()
         for row in context.table:
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         expected_content = set()
         for row in context.table:
-            nid = NominatimID(row['object'])
-            where, params = nid.table_select()
-            cur.execute("""SELECT *, ST_AsText(geometry) as geomtxt,
-                           ST_GeometryType(geometry) as geometrytype
-                           FROM place where %s""" % where,
-                        params)
+            NominatimID(row['object']).query_osm_id(cur,
+                """SELECT *, ST_AsText(geometry) as geomtxt,
+                          ST_GeometryType(geometry) as geometrytype
+                   FROM place WHERE {}""")
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
@@ -317,9 +266,9 @@ def check_placex_contents(context, exact):
 def check_search_name_contents(context, exclude):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         for row in context.table:
 def check_search_name_contents(context, exclude):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
         for row in context.table:
-            pid = NominatimID(row['object']).get_place_id(cur)
-            cur.execute("""SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
-                           FROM search_name WHERE place_id = %s""", (pid, ))
+            NominatimID(row['object']).query_place_id(cur,
+                """SELECT *, ST_X(centroid) as cx, ST_Y(centroid) as cy
+                     FROM search_name WHERE place_id = %s""")
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
             assert cur.rowcount > 0, "No rows found for " + row['object']
 
             for res in cur:
@@ -346,9 +295,9 @@ def check_search_name_contents(context, exclude):
                                     "No word entry found for " + row[h] + ". Entries found: " + str(subcur.rowcount)
                             for wid in subcur:
                                 if exclude:
                                     "No word entry found for " + row[h] + ". Entries found: " + str(subcur.rowcount)
                             for wid in subcur:
                                 if exclude:
-                                    assert wid[0] not in res[h], "Found term for %s/%s: %s" % (pid, h, wid[1])
+                                    assert wid[0] not in res[h], "Found term for %s/%s: %s" % (row['object'], h, wid[1])
                                 else:
                                 else:
-                                    assert wid[0] in res[h], "Missing term for %s/%s: %s" % (pid, h, wid[1])
+                                    assert wid[0] in res[h], "Missing term for %s/%s: %s" % (row['object'], h, wid[1])
                     else:
                         assert_db_column(res, h, row[h], context)
 
                     else:
                         assert_db_column(res, h, row[h], context)
 
@@ -454,14 +403,14 @@ def check_location_property_osmline(context, oid, neg):
 @then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
 def check_placex_has_entry(context, table, oid):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
 @then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
 def check_placex_has_entry(context, table, oid):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
-        nid = NominatimID(oid)
-        where, params = nid.table_select()
-        cur.execute("SELECT * FROM %s where %s" % (table, where), params)
-        assert cur.rowcount == 0
+        NominatimID(oid).query_osm_id(cur, "SELECT * FROM %s where {}" % table)
+        assert cur.rowcount == 0, \
+               "Found {} entries for ID {}".format(cur.rowcount, oid)
 
 @then("search_name has no entry for (?P<oid>.*)")
 def check_search_name_has_entry(context, oid):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
 
 @then("search_name has no entry for (?P<oid>.*)")
 def check_search_name_has_entry(context, oid):
     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
-        pid = NominatimID(oid).get_place_id(cur)
-        cur.execute("SELECT * FROM search_name WHERE place_id = %s", (pid, ))
-        assert cur.rowcount == 0
+        NominatimID(oid).query_place_id(cur,
+            "SELECT * FROM search_name WHERE place_id = %s")
+        assert cur.rowcount == 0, \
+               "Found {} entries for ID {}".format(cur.rowcount, oid)
diff --git a/test/bdd/steps/table_compare.py b/test/bdd/steps/table_compare.py
new file mode 100644 (file)
index 0000000..dfc261d
--- /dev/null
@@ -0,0 +1,62 @@
+"""
+Functions to facilitate accessing and comparing the content of DB tables.
+"""
+import re
+
+ID_REGEX = re.compile(r"(?P<typ>[NRW])(?P<oid>\d+)(:(?P<cls>\w+))?")
+
+class NominatimID:
+    """ Splits a unique identifier for places into its components.
+        As place_ids cannot be used for testing, we use a unique
+        identifier instead that is of the form <osmtype><osmid>[:<class>].
+    """
+
+    def __init__(self, oid):
+        self.typ = self.oid = self.cls = None
+
+        if oid is not None:
+            m = ID_REGEX.fullmatch(oid)
+            assert m is not None, \
+                   "ID '{}' not of form <osmtype><osmid>[:<class>]".format(oid)
+
+            self.typ = m.group('typ')
+            self.oid = m.group('oid')
+            self.cls = m.group('cls')
+
+    def __str__(self):
+        if self.cls is None:
+            return self.typ + self.oid
+
+        return '{self.typ}{self.oid}:{self.cls}'.format(self=self)
+
+    def query_osm_id(self, cur, query):
+        """ Run a query on cursor `cur` using osm ID, type and class. The
+            `query` string must contain exactly one placeholder '{}' where
+            the 'where' query should go.
+        """
+        where = 'osm_type = %s and osm_id = %s'
+        params = [self.typ, self. oid]
+
+        if self.cls is not None:
+            where += ' and class = %s'
+            params.append(self.cls)
+
+        return cur.execute(query.format(where), params)
+
+    def query_place_id(self, cur, query):
+        """ Run a query on cursor `cur` using the place ID. The `query` string
+            must contain exactly one placeholder '%s' where the 'where' query
+            should go.
+        """
+        pid = self.get_place_id(cur)
+        return cur.execute(query, (pid, ))
+
+    def get_place_id(self, cur):
+        """ Look up the place id for the ID. Throws an assertion if the ID
+            is not unique.
+        """
+        self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
+        assert cur.rowcount == 1, \
+               "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
+
+        return cur.fetchone()[0]