return self
+ def size(self) -> int:
+ """ Return the number of bytes the buffer currently contains.
+ """
+ return self.buffer.tell()
+
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
if self.buffer is not None:
self.buffer.close()
def copy_out(self, cur: Cursor, table: str, columns: Optional[Iterable[str]] = None) -> None:
""" Copy all collected data into the given table.
+
+ The buffer is empty and reusable after this operation.
"""
if self.buffer.tell() > 0:
self.buffer.seek(0)
cur.copy_from(self.buffer, table, columns=columns)
+ self.buffer = io.StringIO()