Agent Server API Reference (Legacy)¶
Legacy API
This documents the v1 API. For new projects, use serve(agent) from the Core API.
See Migration Guide to upgrade existing code.
The Agent Server module provides a FastAPI-based server for hosting DCAF agents with RESTful endpoints.
Table of Contents¶
- Overview
- create_chat_app()
- API Endpoints
- Request/Response Formats
- Streaming
- Error Handling
- Channel Routing
- Examples
Overview¶
The Agent Server wraps any AgentProtocol-compliant agent in a FastAPI application with standardized endpoints for chat interactions.
Import¶
from dcaf.agent_server import create_chat_app, AgentProtocol
# Or from the main module
from dcaf import create_chat_app, AgentProtocol
Features¶
- REST API: Standard HTTP endpoints
- WebSocket: Bidirectional streaming via
/api/chat-ws - Streaming: NDJSON streaming support
- Validation: Pydantic schema validation
- Logging: Built-in request/response logging
- Health Check: Endpoint for monitoring
- Channel Routing: Optional Slack integration
AgentProtocol Interface¶
Any agent passed to create_chat_app() must satisfy the AgentProtocol:
from typing import Protocol, runtime_checkable
@runtime_checkable
class AgentProtocol(Protocol):
def invoke(self, messages: dict[str, list[dict[str, Any]]]) -> AgentMessage: ...
Required:
invoke(messages) -> AgentMessage: Process messages and return a response. Can be sync or async.
Optional:
invoke_stream(messages) -> Iterator[StreamEvent]: Stream responses. If not implemented, streaming endpoints fall back toinvoke()and wrap the response in stream events.
Backwards Compatibility
The protocol only requires invoke(). V1 agents that don't implement invoke_stream() will still work with all endpoints—streaming endpoints automatically fall back to wrapping the invoke() response in stream events.
create_chat_app()¶
Create a FastAPI application from an agent.
Parameters¶
| Parameter | Type | Required | Description |
|---|---|---|---|
agent |
AgentProtocol |
Yes | Agent implementation |
router |
ChannelResponseRouter |
No | Channel-specific routing |
Returns¶
FastAPI - A configured FastAPI application instance.
Raises¶
TypeError- If agent doesn't satisfyAgentProtocol
Example¶
from dcaf.agent_server import create_chat_app
from dcaf.agents import ToolCallingAgent
from dcaf.llm import BedrockLLM
import uvicorn
# Create agent
llm = BedrockLLM()
agent = ToolCallingAgent(llm=llm, tools=[...], system_prompt="...")
# Create app
app = create_chat_app(agent)
# Run server
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
With Channel Router¶
from dcaf.agent_server import create_chat_app
from dcaf.channel_routing import SlackResponseRouter
from dcaf.llm import BedrockLLM
llm = BedrockLLM()
# Create router for Slack
router = SlackResponseRouter(
llm_client=llm,
agent_name="MyBot",
agent_description="A helpful assistant"
)
# Create app with router
app = create_chat_app(agent, router=router)
API Endpoints¶
V1 vs V2 Code Paths
This module (dcaf.agent_server) provides the V1 code path. The V1 endpoints are:
| V1 Endpoint | Description |
|---|---|
POST /api/sendMessage |
Synchronous chat (V1) |
POST /api/sendMessageStream |
Streaming chat (V1) |
For a unified server with both V1 and V2 endpoints, use dcaf.core.create_app() instead:
| Endpoint | Code Path | Description |
|---|---|---|
POST /api/chat |
V2 | Synchronous chat with _request_fields forwarding |
POST /api/chat-stream |
V2 | Streaming chat with _request_fields forwarding |
WS /api/chat-ws |
V2 | WebSocket bidirectional streaming |
POST /api/sendMessage |
V1 | Legacy synchronous chat |
POST /api/sendMessageStream |
V1 | Legacy streaming chat |
See ADR-006: Strangler Fig Migration for the rationale.
GET /health¶
Health check endpoint for monitoring.
Response¶
Example¶
POST /api/sendMessage¶
Deprecated
This endpoint is deprecated. Use POST /api/chat instead for new integrations.
The endpoint remains fully functional for backwards compatibility.
Send a message to the agent and receive a response.
Request Body¶
{
"messages": [
{
"role": "user",
"content": "Hello!",
"data": {},
"platform_context": {
"tenant_name": "production",
"user_id": "user123"
}
}
],
"source": "help-desk"
}
Fields¶
| Field | Type | Required | Description |
|---|---|---|---|
messages |
array |
Yes | Array of message objects |
source |
string |
No | Message source (e.g., "slack", "help-desk") |
Response¶
{
"role": "assistant",
"content": "Hello! How can I help you today?",
"data": {
"cmds": [],
"executed_cmds": [],
"tool_calls": [],
"executed_tool_calls": [],
"url_configs": []
},
"meta_data": {},
"timestamp": null,
"user": null,
"agent": null
}
Example¶
curl -X POST http://localhost:8000/api/sendMessage \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "What is the weather in NYC?"}
]
}'
Error Responses¶
| Code | Description |
|---|---|
400 |
Missing messages field |
422 |
Validation error in messages |
500 |
Agent error or invalid response |
POST /api/sendMessageStream¶
Deprecated
This endpoint is deprecated. Use POST /api/chat-stream instead for new integrations.
The endpoint remains fully functional for backwards compatibility.
Stream a response from the agent.
Request Body¶
Same as /api/sendMessage.
Response¶
NDJSON (Newline-delimited JSON) stream:
{"type":"text_delta","text":"Hello"}
{"type":"text_delta","text":" there!"}
{"type":"tool_calls","tool_calls":[...]}
{"type":"done","stop_reason":"end_turn"}
Example¶
curl -X POST http://localhost:8000/api/sendMessageStream \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "Tell me a story"}]}'
WS /api/chat-ws¶
Bidirectional streaming chat over WebSocket. The connection stays open for multiple conversation turns.
Client Frame¶
Each text frame from the client is a JSON object with the same shape as the HTTP endpoints:
Server Frames¶
The server streams back the same event types as /api/sendMessageStream (text_delta, tool_calls, done, error, etc.), one JSON object per text frame. Each turn ends with a done event, after which the client can send the next turn.
Error Behavior¶
Errors (invalid JSON, missing fields, agent exceptions) are sent as error events without closing the connection. The client can continue sending messages after receiving an error.
Example¶
import asyncio
import json
import websockets
async def chat():
async with websockets.connect("ws://localhost:8000/api/chat-ws") as ws:
await ws.send(json.dumps({
"messages": [{"role": "user", "content": "Hello"}]
}))
async for frame in ws:
event = json.loads(frame)
if event["type"] == "text_delta":
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print("\n--- Done ---")
break
elif event["type"] == "error":
print(f"\nError: {event['error']}")
break
asyncio.run(chat())
Request/Response Formats¶
Message Object¶
{
"role": "user" | "assistant",
"content": "Message text",
"data": {
"cmds": [...], # Suggested commands
"executed_cmds": [...], # Executed commands
"tool_calls": [...], # Tools needing approval
"executed_tool_calls": [...] # Executed tools
},
"platform_context": { # Only for user messages
"tenant_name": "string",
"user_id": "string",
"k8s_namespace": "string",
"duplo_base_url": "string",
"duplo_token": "string",
"kubeconfig": "base64-string",
"aws_credentials": {...}
},
"timestamp": "ISO-8601",
"user": {"name": "string", "id": "string"},
"agent": {"name": "string", "id": "string"}
}
AgentMessage Response¶
class AgentMessage(BaseModel):
role: Literal["assistant"] = "assistant"
content: str = ""
data: Data = Data()
meta_data: Dict[str, Any] = {}
timestamp: Optional[datetime] = None
user: Optional[User] = None
agent: Optional[Agent] = None
Data Object¶
class Data(BaseModel):
cmds: List[Command] = [] # Suggested terminal commands
executed_cmds: List[ExecutedCommand] = []
tool_calls: List[ToolCall] = [] # Tools needing approval
executed_tool_calls: List[ExecutedToolCall] = []
url_configs: List[URLConfig] = []
Streaming¶
Stream Event Types¶
DCAF supports 7 event types for streaming:
1. text_delta¶
Streaming text tokens from the LLM.
2. tool_calls¶
Tool calls requiring user approval.
{
"type": "tool_calls",
"tool_calls": [
{
"id": "tool-123",
"name": "delete_file",
"input": {"path": "/tmp/file.txt"},
"execute": false,
"tool_description": "Delete a file",
"input_description": {...}
}
]
}
3. executed_tool_calls¶
Tools that were executed.
{
"type": "executed_tool_calls",
"executed_tool_calls": [
{
"id": "tool-456",
"name": "get_weather",
"input": {"location": "NYC"},
"output": "72°F, sunny"
}
]
}
4. commands¶
Terminal commands for approval.
{
"type": "commands",
"commands": [
{
"command": "kubectl get pods",
"execute": false,
"files": []
}
]
}
5. executed_commands¶
Commands that were executed.
{
"type": "executed_commands",
"executed_cmds": [
{
"command": "ls -la",
"output": "total 0\ndrwxr-xr-x ..."
}
]
}
6. done¶
Stream completed successfully.
7. error¶
Error during streaming.
Consuming Streams¶
Python¶
import requests
response = requests.post(
"http://localhost:8000/api/sendMessageStream",
json={"messages": [{"role": "user", "content": "Hello"}]},
stream=True
)
for line in response.iter_lines():
if line:
import json
event = json.loads(line)
if event["type"] == "text_delta":
print(event["text"], end="", flush=True)
elif event["type"] == "done":
print("\n[Done]")
break
elif event["type"] == "error":
print(f"\n[Error: {event['error']}]")
break
JavaScript¶
const response = await fetch('/api/sendMessageStream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [{ role: 'user', content: 'Hello' }]
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n');
for (const line of lines) {
if (line.trim()) {
const event = JSON.parse(line);
console.log(event);
}
}
}
Error Handling¶
Error Responses¶
# 400 Bad Request - Missing messages
{
"detail": "'messages' field missing from request body"
}
# 422 Unprocessable Entity - Validation error
{
"detail": [
{
"loc": ["body", "messages", 0, "role"],
"msg": "value is not a valid enumeration member",
"type": "type_error.enum"
}
]
}
# 500 Internal Server Error - Agent error
{
"detail": "Agent returned invalid Message: ..."
}
Handling Errors in Client¶
import requests
try:
response = requests.post(
"http://localhost:8000/api/sendMessage",
json={"messages": [{"role": "user", "content": "Hi"}]},
timeout=30
)
response.raise_for_status()
data = response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
print("Bad request:", e.response.json())
elif e.response.status_code == 422:
print("Validation error:", e.response.json())
elif e.response.status_code == 500:
print("Server error:", e.response.json())
except requests.exceptions.Timeout:
print("Request timed out")
except requests.exceptions.ConnectionError:
print("Connection error")
Channel Routing¶
The Agent Server supports channel-specific routing, particularly for Slack.
Slack Integration¶
from dcaf.agent_server import create_chat_app
from dcaf.channel_routing import SlackResponseRouter
from dcaf.llm import BedrockLLM
llm = BedrockLLM()
# Create Slack router
router = SlackResponseRouter(
llm_client=llm,
agent_name="DuploBot",
agent_description="A helpful DuploCloud assistant",
model_id="us.anthropic.claude-3-5-haiku-20241022-v1:0" # Fast model for routing
)
# Create app with router
app = create_chat_app(agent, router=router)
How Routing Works¶
- Request includes
"source": "slack" - Router's
should_agent_respond()is called - If
False, returns empty response (agent stays silent) - If
True, proceeds with agent invocation
Request with Source¶
Examples¶
Example 1: Basic Server¶
from dcaf.agent_server import create_chat_app, AgentProtocol
from dcaf.schemas.messages import AgentMessage
import uvicorn
class SimpleAgent(AgentProtocol):
def invoke(self, messages):
return AgentMessage(content="Hello from SimpleAgent!")
agent = SimpleAgent()
app = create_chat_app(agent)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Example 2: Production Server¶
from dcaf.agent_server import create_chat_app
from dcaf.agents import ToolCallingAgent
from dcaf.llm import BedrockLLM
from dcaf.tools import tool
import uvicorn
import dotenv
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment
dotenv.load_dotenv()
# Define tools
@tool(schema={...}, requires_approval=False)
def my_tool(param: str) -> str:
return f"Result: {param}"
# Create components
llm = BedrockLLM(region_name="us-east-1")
agent = ToolCallingAgent(
llm=llm,
tools=[my_tool],
system_prompt="You are a helpful assistant."
)
# Create app
app = create_chat_app(agent)
# Add middleware (optional)
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info",
workers=1 # Use 1 worker for stateful agents
)
Example 3: Docker Deployment¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# main.py
from dcaf.agent_server import create_chat_app
from dcaf.agents import ToolCallingAgent
from dcaf.llm import BedrockLLM
import dotenv
dotenv.load_dotenv()
llm = BedrockLLM()
agent = ToolCallingAgent(llm=llm, tools=[], system_prompt="...")
app = create_chat_app(agent)
Example 4: Testing the Server¶
# test_server.py
import pytest
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_health():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_send_message():
response = client.post(
"/api/sendMessage",
json={
"messages": [
{"role": "user", "content": "Hello!"}
]
}
)
assert response.status_code == 200
data = response.json()
assert "content" in data
assert data["role"] == "assistant"
def test_missing_messages():
response = client.post(
"/api/sendMessage",
json={}
)
assert response.status_code == 400
def test_invalid_role():
response = client.post(
"/api/sendMessage",
json={
"messages": [
{"role": "invalid", "content": "Hello!"}
]
}
)
assert response.status_code == 422