Adapters¶
Adapters translate between our domain and external systems. Each framework gets its own cohesive module containing all related code.
Overview¶
The adapters layer includes:
- Inbound Adapters: Handle incoming requests (HTTP, CLI)
- Outbound Adapters: Implement ports for external services (LLM frameworks, databases)
Agno Adapter¶
The Agno adapter provides integration with the Agno SDK for agent orchestration with Claude models on AWS Bedrock and other providers.
Note: This adapter uses the real Agno SDK (
pip install agno). For Bedrock, ensure you have valid AWS credentials configured:
Location¶
dcaf/core/adapters/outbound/agno/
├── __init__.py
├── adapter.py # AgnoAdapter
├── tool_converter.py # AgnoToolConverter
├── message_converter.py # AgnoMessageConverter
└── types.py # Agno-specific types
Features¶
The Agno adapter includes production-proven patterns for reliability:
| Feature | Description |
|---|---|
| Async Support | Uses aioboto3 for non-blocking AWS calls |
| Message Filtering | Removes tool messages to prevent Bedrock errors |
| Alternation Validation | Ensures user/assistant message alternation |
| Parallel Tool Workaround | Limits concurrent tool calls to prevent bugs |
| Metrics Extraction | Captures tokens, duration, and timing |
| Region Inference | Extracts region from ARN-style model IDs |
Usage¶
from dcaf.core.adapters.outbound.agno import AgnoAdapter
# Create adapter with AWS profile
adapter = AgnoAdapter(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
provider="bedrock",
aws_profile="my-profile",
aws_region="us-west-2",
max_tokens=4096,
temperature=0.1,
)
# Async invocation (preferred for FastAPI)
response = await adapter.ainvoke(
messages=conversation.messages,
tools=[my_tool],
system_prompt="You are helpful.",
)
# Sync invocation (wraps async internally)
response = adapter.invoke(
messages=conversation.messages,
tools=[my_tool],
)
AgnoAdapter¶
Implements the AgentRuntime port with both sync and async interfaces.
class AgnoAdapter:
def __init__(
self,
model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0",
provider: str = "bedrock",
max_tokens: int = 4096,
temperature: float = 0.1,
# AWS configuration
aws_profile: Optional[str] = None,
aws_region: Optional[str] = None,
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
# Generic API key (for non-AWS providers)
api_key: Optional[str] = None,
# Behavior flags
tool_call_limit: Optional[int] = None,
disable_history: bool = False,
disable_tool_filtering: bool = False,
): ...
# Async methods (preferred)
async def ainvoke(
self,
messages: List[Any],
tools: List[Any],
system_prompt: Optional[str] = None,
) -> AgentResponse: ...
async def ainvoke_stream(
self,
messages: List[Any],
tools: List[Any],
system_prompt: Optional[str] = None,
) -> AsyncIterator[StreamEvent]: ...
# Sync methods (wrap async internally)
def invoke(
self,
messages: List[Message],
tools: List[Tool],
system_prompt: Optional[str] = None,
) -> AgentResponse: ...
def invoke_stream(
self,
messages: List[Message],
tools: List[Tool],
system_prompt: Optional[str] = None,
) -> Iterator[StreamEvent]: ...
# Cleanup
async def cleanup(self) -> None: ...
Tracing and Observability¶
The Agno adapter supports distributed tracing through the platform_context parameter. Tracing IDs are passed to the Agno SDK and included in response metadata.
Supported Tracing Fields:
| Field | Agno Parameter | Description |
|---|---|---|
user_id |
user_id |
User identifier |
session_id |
session_id |
Session grouping runs |
run_id |
run_id |
Unique execution ID |
request_id |
metadata.request_id |
HTTP correlation ID |
tenant_id |
metadata.tenant_id |
Tenant identifier |
Usage:
# Via AgentRequest (recommended)
request = AgentRequest(
content="What pods are running?",
user_id="user-123",
session_id="session-abc",
run_id="run-xyz",
request_id="req-456",
tools=[kubectl_tool],
)
# Via platform_context dict
response = await adapter.ainvoke(
messages=messages,
tools=tools,
platform_context={
"user_id": "user-123",
"session_id": "session-abc",
"run_id": "run-xyz",
"request_id": "req-456",
"tenant_id": "tenant-1",
},
)
# Tracing IDs returned in response metadata
print(response.metadata)
# {'run_id': 'run-xyz', 'session_id': 'session-abc', ...}
Debug Mode:
Enable Agno's verbose debug logging:
# Option 1: Set Python log level to DEBUG
LOG_LEVEL=DEBUG python your_agent.py
# Option 2: Set AGNO_DEBUG directly
AGNO_DEBUG=true python your_agent.py
See Tracing and Observability Guide for complete documentation.
Environment Variables¶
The adapter supports configuration via environment variables:
| Variable | Default | Description |
|---|---|---|
AWS_REGION |
us-west-2 |
Default AWS region |
AGNO_TOOL_CALL_LIMIT |
1 |
Max concurrent tool calls |
AGNO_DISABLE_HISTORY |
false |
Disable message history |
DISABLE_TOOL_FILTERING |
false |
Disable tool message filtering |
LOG_LEVEL |
INFO |
Python log level (DEBUG enables Agno verbose mode) |
AGNO_DEBUG |
false |
Enable Agno debug mode directly |
Bedrock Compatibility¶
The adapter includes workarounds for Bedrock-specific issues:
1. Message Filtering
Tool-related messages are filtered from history to prevent ValidationException:
# These message types are automatically filtered:
# - Messages with content: null
# - Messages with empty string content
# - Messages with content: [...] (tool blocks)
2. Message Alternation
Bedrock requires strict user/assistant alternation:
# Automatically fixed:
# - Leading assistant messages removed
# - Consecutive same-role messages deduplicated
3. Parallel Tool Prevention
A bug in Agno/Bedrock causes errors with parallel tool calls:
# Workarounds applied:
# - tool_call_limit=1 (default)
# - System prompt instruction to call tools one at a time
Metrics¶
The adapter extracts metrics from each run:
@dataclass
class AgnoMetrics:
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
duration: float = 0.0
time_to_first_token: Optional[float] = None
Metrics are logged automatically:
📊 Agno Metrics: tokens=1234 (in=100, out=1134), duration=2.345s
🔧 Agno Tools: Executed 2 tool call(s)
AgnoToolConverter¶
Converts dcaf Tools to Agno format.
from dcaf.core.adapters.outbound.agno import AgnoToolConverter
converter = AgnoToolConverter()
# Convert single tool
agno_tool = converter.to_agno(dcaf_tool)
# Convert list of tools
agno_tools = converter.to_agno_list(dcaf_tools)
AgnoMessageConverter¶
Converts messages bidirectionally.
from dcaf.core.adapters.outbound.agno import AgnoMessageConverter
converter = AgnoMessageConverter()
# Convert to Agno format
agno_messages = converter.to_agno(dcaf_messages)
# Convert from Agno response
response = converter.from_agno(agno_response, conversation_id)
# Convert streaming events
stream_event = converter.stream_event_from_agno(agno_event)
Persistence Adapters¶
InMemoryConversationRepository¶
Simple in-memory implementation for testing and development.
from dcaf.core.adapters.outbound.persistence import InMemoryConversationRepository
repo = InMemoryConversationRepository()
# Save conversation
repo.save(conversation)
# Retrieve
loaded = repo.get(conversation.id)
# Check existence
exists = repo.exists(conversation.id)
# Get or create
conv = repo.get_or_create(ConversationId("new-id"))
# Delete
deleted = repo.delete(conversation.id)
# Utility methods
repo.clear() # Clear all conversations
repo.count() # Get count
repo.all() # Get all conversations
Thread Safety: Uses a reentrant lock for concurrent access.
Limitations: - Data is lost when process ends - Not suitable for distributed systems - Use for testing and single-instance deployments only
Adding a New Framework Adapter¶
Follow these steps to add support for a new LLM framework (e.g., LangChain).
Step 1: Create the Module Structure¶
dcaf/core/adapters/outbound/langchain/
├── __init__.py
├── adapter.py
├── tool_converter.py
├── message_converter.py
└── types.py
Step 2: Define Types¶
# types.py
from typing import TypedDict, List, Dict, Any
class LangChainMessage(TypedDict):
role: str
content: str
class LangChainTool(TypedDict):
name: str
description: str
parameters: Dict[str, Any]
Step 3: Implement Tool Converter¶
# tool_converter.py
from dcaf.tools import Tool
class LangChainToolConverter:
def to_langchain(self, tool: Tool) -> dict:
return {
"name": tool.name,
"description": tool.description,
"parameters": tool.schema.get("input_schema", {}),
}
def to_langchain_list(self, tools: List[Tool]) -> List[dict]:
return [self.to_langchain(t) for t in tools]
Step 4: Implement Message Converter¶
# message_converter.py
from dcaf.core.domain.entities import Message, MessageRole
from dcaf.core.application.dto import AgentResponse
class LangChainMessageConverter:
def to_langchain(self, messages: List[Message]) -> List[dict]:
return [
{
"role": self._convert_role(m.role),
"content": m.text or "",
}
for m in messages
]
def from_langchain(
self,
response: dict,
conversation_id: str,
) -> AgentResponse:
# Parse LangChain response format
...
def _convert_role(self, role: MessageRole) -> str:
mapping = {
MessageRole.USER: "human",
MessageRole.ASSISTANT: "ai",
MessageRole.SYSTEM: "system",
}
return mapping.get(role, "human")
Step 5: Implement Adapter¶
# adapter.py
from dcaf.core.application.ports import AgentRuntime
from .tool_converter import LangChainToolConverter
from .message_converter import LangChainMessageConverter
class LangChainAdapter:
def __init__(self, model: str):
self._tool_converter = LangChainToolConverter()
self._message_converter = LangChainMessageConverter()
# Initialize LangChain components
def invoke(
self,
messages: List[Message],
tools: List[Tool],
system_prompt: Optional[str] = None,
) -> AgentResponse:
# 1. Convert to LangChain format
lc_messages = self._message_converter.to_langchain(messages)
lc_tools = self._tool_converter.to_langchain_list(tools)
# 2. Call LangChain
response = self._chain.invoke(lc_messages, tools=lc_tools)
# 3. Convert back
return self._message_converter.from_langchain(
response,
conversation_id="..."
)
Step 6: Export from __init__.py¶
# __init__.py
from .adapter import LangChainAdapter
from .tool_converter import LangChainToolConverter
from .message_converter import LangChainMessageConverter
__all__ = [
"LangChainAdapter",
"LangChainToolConverter",
"LangChainMessageConverter",
]
Best Practices¶
- Isolate framework code: All framework-specific code stays in its adapter folder
- Don't leak abstractions: Convert to/from domain types at adapter boundaries
- Handle errors gracefully: Catch framework exceptions and convert to domain exceptions
- Support streaming: Implement both sync and streaming methods
- Test converters independently: Unit test converters without the full framework
- Document framework requirements: Note which framework version is supported