Skip to content

A2A (Agent-to-Agent) Protocol

DCAF supports the A2A (Agent-to-Agent) protocol developed by Google, enabling agents to discover and communicate with each other using standardized HTTP/JSON-RPC interfaces.


Overview

A2A is an open protocol for agent-to-agent communication that enables:

  • 🔍 Agent Discovery: Agents expose a card describing their capabilities
  • 📡 Task Execution: Agents can send tasks to other agents
  • Async Support: Long-running tasks can execute asynchronously
  • 🌐 Standard Protocol: Uses HTTP, JSON-RPC, and SSE (Server-Sent Events)

Why A2A?

Traditional monolithic agents try to do everything. With A2A, you can:

  • Specialize: Build focused agents that excel at specific domains (K8s, AWS, databases)
  • Compose: Combine specialist agents into powerful multi-agent systems
  • Scale: Distribute work across multiple agents
  • Interoperate: Work with agents from other frameworks that support A2A

Quick Start

Server: Expose an Agent via A2A

from dcaf.core import Agent, serve
from dcaf.tools import tool

@tool(description="List Kubernetes pods")
def list_pods(namespace: str = "default") -> str:
    return kubectl(f"get pods -n {namespace}")

# Create agent with A2A identity
agent = Agent(
    name="k8s-assistant",              # A2A name (required for A2A)
    description="Kubernetes helper",   # A2A description
    tools=[list_pods],
)

# Enable A2A alongside regular REST API
serve(agent, port=8000, a2a=True)

When A2A is enabled, these endpoints are added:

Endpoint Purpose
GET /.well-known/agent.json Agent card (discovery)
POST /a2a/tasks/send Receive tasks
GET /a2a/tasks/{id} Task status

Client: Call a Remote Agent

from dcaf.core.a2a import RemoteAgent

# Connect to remote agent
k8s = RemoteAgent(url="http://k8s-agent:8000")

# Send a task
result = k8s.send("What pods are failing in production?")
print(result.text)

# Check agent capabilities
print(f"Agent: {k8s.name}")           # "k8s-assistant"
print(f"Skills: {k8s.skills}")        # ["list_pods", ...]

Agent Card (Discovery)

When you expose an agent via A2A, it automatically generates an Agent Card that describes its capabilities. You can also provide a custom card for full control over the A2A discovery metadata.

Auto-Generated Card

By default, the card is generated from your Agent instance:

{
  "name": "k8s-assistant",
  "description": "Manages Kubernetes clusters",
  "url": "http://k8s-agent:8000",
  "skills": ["list_pods", "delete_pod", "describe_pod"],
  "version": "1.0",
  "metadata": {
    "framework": "dcaf",
    "model": "anthropic.claude-3-sonnet-20240229-v1:0",
    "provider": "bedrock"
  }
}

Custom Agent Card

For full control over the agent card — including fields from the A2A spec that DCAF doesn't auto-generate — pass a2a_agent_card to serve() or create_app().

Using an AgentCard instance

from dcaf.core import Agent, serve
from dcaf.core.a2a.models import AgentCard

agent = Agent(name="my-agent", tools=[...])

custom_card = AgentCard(
    name="ci-cd-agent",
    description="Jenkins CI/CD assistant",
    url="",  # Set automatically from request URL
    skills=["fetch_logs", "trigger_build"],
    version="2.0",
    metadata={"org": "duplocloud", "team": "platform"},
)

serve(agent, a2a=True, a2a_agent_card=custom_card)

Using a dict (full A2A spec compliance)

Pass a dict to include arbitrary fields from the A2A spec without being limited to the AgentCard model:

from dcaf.core import Agent, create_app

agent = Agent(name="my-agent", tools=[...])

app = create_app(agent, a2a=True, a2a_agent_card={
    "name": "ci-cd-agent",
    "description": "Jenkins CI/CD assistant",
    "skills": ["fetch_logs", "trigger_build"],
    "authentication": {"schemes": ["bearer"]},
    "capabilities": {"streaming": True, "pushNotifications": False},
    "provider": {"organization": "DuploCloud"},
})

Note

The url field is always set dynamically from the incoming request's base URL, regardless of whether you provide a custom card or use auto-generation.

Accessing Agent Cards

from dcaf.core.a2a import RemoteAgent

remote = RemoteAgent(url="http://k8s-agent:8000")

# Card is fetched automatically on first access
print(remote.card.name)         # "k8s-assistant"
print(remote.card.description)  # "Manages Kubernetes clusters"
print(remote.card.skills)       # ["list_pods", "delete_pod", ...]

Multi-Agent Patterns

Pattern 1: Peer-to-Peer

Agents directly communicate with each other.

# === k8s_agent.py ===
from dcaf.core import Agent, serve

agent = Agent(
    name="k8s-assistant",
    tools=[list_pods, delete_pod],
)
serve(agent, port=8001, a2a=True)


# === aws_agent.py ===
from dcaf.core import Agent, serve

agent = Agent(
    name="aws-assistant",
    tools=[list_ec2, describe_vpc],
)
serve(agent, port=8002, a2a=True)


# === client.py ===
from dcaf.core.a2a import RemoteAgent

k8s = RemoteAgent(url="http://localhost:8001")
aws = RemoteAgent(url="http://localhost:8002")

# Call each agent directly
k8s_result = k8s.send("List pods")
aws_result = aws.send("List EC2 instances")

Pattern 2: Orchestration

An orchestrator agent routes requests to specialist agents.

from dcaf.core import Agent
from dcaf.core.a2a import RemoteAgent

# Connect to specialist agents
k8s = RemoteAgent(url="http://k8s-agent:8000")
aws = RemoteAgent(url="http://aws-agent:8000")
db = RemoteAgent(url="http://db-agent:8000")

# Orchestrator uses remote agents as tools
orchestrator = Agent(
    name="orchestrator",
    tools=[
        k8s.as_tool(),  # Wraps remote agent as a tool
        aws.as_tool(),
        db.as_tool(),
    ],
    system="""You are an orchestrator that routes requests to specialist agents.
    Use k8s_assistant for Kubernetes questions.
    Use aws_assistant for AWS questions.
    Use db_assistant for database questions."""
)

# The LLM decides which specialist to call
response = orchestrator.run([
    {"role": "user", "content": "How many pods are running and what's my AWS bill?"}
])
# The orchestrator will call both k8s and aws agents

RemoteAgent API Reference

Constructor

RemoteAgent(
    url: str,
    name: str | None = None,
    adapter: A2AClientAdapter | None = None,
)

Parameters:

  • url: Base URL of the remote agent (e.g., "http://k8s-agent:8000")
  • name: Optional name override (uses card name if not provided)
  • adapter: Optional custom A2A client adapter (default: Agno)

Methods

send()

Send a message and wait for response (synchronous).

def send(
    message: str,
    context: dict | None = None,
    timeout: float = 60.0,
) -> TaskResult

Example:

result = remote.send(
    "List pods in production",
    context={"tenant_name": "prod"},
    timeout=120.0,
)
print(result.text)
print(result.status)  # "completed", "failed", "pending"

send_async()

Send a message asynchronously (returns immediately).

def send_async(
    message: str,
    context: dict | None = None,
) -> str  # Returns task_id

Example:

task_id = remote.send_async("Analyze all pod logs")

# Later, check status
result = remote.get_task_status(task_id)
if result.status == "completed":
    print(result.text)

as_tool()

Convert the remote agent to a tool for use by other agents.

def as_tool() -> Tool

Example:

k8s = RemoteAgent(url="http://k8s-agent:8000")

# Use as a tool
orchestrator = Agent(
    tools=[k8s.as_tool()],
    system="Use k8s_assistant for Kubernetes questions"
)

Properties

remote.card         # AgentCard: Agent metadata
remote.name         # str: Agent name
remote.description  # str: Agent description
remote.skills       # list[str]: List of tool names

TaskResult

Response from a remote agent.

@dataclass
class TaskResult:
    task_id: str              # ID of the task
    text: str                 # Response text
    status: str               # "completed", "failed", "pending"
    artifacts: list[dict]     # Structured outputs
    error: str | None         # Error message (if failed)
    metadata: dict            # Additional metadata

Example:

result = remote.send("List pods")

print(result.task_id)    # "task_abc123"
print(result.text)       # "Here are the pods: nginx-abc, redis-xyz..."
print(result.status)     # "completed"
print(result.artifacts)  # []

Server Configuration

Basic Setup

from dcaf.core import Agent, serve

agent = Agent(
    name="my-agent",              # Required for A2A
    description="Does X, Y, Z",   # Recommended for A2A
    tools=[...],
)

# Enable A2A
serve(agent, port=8000, a2a=True)

Advanced Setup

For more control over the FastAPI app:

from dcaf.core import Agent, create_app
import uvicorn

agent = Agent(name="my-agent", tools=[...])

# Create app with A2A enabled
app = create_app(agent, a2a=True)

# Add custom middleware, etc.
@app.middleware("http")
async def log_requests(request, call_next):
    print(f"Request: {request.url}")
    return await call_next(request)

# Run with custom configuration
uvicorn.run(app, host="0.0.0.0", port=8000)

Custom A2A Routes

You can also manually add A2A routes:

from dcaf.core import Agent, create_app
from dcaf.core.a2a import create_a2a_routes

agent = Agent(name="my-agent", tools=[...])

# Create base app without A2A
app = create_app(agent, a2a=False)

# Add A2A routes manually
for router in create_a2a_routes(agent):
    app.include_router(router)

Complete Example: Multi-Agent System

This example shows a complete multi-agent system with three specialist agents and an orchestrator.

Specialist Agent 1: Kubernetes

# k8s_agent.py
from dcaf.core import Agent, serve
from dcaf.tools import tool
import subprocess

def kubectl(cmd: str) -> str:
    result = subprocess.run(
        f"kubectl {cmd}",
        shell=True,
        capture_output=True,
        text=True
    )
    return result.stdout or result.stderr

@tool(description="List pods in a namespace")
def list_pods(namespace: str = "default") -> str:
    return kubectl(f"get pods -n {namespace}")

@tool(requires_approval=True, description="Delete a pod")
def delete_pod(name: str, namespace: str = "default") -> str:
    return kubectl(f"delete pod {name} -n {namespace}")

agent = Agent(
    name="k8s-assistant",
    description="Manages Kubernetes clusters",
    tools=[list_pods, delete_pod],
)

if __name__ == "__main__":
    serve(agent, port=8001, a2a=True)

Specialist Agent 2: AWS

# aws_agent.py
from dcaf.core import Agent, serve
from dcaf.tools import tool
import boto3

@tool(description="List EC2 instances")
def list_ec2() -> str:
    ec2 = boto3.client('ec2')
    response = ec2.describe_instances()
    instances = []
    for reservation in response['Reservations']:
        for instance in reservation['Instances']:
            instances.append(f"{instance['InstanceId']} - {instance['State']['Name']}")
    return "\n".join(instances)

agent = Agent(
    name="aws-assistant",
    description="Manages AWS resources",
    tools=[list_ec2],
)

if __name__ == "__main__":
    serve(agent, port=8002, a2a=True)

Orchestrator Agent

# orchestrator.py
from dcaf.core import Agent, serve
from dcaf.core.a2a import RemoteAgent

# Connect to specialist agents
k8s = RemoteAgent(url="http://localhost:8001")
aws = RemoteAgent(url="http://localhost:8002")

# Orchestrator routes to specialists
orchestrator = Agent(
    name="orchestrator",
    description="Routes requests to specialist agents",
    tools=[k8s.as_tool(), aws.as_tool()],
    system="""You are an intelligent orchestrator for infrastructure management.

    You have access to two specialist agents:
    - k8s_assistant: For Kubernetes questions (pods, deployments, services)
    - aws_assistant: For AWS questions (EC2, VPC, billing)

    Route each question to the appropriate specialist. You can call both if needed.
    """,
)

if __name__ == "__main__":
    serve(orchestrator, port=8000, a2a=True)

Usage

# client.py
from dcaf.core.a2a import RemoteAgent

# Connect to orchestrator
orchestrator = RemoteAgent(url="http://localhost:8000")

# Ask a question - orchestrator will route to k8s agent
result = orchestrator.send("How many pods are running in production?")
print(result.text)

# Ask another - orchestrator will route to aws agent
result = orchestrator.send("List my EC2 instances")
print(result.text)

# Ask complex - orchestrator will call both
result = orchestrator.send("Give me a status of my infrastructure")
print(result.text)

Testing

Testing A2A Server

import pytest
from dcaf.core import Agent
from dcaf.core.a2a import RemoteAgent, generate_agent_card

def test_agent_card_generation():
    agent = Agent(
        name="test-agent",
        description="Test agent",
        tools=[my_tool],
    )

    card = generate_agent_card(agent, "http://localhost:8000")

    assert card.name == "test-agent"
    assert card.description == "Test agent"
    assert "my_tool" in card.skills

def test_a2a_task_execution():
    # Start agent with A2A in a fixture
    # ...

    remote = RemoteAgent(url="http://localhost:8000")
    result = remote.send("Test message")

    assert result.status == "completed"
    assert len(result.text) > 0

Integration Testing

import pytest
from dcaf.core import Agent, create_app
from fastapi.testclient import TestClient

@pytest.fixture
def a2a_agent():
    agent = Agent(
        name="test",
        description="Test agent",
        tools=[my_tool],
    )
    return create_app(agent, a2a=True)

def test_agent_card_endpoint(a2a_agent):
    client = TestClient(a2a_agent)
    response = client.get("/.well-known/agent.json")

    assert response.status_code == 200
    data = response.json()
    assert data["name"] == "test"
    assert "my_tool" in data["skills"]

def test_task_send_endpoint(a2a_agent):
    client = TestClient(a2a_agent)
    response = client.post(
        "/a2a/tasks/send",
        json={
            "id": "task_123",
            "message": "Test message",
            "context": {},
            "status": "pending",
        }
    )

    assert response.status_code == 200
    data = response.json()
    assert data["status"] in ["completed", "pending", "failed"]

Troubleshooting

Common Issues

Issue: "No A2A adapter available"

RuntimeError: No A2A adapter available. Install agno with: pip install agno

Solution: Install required dependencies:

pip install httpx  # For A2A client
pip install agno   # For Agno adapter (optional)

Issue: "Cannot reach agent at http://..."

ConnectionError: Cannot reach agent at http://k8s-agent:8000

Solution: - Check that the agent is running: curl http://k8s-agent:8000/.well-known/agent.json - Check network connectivity - Verify the URL is correct

Issue: Agent card missing tools

# Agent card shows: "skills": []

Solution: Make sure tools are provided when creating the agent:

agent = Agent(
    name="my-agent",
    tools=[tool1, tool2],  # Must provide tools
)

Client Configuration

Environment Variables

The A2A client (used by RemoteAgent) supports timeout configuration via environment variables:

Variable Default Description
BOTO3_READ_TIMEOUT 20 Read timeout in seconds for HTTP operations
BOTO3_CONNECT_TIMEOUT 10 Connection timeout in seconds

These are the same variables used by the Bedrock LLM, allowing unified timeout configuration across your application.

Example:

# Set longer timeouts for slow networks
export BOTO3_READ_TIMEOUT=60
export BOTO3_CONNECT_TIMEOUT=30
from dcaf.core.a2a import RemoteAgent

# Client will use environment-configured timeouts
remote = RemoteAgent(url="http://k8s-agent:8000")
result = remote.send("List pods")

Best Practices

1. Name Your Agents

Always provide a meaningful name for A2A agents:

# Good
agent = Agent(
    name="k8s-prod-assistant",
    description="Manages production Kubernetes cluster",
    ...
)

# Bad
agent = Agent(...)  # Default name: "dcaf-agent"

2. Provide Clear Descriptions

Help other agents understand what your agent does:

agent = Agent(
    name="k8s-assistant",
    description="Manages Kubernetes clusters. Can list, describe, delete pods and deployments.",
    ...
)

3. Handle Errors Gracefully

from dcaf.core.a2a import RemoteAgent

remote = RemoteAgent(url="http://k8s-agent:8000")

try:
    result = remote.send("List pods", timeout=30.0)
    if result.status == "failed":
        print(f"Task failed: {result.error}")
    else:
        print(result.text)
except ConnectionError:
    print("Agent is not available")
except TimeoutError:
    print("Task timed out")

4. Use Async for Long-Running Tasks

# For tasks that take > 30 seconds
task_id = remote.send_async("Analyze all logs")

# Poll for completion
import time
while True:
    result = remote.get_task_status(task_id)
    if result.status in ["completed", "failed"]:
        break
    time.sleep(5)

5. Secure Your A2A Endpoints

from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer

security = HTTPBearer()

async def verify_token(credentials = Depends(security)):
    if credentials.credentials != "your-secret-token":
        raise HTTPException(status_code=401)

# Add to routes
app = create_app(agent, a2a=True)

@app.middleware("http")
async def auth_middleware(request, call_next):
    # Implement your auth logic
    return await call_next(request)

See Also