]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/table_compare.py
f0d27ba595494df92938263c146c82d584086c50
[nominatim.git] / test / bdd / steps / table_compare.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions to facilitate accessing and comparing the content of DB tables.
9 """
10 import math
11 import re
12 import json
13
14 import psycopg
15 from psycopg import sql as pysql
16
17 ID_REGEX = re.compile(r"(?P<typ>[NRW])(?P<oid>\d+)(:(?P<cls>\w+))?")
18
19 class NominatimID:
20     """ Splits a unique identifier for places into its components.
21         As place_ids cannot be used for testing, we use a unique
22         identifier instead that is of the form <osmtype><osmid>[:<class>].
23     """
24
25     def __init__(self, oid):
26         self.typ = self.oid = self.cls = None
27
28         if oid is not None:
29             m = ID_REGEX.fullmatch(oid)
30             assert m is not None, \
31                    "ID '{}' not of form <osmtype><osmid>[:<class>]".format(oid)
32
33             self.typ = m.group('typ')
34             self.oid = m.group('oid')
35             self.cls = m.group('cls')
36
37     def __str__(self):
38         if self.cls is None:
39             return self.typ + self.oid
40
41         return '{self.typ}{self.oid}:{self.cls}'.format(self=self)
42
43     def query_osm_id(self, cur, query):
44         """ Run a query on cursor `cur` using osm ID, type and class. The
45             `query` string must contain exactly one placeholder '{}' where
46             the 'where' query should go.
47         """
48         where = 'osm_type = %s and osm_id = %s'
49         params = [self.typ, self. oid]
50
51         if self.cls is not None:
52             where += ' and class = %s'
53             params.append(self.cls)
54
55         cur.execute(query.format(where), params)
56
57     def row_by_place_id(self, cur, table, extra_columns=None):
58         """ Get a row by place_id from the given table using cursor `cur`.
59             extra_columns may contain a list additional elements for the select
60             part of the query.
61         """
62         pid = self.get_place_id(cur)
63         query = "SELECT {} FROM {} WHERE place_id = %s".format(
64                     ','.join(['*'] + (extra_columns or [])), table)
65         cur.execute(query, (pid, ))
66
67     def get_place_id(self, cur, allow_empty=False):
68         """ Look up the place id for the ID. Throws an assertion if the ID
69             is not unique.
70         """
71         self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
72         if cur.rowcount == 0 and allow_empty:
73             return None
74
75         assert cur.rowcount == 1, \
76                "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
77
78         return cur.fetchone()['place_id']
79
80
81 class DBRow:
82     """ Represents a row from a database and offers comparison functions.
83     """
84     def __init__(self, nid, db_row, context):
85         self.nid = nid
86         self.db_row = db_row
87         self.context = context
88
89     def assert_row(self, row, exclude_columns):
90         """ Check that all columns of the given behave row are contained
91             in the database row. Exclude behave rows with the names given
92             in the `exclude_columns` list.
93         """
94         for name, value in zip(row.headings, row.cells):
95             if name not in exclude_columns:
96                 assert self.contains(name, value), self.assert_msg(name, value)
97
98     def contains(self, name, expected):
99         """ Check that the DB row contains a column `name` with the given value.
100         """
101         if '+' in name:
102             column, field = name.split('+', 1)
103             return self._contains_hstore_value(column, field, expected)
104
105         if name == 'geometry':
106             return self._has_geometry(expected)
107
108         if name not in self.db_row:
109             return False
110
111         actual = self.db_row[name]
112
113         if expected == '-':
114             return actual is None
115
116         if name == 'name' and ':' not in expected:
117             return self._compare_column(actual[name], expected)
118
119         if 'place_id' in name:
120             return self._compare_place_id(actual, expected)
121
122         if name == 'centroid':
123             return self._has_centroid(expected)
124
125         return self._compare_column(actual, expected)
126
127     def _contains_hstore_value(self, column, field, expected):
128         if column == 'addr':
129             column = 'address'
130
131         if column not in self.db_row:
132             return False
133
134         if expected == '-':
135             return self.db_row[column] is None or field not in self.db_row[column]
136
137         if self.db_row[column] is None:
138             return False
139
140         return self._compare_column(self.db_row[column].get(field), expected)
141
142     def _compare_column(self, actual, expected):
143         if isinstance(actual, dict):
144             return actual == eval('{' + expected + '}')
145
146         return str(actual) == expected
147
148     def _compare_place_id(self, actual, expected):
149        if expected == '0':
150             return actual == 0
151
152        with self.context.db.cursor() as cur:
153             return NominatimID(expected).get_place_id(cur) == actual
154
155     def _has_centroid(self, expected):
156         if expected == 'in geometry':
157             with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
158                 cur.execute("""SELECT ST_Within(ST_SetSRID(ST_Point(%(cx)s, %(cy)s), 4326),
159                                         ST_SetSRID(%(geomtxt)s::geometry, 4326))""",
160                             (self.db_row))
161                 return cur.fetchone()[0]
162
163         if ' ' in expected:
164             x, y = expected.split(' ')
165         else:
166             x, y = self.context.osm.grid_node(int(expected))
167
168         return math.isclose(float(x), self.db_row['cx']) and math.isclose(float(y), self.db_row['cy'])
169
170     def _has_geometry(self, expected):
171         geom = self.context.osm.parse_geometry(expected)
172         with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
173             cur.execute(pysql.SQL("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
174                                    ST_SnapToGrid(ST_SetSRID({}::geometry, 4326), 0.00001, 0.00001))""")
175                              .format(pysql.SQL(geom),
176                                      pysql.Literal(self.db_row['geomtxt'])))
177             return cur.fetchone()[0]
178
179     def assert_msg(self, name, value):
180         """ Return a string with an informative message for a failed compare.
181         """
182         msg = "\nBad column '{}' in row '{!s}'.".format(name, self.nid)
183         actual = self._get_actual(name)
184         if actual is not None:
185             msg += " Expected: {}, got: {}.".format(value, actual)
186         else:
187             msg += " No such column."
188
189         return msg + "\nFull DB row: {}".format(json.dumps(dict(self.db_row), indent=4, default=str))
190
191     def _get_actual(self, name):
192         if '+' in name:
193             column, field = name.split('+', 1)
194             if column == 'addr':
195                 column = 'address'
196             return (self.db_row.get(column) or {}).get(field)
197
198         if name == 'geometry':
199             return self.db_row['geomtxt']
200
201         if name not in self.db_row:
202             return None
203
204         if name == 'centroid':
205             return "POINT({cx} {cy})".format(**self.db_row)
206
207         actual = self.db_row[name]
208
209         if 'place_id' in name:
210             if actual is None:
211                 return '<null>'
212
213             if actual == 0:
214                 return "place ID 0"
215
216             with self.context.db.cursor(row_factory=psycopg.rows.tuple_row) as cur:
217                 cur.execute("""SELECT osm_type, osm_id, class
218                                FROM placex WHERE place_id = %s""",
219                             (actual, ))
220
221                 if cur.rowcount == 1:
222                     return "{0[0]}{0[1]}:{0[2]}".format(cur.fetchone())
223
224                 return "[place ID {} not found]".format(actual)
225
226         return actual