]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/table_compare.py
add migration for new interpolation table layout
[nominatim.git] / test / bdd / steps / table_compare.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Functions to facilitate accessing and comparing the content of DB tables.
9 """
10 import re
11 import json
12
13 from steps.check_functions import Almost
14
15 ID_REGEX = re.compile(r"(?P<typ>[NRW])(?P<oid>\d+)(:(?P<cls>\w+))?")
16
17 class NominatimID:
18     """ Splits a unique identifier for places into its components.
19         As place_ids cannot be used for testing, we use a unique
20         identifier instead that is of the form <osmtype><osmid>[:<class>].
21     """
22
23     def __init__(self, oid):
24         self.typ = self.oid = self.cls = None
25
26         if oid is not None:
27             m = ID_REGEX.fullmatch(oid)
28             assert m is not None, \
29                    "ID '{}' not of form <osmtype><osmid>[:<class>]".format(oid)
30
31             self.typ = m.group('typ')
32             self.oid = m.group('oid')
33             self.cls = m.group('cls')
34
35     def __str__(self):
36         if self.cls is None:
37             return self.typ + self.oid
38
39         return '{self.typ}{self.oid}:{self.cls}'.format(self=self)
40
41     def query_osm_id(self, cur, query):
42         """ Run a query on cursor `cur` using osm ID, type and class. The
43             `query` string must contain exactly one placeholder '{}' where
44             the 'where' query should go.
45         """
46         where = 'osm_type = %s and osm_id = %s'
47         params = [self.typ, self. oid]
48
49         if self.cls is not None:
50             where += ' and class = %s'
51             params.append(self.cls)
52
53         cur.execute(query.format(where), params)
54
55     def row_by_place_id(self, cur, table, extra_columns=None):
56         """ Get a row by place_id from the given table using cursor `cur`.
57             extra_columns may contain a list additional elements for the select
58             part of the query.
59         """
60         pid = self.get_place_id(cur)
61         query = "SELECT {} FROM {} WHERE place_id = %s".format(
62                     ','.join(['*'] + (extra_columns or [])), table)
63         cur.execute(query, (pid, ))
64
65     def get_place_id(self, cur):
66         """ Look up the place id for the ID. Throws an assertion if the ID
67             is not unique.
68         """
69         self.query_osm_id(cur, "SELECT place_id FROM placex WHERE {}")
70         assert cur.rowcount == 1, \
71                "Place ID {!s} not unique. Found {} entries.".format(self, cur.rowcount)
72
73         return cur.fetchone()[0]
74
75
76 class DBRow:
77     """ Represents a row from a database and offers comparison functions.
78     """
79     def __init__(self, nid, db_row, context):
80         self.nid = nid
81         self.db_row = db_row
82         self.context = context
83
84     def assert_row(self, row, exclude_columns):
85         """ Check that all columns of the given behave row are contained
86             in the database row. Exclude behave rows with the names given
87             in the `exclude_columns` list.
88         """
89         for name, value in zip(row.headings, row.cells):
90             if name not in exclude_columns:
91                 assert self.contains(name, value), self.assert_msg(name, value)
92
93     def contains(self, name, expected):
94         """ Check that the DB row contains a column `name` with the given value.
95         """
96         if '+' in name:
97             column, field = name.split('+', 1)
98             return self._contains_hstore_value(column, field, expected)
99
100         if name == 'geometry':
101             return self._has_geometry(expected)
102
103         if name not in self.db_row:
104             return False
105
106         actual = self.db_row[name]
107
108         if expected == '-':
109             return actual is None
110
111         if name == 'name' and ':' not in expected:
112             return self._compare_column(actual[name], expected)
113
114         if 'place_id' in name:
115             return self._compare_place_id(actual, expected)
116
117         if name == 'centroid':
118             return self._has_centroid(expected)
119
120         return self._compare_column(actual, expected)
121
122     def _contains_hstore_value(self, column, field, expected):
123         if column == 'addr':
124             column = 'address'
125
126         if column not in self.db_row:
127             return False
128
129         if expected == '-':
130             return self.db_row[column] is None or field not in self.db_row[column]
131
132         if self.db_row[column] is None:
133             return False
134
135         return self._compare_column(self.db_row[column].get(field), expected)
136
137     def _compare_column(self, actual, expected):
138         if isinstance(actual, dict):
139             return actual == eval('{' + expected + '}')
140
141         return str(actual) == expected
142
143     def _compare_place_id(self, actual, expected):
144        if expected == '0':
145             return actual == 0
146
147        with self.context.db.cursor() as cur:
148             return NominatimID(expected).get_place_id(cur) == actual
149
150     def _has_centroid(self, expected):
151         if expected == 'in geometry':
152             with self.context.db.cursor() as cur:
153                 cur.execute("""SELECT ST_Within(ST_SetSRID(ST_Point({cx}, {cy}), 4326),
154                                         ST_SetSRID('{geomtxt}'::geometry, 4326))""".format(**self.db_row))
155                 return cur.fetchone()[0]
156
157         x, y = expected.split(' ')
158         return Almost(float(x)) == self.db_row['cx'] and Almost(float(y)) == self.db_row['cy']
159
160     def _has_geometry(self, expected):
161         geom = self.context.osm.parse_geometry(expected, self.context.scene)
162         with self.context.db.cursor() as cur:
163             cur.execute("""SELECT ST_Equals(ST_SnapToGrid({}, 0.00001, 0.00001),
164                                    ST_SnapToGrid(ST_SetSRID('{}'::geometry, 4326), 0.00001, 0.00001))""".format(
165                             geom, self.db_row['geomtxt']))
166             return cur.fetchone()[0]
167
168     def assert_msg(self, name, value):
169         """ Return a string with an informative message for a failed compare.
170         """
171         msg = "\nBad column '{}' in row '{!s}'.".format(name, self.nid)
172         actual = self._get_actual(name)
173         if actual is not None:
174             msg += " Expected: {}, got: {}.".format(value, actual)
175         else:
176             msg += " No such column."
177
178         return msg + "\nFull DB row: {}".format(json.dumps(dict(self.db_row), indent=4, default=str))
179
180     def _get_actual(self, name):
181         if '+' in name:
182             column, field = name.split('+', 1)
183             if column == 'addr':
184                 column = 'address'
185             return (self.db_row.get(column) or {}).get(field)
186
187         if name == 'geometry':
188             return self.db_row['geomtxt']
189
190         if name not in self.db_row:
191             return None
192
193         if name == 'centroid':
194             return "POINT({cx} {cy})".format(**self.db_row)
195
196         actual = self.db_row[name]
197
198         if 'place_id' in name:
199             if actual is None:
200                 return '<null>'
201
202             if actual == 0:
203                 return "place ID 0"
204
205             with self.context.db.cursor() as cur:
206                 cur.execute("""SELECT osm_type, osm_id, class
207                                FROM placex WHERE place_id = %s""",
208                             (actual, ))
209
210                 if cur.rowcount == 1:
211                     return "{0[0]}{0[1]}:{0[2]}".format(cur.fetchone())
212
213                 return "[place ID {} not found]".format(actual)
214
215         return actual