]> git.openstreetmap.org Git - nominatim.git/blobdiff - test/bdd/steps/steps_db_ops.py
enable flake for bdd test code
[nominatim.git] / test / bdd / steps / steps_db_ops.py
index fb8431d5ffae827c7c681878d57a3b94bc3375b5..8b62cbc6b3d6bd06baaf001a7daeadce97c22e05 100644 (file)
@@ -2,9 +2,8 @@
 #
 # This file is part of Nominatim. (https://nominatim.org)
 #
-# Copyright (C) 2024 by the Nominatim developer community.
+# Copyright (C) 2025 by the Nominatim developer community.
 # For a full list of authors see the git log.
-import logging
 from itertools import chain
 
 import psycopg
@@ -13,9 +12,9 @@ from psycopg import sql as pysql
 from place_inserter import PlaceColumn
 from table_compare import NominatimID, DBRow
 
-from nominatim_db.indexer import indexer
 from nominatim_db.tokenizer import factory as tokenizer_factory
 
+
 def check_database_integrity(context):
     """ Check some generic constraints on the tables.
     """
@@ -31,10 +30,9 @@ def check_database_integrity(context):
         cur.execute("SELECT count(*) FROM word WHERE word_token = ''")
         assert cur.fetchone()[0] == 0, "Empty word tokens found in word table"
 
+# GIVEN ##################################
 
 
-################################ GIVEN ##################################
-
 @given("the (?P<named>named )?places")
 def add_data_to_place_table(context, named):
     """ Add entries into the place table. 'named places' makes sure that
@@ -46,6 +44,7 @@ def add_data_to_place_table(context, named):
             PlaceColumn(context).add_row(row, named is not None).db_insert(cur)
         cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
 
+
 @given("the relations")
 def add_data_to_planet_relations(context):
     """ Add entries into the osm2pgsql relation middle table. This is needed
@@ -77,9 +76,11 @@ def add_data_to_planet_relations(context):
                 else:
                     members = None
 
-                tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings if h.startswith("tags+")])
+                tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings
+                                            if h.startswith("tags+")])
 
-                cur.execute("""INSERT INTO planet_osm_rels (id, way_off, rel_off, parts, members, tags)
+                cur.execute("""INSERT INTO planet_osm_rels (id, way_off, rel_off,
+                                                            parts, members, tags)
                                VALUES (%s, %s, %s, %s, %s, %s)""",
                             (r['id'], last_node, last_way, parts, members, list(tags)))
         else:
@@ -99,6 +100,7 @@ def add_data_to_planet_relations(context):
                             (r['id'], psycopg.types.json.Json(tags),
                              psycopg.types.json.Json(members)))
 
+
 @given("the ways")
 def add_data_to_planet_ways(context):
     """ Add entries into the osm2pgsql way middle table. This is necessary for
@@ -110,16 +112,18 @@ def add_data_to_planet_ways(context):
         json_tags = row is not None and row['value'] != '1'
         for r in context.table:
             if json_tags:
-                tags = psycopg.types.json.Json({h[5:]: r[h] for h in r.headings if h.startswith("tags+")})
+                tags = psycopg.types.json.Json({h[5:]: r[h] for h in r.headings
+                                                if h.startswith("tags+")})
             else:
                 tags = list(chain.from_iterable([(h[5:], r[h])
                                                  for h in r.headings if h.startswith("tags+")]))
-            nodes = [ int(x.strip()) for x in r['nodes'].split(',') ]
+            nodes = [int(x.strip()) for x in r['nodes'].split(',')]
 
             cur.execute("INSERT INTO planet_osm_ways (id, nodes, tags) VALUES (%s, %s, %s)",
                         (r['id'], nodes, tags))
 
-################################ WHEN ##################################
+# WHEN ##################################
+
 
 @when("importing")
 def import_and_index_data_from_place_table(context):
@@ -136,6 +140,7 @@ def import_and_index_data_from_place_table(context):
     # itself.
     context.log_capture.buffer.clear()
 
+
 @when("updating places")
 def update_place_table(context):
     """ Update the place table with the given data. Also runs all triggers
@@ -164,6 +169,7 @@ def update_postcodes(context):
     """
     context.nominatim.run_nominatim('refresh', '--postcodes')
 
+
 @when("marking for delete (?P<oids>.*)")
 def delete_places(context, oids):
     """ Remove entries from the place table. Multiple ids may be given
@@ -184,7 +190,8 @@ def delete_places(context, oids):
     # itself.
     context.log_capture.buffer.clear()
 
-################################ THEN ##################################
+# THEN ##################################
+
 
 @then("(?P<table>placex|place) contains(?P<exact> exactly)?")
 def check_place_contents(context, table, exact):
@@ -201,7 +208,8 @@ def check_place_contents(context, table, exact):
         expected_content = set()
         for row in context.table:
             nid = NominatimID(row['object'])
-            query = 'SELECT *, ST_AsText(geometry) as geomtxt, ST_GeometryType(geometry) as geometrytype'
+            query = """SELECT *, ST_AsText(geometry) as geomtxt,
+                              ST_GeometryType(geometry) as geometrytype """
             if table == 'placex':
                 query += ' ,ST_X(centroid) as cx, ST_Y(centroid) as cy'
             query += " FROM %s WHERE {}" % (table, )
@@ -261,17 +269,18 @@ def check_search_name_contents(context, exclude):
 
                             if not exclude:
                                 assert len(tokens) >= len(items), \
-                                       "No word entry found for {}. Entries found: {!s}".format(value, len(tokens))
+                                    f"No word entry found for {value}. Entries found: {len(tokens)}"
                             for word, token, wid in tokens:
                                 if exclude:
                                     assert wid not in res[name], \
-                                           "Found term for {}/{}: {}".format(nid, name, wid)
+                                        "Found term for {}/{}: {}".format(nid, name, wid)
                                 else:
                                     assert wid in res[name], \
-                                           "Missing term for {}/{}: {}".format(nid, name, wid)
+                                        "Missing term for {}/{}: {}".format(nid, name, wid)
                         elif name != 'object':
                             assert db_row.contains(name, value), db_row.assert_msg(name, value)
 
+
 @then("search_name has no entry for (?P<oid>.*)")
 def check_search_name_has_entry(context, oid):
     """ Check that there is noentry in the search_name table for the given
@@ -283,6 +292,7 @@ def check_search_name_has_entry(context, oid):
         assert cur.rowcount == 0, \
                "Found {} entries for ID {}".format(cur.rowcount, oid)
 
+
 @then("location_postcode contains exactly")
 def check_location_postcode(context):
     """ Check full contents for location_postcode table. Each row represents a table row
@@ -294,21 +304,22 @@ def check_location_postcode(context):
     with context.db.cursor() as cur:
         cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
         assert cur.rowcount == len(list(context.table)), \
-            "Postcode table has {} rows, expected {}.".format(cur.rowcount, len(list(context.table)))
+            "Postcode table has {cur.rowcount} rows, expected {len(list(context.table))}."
 
         results = {}
         for row in cur:
             key = (row['country_code'], row['postcode'])
             assert key not in results, "Postcode table has duplicate entry: {}".format(row)
-            results[key] = DBRow((row['country_code'],row['postcode']), row, context)
+            results[key] = DBRow((row['country_code'], row['postcode']), row, context)
 
         for row in context.table:
-            db_row = results.get((row['country'],row['postcode']))
+            db_row = results.get((row['country'], row['postcode']))
             assert db_row is not None, \
                 f"Missing row for country '{row['country']}' postcode '{row['postcode']}'."
 
             db_row.assert_row(row, ('country', 'postcode'))
 
+
 @then("there are(?P<exclude> no)? word tokens for postcodes (?P<postcodes>.*)")
 def check_word_table_for_postcodes(context, exclude, postcodes):
     """ Check that the tokenizer produces postcode tokens for the given
@@ -333,7 +344,8 @@ def check_word_table_for_postcodes(context, exclude, postcodes):
         assert len(found) == 0, f"Unexpected postcodes: {found}"
     else:
         assert set(found) == set(plist), \
-        f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
+            f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
+
 
 @then("place_addressline contains")
 def check_place_addressline(context):
@@ -352,11 +364,12 @@ def check_place_addressline(context):
                             WHERE place_id = %s AND address_place_id = %s""",
                         (pid, apid))
             assert cur.rowcount > 0, \
-                        "No rows found for place %s and address %s" % (row['object'], row['address'])
+                f"No rows found for place {row['object']} and address {row['address']}."
 
             for res in cur:
                 DBRow(nid, res, context).assert_row(row, ('address', 'object'))
 
+
 @then("place_addressline doesn't contain")
 def check_place_addressline_exclude(context):
     """ Check that the place_addressline doesn't contain any entries for the
@@ -371,9 +384,10 @@ def check_place_addressline_exclude(context):
                                 WHERE place_id = %s AND address_place_id = %s""",
                             (pid, apid))
                 assert cur.rowcount == 0, \
-                    "Row found for place %s and address %s" % (row['object'], row['address'])
+                    f"Row found for place {row['object']} and address {row['address']}."
+
 
-@then("W(?P<oid>\d+) expands to(?P<neg> no)? interpolation")
+@then(r"W(?P<oid>\d+) expands to(?P<neg> no)? interpolation")
 def check_location_property_osmline(context, oid, neg):
     """ Check that the given way is present in the interpolation table.
     """
@@ -392,7 +406,7 @@ def check_location_property_osmline(context, oid, neg):
             for i in todo:
                 row = context.table[i]
                 if (int(row['start']) == res['startnumber']
-                    and int(row['end']) == res['endnumber']):
+                        and int(row['end']) == res['endnumber']):
                     todo.remove(i)
                     break
             else:
@@ -402,8 +416,9 @@ def check_location_property_osmline(context, oid, neg):
 
         assert not todo, f"Unmatched lines in table: {list(context.table[i] for i in todo)}"
 
+
 @then("location_property_osmline contains(?P<exact> exactly)?")
-def check_place_contents(context, exact):
+def check_osmline_contents(context, exact):
     """ Check contents of the interpolation table. Each row represents a table row
         and all data must match. Data not present in the expected table, may
         be arbitrary. The rows are identified via the 'object' column which must
@@ -447,4 +462,3 @@ def check_place_contents(context, exact):
             assert expected_content == actual, \
                    f"Missing entries: {expected_content - actual}\n" \
                    f"Not expected in table: {actual - expected_content}"
-