- while place_threads > 0:
- for key, _ in sel.select(1):
- conn = key.data
- sel.unregister(conn)
- try:
- conn.wait()
- except Exception as exc: # pylint: disable=broad-except
- LOG.info('Wrong SQL statement: %s', exc)
- conn.close()
- place_threads -= 1
-
-def add_tiger_data(dsn, data_dir, threads, config, sqllib_dir):
- """ Import tiger data from directory or tar file
- """
+ sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
+
+ for row in csv.DictReader(fd, delimiter=';'):
+ try:
+ address = dict(street=row['street'], postcode=row['postcode'])
+ args = ('SRID=4326;' + row['geometry'],
+ int(row['from']), int(row['to']), row['interpolation'],
+ psycopg2.extras.Json(analyzer.process_place(dict(address=address))),
+ analyzer.normalize_postcode(row['postcode']))
+ except ValueError:
+ continue
+ pool.next_free_worker().perform(sql, args=args)