- # place_addressline should not have duplicate (place_id, address_place_id)
- cur = context.db.cursor()
- cur.execute("""SELECT count(*) FROM
- (SELECT place_id, address_place_id, count(*) as c
- FROM place_addressline GROUP BY place_id, address_place_id) x
- WHERE c > 1""")
- assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"
-
-
-class NominatimID:
- """ Splits a unique identifier for places into its components.
- As place_ids cannot be used for testing, we use a unique
- identifier instead that is of the form <osmtype><osmid>[:<class>].
- """
-
- id_regex = re.compile(r"(?P<tp>[NRW])(?P<id>\d+)(:(?P<cls>\w+))?")
-
- def __init__(self, oid):
- self.typ = self.oid = self.cls = None
-
- if oid is not None:
- m = self.id_regex.fullmatch(oid)
- assert m is not None, "ID '%s' not of form <osmtype><osmid>[:<class>]" % oid
-
- self.typ = m.group('tp')
- self.oid = m.group('id')
- self.cls = m.group('cls')
-
- def __str__(self):
- if self.cls is None:
- return self.typ + self.oid
-
- return '%s%d:%s' % (self.typ, self.oid, self.cls)
-
- def table_select(self):
- """ Return where clause and parameter list to select the object
- from a Nominatim table.
- """
- where = 'osm_type = %s and osm_id = %s'
- params = [self.typ, self. oid]
-
- if self.cls is not None:
- where += ' and class = %s'
- params.append(self.cls)
-
- return where, params
-
- def get_place_id(self, cur):
- where, params = self.table_select()
- cur.execute("SELECT place_id FROM placex WHERE %s" % where, params)
- assert cur.rowcount == 1, \
- "Expected exactly 1 entry in placex for %s found %s" % (str(self), cur.rowcount)
+ with context.db.cursor() as cur:
+ # place_addressline should not have duplicate (place_id, address_place_id)
+ cur.execute("""SELECT count(*) FROM
+ (SELECT place_id, address_place_id, count(*) as c
+ FROM place_addressline GROUP BY place_id, address_place_id) x
+ WHERE c > 1""")
+ assert cur.fetchone()[0] == 0, "Duplicates found in place_addressline"