- fetcher = DBConnection(self.dsn, cursor_factory=psycopg2.extras.DictCursor)
- psycopg2.extras.register_hstore(fetcher.conn,
- oid=hstore_oid,
- array_oid=hstore_array_oid)
-
- with WorkerPool(self.dsn, self.num_threads) as pool:
- places = self._fetch_next_batch(cur, fetcher, runner)
- while places is not None:
- if not places:
- t0 = time.time()
- fetcher.wait()
- fetcher_wait += time.time() - t0
- places = fetcher.cursor.fetchall()
-
- # asynchronously get the next batch
- next_places = self._fetch_next_batch(cur, fetcher, runner)
-
- # And insert the curent batch
- for idx in range(0, len(places), batch):
- t0 = time.time()
- worker = pool.next_free_worker()
- pool_wait += time.time() - t0
- part = places[idx:idx+batch]
- LOG.debug("Processing places: %s", str(part))
- runner.index_places(worker, part)
- progress.add(len(part))
+ with PlaceFetcher(self.dsn, conn) as fetcher:
+ with WorkerPool(self.dsn, self.num_threads) as pool:
+ has_more = fetcher.fetch_next_batch(cur, runner)
+ while has_more:
+ places = fetcher.get_batch()