Application Layer¶
The application layer orchestrates domain logic with infrastructure through use cases and ports. It contains no business logic itself—that belongs in the domain.
Overview¶
The application layer includes:
- Ports: Interfaces (protocols) for external systems
- Services: Application services that orchestrate operations
- DTOs: Data transfer objects for communication
Ports¶
Ports define how the application interacts with external systems. They are implemented by adapters.
AgentRuntime¶
The primary port for LLM framework integration.
from dcaf.core.application.ports import AgentRuntime
from typing import Protocol, List, Iterator
class AgentRuntime(Protocol):
"""Port that adapters implement."""
def invoke(
self,
messages: List[Message],
tools: List[Tool],
system_prompt: Optional[str] = None,
) -> AgentResponse:
"""Synchronous agent invocation."""
...
def invoke_stream(
self,
messages: List[Message],
tools: List[Tool],
system_prompt: Optional[str] = None,
) -> Iterator[StreamEvent]:
"""Streaming agent invocation."""
...
Implementations:
- AgnoAdapter - Agno framework
- LangChainAdapter - LangChain framework (future)
- BedrockDirectAdapter - Direct Bedrock access (future)
ConversationRepository¶
Persistence port for conversations.
from dcaf.core.application.ports import ConversationRepository
class ConversationRepository(Protocol):
def get(self, id: ConversationId) -> Optional[Conversation]: ...
def save(self, conversation: Conversation) -> None: ...
def delete(self, id: ConversationId) -> bool: ...
def exists(self, id: ConversationId) -> bool: ...
def get_or_create(self, id: ConversationId) -> Conversation: ...
Implementations:
- InMemoryConversationRepository - For testing and simple use cases
ApprovalCallback¶
Port for requesting human approval.
from dcaf.core.application.ports import ApprovalCallback, ApprovalDecision
class ApprovalCallback(Protocol):
def request_approval(
self,
tool_calls: List[ToolCall],
) -> List[ApprovalDecision]:
"""Request approval for tool calls."""
...
def notify_execution_result(
self,
tool_call_id: str,
result: str,
success: bool,
) -> None:
"""Notify of execution results."""
...
EventPublisher¶
Port for publishing domain events.
from dcaf.core.application.ports import EventPublisher
class EventPublisher(Protocol):
def publish(self, event: DomainEvent) -> None: ...
def publish_all(self, events: List[DomainEvent]) -> None: ...
Services¶
Use cases orchestrate the execution of business operations.
AgentService¶
The main use case for agent execution.
from dcaf.core.application.services import AgentService
from dcaf.core.application.dto import AgentRequest
# Setup
service = AgentService(
runtime=agno_adapter,
conversations=conversation_repo,
events=event_publisher,
approval_policy=ApprovalPolicy(),
)
# Execute synchronously
response = service.execute(AgentRequest(
content="What pods are running?",
tools=[kubectl_tool],
conversation_id="conv-123", # Optional, creates new if not provided
context={"tenant_name": "my-tenant"},
))
# Handle response
if response.has_pending_approvals:
# Tool calls need approval
for tc in response.pending_tool_calls:
print(f"Approve {tc.name}? {tc.input}")
else:
# Response is complete
print(response.text)
Streaming:
# Execute with streaming
for event in service.execute_stream(request):
if event.event_type == StreamEventType.TEXT_DELTA:
print(event.data["text"], end="")
elif event.event_type == StreamEventType.MESSAGE_END:
final_response = event.data["response"]
Resume After Approval:
# After user approves tool calls
response = service.resume(
conversation_id="conv-123",
tools=[kubectl_tool],
)
ApprovalService¶
Handles approval decisions for pending tool calls.
from dcaf.core.application.services import ApprovalService
from dcaf.core.application.dto import ApprovalRequest, ToolCallApproval
service = ApprovalService(
conversations=conversation_repo,
events=event_publisher,
)
# Approve specific tool calls
response = service.execute(ApprovalRequest(
conversation_id="conv-123",
approvals=[
ToolCallApproval(tool_call_id="tc-1", approved=True),
ToolCallApproval(tool_call_id="tc-2", approved=False, rejection_reason="Too risky"),
],
))
# Convenience methods
response = service.approve_single("conv-123", "tc-1")
response = service.reject_single("conv-123", "tc-2", "Not needed")
response = service.approve_all("conv-123")
response = service.reject_all("conv-123", "User cancelled")
DTOs¶
Data Transfer Objects for communication between layers.
AgentRequest¶
Request DTO for agent execution.
from dcaf.core.application.dto import AgentRequest
request = AgentRequest(
content="What pods are running?",
conversation_id="conv-123",
context={"tenant_name": "my-tenant", "k8s_namespace": "default"},
tools=[kubectl_tool, aws_tool],
system_prompt="You are a helpful DevOps assistant.",
stream=False,
# Tracing fields (optional)
user_id="user-123",
session_id="session-abc",
run_id="run-xyz",
request_id="req-456",
)
# Access helpers
conv_id = request.get_conversation_id() # ConversationId value object
context = request.get_platform_context() # PlatformContext with tracing merged in
Tracing Fields:
| Field | Description |
|---|---|
user_id |
User identifier for tracking and analytics |
session_id |
Groups related runs into a session |
run_id |
Unique identifier for this execution |
request_id |
HTTP request correlation ID |
See Tracing and Observability Guide for details.
AgentResponse¶
Response DTO from agent execution.
from dcaf.core.application.dto import AgentResponse
# Properties
response.conversation_id # str
response.text # Optional[str]
response.tool_calls # List[ToolCallDTO]
response.has_pending_approvals # bool
response.is_complete # bool
# Helpers
response.pending_tool_calls # Tool calls awaiting approval
response.approved_tool_calls # Tool calls that were approved
response.executed_tool_calls # Tool calls that completed
# Serialization
response.to_dict()
ToolCallDTO¶
DTO representing a tool call.
from dcaf.core.application.dto import ToolCallDTO
tc = ToolCallDTO(
id="tc-123",
name="kubectl",
input={"command": "get pods"},
description="Execute kubectl commands",
intent="List running pods",
requires_approval=True,
status="pending", # pending, approved, completed, rejected, failed
result=None,
error=None,
)
# From domain entity
tc = ToolCallDTO.from_tool_call(tool_call_entity)
StreamEvent¶
Streaming event for real-time responses.
from dcaf.core.application.dto import StreamEvent, StreamEventType
# Event types
StreamEventType.TEXT_DELTA # Text chunk
StreamEventType.TOOL_USE_START # Tool call starting
StreamEventType.TOOL_USE_DELTA # Tool call input chunk
StreamEventType.TOOL_USE_END # Tool call complete
StreamEventType.MESSAGE_START # Message starting
StreamEventType.MESSAGE_END # Message complete with response
StreamEventType.ERROR # Error occurred
# Factory methods
event = StreamEvent.text_delta("Hello")
event = StreamEvent.tool_use_start("tc-123", "kubectl")
event = StreamEvent.error("Connection failed", code="TIMEOUT")
Wiring It Together¶
Here's a complete example of setting up the application layer:
from dcaf.core.adapters.outbound.agno import AgnoAdapter
from dcaf.core.adapters.outbound.persistence import InMemoryConversationRepository
from dcaf.core.application.services import AgentService, ApprovalService
from dcaf.core.domain.services import ApprovalPolicy
from dcaf.core.testing import FakeEventPublisher
# Create adapters
runtime = AgnoAdapter(
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
provider="bedrock",
)
conversations = InMemoryConversationRepository()
events = FakeEventPublisher() # Or a real implementation
# Create policy
policy = ApprovalPolicy()
# Create use cases
execute_agent = AgentService(
runtime=runtime,
conversations=conversations,
events=events,
approval_policy=policy,
)
approve_tool_call = ApprovalService(
conversations=conversations,
events=events,
)
Best Practices¶
- Use DTOs at boundaries: Don't pass domain entities directly to external layers
- Keep use cases thin: They orchestrate, they don't contain business logic
- Inject dependencies: Use constructor injection for all ports
- Handle events: Publish domain events for audit trails and side effects
- Validate inputs: DTOs should validate their inputs