Skip to content

Streaming Guide

This guide covers how to use streaming responses in DCAF for real-time, incremental updates from agents.


Table of Contents

  1. Overview
  2. Stream Event Types
  3. Server-Side Streaming
  4. Client-Side Consumption
  5. Error Handling
  6. Best Practices

Overview

DCAF supports streaming responses through the /api/sendMessageStream endpoint. Streaming provides:

  • Real-time updates as the LLM generates tokens
  • Progressive UI updates for better user experience
  • Tool execution visibility before completion
  • Early termination capability

Streaming Format

DCAF uses NDJSON (Newline-Delimited JSON) for streaming:

{"type":"text_delta","text":"Hello"}
{"type":"text_delta","text":" there!"}
{"type":"done","stop_reason":"end_turn"}

Each line is a complete JSON object that can be parsed independently.


Stream Event Types

DCAF supports 10 event types:

1. text_delta

Streaming text tokens from the LLM.

{
    "type": "text_delta",
    "text": "Hello"
}

Use: Append text to the displayed response.

2. intermittent_update

Interim status updates emitted while the agent is working — before any text is produced.

{
    "type": "intermittent_update",
    "text": "Thinking...",
    "content": {}
}
{
    "type": "intermittent_update",
    "text": "Calling tool: get_pods",
    "content": {}
}

Emitted at two points:

  • When the LLM begins its reasoning phase → "Thinking..."
  • When a tool invocation starts → "Calling tool: <name>"

Fields:

Field Type Description
text string Human-readable status label
content object Arbitrary metadata for the update (defaults to {})

Use: Display a transient indicator (spinner, status bar) so users know the agent is active. Use content to carry structured data (e.g. tool inputs, step counts) for richer UI.

3. tool_calls

Tool calls requiring user approval.

{
    "type": "tool_calls",
    "tool_calls": [
        {
            "id": "toolu_123",
            "name": "delete_file",
            "input": {"path": "/tmp/file.txt"},
            "execute": false,
            "tool_description": "Delete a file",
            "input_description": {...}
        }
    ]
}

Use: Display approval UI for tools.

4. executed_tool_calls

Tools that were executed automatically.

{
    "type": "executed_tool_calls",
    "executed_tool_calls": [
        {
            "id": "toolu_456",
            "name": "get_status",
            "input": {"service": "web-app"},
            "output": "Service is running"
        }
    ]
}

Use: Display executed tool information.

5. commands

Terminal commands for approval.

{
    "type": "commands",
    "commands": [
        {
            "command": "kubectl get pods -n production",
            "execute": false,
            "files": null
        }
    ]
}

Use: Display command approval UI.

6. executed_commands

Commands that were executed.

{
    "type": "executed_commands",
    "executed_cmds": [
        {
            "command": "kubectl get pods",
            "output": "NAME        READY   STATUS\nweb-123   1/1     Running"
        }
    ]
}

Use: Display command output.

7. approvals

Unified approval items for user review. Supports tool calls, commands, and custom approval types through a type discriminator.

{
    "type": "approvals",
    "approvals": [
        {
            "id": "appr_123",
            "type": "tool_call",
            "name": "delete_pod",
            "input": {"name": "nginx-1", "namespace": "default"},
            "execute": false,
            "description": "Delete Kubernetes pod nginx-1",
            "intent": "Remove failing pod as requested"
        }
    ]
}

Use: Display unified approval UI. The type field determines which UI variant to render.

Mutual exclusivity with legacy events

For every tool call or command, the server emits both an approvals event (unified) and the corresponding legacy event (tool_calls or commands) for backward compatibility. If your client handles approvals, you must not also handle tool_calls or commands — doing so will render duplicate approval dialogs for the same action. Pick one format and ignore the other:

  • Legacy clients: handle tool_calls and commands, ignore approvals
  • Unified clients: handle approvals, ignore tool_calls and commands

When sending the approval decision back, use the matching format: data.approvals for unified clients, data.tool_calls / data.cmds for legacy clients.

8. executed_approvals

Approval items that were executed after user approval.

{
    "type": "executed_approvals",
    "executed_approvals": [
        {
            "id": "appr_123",
            "type": "tool_call",
            "name": "delete_pod",
            "input": {"name": "nginx-1", "namespace": "default"},
            "output": "pod \"nginx-1\" deleted"
        }
    ]
}

Use: Display executed approval results.

9. done

Stream completed successfully.

{
    "type": "done",
    "stop_reason": "end_turn"
}

Stop reasons: - end_turn - Normal completion - tool_use - Stopped for tool execution - max_tokens - Token limit reached

Use: Finalize UI, enable user input.

10. error

Error during streaming.

{
    "type": "error",
    "error": "Connection timeout"
}

Use: Display error message, offer retry.


Server-Side Streaming

Agent with Streaming Support

Agents that support streaming implement invoke_stream:

from dcaf.schemas.events import (
    TextDeltaEvent,
    ToolCallsEvent,
    ExecutedToolCallsEvent,
    ApprovalsEvent,
    ExecutedApprovalsEvent,
    DoneEvent,
    ErrorEvent
)
from typing import Generator

class StreamingAgent:
    def invoke_stream(
        self, 
        messages: dict
    ) -> Generator:
        """Stream response events."""
        try:
            # Stream text deltas
            for chunk in self._generate_response(messages):
                yield TextDeltaEvent(text=chunk)

            # Check for tool calls
            if self.pending_tool_calls:
                yield ToolCallsEvent(tool_calls=self.pending_tool_calls)

            # Yield done event
            yield DoneEvent(stop_reason="end_turn")

        except Exception as e:
            yield ErrorEvent(error=str(e))

Using BedrockLLM Streaming

from dcaf.llm import BedrockLLM
from dcaf.schemas.events import TextDeltaEvent, DoneEvent

llm = BedrockLLM()

def stream_response(messages):
    for event in llm.invoke_stream(
        messages=messages,
        model_id="us.anthropic.claude-3-5-sonnet-20240620-v1:0",
        max_tokens=1000
    ):
        if "contentBlockDelta" in event:
            delta = event["contentBlockDelta"].get("delta", {})
            if "text" in delta:
                yield TextDeltaEvent(text=delta["text"])

        elif "messageStop" in event:
            reason = event["messageStop"].get("stopReason", "end_turn")
            yield DoneEvent(stop_reason=reason)

Client-Side Consumption

Python Client

import requests
import json

def stream_chat(messages: list):
    """Stream responses from the agent."""
    response = requests.post(
        "http://localhost:8000/api/sendMessageStream",
        json={"messages": messages},
        stream=True
    )

    accumulated_text = ""

    for line in response.iter_lines():
        if line:
            event = json.loads(line.decode('utf-8'))
            event_type = event.get("type")

            if event_type == "intermittent_update":
                print(f"\r[{event.get('text')}]", end="", flush=True)

            elif event_type == "text_delta":
                text = event.get("text", "")
                accumulated_text += text
                print(text, end="", flush=True)

            elif event_type == "tool_calls":
                print("\n[Tool calls pending approval]")
                for tc in event.get("tool_calls", []):
                    print(f"  - {tc['name']}: {tc['input']}")

            elif event_type == "executed_tool_calls":
                for etc in event.get("executed_tool_calls", []):
                    print(f"\n[Executed: {etc['name']}]")
                    print(f"  Output: {etc['output']}")

            elif event_type == "commands":
                print("\n[Commands pending approval]")
                for cmd in event.get("commands", []):
                    print(f"  $ {cmd['command']}")

            elif event_type == "executed_commands":
                for cmd in event.get("executed_cmds", []):
                    print(f"\n[Executed: {cmd['command']}]")
                    print(f"  Output: {cmd['output']}")

            elif event_type == "approvals":
                print("\n[Approvals pending]")
                for appr in event.get("approvals", []):
                    print(f"  - [{appr['type']}] {appr['name']}: {appr['input']}")

            elif event_type == "executed_approvals":
                for appr in event.get("executed_approvals", []):
                    print(f"\n[Executed {appr['type']}: {appr['name']}]")
                    print(f"  Output: {appr['output']}")

            elif event_type == "done":
                print(f"\n[Done: {event.get('stop_reason')}]")
                return accumulated_text

            elif event_type == "error":
                print(f"\n[Error: {event.get('error')}]")
                raise Exception(event.get("error"))

    return accumulated_text

# Usage
result = stream_chat([
    {"role": "user", "content": "Tell me about Kubernetes"}
])

JavaScript/TypeScript Client

async function streamChat(messages) {
    const response = await fetch('/api/sendMessageStream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    let accumulatedText = '';
    let buffer = '';

    while (true) {
        const { value, done } = await reader.read();

        if (done) break;

        buffer += decoder.decode(value, { stream: true });

        // Process complete lines
        const lines = buffer.split('\n');
        buffer = lines.pop(); // Keep incomplete line in buffer

        for (const line of lines) {
            if (!line.trim()) continue;

            try {
                const event = JSON.parse(line);

                switch (event.type) {
                    case 'intermittent_update':
                        showStatus(event.text);
                        break;

                    case 'text_delta':
                        accumulatedText += event.text;
                        updateDisplay(event.text);
                        break;

                    case 'tool_calls':
                        showToolApproval(event.tool_calls);
                        break;

                    case 'executed_tool_calls':
                        showExecutedTools(event.executed_tool_calls);
                        break;

                    case 'commands':
                        showCommandApproval(event.commands);
                        break;

                    case 'executed_commands':
                        showExecutedCommands(event.executed_cmds);
                        break;

                    case 'approvals':
                        showApprovalUI(event.approvals);
                        break;

                    case 'executed_approvals':
                        showExecutedApprovals(event.executed_approvals);
                        break;

                    case 'done':
                        finalize(event.stop_reason);
                        return accumulatedText;

                    case 'error':
                        handleError(event.error);
                        throw new Error(event.error);
                }
            } catch (e) {
                console.error('Parse error:', e);
            }
        }
    }

    return accumulatedText;
}

function updateDisplay(text) {
    const display = document.getElementById('response');
    display.textContent += text;
}

function showToolApproval(toolCalls) {
    // Render tool approval UI
    for (const tc of toolCalls) {
        console.log(`Tool: ${tc.name}`, tc.input);
    }
}

React Hook

import { useState, useCallback } from 'react';

function useStreamingChat() {
    const [text, setText] = useState('');
    const [isStreaming, setIsStreaming] = useState(false);
    const [status, setStatus] = useState('');
    const [toolCalls, setToolCalls] = useState([]);
    const [error, setError] = useState(null);

    const sendMessage = useCallback(async (messages) => {
        setText('');
        setToolCalls([]);
        setError(null);
        setIsStreaming(true);

        try {
            const response = await fetch('/api/sendMessageStream', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ messages })
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder();

            let buffer = '';

            while (true) {
                const { value, done } = await reader.read();

                if (done) break;

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n');
                buffer = lines.pop();

                for (const line of lines) {
                    if (!line.trim()) continue;

                    const event = JSON.parse(line);

                    switch (event.type) {
                        case 'intermittent_update':
                            setStatus(event.text);
                            break;
                        case 'text_delta':
                            setStatus('');
                            setText(prev => prev + event.text);
                            break;
                        case 'tool_calls':
                            setToolCalls(event.tool_calls);
                            break;
                        case 'error':
                            setError(event.error);
                            break;
                    }
                }
            }
        } catch (e) {
            setError(e.text);
        } finally {
            setIsStreaming(false);
        }
    }, []);

    return { text, isStreaming, status, toolCalls, error, sendMessage };
}

// Usage in component
function ChatComponent() {
    const { text, isStreaming, status, toolCalls, error, sendMessage } = useStreamingChat();
    const [input, setInput] = useState('');

    const handleSubmit = () => {
        sendMessage([{ role: 'user', content: input }]);
        setInput('');
    };

    return (
        <div>
            <div className="response">
                {status && <div className="status-indicator">{status}</div>}
                {text}
                {isStreaming && !status && <span className="cursor"></span>}
            </div>

            {toolCalls.length > 0 && (
                <div className="tool-calls">
                    {toolCalls.map(tc => (
                        <ToolApproval key={tc.id} toolCall={tc} />
                    ))}
                </div>
            )}

            {error && <div className="error">{error}</div>}

            <input
                value={input}
                onChange={e => setInput(e.target.value)}
                disabled={isStreaming}
            />
            <button onClick={handleSubmit} disabled={isStreaming}>
                Send
            </button>
        </div>
    );
}

cURL

# Stream response
curl -N -X POST http://localhost:8000/api/sendMessageStream \
  -H "Content-Type: application/json" \
  -d '{"messages": [{"role": "user", "content": "Tell me a story"}]}'

# Output (each line is a separate event):
# {"type":"text_delta","text":"Once"}
# {"type":"text_delta","text":" upon"}
# {"type":"text_delta","text":" a"}
# {"type":"text_delta","text":" time"}
# ...
# {"type":"done","stop_reason":"end_turn"}

Error Handling

Client-Side Error Handling

import requests
import json

def safe_stream_chat(messages):
    """Stream with comprehensive error handling."""
    try:
        response = requests.post(
            "http://localhost:8000/api/sendMessageStream",
            json={"messages": messages},
            stream=True,
            timeout=60
        )
        response.raise_for_status()

        for line in response.iter_lines():
            if not line:
                continue

            try:
                event = json.loads(line.decode('utf-8'))
            except json.JSONDecodeError as e:
                print(f"JSON parse error: {e}")
                continue

            event_type = event.get("type")

            if event_type == "error":
                error_msg = event.get("error", "Unknown error")
                raise StreamError(error_msg)

            elif event_type == "text_delta":
                yield event.get("text", "")

            elif event_type == "done":
                return

    except requests.exceptions.Timeout:
        raise StreamError("Request timed out")

    except requests.exceptions.ConnectionError:
        raise StreamError("Connection failed")

    except requests.exceptions.HTTPError as e:
        raise StreamError(f"HTTP error: {e}")

class StreamError(Exception):
    pass

# Usage with retry
def stream_with_retry(messages, max_retries=3):
    for attempt in range(max_retries):
        try:
            result = ""
            for chunk in safe_stream_chat(messages):
                result += chunk
                print(chunk, end="", flush=True)
            return result
        except StreamError as e:
            if attempt < max_retries - 1:
                print(f"\nRetrying ({attempt + 1}/{max_retries})...")
                time.sleep(2 ** attempt)
            else:
                raise

Graceful Degradation

def chat_with_fallback(messages):
    """Try streaming, fall back to regular request."""
    try:
        # Try streaming first
        result = ""
        for chunk in safe_stream_chat(messages):
            result += chunk
            print(chunk, end="", flush=True)
        return result

    except StreamError:
        # Fall back to non-streaming
        print("\nStreaming failed, using regular request...")
        response = requests.post(
            "http://localhost:8000/api/sendMessage",
            json={"messages": messages}
        )
        return response.json()["content"]

Best Practices

1. Buffer Incomplete Lines

NDJSON may arrive in chunks that don't align with line boundaries:

buffer = ""

for chunk in response.iter_content():
    buffer += chunk.decode('utf-8')

    while '\n' in buffer:
        line, buffer = buffer.split('\n', 1)
        if line.strip():
            event = json.loads(line)
            process_event(event)

2. Handle Partial Updates

Don't assume events arrive in a specific order:

let state = {
    text: '',
    status: '',
    toolCalls: [],
    executedTools: [],
    commands: [],
    executedCommands: [],
    approvals: [],
    executedApprovals: [],
    done: false,
    error: null
};

function processEvent(event) {
    switch (event.type) {
        case 'intermittent_update':
            state.status = event.text;
            break;
        case 'text_delta':
            state.status = '';  // clear status once text starts arriving
            state.text += event.text;
            break;
        case 'tool_calls':
            state.toolCalls.push(...event.tool_calls);
            break;
        case 'executed_tool_calls':
            state.executedTools.push(...event.executed_tool_calls);
            break;
        case 'commands':
            state.commands.push(...event.commands);
            break;
        case 'executed_commands':
            state.executedCommands.push(...event.executed_cmds);
            break;
        case 'approvals':
            state.approvals.push(...event.approvals);
            break;
        case 'executed_approvals':
            state.executedApprovals.push(...event.executed_approvals);
            break;
        case 'done':
            state.done = true;
            break;
        case 'error':
            state.error = event.error;
            break;
    }

    updateUI(state);
}

3. Implement Timeouts

import time

def stream_with_timeout(messages, timeout=60):
    """Stream with overall timeout."""
    start_time = time.time()

    for chunk in safe_stream_chat(messages):
        if time.time() - start_time > timeout:
            raise TimeoutError("Stream timeout exceeded")
        yield chunk

4. Show Progress Indicators

Use status events to display meaningful, real-time feedback rather than a generic spinner:

function StreamingResponse({ text, isStreaming, status }) {
    return (
        <div className="response">
            {status && (
                <span className="streaming-indicator">
                    <span className="spinner" />
                    <span className="status">{status}</span>
                    {/* e.g. "Thinking..." or "Calling tool: get_pods" */}
                </span>
            )}
            {text}
            {isStreaming && !status && <span className="cursor"></span>}
        </div>
    );
}

5. Enable Cancellation

const controller = new AbortController();

// Start streaming
fetch('/api/sendMessageStream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages }),
    signal: controller.signal
});

// Cancel if needed
document.getElementById('cancel').onclick = () => {
    controller.abort();
};

6. Batch UI Updates

let pendingText = '';
let updateScheduled = false;

function processTextDelta(text) {
    pendingText += text;

    if (!updateScheduled) {
        updateScheduled = true;
        requestAnimationFrame(() => {
            document.getElementById('response').textContent += pendingText;
            pendingText = '';
            updateScheduled = false;
        });
    }
}

See Also