-
-class PlaceFetcher:
- """ Asynchronous connection that fetches place details for processing.
- """
- def __init__(self, dsn: str, setup_conn: Connection) -> None:
- self.wait_time = 0.0
- self.current_ids: Optional[DictCursorResults] = None
- self.conn: Optional[DBConnection] = DBConnection(dsn,
- cursor_factory=psycopg2.extras.DictCursor)
-
- # need to fetch those manually because register_hstore cannot
- # fetch them on an asynchronous connection below.
- hstore_oid = execute_scalar(setup_conn, "SELECT 'hstore'::regtype::oid")
- hstore_array_oid = execute_scalar(setup_conn, "SELECT 'hstore[]'::regtype::oid")
-
- psycopg2.extras.register_hstore(self.conn.conn, oid=hstore_oid,
- array_oid=hstore_array_oid)
-
-
- def close(self) -> None:
- """ Close the underlying asynchronous connection.
- """
- if self.conn:
- self.conn.close()
- self.conn = None
-
-
- def fetch_next_batch(self, cur: Cursor, runner: runners.Runner) -> bool:
- """ Send a request for the next batch of places.
- If details for the places are required, they will be fetched
- asynchronously.
-
- Returns true if there is still data available.
- """
- ids = cast(Optional[DictCursorResults], cur.fetchmany(100))
-
- if not ids:
- self.current_ids = None
- return False
-
- assert self.conn is not None
- self.current_ids = runner.get_place_details(self.conn, ids)
-
- return True
-
- def get_batch(self) -> DictCursorResults:
- """ Get the next batch of data, previously requested with
- `fetch_next_batch`.
- """
- assert self.conn is not None
- assert self.conn.cursor is not None
-
- if self.current_ids is not None and not self.current_ids:
- tstart = time.time()
- self.conn.wait()
- self.wait_time += time.time() - tstart
- self.current_ids = cast(Optional[DictCursorResults],
- self.conn.cursor.fetchall())
-
- return self.current_ids if self.current_ids is not None else []
-
- def __enter__(self) -> 'PlaceFetcher':
- return self
-
-
- def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
- assert self.conn is not None
- self.conn.wait()
- self.close()
-
-