else:
assert cur.rowcount > 0, "Row not in word table: %s" % '/'.join(values)
+
+@then("there are(?P<exclude> no)? word tokens for postcodes (?P<postcodes>.*)")
+def check_word_table_for_postcodes(context, exclude, postcodes):
+ """ Check that the tokenizer produces postcode tokens for the given
+ postcodes. The postcodes are a comma-separated list of postcodes.
+ Whitespace matters.
+ """
+ nctx = context.nominatim
+ tokenizer = tokenizer_factory.get_tokenizer_for_db(nctx.get_test_config())
+ with tokenizer.name_analyzer() as ana:
+ plist = [ana.normalize_postcode(p) for p in postcodes.split(',')]
+
+ plist.sort()
+
+ with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
+ if nctx.tokenizer == 'legacy_icu':
+ cur.execute("""SELECT info->>'postcode' FROM word
+ WHERE type = 'P' and info->>'postcode' = any(%s)""",
+ (plist,))
+ else:
+ cur.execute("""SELECT word FROM word WHERE word = any(%s)
+ and class = 'place' and type = 'postcode'""",
+ (plist,))
+
+ found = [row[0] for row in cur]
+ assert len(found) == len(set(found)), f"Duplicate rows for postcodes: {found}"
+
+ if exclude:
+ assert len(found) == 0, f"Unexpected postcodes: {found}"
+ else:
+ assert set(found) == set(plist), \
+ f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
+
@then("place_addressline contains")
def check_place_addressline(context):
""" Check the contents of the place_addressline table. Each row represents