]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/bdd/steps/table_compare.py
Merge pull request #3463 from lonvia/sqlalchemy14-with-psycopg
[nominatim.git] / test / bdd / steps / table_compare.py
index 51d1b645db053d4ce008a8a33124831af18d581e..cf2e12f127871390126379fb348f2eb2cfeceec1 100644 (file)
@@ -1,3 +1,9 @@
+# SPDX-License-Identifier: GPL-2.0-only
+#
+# This file is part of Nominatim. (https://nominatim.org)
+#
+# Copyright (C) 2022 by the Nominatim developer community.
+# For a full list of authors see the git log.
 """
 Functions to facilitate accessing and comparing the content of DB tables.
 """
 """
 Functions to facilitate accessing and comparing the content of DB tables.
 """
@@ -56,11 +62,14 @@ class NominatimID:
                     ','.join(['*'] + (extra_columns or [])), table)
         cur.execute(query, (pid, ))
 
                     ','.join(['*'] + (extra_columns or [])), table)
         cur.execute(query, (pid, ))
 
-    def get_place_id(self, cur):
+    def get_place_id(self, cur, allow_empty=False):
         """ Look up the place id for the ID. Throws an assertion if the ID
             is not unique.
         """
         self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
         """ Look up the place id for the ID. Throws an assertion if the ID
             is not unique.
         """
         self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
+        if cur.rowcount == 0 and allow_empty:
+            return None
+
         assert cur.rowcount == 1, \
                "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
 
         assert cur.rowcount == 1, \
                "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
 
@@ -148,11 +157,15 @@ class DBRow:
                                         ST_SetSRID('{geomtxt}'::geometry, 4326))""".format(**self.db_row))
                 return cur.fetchone()[0]
 
                                         ST_SetSRID('{geomtxt}'::geometry, 4326))""".format(**self.db_row))
                 return cur.fetchone()[0]
 
-        x, y = expected.split(' ')
+        if ' ' in expected:
+            x, y = expected.split(' ')
+        else:
+            x, y = self.context.osm.grid_node(int(expected))
+
         return Almost(float(x)) == self.db_row['cx'] and Almost(float(y)) == self.db_row['cy']
 
     def _has_geometry(self, expected):
         return Almost(float(x)) == self.db_row['cx'] and Almost(float(y)) == self.db_row['cy']
 
     def _has_geometry(self, expected):
-        geom = self.context.osm.parse_geometry(expected, self.context.scene)
+        geom = self.context.osm.parse_geometry(expected)
         with self.context.db.cursor() as cur:
             cur.execute("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
                                    ST_SnapToGrid(ST_SetSRID('{}'::geometry, 4326), 0.00001, 0.00001))""".format(
         with self.context.db.cursor() as cur:
             cur.execute("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
                                    ST_SnapToGrid(ST_SetSRID('{}'::geometry, 4326), 0.00001, 0.00001))""".format(
@@ -196,7 +209,7 @@ class DBRow:
             if actual == 0:
                 return "place ID 0"
 
             if actual == 0:
                 return "place ID 0"
 
-            with self.context.db.cursor():
+            with self.context.db.cursor() as cur:
                 cur.execute("""SELECT osm_type, osm_id, class
                                FROM placex WHERE place_id = %s""",
                             (actual, ))
                 cur.execute("""SELECT osm_type, osm_id, class
                                FROM placex WHERE place_id = %s""",
                             (actual, ))