1 # SPDX-License-Identifier: GPL-2.0-only
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2022 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 from itertools import chain
10 import psycopg2.extras
12 from place_inserter import PlaceColumn
13 from table_compare import NominatimID, DBRow
15 from nominatim.indexer import indexer
16 from nominatim.tokenizer import factory as tokenizer_factory
18 def check_database_integrity(context):
19 """ Check some generic constraints on the tables.
21 # place_addressline should not have duplicate (place_id, address_place_id)
22 cur = context.db.cursor()
23 cur.execute("""SELECT count(*) FROM
24 (SELECT place_id, address_place_id, count(*) as c
25 FROM place_addressline GROUP BY place_id, address_place_id) x
27 assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
30 ################################ GIVEN ##################################
32 @given("the (?P<named>named )?places")
33 def add_data_to_place_table(context, named):
34 """ Add entries into the place table. 'named places' makes sure that
35 the entries get a random name when none is explicitly given.
37 with context.db.cursor() as cur:
38 cur.execute('ALTER TABLE place DISABLE TRIGGER place_before_insert')
39 for row in context.table:
40 PlaceColumn(context).add_row(row, named is not None).db_insert(cur)
41 cur.execute('ALTER TABLE place ENABLE TRIGGER place_before_insert')
43 @given("the relations")
44 def add_data_to_planet_relations(context):
45 """ Add entries into the osm2pgsql relation middle table. This is needed
46 for tests on data that looks up members.
48 with context.db.cursor() as cur:
49 for r in context.table:
55 for m in r['members'].split(','):
58 parts.insert(last_node, int(mid.oid))
62 parts.insert(last_way, int(mid.oid))
65 parts.append(int(mid.oid))
67 members.extend((mid.typ.lower() + mid.oid, mid.cls or ''))
71 tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings if h.startswith("tags+")])
73 cur.execute("""INSERT INTO planet_osm_rels (id, way_off, rel_off, parts, members, tags)
74 VALUES (%s, %s, %s, %s, %s, %s)""",
75 (r['id'], last_node, last_way, parts, members, list(tags)))
78 def add_data_to_planet_ways(context):
79 """ Add entries into the osm2pgsql way middle table. This is necessary for
80 tests on that that looks up node ids in this table.
82 with context.db.cursor() as cur:
83 for r in context.table:
84 tags = chain.from_iterable([(h[5:], r[h]) for h in r.headings if h.startswith("tags+")])
85 nodes = [ int(x.strip()) for x in r['nodes'].split(',') ]
87 cur.execute("INSERT INTO planet_osm_ways (id, nodes, tags) VALUES (%s, %s, %s)",
88 (r['id'], nodes, list(tags)))
90 ################################ WHEN ##################################
93 def import_and_index_data_from_place_table(context):
94 """ Import data previously set up in the place table.
96 context.nominatim.run_nominatim('import', '--continue', 'load-data',
97 '--index-noanalyse', '-q')
99 check_database_integrity(context)
101 @when("updating places")
102 def update_place_table(context):
103 """ Update the place table with the given data. Also runs all triggers
104 related to updates and reindexes the new data.
106 context.nominatim.run_nominatim('refresh', '--functions')
107 with context.db.cursor() as cur:
108 for row in context.table:
109 PlaceColumn(context).add_row(row, False).db_insert(cur)
111 context.nominatim.reindex_placex(context.db)
112 check_database_integrity(context)
114 @when("updating postcodes")
115 def update_postcodes(context):
116 """ Rerun the calculation of postcodes.
118 context.nominatim.run_nominatim('refresh', '--postcodes')
120 @when("marking for delete (?P<oids>.*)")
121 def delete_places(context, oids):
122 """ Remove entries from the place table. Multiple ids may be given
123 separated by commas. Also runs all triggers
124 related to updates and reindexes the new data.
126 context.nominatim.run_nominatim('refresh', '--functions')
127 with context.db.cursor() as cur:
128 for oid in oids.split(','):
129 NominatimID(oid).query_osm_id(cur, 'DELETE FROM place WHERE {}')
131 context.nominatim.reindex_placex(context.db)
133 ################################ THEN ##################################
135 @then("(?P<table>placex|place) contains(?P<exact> exactly)?")
136 def check_place_contents(context, table, exact):
137 """ Check contents of place/placex tables. Each row represents a table row
138 and all data must match. Data not present in the expected table, may
139 be arbitry. The rows are identified via the 'object' column which must
140 have an identifier of the form '<NRW><osm id>[:<class>]'. When multiple
141 rows match (for example because 'class' was left out and there are
142 multiple entries for the given OSM object) then all must match. All
143 expected rows are expected to be present with at least one database row.
144 When 'exactly' is given, there must not be additional rows in the database.
146 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
147 expected_content = set()
148 for row in context.table:
149 nid = NominatimID(row['object'])
150 query = 'SELECT *, ST_AsText(geometry) as geomtxt, ST_GeometryType(geometry) as geometrytype'
151 if table == 'placex':
152 query += ' ,ST_X(centroid) as cx, ST_Y(centroid) as cy'
153 query += " FROM %s WHERE {}" % (table, )
154 nid.query_osm_id(cur, query)
155 assert cur.rowcount > 0, "No rows found for " + row['object']
159 expected_content.add((res['osm_type'], res['osm_id'], res['class']))
161 DBRow(nid, res, context).assert_row(row, ['object'])
164 cur.execute('SELECT osm_type, osm_id, class from {}'.format(table))
165 assert expected_content == set([(r[0], r[1], r[2]) for r in cur])
168 @then("(?P<table>placex|place) has no entry for (?P<oid>.*)")
169 def check_place_has_entry(context, table, oid):
170 """ Ensure that no database row for the given object exists. The ID
171 must be of the form '<NRW><osm id>[:<class>]'.
173 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
174 NominatimID(oid).query_osm_id(cur, "SELECT * FROM %s where {}" % table)
175 assert cur.rowcount == 0, \
176 "Found {} entries for ID {}".format(cur.rowcount, oid)
179 @then("search_name contains(?P<exclude> not)?")
180 def check_search_name_contents(context, exclude):
181 """ Check contents of place/placex tables. Each row represents a table row
182 and all data must match. Data not present in the expected table, may
183 be arbitry. The rows are identified via the 'object' column which must
184 have an identifier of the form '<NRW><osm id>[:<class>]'. All
185 expected rows are expected to be present with at least one database row.
187 tokenizer = tokenizer_factory.get_tokenizer_for_db(context.nominatim.get_test_config())
189 with tokenizer.name_analyzer() as analyzer:
190 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
191 for row in context.table:
192 nid = NominatimID(row['object'])
193 nid.row_by_place_id(cur, 'search_name',
194 ['ST_X(centroid) as cx', 'ST_Y(centroid) as cy'])
195 assert cur.rowcount > 0, "No rows found for " + row['object']
198 db_row = DBRow(nid, res, context)
199 for name, value in zip(row.headings, row.cells):
200 if name in ('name_vector', 'nameaddress_vector'):
201 items = [x.strip() for x in value.split(',')]
202 tokens = analyzer.get_word_token_info(items)
205 assert len(tokens) >= len(items), \
206 "No word entry found for {}. Entries found: {!s}".format(value, len(tokens))
207 for word, token, wid in tokens:
209 assert wid not in res[name], \
210 "Found term for {}/{}: {}".format(nid, name, wid)
212 assert wid in res[name], \
213 "Missing term for {}/{}: {}".format(nid, name, wid)
214 elif name != 'object':
215 assert db_row.contains(name, value), db_row.assert_msg(name, value)
217 @then("search_name has no entry for (?P<oid>.*)")
218 def check_search_name_has_entry(context, oid):
219 """ Check that there is noentry in the search_name table for the given
220 objects. IDs are in format '<NRW><osm id>[:<class>]'.
222 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
223 NominatimID(oid).row_by_place_id(cur, 'search_name')
225 assert cur.rowcount == 0, \
226 "Found {} entries for ID {}".format(cur.rowcount, oid)
228 @then("location_postcode contains exactly")
229 def check_location_postcode(context):
230 """ Check full contents for location_postcode table. Each row represents a table row
231 and all data must match. Data not present in the expected table, may
232 be arbitry. The rows are identified via 'country' and 'postcode' columns.
233 All rows must be present as excepted and there must not be additional
236 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
237 cur.execute("SELECT *, ST_AsText(geometry) as geomtxt FROM location_postcode")
238 assert cur.rowcount == len(list(context.table)), \
239 "Postcode table has {} rows, expected {}.".format(cur.rowcount, len(list(context.table)))
243 key = (row['country_code'], row['postcode'])
244 assert key not in results, "Postcode table has duplicate entry: {}".format(row)
245 results[key] = DBRow((row['country_code'],row['postcode']), row, context)
247 for row in context.table:
248 db_row = results.get((row['country'],row['postcode']))
249 assert db_row is not None, \
250 f"Missing row for country '{row['country']}' postcode '{row['postcode']}'."
252 db_row.assert_row(row, ('country', 'postcode'))
254 @then("there are(?P<exclude> no)? word tokens for postcodes (?P<postcodes>.*)")
255 def check_word_table_for_postcodes(context, exclude, postcodes):
256 """ Check that the tokenizer produces postcode tokens for the given
257 postcodes. The postcodes are a comma-separated list of postcodes.
260 nctx = context.nominatim
261 tokenizer = tokenizer_factory.get_tokenizer_for_db(nctx.get_test_config())
262 with tokenizer.name_analyzer() as ana:
263 plist = [ana.normalize_postcode(p) for p in postcodes.split(',')]
267 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
268 if nctx.tokenizer == 'icu':
269 cur.execute("SELECT word FROM word WHERE type = 'P' and word = any(%s)",
272 cur.execute("""SELECT word FROM word WHERE word = any(%s)
273 and class = 'place' and type = 'postcode'""",
276 found = [row[0] for row in cur]
277 assert len(found) == len(set(found)), f"Duplicate rows for postcodes: {found}"
280 assert len(found) == 0, f"Unexpected postcodes: {found}"
282 assert set(found) == set(plist), \
283 f"Missing postcodes {set(plist) - set(found)}. Found: {found}"
285 @then("place_addressline contains")
286 def check_place_addressline(context):
287 """ Check the contents of the place_addressline table. Each row represents
288 a table row and all data must match. Data not present in the expected
289 table, may be arbitry. The rows are identified via the 'object' column,
290 representing the addressee and the 'address' column, representing the
293 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
294 for row in context.table:
295 nid = NominatimID(row['object'])
296 pid = nid.get_place_id(cur)
297 apid = NominatimID(row['address']).get_place_id(cur)
298 cur.execute(""" SELECT * FROM place_addressline
299 WHERE place_id = %s AND address_place_id = %s""",
301 assert cur.rowcount > 0, \
302 "No rows found for place %s and address %s" % (row['object'], row['address'])
305 DBRow(nid, res, context).assert_row(row, ('address', 'object'))
307 @then("place_addressline doesn't contain")
308 def check_place_addressline_exclude(context):
309 """ Check that the place_addressline doesn't contain any entries for the
310 given addressee/address item pairs.
312 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
313 for row in context.table:
314 pid = NominatimID(row['object']).get_place_id(cur)
315 apid = NominatimID(row['address']).get_place_id(cur, allow_empty=True)
317 cur.execute(""" SELECT * FROM place_addressline
318 WHERE place_id = %s AND address_place_id = %s""",
320 assert cur.rowcount == 0, \
321 "Row found for place %s and address %s" % (row['object'], row['address'])
323 @then("W(?P<oid>\d+) expands to(?P<neg> no)? interpolation")
324 def check_location_property_osmline(context, oid, neg):
325 """ Check that the given way is present in the interpolation table.
327 with context.db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
328 cur.execute("""SELECT *, ST_AsText(linegeo) as geomtxt
329 FROM location_property_osmline
330 WHERE osm_id = %s AND startnumber IS NOT NULL""",
334 assert cur.rowcount == 0, "Interpolation found for way {}.".format(oid)
337 todo = list(range(len(list(context.table))))
340 row = context.table[i]
341 if (int(row['start']) == res['startnumber']
342 and int(row['end']) == res['endnumber']):
346 assert False, "Unexpected row " + str(res)
348 DBRow(oid, res, context).assert_row(row, ('start', 'end'))