- while place_threads > 0:
- for key, _ in sel.select(1):
- conn = key.data
- sel.unregister(conn)
- try:
- conn.wait()
- except Exception as exc: # pylint: disable=broad-except
- LOG.info('Wrong SQL statement: %s', exc)
- conn.close()
- place_threads -= 1
-
-def add_tiger_data(data_dir, config, threads):
+ sql = "SELECT tiger_line_import(%s, %s, %s, %s, %s, %s)"
+
+ for row in csv.DictReader(fd, delimiter=';'):
+ try:
+ address = dict(street=row['street'], postcode=row['postcode'])
+ args = ('SRID=4326;' + row['geometry'],
+ int(row['from']), int(row['to']), row['interpolation'],
+ psycopg2.extras.Json(analyzer.process_place(dict(address=address))),
+ analyzer.normalize_postcode(row['postcode']))
+ except ValueError:
+ continue
+ pool.next_free_worker().perform(sql, args=args)
+
+ lines += 1
+ if lines == 1000:
+ print('.', end='', flush=True)
+ lines = 0
+
+
+def add_tiger_data(data_dir, config, threads, tokenizer):