- def __len__(self) -> int:
- return len(self.files)
-
-
-def handle_threaded_sql_statements(pool: WorkerPool, fd: TextIO,
- analyzer: AbstractAnalyzer) -> None:
- """ Handles sql statement with multiplexing
- """
- lines = 0
- # Using pool of database connections to execute sql statements
-
- sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
-
- for row in csv.DictReader(fd, delimiter=';'):
- try:
- address = dict(street=row['street'], postcode=row['postcode'])
- args = ('SRID=4326;' + row['geometry'],
- int(row['from']), int(row['to']), row['interpolation'],
- Json(analyzer.process_place(PlaceInfo({'address': address}))),
- analyzer.normalize_postcode(row['postcode']))
- except ValueError:
- continue
- pool.next_free_worker().perform(sql, args=args)
-
- lines += 1
- if lines == 1000:
- print('.', end='', flush=True)
- lines = 0
+ def __iter__(self) -> Iterator[Dict[str, Any]]:
+ """ Iterate over the lines in each file.
+ """
+ for fname in self.files:
+ fd = self.get_file(fname)
+ yield from csv.DictReader(fd, delimiter=';')