#
# This file is part of Nominatim. (https://nominatim.org)
#
-# Copyright (C) 2024 by the Nominatim developer community.
+# Copyright (C) 2025 by the Nominatim developer community.
# For a full list of authors see the git log.
-import logging
from itertools import chain
import psycopg
from place_inserter import PlaceColumn
from table_compare import NominatimID, DBRow
-from nominatim_db.indexer import indexer
from nominatim_db.tokenizer import factory as tokenizer_factory
+
def check_database_integrity(context):
""" Check some generic constraints on the tables.
"""
assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
# word table must not have empty word_tokens
- if context.nominatim.tokenizer != 'legacy':
- cur.execute("SELECT count(*) FROM word WHERE word_token = ''")
- assert cur.fetchone()[0] == 0, "Empty word tokens found in word table"
-
+ cur.execute("SELECT count(*) FROM word WHERE word_token = ''")
+ assert cur.fetchone()[0] == 0, "Empty word tokens found in word table"
+# GIVEN ##################################
-################################ GIVEN ##################################
@given("the (?P<named>named )?places")
def add_data_to_place_table(context, named):
PlaceColumn(context).add_row(row, named is not None).db_insert(cur)
cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
+
@given("the relations")
def add_data_to_planet_relations(context):
""" Add entries into the osm2pgsql relation middle table. This is needed
else:
members = None
- tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings if h.startswith("tags+")])
+ tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings
+ if h.startswith("tags+")])
- cur.execute("""INSERT INTO planet_osm_rels (id, way_off, rel_off, parts, members, tags)
+ cur.execute("""INSERT INTO planet_osm_rels (id, way_off, rel_off,
+ parts, members, tags)
VALUES (%s, %s, %s, %s, %s, %s)""",
(r['id'], last_node, last_way, parts, members, list(tags)))
else:
(r['id'], psycopg.types.json.Json(tags),
psycopg.types.json.Json(members)))
+
@given("the ways")
def add_data_to_planet_ways(context):
""" Add entries into the osm2pgsql way middle table. This is necessary for
json_tags = row is not None and row['value'] != '1'
for r in context.table:
if json_tags:
- tags = psycopg.types.json.Json({h[5:]: r[h] for h in r.headings if h.startswith("tags+")})
+ tags = psycopg.types.json.Json({h[5:]: r[h] for h in r.headings
+ if h.startswith("tags+")})
else:
tags = list(chain.from_iterable([(h[5:], r[h])
for h in r.headings if h.startswith("tags+")]))
- nodes = [ int(x.strip()) for x in r['nodes'].split(',') ]
+ nodes = [int(x.strip()) for x in r['nodes'].split(',')]
cur.execute("INSERT INTO planet_osm_ways (id, nodes, tags) VALUES (%s, %s, %s)",
(r['id'], nodes, tags))
-################################ WHEN ##################################
+# WHEN ##################################
+
@when("importing")
def import_and_index_data_from_place_table(context):
# itself.
context.log_capture.buffer.clear()
+
@when("updating places")
def update_place_table(context):
""" Update the place table with the given data. Also runs all triggers
"""
context.nominatim.run_nominatim('refresh', '--postcodes')
+
@when("marking for delete (?P<oids>.*)")
def delete_places(context, oids):
""" Remove entries from the place table. Multiple ids may be given
# itself.
context.log_capture.buffer.clear()
-################################ THEN ##################################
+# THEN ##################################
+
@then("(?P<table>placex|place) contains(?P<exact> exactly)?")
def check_place_contents(context, table, exact):
expected_content = set()
for row in context.table:
nid = NominatimID(row['object'])
- query = 'SELECT *, ST_AsText(geometry) as geomtxt, ST_GeometryType(geometry) as geometrytype'
+ query = """SELECT *, ST_AsText(geometry) as geomtxt,
+ ST_GeometryType(geometry) as geometrytype """
if table == 'placex':
query += ' ,ST_X(centroid) as cx, ST_Y(centroid) as cy'
query += " FROM %s WHERE {}" % (table, )
if not exclude:
assert len(tokens) >= len(items), \
- "No word entry found for {}. Entries found: {!s}".format(value, len(tokens))
+ f"No word entry found for {value}. Entries found: {len(tokens)}"
for word, token, wid in tokens:
if exclude:
assert wid not in res[name], \
- "Found term for {}/{}: {}".format(nid, name, wid)
+ "Found term for {}/{}: {}".format(nid, name, wid)
else:
assert wid in res[name], \
- "Missing term for {}/{}: {}".format(nid, name, wid)
+ "Missing term for {}/{}: {}".format(nid, name, wid)
elif name != 'object':
assert db_row.contains(name, value), db_row.assert_msg(name, value)
+
@then("search_name has no entry for (?P<oid>.*)")
def check_search_name_has_entry(context, oid):
""" Check that there is noentry in the search_name table for the given
assert cur.rowcount == 0, \
"Found {} entries for ID {}".format(cur.rowcount, oid)
+
@then("location_postcode contains exactly")
def check_location_postcode(context):
""" Check full contents for location_postcode table. Each row represents a table row
with context.db.cursor() as cur:
cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
assert cur.rowcount == len(list(context.table)), \
- "Postcode table has {} rows, expected {}.".format(cur.rowcount, len(list(context.table)))
+ "Postcode table has {cur.rowcount} rows, expected {len(list(context.table))}."
results = {}
for row in cur:
key = (row['country_code'], row['postcode'])
assert key not in results, "Postcode table has duplicate entry: {}".format(row)
- results[key] = DBRow((row['country_code'],row['postcode']), row, context)
+ results[key] = DBRow((row['country_code'], row['postcode']), row, context)
for row in context.table:
- db_row = results.get((row['country'],row['postcode']))
+ db_row = results.get((row['country'], row['postcode']))
assert db_row is not None, \
f"Missing row for country '{row['country']}' postcode '{row['postcode']}'."
db_row.assert_row(row, ('country', 'postcode'))
+
@then("there are(?P<exclude> no)? word tokens for postcodes (?P<postcodes>.*)")
def check_word_table_for_postcodes(context, exclude, postcodes):
""" Check that the tokenizer produces postcode tokens for the given
plist.sort()
with context.db.cursor() as cur:
- if nctx.tokenizer != 'legacy':
- cur.execute("SELECT word FROM word WHERE type = 'P' and word = any(%s)",
- (plist,))
- else:
- cur.execute("""SELECT word FROM word WHERE word = any(%s)
- and class = 'place' and type = 'postcode'""",
- (plist,))
+ cur.execute("SELECT word FROM word WHERE type = 'P' and word = any(%s)",
+ (plist,))
found = [row['word'] for row in cur]
assert len(found) == len(set(found)), f"Duplicate rows for postcodes: {found}"
assert len(found) == 0, f"Unexpected postcodes: {found}"
else:
assert set(found) == set(plist), \
- f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
+ f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
+
@then("place_addressline contains")
def check_place_addressline(context):
WHERE place_id = %s AND address_place_id = %s""",
(pid, apid))
assert cur.rowcount > 0, \
- "No rows found for place %s and address %s" % (row['object'], row['address'])
+ f"No rows found for place {row['object']} and address {row['address']}."
for res in cur:
DBRow(nid, res, context).assert_row(row, ('address', 'object'))
+
@then("place_addressline doesn't contain")
def check_place_addressline_exclude(context):
""" Check that the place_addressline doesn't contain any entries for the
WHERE place_id = %s AND address_place_id = %s""",
(pid, apid))
assert cur.rowcount == 0, \
- "Row found for place %s and address %s" % (row['object'], row['address'])
+ f"Row found for place {row['object']} and address {row['address']}."
+
-@then("W(?P<oid>\d+) expands to(?P<neg> no)? interpolation")
+@then(r"W(?P<oid>\d+) expands to(?P<neg> no)? interpolation")
def check_location_property_osmline(context, oid, neg):
""" Check that the given way is present in the interpolation table.
"""
for i in todo:
row = context.table[i]
if (int(row['start']) == res['startnumber']
- and int(row['end']) == res['endnumber']):
+ and int(row['end']) == res['endnumber']):
todo.remove(i)
break
else:
assert not todo, f"Unmatched lines in table: {list(context.table[i] for i in todo)}"
+
@then("location_property_osmline contains(?P<exact> exactly)?")
-def check_place_contents(context, exact):
+def check_osmline_contents(context, exact):
""" Check contents of the interpolation table. Each row represents a table row
and all data must match. Data not present in the expected table, may
be arbitrary. The rows are identified via the 'object' column which must
assert expected_content == actual, \
f"Missing entries: {expected_content - actual}\n" \
f"Not expected in table: {actual - expected_content}"
-