]> git.openstreetmap.org Git - nominatim.git/blob - test/bdd/steps/steps_db_ops.py
Merge pull request #3463 from lonvia/sqlalchemy14-with-psycopg
[nominatim.git] / test / bdd / steps / steps_db_ops.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 import logging
8 from itertools import chain
9
10 import psycopg2.extras
11
12 from place_inserter import PlaceColumn
13 from table_compare import NominatimID, DBRow
14
15 from nominatim_db.indexer import indexer
16 from nominatim_db.tokenizer import factory as tokenizer_factory
17
18 def check_database_integrity(context):
19     """ Check some generic constraints on the tables.
20     """
21     with context.db.cursor() as cur:
22         # place_addressline should not have duplicate (place_id, address_place_id)
23         cur.execute("""SELECT count(*) FROM
24                         (SELECT place_id, address_place_id, count(*) as c
25                          FROM place_addressline GROUP BY place_id, address_place_id) x
26                        WHERE c > 1""")
27         assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
28
29         # word table must not have empty word_tokens
30         if context.nominatim.tokenizer != 'legacy':
31             cur.execute("SELECT count(*) FROM word WHERE word_token = ''")
32             assert cur.fetchone()[0] == 0, "Empty word tokens found in word table"
33
34
35
36 ################################ GIVEN ##################################
37
38 @given("the (?P<named>named )?places")
39 def add_data_to_place_table(context, named):
40     """ Add entries into the place table. 'named places' makes sure that
41         the entries get a random name when none is explicitly given.
42     """
43     with context.db.cursor() as cur:
44         cur.execute('ALTER TABLE place DISABLE TRIGGER place_before_insert')
45         for row in context.table:
46             PlaceColumn(context).add_row(row, named is not None).db_insert(cur)
47         cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
48
49 @given("the relations")
50 def add_data_to_planet_relations(context):
51     """ Add entries into the osm2pgsql relation middle table. This is needed
52         for tests on data that looks up members.
53     """
54     with context.db.cursor() as cur:
55         cur.execute("SELECT value FROM osm2pgsql_properties WHERE property = 'db_format'")
56         row = cur.fetchone()
57         if row is None or row[0] == '1':
58             for r in context.table:
59                 last_node = 0
60                 last_way = 0
61                 parts = []
62                 if r['members']:
63                     members = []
64                     for m in r['members'].split(','):
65                         mid = NominatimID(m)
66                         if mid.typ == 'N':
67                             parts.insert(last_node, int(mid.oid))
68                             last_node += 1
69                             last_way += 1
70                         elif mid.typ == 'W':
71                             parts.insert(last_way, int(mid.oid))
72                             last_way += 1
73                         else:
74                             parts.append(int(mid.oid))
75
76                         members.extend((mid.typ.lower() + mid.oid, mid.cls or ''))
77                 else:
78                     members = None
79
80                 tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings if h.startswith("tags+")])
81
82                 cur.execute("""INSERT INTO planet_osm_rels (id, way_off, rel_off, parts, members, tags)
83                                VALUES (%s, %s, %s, %s, %s, %s)""",
84                             (r['id'], last_node, last_way, parts, members, list(tags)))
85         else:
86             for r in context.table:
87                 if r['members']:
88                     members = []
89                     for m in r['members'].split(','):
90                         mid = NominatimID(m)
91                         members.append({'ref': mid.oid, 'role': mid.cls or '', 'type': mid.typ})
92                 else:
93                     members = []
94
95                 tags = {h[5:]: r[h] for h in r.headings if h.startswith("tags+")}
96
97                 cur.execute("""INSERT INTO planet_osm_rels (id, tags, members)
98                                VALUES (%s, %s, %s)""",
99                             (r['id'], psycopg2.extras.Json(tags),
100                              psycopg2.extras.Json(members)))
101
102 @given("the ways")
103 def add_data_to_planet_ways(context):
104     """ Add entries into the osm2pgsql way middle table. This is necessary for
105         tests on that that looks up node ids in this table.
106     """
107     with context.db.cursor() as cur:
108         cur.execute("SELECT value FROM osm2pgsql_properties WHERE property = 'db_format'")
109         row = cur.fetchone()
110         json_tags = row is not None and row[0] != '1'
111         for r in context.table:
112             if json_tags:
113                 tags = psycopg2.extras.Json({h[5:]: r[h] for h in r.headings if h.startswith("tags+")})
114             else:
115                 tags = list(chain.from_iterable([(h[5:], r[h])
116                                                  for h in r.headings if h.startswith("tags+")]))
117             nodes = [ int(x.strip()) for x in r['nodes'].split(',') ]
118
119             cur.execute("INSERT INTO planet_osm_ways (id, nodes, tags) VALUES (%s, %s, %s)",
120                         (r['id'], nodes, tags))
121
122 ################################ WHEN ##################################
123
124 @when("importing")
125 def import_and_index_data_from_place_table(context):
126     """ Import data previously set up in the place table.
127     """
128     context.nominatim.run_nominatim('import', '--continue', 'load-data',
129                                               '--index-noanalyse', '-q',
130                                               '--offline')
131
132     check_database_integrity(context)
133
134     # Remove the output of the input, when all was right. Otherwise it will be
135     # output when there are errors that had nothing to do with the import
136     # itself.
137     context.log_capture.buffer.clear()
138
139 @when("updating places")
140 def update_place_table(context):
141     """ Update the place table with the given data. Also runs all triggers
142         related to updates and reindexes the new data.
143     """
144     context.nominatim.run_nominatim('refresh', '--functions')
145     with context.db.cursor() as cur:
146         for row in context.table:
147             col = PlaceColumn(context).add_row(row, False)
148             col.db_delete(cur)
149             col.db_insert(cur)
150         cur.execute('SELECT flush_deleted_places()')
151
152     context.nominatim.reindex_placex(context.db)
153     check_database_integrity(context)
154
155     # Remove the output of the input, when all was right. Otherwise it will be
156     # output when there are errors that had nothing to do with the import
157     # itself.
158     context.log_capture.buffer.clear()
159
160
161 @when("updating postcodes")
162 def update_postcodes(context):
163     """ Rerun the calculation of postcodes.
164     """
165     context.nominatim.run_nominatim('refresh', '--postcodes')
166
167 @when("marking for delete (?P<oids>.*)")
168 def delete_places(context, oids):
169     """ Remove entries from the place table. Multiple ids may be given
170         separated by commas. Also runs all triggers
171         related to updates and reindexes the new data.
172     """
173     context.nominatim.run_nominatim('refresh', '--functions')
174     with context.db.cursor() as cur:
175         cur.execute('TRUNCATE place_to_be_deleted')
176         for oid in oids.split(','):
177             NominatimID(oid).query_osm_id(cur, 'DELETE FROM place WHERE {}')
178         cur.execute('SELECT flush_deleted_places()')
179
180     context.nominatim.reindex_placex(context.db)
181
182     # Remove the output of the input, when all was right. Otherwise it will be
183     # output when there are errors that had nothing to do with the import
184     # itself.
185     context.log_capture.buffer.clear()
186
187 ################################ THEN ##################################
188
189 @then("(?P<table>placex|place) contains(?P<exact> exactly)?")
190 def check_place_contents(context, table, exact):
191     """ Check contents of place/placex tables. Each row represents a table row
192         and all data must match. Data not present in the expected table, may
193         be arbitrary. The rows are identified via the 'object' column which must
194         have an identifier of the form '<NRW><osm id>[:<class>]'. When multiple
195         rows match (for example because 'class' was left out and there are
196         multiple entries for the given OSM object) then all must match. All
197         expected rows are expected to be present with at least one database row.
198         When 'exactly' is given, there must not be additional rows in the database.
199     """
200     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
201         expected_content = set()
202         for row in context.table:
203             nid = NominatimID(row['object'])
204             query = 'SELECT *, ST_AsText(geometry) as geomtxt, ST_GeometryType(geometry) as geometrytype'
205             if table == 'placex':
206                 query += ' ,ST_X(centroid) as cx, ST_Y(centroid) as cy'
207             query += " FROM %s WHERE {}" % (table, )
208             nid.query_osm_id(cur, query)
209             assert cur.rowcount > 0, "No rows found for " + row['object']
210
211             for res in cur:
212                 if exact:
213                     expected_content.add((res['osm_type'], res['osm_id'], res['class']))
214
215                 DBRow(nid, res, context).assert_row(row, ['object'])
216
217         if exact:
218             cur.execute('SELECT osm_type, osm_id, class from {}'.format(table))
219             actual = set([(r[0], r[1], r[2]) for r in cur])
220             assert expected_content == actual, \
221                    f"Missing entries: {expected_content - actual}\n" \
222                    f"Not expected in table: {actual - expected_content}"
223
224
225 @then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
226 def check_place_has_entry(context, table, oid):
227     """ Ensure that no database row for the given object exists. The ID
228         must be of the form '<NRW><osm id>[:<class>]'.
229     """
230     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
231         NominatimID(oid).query_osm_id(cur, "SELECT * FROM %s where {}" % table)
232         assert cur.rowcount == 0, \
233                "Found {} entries for ID {}".format(cur.rowcount, oid)
234
235
236 @then("search_name contains(?P<exclude> not)?")
237 def check_search_name_contents(context, exclude):
238     """ Check contents of place/placex tables. Each row represents a table row
239         and all data must match. Data not present in the expected table, may
240         be arbitrary. The rows are identified via the 'object' column which must
241         have an identifier of the form '<NRW><osm id>[:<class>]'. All
242         expected rows are expected to be present with at least one database row.
243     """
244     tokenizer = tokenizer_factory.get_tokenizer_for_db(context.nominatim.get_test_config())
245
246     with tokenizer.name_analyzer() as analyzer:
247         with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
248             for row in context.table:
249                 nid = NominatimID(row['object'])
250                 nid.row_by_place_id(cur, 'search_name',
251                                     ['ST_X(centroid) as cx', 'ST_Y(centroid) as cy'])
252                 assert cur.rowcount > 0, "No rows found for " + row['object']
253
254                 for res in cur:
255                     db_row = DBRow(nid, res, context)
256                     for name, value in zip(row.headings, row.cells):
257                         if name in ('name_vector', 'nameaddress_vector'):
258                             items = [x.strip() for x in value.split(',')]
259                             tokens = analyzer.get_word_token_info(items)
260
261                             if not exclude:
262                                 assert len(tokens) >= len(items), \
263                                        "No word entry found for {}. Entries found: {!s}".format(value, len(tokens))
264                             for word, token, wid in tokens:
265                                 if exclude:
266                                     assert wid not in res[name], \
267                                            "Found term for {}/{}: {}".format(nid, name, wid)
268                                 else:
269                                     assert wid in res[name], \
270                                            "Missing term for {}/{}: {}".format(nid, name, wid)
271                         elif name != 'object':
272                             assert db_row.contains(name, value), db_row.assert_msg(name, value)
273
274 @then("search_name has no entry for (?P<oid>.*)")
275 def check_search_name_has_entry(context, oid):
276     """ Check that there is noentry in the search_name table for the given
277         objects. IDs are in format '<NRW><osm id>[:<class>]'.
278     """
279     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
280         NominatimID(oid).row_by_place_id(cur, 'search_name')
281
282         assert cur.rowcount == 0, \
283                "Found {} entries for ID {}".format(cur.rowcount, oid)
284
285 @then("location_postcode contains exactly")
286 def check_location_postcode(context):
287     """ Check full contents for location_postcode table. Each row represents a table row
288         and all data must match. Data not present in the expected table, may
289         be arbitrary. The rows are identified via 'country' and 'postcode' columns.
290         All rows must be present as excepted and there must not be additional
291         rows.
292     """
293     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
294         cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
295         assert cur.rowcount == len(list(context.table)), \
296             "Postcode table has {} rows, expected {}.".format(cur.rowcount, len(list(context.table)))
297
298         results = {}
299         for row in cur:
300             key = (row['country_code'], row['postcode'])
301             assert key not in results, "Postcode table has duplicate entry: {}".format(row)
302             results[key] = DBRow((row['country_code'],row['postcode']), row, context)
303
304         for row in context.table:
305             db_row = results.get((row['country'],row['postcode']))
306             assert db_row is not None, \
307                 f"Missing row for country '{row['country']}' postcode '{row['postcode']}'."
308
309             db_row.assert_row(row, ('country', 'postcode'))
310
311 @then("there are(?P<exclude> no)? word tokens for postcodes (?P<postcodes>.*)")
312 def check_word_table_for_postcodes(context, exclude, postcodes):
313     """ Check that the tokenizer produces postcode tokens for the given
314         postcodes. The postcodes are a comma-separated list of postcodes.
315         Whitespace matters.
316     """
317     nctx = context.nominatim
318     tokenizer = tokenizer_factory.get_tokenizer_for_db(nctx.get_test_config())
319     with tokenizer.name_analyzer() as ana:
320         plist = [ana.normalize_postcode(p) for p in postcodes.split(',')]
321
322     plist.sort()
323
324     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
325         if nctx.tokenizer != 'legacy':
326             cur.execute("SELECT word FROM word WHERE type = 'P' and word = any(%s)",
327                         (plist,))
328         else:
329             cur.execute("""SELECT word FROM word WHERE word = any(%s)
330                              and class = 'place' and type = 'postcode'""",
331                         (plist,))
332
333         found = [row[0] for row in cur]
334         assert len(found) == len(set(found)), f"Duplicate rows for postcodes: {found}"
335
336     if exclude:
337         assert len(found) == 0, f"Unexpected postcodes: {found}"
338     else:
339         assert set(found) == set(plist), \
340         f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
341
342 @then("place_addressline contains")
343 def check_place_addressline(context):
344     """ Check the contents of the place_addressline table. Each row represents
345         a table row and all data must match. Data not present in the expected
346         table, may be arbitrary. The rows are identified via the 'object' column,
347         representing the addressee and the 'address' column, representing the
348         address item.
349     """
350     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
351         for row in context.table:
352             nid = NominatimID(row['object'])
353             pid = nid.get_place_id(cur)
354             apid = NominatimID(row['address']).get_place_id(cur)
355             cur.execute(""" SELECT * FROM place_addressline
356                             WHERE place_id = %s AND address_place_id = %s""",
357                         (pid, apid))
358             assert cur.rowcount > 0, \
359                         "No rows found for place %s and address %s" % (row['object'], row['address'])
360
361             for res in cur:
362                 DBRow(nid, res, context).assert_row(row, ('address', 'object'))
363
364 @then("place_addressline doesn't contain")
365 def check_place_addressline_exclude(context):
366     """ Check that the place_addressline doesn't contain any entries for the
367         given addressee/address item pairs.
368     """
369     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
370         for row in context.table:
371             pid = NominatimID(row['object']).get_place_id(cur)
372             apid = NominatimID(row['address']).get_place_id(cur, allow_empty=True)
373             if apid is not None:
374                 cur.execute(""" SELECT * FROM place_addressline
375                                 WHERE place_id = %s AND address_place_id = %s""",
376                             (pid, apid))
377                 assert cur.rowcount == 0, \
378                     "Row found for place %s and address %s" % (row['object'], row['address'])
379
380 @then("W(?P<oid>\d+) expands to(?P<neg> no)? interpolation")
381 def check_location_property_osmline(context, oid, neg):
382     """ Check that the given way is present in the interpolation table.
383     """
384     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
385         cur.execute("""SELECT *, ST_AsText(linegeo) as geomtxt
386                        FROM location_property_osmline
387                        WHERE osm_id = %s AND startnumber IS NOT NULL""",
388                     (oid, ))
389
390         if neg:
391             assert cur.rowcount == 0, "Interpolation found for way {}.".format(oid)
392             return
393
394         todo = list(range(len(list(context.table))))
395         for res in cur:
396             for i in todo:
397                 row = context.table[i]
398                 if (int(row['start']) == res['startnumber']
399                     and int(row['end']) == res['endnumber']):
400                     todo.remove(i)
401                     break
402             else:
403                 assert False, "Unexpected row " + str(res)
404
405             DBRow(oid, res, context).assert_row(row, ('start', 'end'))
406
407         assert not todo, f"Unmatched lines in table: {list(context.table[i] for i in todo)}"
408
409 @then("location_property_osmline contains(?P<exact> exactly)?")
410 def check_place_contents(context, exact):
411     """ Check contents of the interpolation table. Each row represents a table row
412         and all data must match. Data not present in the expected table, may
413         be arbitrary. The rows are identified via the 'object' column which must
414         have an identifier of the form '<osm id>[:<startnumber>]'. When multiple
415         rows match (for example because 'startnumber' was left out and there are
416         multiple entries for the given OSM object) then all must match. All
417         expected rows are expected to be present with at least one database row.
418         When 'exactly' is given, there must not be additional rows in the database.
419     """
420     with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
421         expected_content = set()
422         for row in context.table:
423             if ':' in row['object']:
424                 nid, start = row['object'].split(':', 2)
425                 start = int(start)
426             else:
427                 nid, start = row['object'], None
428
429             query = """SELECT *, ST_AsText(linegeo) as geomtxt,
430                               ST_GeometryType(linegeo) as geometrytype
431                        FROM location_property_osmline WHERE osm_id=%s"""
432
433             if ':' in row['object']:
434                 query += ' and startnumber = %s'
435                 params = [int(val) for val in row['object'].split(':', 2)]
436             else:
437                 params = (int(row['object']), )
438
439             cur.execute(query, params)
440             assert cur.rowcount > 0, "No rows found for " + row['object']
441
442             for res in cur:
443                 if exact:
444                     expected_content.add((res['osm_id'], res['startnumber']))
445
446                 DBRow(nid, res, context).assert_row(row, ['object'])
447
448         if exact:
449             cur.execute('SELECT osm_id, startnumber from location_property_osmline')
450             actual = set([(r[0], r[1]) for r in cur])
451             assert expected_content == actual, \
452                    f"Missing entries: {expected_content - actual}\n" \
453                    f"Not expected in table: {actual - expected_content}"
454