-def handle_threaded_sql_statements(pool: WorkerPool, fd: TextIO,
- analyzer: AbstractAnalyzer) -> None:
- """ Handles sql statement with multiplexing
- """
- lines = 0
- # Using pool of database connections to execute sql statements
-
- sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
-
- for row in csv.DictReader(fd, delimiter=';'):
- try:
- address = dict(street=row['street'], postcode=row['postcode'])
- args = ('SRID=4326;' + row['geometry'],
- int(row['from']), int(row['to']), row['interpolation'],
- Json(analyzer.process_place(PlaceInfo({'address': address}))),
- analyzer.normalize_postcode(row['postcode']))
- except ValueError:
- continue
- pool.next_free_worker().perform(sql, args=args)
-
- lines += 1
- if lines == 1000:
- print('.', end='', flush=True)
- lines = 0
-
-
-def add_tiger_data(data_dir: str, config: Configuration, threads: int,
- tokenizer: AbstractTokenizer) -> int:
+async def add_tiger_data(data_dir: str, config: Configuration, threads: int,
+ tokenizer: AbstractTokenizer) -> int: