Streaming Guide¶
This guide covers how to use streaming responses in DCAF for real-time, incremental updates from agents.
Table of Contents¶
- Overview
- Stream Event Types
- Server-Side Streaming
- Client-Side Consumption
- Error Handling
- Best Practices
Overview¶
DCAF supports streaming responses through the /api/sendMessageStream endpoint. Streaming provides:
- Real-time updates as the LLM generates tokens
- Progressive UI updates for better user experience
- Tool execution visibility before completion
- Early termination capability
Streaming Format¶
DCAF uses NDJSON (Newline-Delimited JSON) for streaming:
{"type":"text_delta","text":"Hello"}
{"type":"text_delta","text":" there!"}
{"type":"done","stop_reason":"end_turn"}
Each line is a complete JSON object that can be parsed independently.
Stream Event Types¶
DCAF supports 10 event types:
1. text_delta¶
Streaming text tokens from the LLM.
Use: Append text to the displayed response.
2. intermittent_update¶
Interim status updates emitted while the agent is working — before any text is produced.
Emitted at two points:
- When the LLM begins its reasoning phase →
"Thinking..." - When a tool invocation starts →
"Calling tool: <name>"
Fields:
| Field | Type | Description |
|---|---|---|
text |
string |
Human-readable status label |
content |
object |
Arbitrary metadata for the update (defaults to {}) |
Use: Display a transient indicator (spinner, status bar) so users know the agent is active. Use content to carry structured data (e.g. tool inputs, step counts) for richer UI.
3. tool_calls¶
Tool calls requiring user approval.
{
"type": "tool_calls",
"tool_calls": [
{
"id": "toolu_123",
"name": "delete_file",
"input": {"path": "/tmp/file.txt"},
"execute": false,
"tool_description": "Delete a file",
"input_description": {...}
}
]
}
Use: Display approval UI for tools.
4. executed_tool_calls¶
Tools that were executed automatically.
{
"type": "executed_tool_calls",
"executed_tool_calls": [
{
"id": "toolu_456",
"name": "get_status",
"input": {"service": "web-app"},
"output": "Service is running"
}
]
}
Use: Display executed tool information.
5. commands¶
Terminal commands for approval.
{
"type": "commands",
"commands": [
{
"command": "kubectl get pods -n production",
"execute": false,
"files": null
}
]
}
Use: Display command approval UI.
6. executed_commands¶
Commands that were executed.
{
"type": "executed_commands",
"executed_cmds": [
{
"command": "kubectl get pods",
"output": "NAME READY STATUS\nweb-123 1/1 Running"
}
]
}
Use: Display command output.
7. approvals¶
Unified approval items for user review. Supports tool calls, commands, and custom approval types through a type discriminator.
{
"type": "approvals",
"approvals": [
{
"id": "appr_123",
"type": "tool_call",
"name": "delete_pod",
"input": {"name": "nginx-1", "namespace": "default"},
"execute": false,
"description": "Delete Kubernetes pod nginx-1",
"intent": "Remove failing pod as requested"
}
]
}
Use: Display unified approval UI. The type field determines which UI variant to render.
Mutual exclusivity with legacy events
For every tool call or command, the server emits both an approvals event (unified) and the corresponding legacy event (tool_calls or commands) for backward compatibility. If your client handles approvals, you must not also handle tool_calls or commands — doing so will render duplicate approval dialogs for the same action. Pick one format and ignore the other:
- Legacy clients: handle
tool_callsandcommands, ignoreapprovals - Unified clients: handle
approvals, ignoretool_callsandcommands
When sending the approval decision back, use the matching format: data.approvals for unified clients, data.tool_calls / data.cmds for legacy clients.
8. executed_approvals¶
Approval items that were executed after user approval.
{
"type": "executed_approvals",
"executed_approvals": [
{
"id": "appr_123",
"type": "tool_call",
"name": "delete_pod",
"input": {"name": "nginx-1", "namespace": "default"},
"output": "pod \"nginx-1\" deleted"
}
]
}
Use: Display executed approval results.
9. done¶
Stream completed successfully.
Stop reasons:
- end_turn - Normal completion
- tool_use - Stopped for tool execution
- max_tokens - Token limit reached
Use: Finalize UI, enable user input.
10. error¶
Error during streaming.
Use: Display error message, offer retry.
Server-Side Streaming¶
Agent with Streaming Support¶
Agents that support streaming implement invoke_stream:
from dcaf.schemas.events import (
TextDeltaEvent,
ToolCallsEvent,
ExecutedToolCallsEvent,
ApprovalsEvent,
ExecutedApprovalsEvent,
DoneEvent,
ErrorEvent
)
from typing import Generator
class StreamingAgent:
def invoke_stream(
self,
messages: dict
) -> Generator:
"""Stream response events."""
try:
# Stream text deltas
for chunk in self._generate_response(messages):
yield TextDeltaEvent(text=chunk)
# Check for tool calls
if self.pending_tool_calls:
yield ToolCallsEvent(tool_calls=self.pending_tool_calls)
# Yield done event
yield DoneEvent(stop_reason="end_turn")
except Exception as e:
yield ErrorEvent(error=str(e))
Using BedrockLLM Streaming¶
from dcaf.llm import BedrockLLM
from dcaf.schemas.events import TextDeltaEvent, DoneEvent
llm = BedrockLLM()
def stream_response(messages):
for event in llm.invoke_stream(
messages=messages,
model_id="us.anthropic.claude-3-5-sonnet-20240620-v1:0",
max_tokens=1000
):
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
yield TextDeltaEvent(text=delta["text"])
elif "messageStop" in event:
reason = event["messageStop"].get("stopReason", "end_turn")
yield DoneEvent(stop_reason=reason)
Client-Side Consumption¶
Python Client¶
import requests
import json
def stream_chat(messages: list):
"""Stream responses from the agent."""
response = requests.post(
"http://localhost:8000/api/sendMessageStream",
json={"messages": messages},
stream=True
)
accumulated_text = ""
for line in response.iter_lines():
if line:
event = json.loads(line.decode('utf-8'))
event_type = event.get("type")
if event_type == "intermittent_update":
print(f"\r[{event.get('text')}]", end="", flush=True)
elif event_type == "text_delta":
text = event.get("text", "")
accumulated_text += text
print(text, end="", flush=True)
elif event_type == "tool_calls":
print("\n[Tool calls pending approval]")
for tc in event.get("tool_calls", []):
print(f" - {tc['name']}: {tc['input']}")
elif event_type == "executed_tool_calls":
for etc in event.get("executed_tool_calls", []):
print(f"\n[Executed: {etc['name']}]")
print(f" Output: {etc['output']}")
elif event_type == "commands":
print("\n[Commands pending approval]")
for cmd in event.get("commands", []):
print(f" $ {cmd['command']}")
elif event_type == "executed_commands":
for cmd in event.get("executed_cmds", []):
print(f"\n[Executed: {cmd['command']}]")
print(f" Output: {cmd['output']}")
elif event_type == "approvals":
print("\n[Approvals pending]")
for appr in event.get("approvals", []):
print(f" - [{appr['type']}] {appr['name']}: {appr['input']}")
elif event_type == "executed_approvals":
for appr in event.get("executed_approvals", []):
print(f"\n[Executed {appr['type']}: {appr['name']}]")
print(f" Output: {appr['output']}")
elif event_type == "done":
print(f"\n[Done: {event.get('stop_reason')}]")
return accumulated_text
elif event_type == "error":
print(f"\n[Error: {event.get('error')}]")
raise Exception(event.get("error"))
return accumulated_text
# Usage
result = stream_chat([
{"role": "user", "content": "Tell me about Kubernetes"}
])
JavaScript/TypeScript Client¶
async function streamChat(messages) {
const response = await fetch('/api/sendMessageStream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let accumulatedText = '';
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete lines
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line in buffer
for (const line of lines) {
if (!line.trim()) continue;
try {
const event = JSON.parse(line);
switch (event.type) {
case 'intermittent_update':
showStatus(event.text);
break;
case 'text_delta':
accumulatedText += event.text;
updateDisplay(event.text);
break;
case 'tool_calls':
showToolApproval(event.tool_calls);
break;
case 'executed_tool_calls':
showExecutedTools(event.executed_tool_calls);
break;
case 'commands':
showCommandApproval(event.commands);
break;
case 'executed_commands':
showExecutedCommands(event.executed_cmds);
break;
case 'approvals':
showApprovalUI(event.approvals);
break;
case 'executed_approvals':
showExecutedApprovals(event.executed_approvals);
break;
case 'done':
finalize(event.stop_reason);
return accumulatedText;
case 'error':
handleError(event.error);
throw new Error(event.error);
}
} catch (e) {
console.error('Parse error:', e);
}
}
}
return accumulatedText;
}
function updateDisplay(text) {
const display = document.getElementById('response');
display.textContent += text;
}
function showToolApproval(toolCalls) {
// Render tool approval UI
for (const tc of toolCalls) {
console.log(`Tool: ${tc.name}`, tc.input);
}
}
React Hook¶
import { useState, useCallback } from 'react';
function useStreamingChat() {
const [text, setText] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [status, setStatus] = useState('');
const [toolCalls, setToolCalls] = useState([]);
const [error, setError] = useState(null);
const sendMessage = useCallback(async (messages) => {
setText('');
setToolCalls([]);
setError(null);
setIsStreaming(true);
try {
const response = await fetch('/api/sendMessageStream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
if (!line.trim()) continue;
const event = JSON.parse(line);
switch (event.type) {
case 'intermittent_update':
setStatus(event.text);
break;
case 'text_delta':
setStatus('');
setText(prev => prev + event.text);
break;
case 'tool_calls':
setToolCalls(event.tool_calls);
break;
case 'error':
setError(event.error);
break;
}
}
}
} catch (e) {
setError(e.text);
} finally {
setIsStreaming(false);
}
}, []);
return { text, isStreaming, status, toolCalls, error, sendMessage };
}
// Usage in component
function ChatComponent() {
const { text, isStreaming, status, toolCalls, error, sendMessage } = useStreamingChat();
const [input, setInput] = useState('');
const handleSubmit = () => {
sendMessage([{ role: 'user', content: input }]);
setInput('');
};
return (
<div>
<div className="response">
{status && <div className="status-indicator">{status}</div>}
{text}
{isStreaming && !status && <span className="cursor">▌</span>}
</div>
{toolCalls.length > 0 && (
<div className="tool-calls">
{toolCalls.map(tc => (
<ToolApproval key={tc.id} toolCall={tc} />
))}
</div>
)}
{error && <div className="error">{error}</div>}
<input
value={input}
onChange={e => setInput(e.target.value)}
disabled={isStreaming}
/>
<button onClick={handleSubmit} disabled={isStreaming}>
Send
</button>
</div>
);
}
cURL¶
# Stream response
curl -N -X POST http://localhost:8000/api/sendMessageStream \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "Tell me a story"}]}'
# Output (each line is a separate event):
# {"type":"text_delta","text":"Once"}
# {"type":"text_delta","text":" upon"}
# {"type":"text_delta","text":" a"}
# {"type":"text_delta","text":" time"}
# ...
# {"type":"done","stop_reason":"end_turn"}
Error Handling¶
Client-Side Error Handling¶
import requests
import json
def safe_stream_chat(messages):
"""Stream with comprehensive error handling."""
try:
response = requests.post(
"http://localhost:8000/api/sendMessageStream",
json={"messages": messages},
stream=True,
timeout=60
)
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
try:
event = json.loads(line.decode('utf-8'))
except json.JSONDecodeError as e:
print(f"JSON parse error: {e}")
continue
event_type = event.get("type")
if event_type == "error":
error_msg = event.get("error", "Unknown error")
raise StreamError(error_msg)
elif event_type == "text_delta":
yield event.get("text", "")
elif event_type == "done":
return
except requests.exceptions.Timeout:
raise StreamError("Request timed out")
except requests.exceptions.ConnectionError:
raise StreamError("Connection failed")
except requests.exceptions.HTTPError as e:
raise StreamError(f"HTTP error: {e}")
class StreamError(Exception):
pass
# Usage with retry
def stream_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
result = ""
for chunk in safe_stream_chat(messages):
result += chunk
print(chunk, end="", flush=True)
return result
except StreamError as e:
if attempt < max_retries - 1:
print(f"\nRetrying ({attempt + 1}/{max_retries})...")
time.sleep(2 ** attempt)
else:
raise
Graceful Degradation¶
def chat_with_fallback(messages):
"""Try streaming, fall back to regular request."""
try:
# Try streaming first
result = ""
for chunk in safe_stream_chat(messages):
result += chunk
print(chunk, end="", flush=True)
return result
except StreamError:
# Fall back to non-streaming
print("\nStreaming failed, using regular request...")
response = requests.post(
"http://localhost:8000/api/sendMessage",
json={"messages": messages}
)
return response.json()["content"]
Best Practices¶
1. Buffer Incomplete Lines¶
NDJSON may arrive in chunks that don't align with line boundaries:
buffer = ""
for chunk in response.iter_content():
buffer += chunk.decode('utf-8')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
if line.strip():
event = json.loads(line)
process_event(event)
2. Handle Partial Updates¶
Don't assume events arrive in a specific order:
let state = {
text: '',
status: '',
toolCalls: [],
executedTools: [],
commands: [],
executedCommands: [],
approvals: [],
executedApprovals: [],
done: false,
error: null
};
function processEvent(event) {
switch (event.type) {
case 'intermittent_update':
state.status = event.text;
break;
case 'text_delta':
state.status = ''; // clear status once text starts arriving
state.text += event.text;
break;
case 'tool_calls':
state.toolCalls.push(...event.tool_calls);
break;
case 'executed_tool_calls':
state.executedTools.push(...event.executed_tool_calls);
break;
case 'commands':
state.commands.push(...event.commands);
break;
case 'executed_commands':
state.executedCommands.push(...event.executed_cmds);
break;
case 'approvals':
state.approvals.push(...event.approvals);
break;
case 'executed_approvals':
state.executedApprovals.push(...event.executed_approvals);
break;
case 'done':
state.done = true;
break;
case 'error':
state.error = event.error;
break;
}
updateUI(state);
}
3. Implement Timeouts¶
import time
def stream_with_timeout(messages, timeout=60):
"""Stream with overall timeout."""
start_time = time.time()
for chunk in safe_stream_chat(messages):
if time.time() - start_time > timeout:
raise TimeoutError("Stream timeout exceeded")
yield chunk
4. Show Progress Indicators¶
Use status events to display meaningful, real-time feedback rather than a generic spinner:
function StreamingResponse({ text, isStreaming, status }) {
return (
<div className="response">
{status && (
<span className="streaming-indicator">
<span className="spinner" />
<span className="status">{status}</span>
{/* e.g. "Thinking..." or "Calling tool: get_pods" */}
</span>
)}
{text}
{isStreaming && !status && <span className="cursor">▌</span>}
</div>
);
}
5. Enable Cancellation¶
const controller = new AbortController();
// Start streaming
fetch('/api/sendMessageStream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
signal: controller.signal
});
// Cancel if needed
document.getElementById('cancel').onclick = () => {
controller.abort();
};
6. Batch UI Updates¶
let pendingText = '';
let updateScheduled = false;
function processTextDelta(text) {
pendingText += text;
if (!updateScheduled) {
updateScheduled = true;
requestAnimationFrame(() => {
document.getElementById('response').textContent += pendingText;
pendingText = '';
updateScheduled = false;
});
}
}