Building Custom Agents¶
This guide shows how to build agents with custom logic using DCAF.
When to Use Custom Functions¶
Use Agent directly with serve() for simple use cases:
Use custom functions when you need:
- Multiple LLM calls
- Complex branching logic
- Custom pre/post processing
- Integration with external systems
- Different models for different tasks
Quick Start¶
from dcaf.core import Agent, Session, serve
from dcaf.core.primitives import AgentResult
from dcaf.tools import tool
@tool(description="Get current time")
def get_time() -> str:
from datetime import datetime
return datetime.now().isoformat()
def my_agent(messages: list, context: dict, session: Session) -> AgentResult:
"""Your custom agent logic with session access."""
# Track call count in session
call_count = session.get("call_count", 0)
session.set("call_count", call_count + 1)
# Create an agent and run it
agent = Agent(
tools=[get_time],
system="You are a helpful assistant.",
)
# Pass session to agent.run()
response = agent.run(messages, session=session.to_dict())
# Return the result with session
return AgentResult(
text=response.text,
session=session.to_dict(),
)
# Serve it
serve(my_agent)
Note: The session parameter is optional for backward compatibility. Functions without it still work.
The Pattern¶
Custom functions return AgentResult¶
from dcaf.core.primitives import AgentResult, ToolApproval, ToolResult
# Simple text response
return AgentResult(text="Here are your pods: nginx, redis, api")
# With session data
return AgentResult(
text="Here are your pods: nginx, redis, api",
session={"last_query": "pods", "count": 3},
)
# Needs approval
return AgentResult(
text="I need approval to delete the pod.",
pending_tools=[
ToolApproval(
id="tc_123",
name="delete_pod",
input={"name": "nginx-abc"},
description="Delete pod nginx-abc from production",
)
],
)
# Tool was executed
return AgentResult(
text="Pod deleted successfully.",
executed_tools=[
ToolResult(
id="tc_123",
name="delete_pod",
input={"name": "nginx-abc"},
output="pod 'nginx-abc' deleted",
)
],
)
Use from_agent_response() for convenience¶
from dcaf.core import Agent, Session, serve
from dcaf.core.primitives import from_agent_response, AgentResult
def my_agent(messages: list, context: dict, session: Session) -> AgentResult:
agent = Agent(tools=[...])
response = agent.run(messages, session=session.to_dict())
return from_agent_response(response)
serve(my_agent)
Multi-Call Patterns¶
Pattern 1: Sequential Calls¶
Each call depends on the previous:
from dcaf.core import Agent, Session
from dcaf.core.primitives import AgentResult
def research_agent(messages: list, context: dict, session: Session) -> AgentResult:
user_question = messages[-1]["content"]
# Track research topics in session
topics = session.get("research_topics", [])
topics.append(user_question[:50])
session.set("research_topics", topics)
# Step 1: Classify the question
classifier = Agent(system="Classify as: factual, opinion, or action. Reply with one word.")
classification = classifier.run(messages)
# Step 2: Handle based on classification
if "factual" in classification.text.lower():
# Research the topic
researcher = Agent(system="Provide detailed factual information.")
research = researcher.run([{"role": "user", "content": f"Research: {user_question}"}])
# Step 3: Summarize
summarizer = Agent(system="Provide a concise summary.")
summary = summarizer.run([{"role": "user", "content": f"Summarize: {research.text}"}])
return AgentResult(text=summary.text, session=session.to_dict())
elif "action" in classification.text.lower():
# Use tools
executor = Agent(tools=[...], system="Execute the requested action.")
result = executor.run(messages, session=session.to_dict())
return AgentResult(
text=result.text,
pending_tools=[
ToolApproval(id=p.id, name=p.name, input=p.input, description=p.description)
for p in result.pending_tools
],
session=session.to_dict(),
)
else:
# Simple response
responder = Agent(system="Be helpful and friendly.")
result = responder.run(messages)
return AgentResult(text=result.text, session=session.to_dict())
Pattern 2: Parallel Calls¶
When calls are independent, run them in parallel:
from concurrent.futures import ThreadPoolExecutor
from dcaf.core import Agent, AgentResult
executor = ThreadPoolExecutor(max_workers=5)
def parallel_agent(messages: list, context: dict) -> AgentResult:
user_question = messages[-1]["content"]
# Define parallel tasks
def get_weather():
agent = Agent(system="Answer weather questions briefly.")
return agent.run([{"role": "user", "content": "What's the weather?"}]).text
def get_news():
agent = Agent(system="Summarize recent news briefly.")
return agent.run([{"role": "user", "content": "What's the news?"}]).text
def get_stocks():
agent = Agent(system="Summarize stock market briefly.")
return agent.run([{"role": "user", "content": "How are stocks?"}]).text
# Run in parallel
weather_future = executor.submit(get_weather)
news_future = executor.submit(get_news)
stocks_future = executor.submit(get_stocks)
# Combine results
combined = f"""
Weather: {weather_future.result()}
News: {news_future.result()}
Stocks: {stocks_future.result()}
"""
# Final synthesis
synthesizer = Agent(system="Create a cohesive summary from the information provided.")
final = synthesizer.run([{"role": "user", "content": f"Summarize this:\n{combined}"}])
return AgentResult(text=final.text)
Pattern 3: Agentic Loop¶
Let the LLM decide when it's done:
from dcaf.core import Agent, AgentResult, ToolApproval
from dcaf.tools import tool
@tool(description="Mark the task as complete")
def finish(result: str) -> str:
return f"DONE: {result}"
def agentic_loop(messages: list, context: dict) -> AgentResult:
MAX_ITERATIONS = 10
tools = [search, calculate, finish]
agent = Agent(tools=tools, system="Complete the task. Call finish() when done.")
conversation = list(messages)
all_executed = []
for i in range(MAX_ITERATIONS):
response = agent.run(conversation)
# Check if agent called finish
if "DONE:" in response.text:
return AgentResult(
text=response.text.replace("DONE:", "").strip(),
executed_tools=all_executed,
)
# Check for pending approvals
if response.pending_tools:
return AgentResult(
text=response.text,
pending_tools=[
ToolApproval(id=p.id, name=p.name, input=p.input, description=p.description)
for p in response.pending_tools
],
executed_tools=all_executed,
)
# Add response to conversation and continue
conversation.append({"role": "assistant", "content": response.text})
conversation.append({"role": "user", "content": "Continue with the task."})
return AgentResult(text="Max iterations reached.", executed_tools=all_executed)
Pattern 4: Pre/Post Processing¶
Add logic before and after LLM calls:
from dcaf.core import Agent, AgentResult
def processing_agent(messages: list, context: dict) -> AgentResult:
# ==================
# PRE-PROCESSING
# ==================
# Validate tenant
tenant = context.get("tenant_name")
if not tenant:
return AgentResult(text="Error: No tenant specified.")
# Enrich with tenant info
tenant_info = get_tenant_info(tenant)
enriched_system = f"""
You are helping tenant: {tenant}
Cluster: {tenant_info['cluster']}
Namespace: {tenant_info['namespace']}
"""
# Filter messages (e.g., remove PII)
safe_messages = sanitize_messages(messages)
# ==================
# LLM CALL
# ==================
agent = Agent(
tools=get_tools_for_tenant(tenant),
system=enriched_system,
)
response = agent.run(safe_messages)
# ==================
# POST-PROCESSING
# ==================
# Audit logging
log_interaction(tenant, messages, response)
# Validate response
response_text = response.text
if contains_sensitive_info(response_text):
response_text = redact_sensitive(response_text)
# Apply rate limiting
if is_rate_limited(tenant):
return AgentResult(text="Rate limit exceeded. Please try again later.")
return AgentResult(
text=response_text,
pending_tools=[
ToolApproval(id=p.id, name=p.name, input=p.input, description=p.description)
for p in response.pending_tools
],
)
Using Different Models¶
Create agents with different configurations for different tasks:
from dcaf.core import Agent, AgentResult
def smart_routing_agent(messages: list, context: dict) -> AgentResult:
# Fast model for classification
classifier = Agent(
model="anthropic.claude-3-haiku",
system="Classify the complexity: simple or complex. Reply with one word.",
)
complexity = classifier.run(messages)
if "complex" in complexity.text.lower():
# Smart model for complex tasks
smart_agent = Agent(
model="anthropic.claude-3-opus",
tools=[...],
system="Handle complex queries thoroughly.",
)
response = smart_agent.run(messages)
else:
# Fast model for simple tasks
fast_agent = Agent(
model="anthropic.claude-3-haiku",
system="Answer simply and briefly.",
)
response = fast_agent.run(messages)
return AgentResult(text=response.text)
Complete Example¶
"""
Multi-step Kubernetes agent using DCAF.
"""
from dcaf.core import Agent, AgentResult, ToolApproval, ToolResult, serve
from dcaf.tools import tool
# Define tools
@tool(description="List pods in a namespace")
def list_pods(namespace: str = "default") -> str:
return kubectl(f"get pods -n {namespace}")
@tool(requires_approval=True, description="Delete a pod")
def delete_pod(name: str, namespace: str = "default") -> str:
return kubectl(f"delete pod {name} -n {namespace}")
TOOLS = [list_pods, delete_pod]
def k8s_agent(messages: list, context: dict) -> AgentResult:
"""Kubernetes assistant with approval flow."""
user_message = messages[-1]["content"]
# Step 1: Classify intent
classifier = Agent(system="Classify as: query, action, or other. Reply with one word.")
intent = classifier.run([{"role": "user", "content": user_message}])
# Step 2: Handle based on intent
if "query" in intent.text.lower():
# Just answer questions using tools
executor = Agent(
tools=TOOLS,
system="You are a Kubernetes assistant. Answer questions about the cluster.",
)
response = executor.run(messages)
return AgentResult(text=response.text)
elif "action" in intent.text.lower():
# Actions may need approval
executor = Agent(
tools=TOOLS,
system="You are a Kubernetes assistant. Help with cluster management.",
)
response = executor.run(messages)
return AgentResult(
text=response.text or "I need approval for this action.",
pending_tools=[
ToolApproval(id=p.id, name=p.name, input=p.input, description=p.description)
for p in response.pending_tools
],
)
else:
# General conversation
responder = Agent(system="You are a friendly Kubernetes assistant.")
response = responder.run(messages)
return AgentResult(text=response.text)
if __name__ == "__main__":
serve(k8s_agent, port=8000)
See Also¶
- Server Documentation - Running agents as REST APIs
- Core Overview - The simple Agent class
- Building Tools - Creating tools with @tool decorator