]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/bdd/steps/table_compare.py
Merge remote-tracking branch 'upstream/master'
[nominatim.git] / test / bdd / steps / table_compare.py
index 2e71d943a83cc38587e212c8960cbb8df652f877..4284fad962607796c59560dbda2bceb687375a1e 100644 (file)
@@ -1,9 +1,18 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Functions to facilitate accessing and comparing the content of DB tables.
 """
 import re
 import json
 
+import psycopg
+from psycopg import sql as pysql
+
 from steps.check_functions import Almost
 
 ID_REGEX = re.compile(r"(?P<typ>[NRW])(?P<oid>\d+)(:(?P<cls>\w+))?")
@@ -56,15 +65,18 @@ class NominatimID:
                     ','.join(['*'] + (extra_columns or [])), table)
         cur.execute(query, (pid, ))
 
-    def get_place_id(self, cur):
+    def get_place_id(self, cur, allow_empty=False):
         """ Look up the place id for the ID. Throws an assertion if the ID
             is not unique.
         """
         self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
+        if cur.rowcount == 0 and allow_empty:
+            return None
+
         assert cur.rowcount == 1, \
                "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
 
-        return cur.fetchone()[0]
+        return cur.fetchone()['place_id']
 
 
 class DBRow:
@@ -143,20 +155,26 @@ class DBRow:
 
     def _has_centroid(self, expected):
         if expected == 'in geometry':
-            with self.context.db.cursor() as cur:
-                cur.execute("""SELECT ST_Within(ST_SetSRID(ST_Point({cx}, {cy}), 4326),
-                                        ST_SetSRID('{geomtxt}'::geometry, 4326))""".format(**self.db_row))
+            with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
+                cur.execute("""SELECT ST_Within(ST_SetSRID(ST_Point(%(cx)s, %(cy)s), 4326),
+                                        ST_SetSRID(%(geomtxt)s::geometry, 4326))""",
+                            (self.db_row))
                 return cur.fetchone()[0]
 
-        x, y = expected.split(' ')
+        if ' ' in expected:
+            x, y = expected.split(' ')
+        else:
+            x, y = self.context.osm.grid_node(int(expected))
+
         return Almost(float(x)) == self.db_row['cx'] and Almost(float(y)) == self.db_row['cy']
 
     def _has_geometry(self, expected):
-        geom = self.context.osm.parse_geometry(expected, self.context.scene)
-        with self.context.db.cursor() as cur:
-            cur.execute("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
-                                   ST_SnapToGrid(ST_SetSRID('{}'::geometry, 4326), 0.00001, 0.00001))""".format(
-                            geom, self.db_row['geomtxt']))
+        geom = self.context.osm.parse_geometry(expected)
+        with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
+            cur.execute(pysql.SQL("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
+                                   ST_SnapToGrid(ST_SetSRID({}::geometry, 4326), 0.00001, 0.00001))""")
+                             .format(pysql.SQL(geom),
+                                     pysql.Literal(self.db_row['geomtxt'])))
             return cur.fetchone()[0]
 
     def assert_msg(self, name, value):
@@ -196,7 +214,7 @@ class DBRow:
             if actual == 0:
                 return "place ID 0"
 
-            with self.context.db.cursor() as cur:
+            with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
                 cur.execute("""SELECT osm_type, osm_id, class
                                FROM placex WHERE place_id = %s""",
                             (actual, ))