1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Functions to facilitate accessing and comparing the content of DB tables.
14 from psycopg import sql as pysql
16 from steps.check_functions import Almost
18 ID_REGEX = re.compile(r"(?P<typ>[NRW])(?P<oid>\d+)(:(?P<cls>\w+))?")
21 """ Splits a unique identifier for places into its components.
22 As place_ids cannot be used for testing, we use a unique
23 identifier instead that is of the form <osmtype><osmid>[:<class>].
26 def __init__(self, oid):
27 self.typ = self.oid = self.cls = None
30 m = ID_REGEX.fullmatch(oid)
31 assert m is not None, \
32 "ID '{}' not of form <osmtype><osmid>[:<class>]".format(oid)
34 self.typ = m.group('typ')
35 self.oid = m.group('oid')
36 self.cls = m.group('cls')
40 return self.typ + self.oid
42 return '{self.typ}{self.oid}:{self.cls}'.format(self=self)
44 def query_osm_id(self, cur, query):
45 """ Run a query on cursor `cur` using osm ID, type and class. The
46 `query` string must contain exactly one placeholder '{}' where
47 the 'where' query should go.
49 where = 'osm_type = %s and osm_id = %s'
50 params = [self.typ, self. oid]
52 if self.cls is not None:
53 where += ' and class = %s'
54 params.append(self.cls)
56 cur.execute(query.format(where), params)
58 def row_by_place_id(self, cur, table, extra_columns=None):
59 """ Get a row by place_id from the given table using cursor `cur`.
60 extra_columns may contain a list additional elements for the select
63 pid = self.get_place_id(cur)
64 query = "SELECT {} FROM {} WHERE place_id = %s".format(
65 ','.join(['*'] + (extra_columns or [])), table)
66 cur.execute(query, (pid, ))
68 def get_place_id(self, cur, allow_empty=False):
69 """ Look up the place id for the ID. Throws an assertion if the ID
72 self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
73 if cur.rowcount == 0 and allow_empty:
76 assert cur.rowcount == 1, \
77 "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
79 return cur.fetchone()['place_id']
83 """ Represents a row from a database and offers comparison functions.
85 def __init__(self, nid, db_row, context):
88 self.context = context
90 def assert_row(self, row, exclude_columns):
91 """ Check that all columns of the given behave row are contained
92 in the database row. Exclude behave rows with the names given
93 in the `exclude_columns` list.
95 for name, value in zip(row.headings, row.cells):
96 if name not in exclude_columns:
97 assert self.contains(name, value), self.assert_msg(name, value)
99 def contains(self, name, expected):
100 """ Check that the DB row contains a column `name` with the given value.
103 column, field = name.split('+', 1)
104 return self._contains_hstore_value(column, field, expected)
106 if name == 'geometry':
107 return self._has_geometry(expected)
109 if name not in self.db_row:
112 actual = self.db_row[name]
115 return actual is None
117 if name == 'name' and ':' not in expected:
118 return self._compare_column(actual[name], expected)
120 if 'place_id' in name:
121 return self._compare_place_id(actual, expected)
123 if name == 'centroid':
124 return self._has_centroid(expected)
126 return self._compare_column(actual, expected)
128 def _contains_hstore_value(self, column, field, expected):
132 if column not in self.db_row:
136 return self.db_row[column] is None or field not in self.db_row[column]
138 if self.db_row[column] is None:
141 return self._compare_column(self.db_row[column].get(field), expected)
143 def _compare_column(self, actual, expected):
144 if isinstance(actual, dict):
145 return actual == eval('{' + expected + '}')
147 return str(actual) == expected
149 def _compare_place_id(self, actual, expected):
153 with self.context.db.cursor() as cur:
154 return NominatimID(expected).get_place_id(cur) == actual
156 def _has_centroid(self, expected):
157 if expected == 'in geometry':
158 with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
159 cur.execute("""SELECT ST_Within(ST_SetSRID(ST_Point(%(cx)s, %(cy)s), 4326),
160 ST_SetSRID(%(geomtxt)s::geometry, 4326))""",
162 return cur.fetchone()[0]
165 x, y = expected.split(' ')
167 x, y = self.context.osm.grid_node(int(expected))
169 return Almost(float(x)) == self.db_row['cx'] and Almost(float(y)) == self.db_row['cy']
171 def _has_geometry(self, expected):
172 geom = self.context.osm.parse_geometry(expected)
173 with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
174 cur.execute(pysql.SQL("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
175 ST_SnapToGrid(ST_SetSRID({}::geometry, 4326), 0.00001, 0.00001))""")
176 .format(pysql.SQL(geom),
177 pysql.Literal(self.db_row['geomtxt'])))
178 return cur.fetchone()[0]
180 def assert_msg(self, name, value):
181 """ Return a string with an informative message for a failed compare.
183 msg = "\nBad column '{}' in row '{!s}'.".format(name, self.nid)
184 actual = self._get_actual(name)
185 if actual is not None:
186 msg += " Expected: {}, got: {}.".format(value, actual)
188 msg += " No such column."
190 return msg + "\nFull DB row: {}".format(json.dumps(dict(self.db_row), indent=4, default=str))
192 def _get_actual(self, name):
194 column, field = name.split('+', 1)
197 return (self.db_row.get(column) or {}).get(field)
199 if name == 'geometry':
200 return self.db_row['geomtxt']
202 if name not in self.db_row:
205 if name == 'centroid':
206 return "POINT({cx} {cy})".format(**self.db_row)
208 actual = self.db_row[name]
210 if 'place_id' in name:
217 with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
218 cur.execute("""SELECT osm_type, osm_id, class
219 FROM placex WHERE place_id = %s""",
222 if cur.rowcount == 1:
223 return "{0[0]}{0[1]}:{0[2]}".format(cur.fetchone())
225 return "[place ID {} not found]".format(actual)