Server¶
DCAF Core provides simple utilities to expose your agent as a REST API server with minimal configuration.
Quick Start¶
from dcaf.core import Agent, serve
from dcaf.tools import tool
@tool(description="Get current time")
def get_time() -> str:
from datetime import datetime
return datetime.now().isoformat()
agent = Agent(tools=[get_time])
serve(agent) # Server at http://0.0.0.0:8000
That's it. Your agent is now accessible via HTTP.
Configuration¶
Port and Host¶
# Custom port
serve(agent, port=8080)
# Custom host and port
serve(agent, host="0.0.0.0", port=3000)
# Development mode with auto-reload
serve(agent, port=8000, reload=True)
Production Configuration¶
For production deployments, configure worker processes and keep-alive timeouts:
serve(
agent,
port=8000,
workers=4, # Multiple workers for parallelism
timeout_keep_alive=30, # Match your load balancer's idle timeout
log_level="warning",
)
| Parameter | Default | Description |
|---|---|---|
workers |
1 |
Number of worker processes. For production, use (2 × cpu_cores) + 1. |
timeout_keep_alive |
5 |
Keep-alive timeout in seconds. Set this to match or exceed your load balancer's idle timeout (e.g., AWS ALB defaults to 60s). |
reload and workers are mutually exclusive
You cannot use reload=True with workers > 1. Use workers=1 for development with hot reload.
Programmatic Control¶
If you need more control over the FastAPI app:
from dcaf.core import Agent, create_app
import uvicorn
agent = Agent(tools=[...])
app = create_app(agent)
# Add custom endpoints
@app.get("/custom")
def custom_endpoint():
return {"message": "Hello from custom endpoint"}
# Run with custom configuration
uvicorn.run(app, host="0.0.0.0", port=8000, workers=4)
Endpoints¶
| Endpoint | Method | Description |
|---|---|---|
/health |
GET | Health check (always responds immediately) |
/api/chat |
POST | Synchronous chat |
/api/chat-stream |
POST | Streaming chat (NDJSON) |
/api/chat-ws |
WebSocket | Bidirectional streaming chat |
Legacy Endpoints (V1 Code Path)¶
For backwards compatibility with existing v1 clients, the following endpoints are preserved:
| Legacy Endpoint | Preferred Endpoint | Code Path |
|---|---|---|
POST /api/sendMessage |
POST /api/chat |
V1 (dcaf.agent_server) |
POST /api/sendMessageStream |
POST /api/chat-stream |
V1 (dcaf.agent_server) |
Strangler Fig Migration (ADR-006)
Legacy endpoints use the V1 code path from dcaf.agent_server, while new endpoints use the V2 code path from dcaf.core. This follows the Strangler Fig migration pattern.
Key differences:
| Feature | V2 (/api/chat) |
V1 (/api/sendMessage) |
|---|---|---|
_request_fields forwarding |
✅ Yes | ❌ No |
meta_data.request_context echo |
✅ Yes | ❌ No |
| WebSocket support | ✅ Yes | ❌ No |
| Response format | V2 AgentMessage |
V1 AgentMessage |
Existing integrations continue to work without any code changes.
When to Use Each Endpoint
- New projects: Use
/api/chat,/api/chat-stream, and/api/chat-ws(V2) - Existing v1 integrations: Continue using
/api/sendMessageand/api/sendMessageStream - Unified server:
dcaf.core.create_app()exposes both V1 and V2 endpoints simultaneously
Why the Rename? (ADR-007)¶
The endpoint names were changed for three reasons:
- Future-proofing: Lowercase URLs avoid case-sensitivity issues if security middleware is added later
- Semantic accuracy: "chat" better describes bidirectional conversation than "sendMessage"
- REST conventions: Lowercase, hyphenated paths follow RESTful best practices
See ADR-007: Lowercase Chat Endpoints for the full rationale.
Request Format¶
All chat endpoints accept the same request body:
With Conversation History¶
{
"messages": [
{"role": "user", "content": "What time is it?"},
{"role": "assistant", "content": "The current time is 2024-01-15T14:30:00"},
{"role": "user", "content": "What about in Tokyo?"}
]
}
Last Message is Current
The last message in the array is always treated as the current user message. All previous messages are conversation history.
With Platform Context¶
{
"messages": [
{
"role": "user",
"content": "List the pods",
"platform_context": {
"tenant_name": "acme-corp",
"k8s_namespace": "production"
}
}
]
}
Response Format¶
Synchronous Response (/api/chat)¶
{
"role": "assistant",
"content": "The current time is 2024-01-15T14:30:00",
"data": {
"tool_calls": [],
"executed_tool_calls": []
}
}
Tool Approval Required¶
When a tool requires approval:
{
"role": "assistant",
"content": "I need your approval to execute the following tools:",
"data": {
"tool_calls": [
{
"id": "tc_123",
"name": "delete_pod",
"input": {"name": "nginx-abc", "namespace": "production"},
"execute": false,
"tool_description": "Delete a Kubernetes pod",
"input_description": {}
}
]
}
}
Approving Tool Calls¶
To approve a tool call, send back the same tool call with execute: true:
{
"messages": [
{"role": "user", "content": "Delete the failing pod"},
{
"role": "assistant",
"content": "I need your approval...",
"data": {
"tool_calls": [
{
"id": "tc_123",
"name": "delete_pod",
"input": {"name": "nginx-abc"},
"execute": true
}
]
}
},
{"role": "user", "content": "Yes, approved"}
]
}
Streaming (/api/chat-stream)¶
The streaming endpoint returns NDJSON (newline-delimited JSON):
curl -X POST http://localhost:8000/api/chat-stream \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "Tell me about Kubernetes"}]}'
Response stream:
{"type": "text_delta", "text": "Kubernetes"}
{"type": "text_delta", "text": " is"}
{"type": "text_delta", "text": " a"}
{"type": "text_delta", "text": " container"}
{"type": "text_delta", "text": " orchestration"}
{"type": "text_delta", "text": " platform..."}
{"type": "done"}
Event Types¶
| Event Type | Description |
|---|---|
text_delta |
Text token(s) from the LLM |
tool_calls |
Tool calls requiring approval |
executed_tool_calls |
Results from executed tools |
done |
Stream completed successfully |
error |
An error occurred |
Handling Streams in Python¶
import httpx
with httpx.stream(
"POST",
"http://localhost:8000/api/chat-stream",
json={"messages": [{"role": "user", "content": "Hello"}]},
) as response:
for line in response.iter_lines():
if line:
event = json.loads(line)
if event["type"] == "text_delta":
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print("\n--- Done ---")
Handling Streams in JavaScript¶
const response = await fetch("http://localhost:8000/api/chat-stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: [{ role: "user", content: "Hello" }]
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split("\n");
for (const line of lines) {
if (line) {
const event = JSON.parse(line);
if (event.type === "text_delta") {
process.stdout.write(event.text);
}
}
}
}
Async / Non-Blocking¶
All LLM calls run in a thread pool, so:
- Health checks always respond immediately (not blocked by long LLM calls)
- Multiple concurrent requests are handled properly
- Kubernetes liveness probes won't timeout during LLM calls
This is critical for production deployments where a 15-second LLM call could otherwise cause health check failures and container restarts.
WebSocket (/api/chat-ws)¶
The WebSocket endpoint provides bidirectional streaming chat over a persistent connection. Unlike the HTTP endpoints, a single WebSocket connection stays open for multiple conversation turns.
Connecting¶
Message Format¶
Each client frame is a JSON object with the same shape as the HTTP endpoints:
The server streams back event frames (same types as /api/chat-stream), ending each turn with a done event. The connection remains open for the next turn.
Python Client¶
import asyncio
import json
import websockets
async def chat():
async with websockets.connect("ws://localhost:8000/api/chat-ws") as ws:
# Turn 1
await ws.send(json.dumps({
"messages": [{"role": "user", "content": "What is Kubernetes?"}]
}))
async for frame in ws:
event = json.loads(frame)
if event["type"] == "text_delta":
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print()
break
# Turn 2 — same connection
await ws.send(json.dumps({
"messages": [
{"role": "user", "content": "What is Kubernetes?"},
{"role": "assistant", "content": "Kubernetes is a container orchestration platform..."},
{"role": "user", "content": "How do I list pods?"},
]
}))
async for frame in ws:
event = json.loads(frame)
if event["type"] == "text_delta":
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print()
break
asyncio.run(chat())
JavaScript Client¶
const ws = new WebSocket("ws://localhost:8000/api/chat-ws");
ws.onopen = () => {
ws.send(JSON.stringify({
messages: [{ role: "user", content: "Hello" }]
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "text_delta") {
process.stdout.write(data.text);
} else if (data.type === "done") {
console.log("\n--- Turn complete ---");
} else if (data.type === "error") {
console.error("Error:", data.error);
}
};
Error Handling¶
Errors during a turn are sent as error events without closing the connection. The client can continue sending messages after an error:
Invalid JSON or missing messages fields also return error events while keeping the connection alive.
Connection Stability¶
DCAF uses uvicorn's built-in WebSocket ping/pong mechanism to detect dead connections. By default, the server sends a ping frame every 20 seconds and expects a pong reply within 20 seconds. If no pong is received, the server closes the connection.
You can tune these values via serve():
Set to None to disable automatic pings:
Load Balancer Considerations
Many load balancers (e.g., AWS ALB, nginx) enforce their own idle timeouts, typically 60–120 seconds. Ensure ws_ping_interval is shorter than your load balancer's idle timeout so that ping frames keep the connection active.
Client-Side Reconnection¶
WebSocket connections can drop due to network issues, server restarts, or load balancer timeouts. Clients should implement reconnection logic:
import asyncio
import json
import websockets
from websockets.exceptions import ConnectionClosed
async def resilient_chat(url: str, message: str):
backoff = 1
while True:
try:
async with websockets.connect(url) as ws:
backoff = 1 # Reset on successful connect
await ws.send(json.dumps({
"messages": [{"role": "user", "content": message}]
}))
async for frame in ws:
event = json.loads(frame)
if event["type"] == "text_delta":
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print()
return # Success
elif event["type"] == "error":
print(f"\nError: {event['error']}")
return
except (ConnectionClosed, OSError) as e:
print(f"\nConnection lost: {e}. Reconnecting in {backoff}s...")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 30) # Exponential backoff, max 30s
function createResilientWebSocket(url, onEvent) {
let backoff = 1000;
function connect() {
const ws = new WebSocket(url);
ws.onopen = () => { backoff = 1000; };
ws.onmessage = (msg) => {
const event = JSON.parse(msg.data);
onEvent(event);
};
ws.onclose = (e) => {
if (e.code !== 1000) { // Abnormal close
console.log(`Reconnecting in ${backoff}ms...`);
setTimeout(connect, backoff);
backoff = Math.min(backoff * 2, 30000);
}
};
ws.onerror = () => ws.close();
return ws;
}
return connect();
}
// Usage
createResilientWebSocket("ws://localhost:8000/api/chat-ws", (event) => {
if (event.type === "text_delta") process.stdout.write(event.text);
else if (event.type === "done") console.log("\n--- Done ---");
});
When to Use WebSocket vs HTTP¶
| Use Case | Recommended Endpoint |
|---|---|
| Single request/response | /api/chat |
| Streaming a single response | /api/chat-stream |
| Multi-turn conversation with streaming | /api/chat-ws |
| Real-time interactive UI | /api/chat-ws |
| Simple integration / cURL testing | /api/chat or /api/chat-stream |
Docker Deployment¶
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "main.py"]
With a production-ready entry point:
# main.py
import os
from dcaf.core import Agent, serve
from my_tools import list_pods, delete_pod
agent = Agent(
tools=[list_pods, delete_pod],
system_prompt="You are a Kubernetes assistant."
)
if __name__ == "__main__":
serve(
agent,
host="0.0.0.0",
port=int(os.getenv("PORT", 8000)),
workers=int(os.getenv("WORKERS", 4)),
timeout_keep_alive=int(os.getenv("KEEP_ALIVE", 30)),
log_level=os.getenv("LOG_LEVEL", "warning"),
)
Environment Variables
Use environment variables for configuration so you can tune without rebuilding:
Kubernetes Health Check¶
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: agent
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5 # Safe: health endpoint is non-blocking
readinessProbe:
httpGet:
path: /health
port: 8000
periodSeconds: 5
API Reference¶
serve()¶
def serve(
agent: Agent | Callable,
port: int = 8000,
host: str = "0.0.0.0",
reload: bool = False,
log_level: str = "info",
workers: int = 1,
timeout_keep_alive: int = 5,
additional_routers: Sequence[APIRouter] | None = None,
channel_router: ChannelResponseRouter | None = None,
a2a: bool = False,
a2a_adapter: str = "agno",
a2a_agent_card: AgentCard | dict | None = None,
mcp: bool = False,
mcp_port: int = 8001,
mcp_transport: str = "sse",
ws_ping_interval: float | None = 20.0,
ws_ping_timeout: float | None = 20.0,
) -> None
Start a REST server for the agent.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent |
Agent or Callable |
Required | Agent instance or callable (messages, context) -> AgentResult |
port |
int |
8000 |
Port to listen on |
host |
str |
"0.0.0.0" |
Host to bind to |
reload |
bool |
False |
Enable auto-reload on code changes (development only) |
log_level |
str |
"info" |
Logging level (debug, info, warning, error) |
workers |
int |
1 |
Number of worker processes for parallelism |
timeout_keep_alive |
int |
5 |
Keep-alive timeout in seconds |
additional_routers |
Sequence[APIRouter] |
None |
Custom FastAPI routers to include |
channel_router |
ChannelResponseRouter |
None |
Channel response router for multi-agent environments. See Channel Routing. |
a2a |
bool |
False |
Enable A2A (Agent-to-Agent) protocol support |
a2a_adapter |
str |
"agno" |
A2A adapter to use |
a2a_agent_card |
AgentCard or dict |
None |
Custom agent card for A2A discovery. See A2A Agent Card. |
mcp |
bool |
False |
Enable MCP server alongside the HTTP server |
mcp_port |
int |
8001 |
Port for the MCP server |
mcp_transport |
str |
"sse" |
MCP transport ("sse" or "stdio") |
ws_ping_interval |
float or None |
20.0 |
Seconds between WebSocket ping frames. Set to None to disable. |
ws_ping_timeout |
float or None |
20.0 |
Seconds to wait for a pong reply before closing the connection. |
Raises:
ValueError- Ifreload=Trueandworkers > 1(mutually exclusive)
create_app()¶
def create_app(
agent: Agent | Callable,
additional_routers: Sequence[APIRouter] | None = None,
channel_router: ChannelResponseRouter | None = None,
a2a: bool = False,
a2a_adapter: str = "agno",
a2a_agent_card: AgentCard | dict | None = None,
) -> FastAPI
Create a FastAPI application without starting the server. Use this for programmatic control or custom uvicorn configuration.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent |
Agent or Callable |
Required | Agent instance or callable (messages, context) -> AgentResult |
additional_routers |
Sequence[APIRouter] |
None |
Custom FastAPI routers to include |
channel_router |
ChannelResponseRouter |
None |
Channel response router for multi-agent environments. See Channel Routing. |
a2a |
bool |
False |
Enable A2A (Agent-to-Agent) protocol support |
a2a_adapter |
str |
"agno" |
A2A adapter to use |
a2a_agent_card |
AgentCard or dict |
None |
Custom agent card for A2A discovery. See A2A Agent Card. |
Channel Routing¶
Channel routing enables intelligent message filtering in multi-agent environments. When a channel_router is provided and the incoming request includes "source": "slack", the router determines whether the agent should respond before processing the message.
This is useful when multiple agents share a Slack channel — each agent uses its own router to decide if a message is relevant to its domain.
Setup¶
from dcaf.core import Agent, serve, SlackResponseRouter
from dcaf.llm import BedrockLLM
agent = Agent(
tools=[...],
system_prompt="You are a Kubernetes assistant.",
)
llm = BedrockLLM()
slack_router = SlackResponseRouter(
llm_client=llm,
agent_name="k8s-agent",
agent_description="""
Specialized Kubernetes and Helm expert. Responds to:
- Kubernetes resource management, kubectl commands
- Helm chart operations
- DuploCloud service management
Does NOT respond to: general cloud infra, CI/CD, database queries.
""",
)
serve(agent, channel_router=slack_router, port=8000)
With create_app()¶
from dcaf.core import Agent, create_app, SlackResponseRouter
from dcaf.llm import BedrockLLM
import uvicorn
agent = Agent(tools=[...])
llm = BedrockLLM()
router = SlackResponseRouter(
llm_client=llm,
agent_name="aws-agent",
agent_description="AWS cloud infrastructure specialist",
)
app = create_app(agent, channel_router=router)
uvicorn.run(app, host="0.0.0.0", port=8000)
How It Works¶
When a request arrives with "source": "slack" in the body:
- The
SlackResponseRouter.should_agent_respond()method is called with the message thread. - The router uses a fast LLM call (Claude 3.5 Haiku) to analyze the conversation and decide if the agent should respond.
- If the router decides not to respond, the endpoint returns an empty response immediately (or a
doneevent for streaming). - If the router decides to respond, the request proceeds to the agent normally.
Requests without "source": "slack" bypass routing entirely.
Slack Request Format¶
{
"messages": [
{
"role": "user",
"content": "My pods keep crashing with OOMKilled",
"user": {"name": "alice", "id": "U123"}
},
{
"role": "assistant",
"content": "Try increasing memory limits in your deployment.",
"agent": {"name": "k8s-agent", "id": "B456"}
},
{
"role": "user",
"content": "Actually, I think this is an AWS node issue",
"user": {"name": "alice", "id": "U123"}
}
],
"source": "slack"
}
Multi-Agent Example¶
Run multiple agents on different ports, each with its own router:
# k8s_server.py
from dcaf.core import Agent, serve, SlackResponseRouter
from dcaf.llm import BedrockLLM
llm = BedrockLLM()
agent = Agent(tools=[...], system_prompt="You are a Kubernetes expert.")
serve(
agent,
channel_router=SlackResponseRouter(
llm_client=llm,
agent_name="k8s-agent",
agent_description="Kubernetes and container orchestration specialist",
),
port=8000,
)
# aws_server.py
from dcaf.core import Agent, serve, SlackResponseRouter
from dcaf.llm import BedrockLLM
llm = BedrockLLM()
agent = Agent(tools=[...], system_prompt="You are an AWS infrastructure expert.")
serve(
agent,
channel_router=SlackResponseRouter(
llm_client=llm,
agent_name="aws-agent",
agent_description="AWS cloud infrastructure and services specialist",
),
port=8001,
)
When a Slack message arrives, each agent's router independently decides whether to respond based on the message content and the agent's description.
Custom Routers¶
You can implement your own router by extending ChannelResponseRouter:
from dcaf.core import ChannelResponseRouter
class KeywordRouter(ChannelResponseRouter):
def __init__(self, keywords: list[str]):
self.keywords = keywords
def should_agent_respond(self, messages: list) -> dict:
last_msg = next(
(m for m in reversed(messages) if m.get("role") == "user"),
None,
)
if not last_msg:
return {"should_respond": False, "reasoning": "No user message"}
content = last_msg.get("content", "").lower()
for kw in self.keywords:
if kw.lower() in content:
return {"should_respond": True, "reasoning": f"Matched: {kw}"}
return {"should_respond": False, "reasoning": "No matching keywords"}
serve(agent, channel_router=KeywordRouter(keywords=["kubernetes", "k8s", "pod"]))
See Also
For full API details on SlackResponseRouter (constructor parameters, decision criteria, testing patterns), see the Channel Routing API Reference.
See Also¶
- Core Overview - Introduction to DCAF Core
- Channel Routing API Reference - Full
SlackResponseRouterandChannelResponseRouterAPI details - ADR-007: Lowercase Chat Endpoints - Why
/api/chatinstead of/api/sendMessage