communication

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 11 Imported by: 0

README

A2A (Agent-to-Agent) Package

This package implements Google's Agent-to-Agent (a2a) protocol for dspy-go, enabling agent composition and interoperability.

✅ Implementation Complete

Core Features
  • In-process agent composition - Parent agents with sub-agents (like ADK Python)
  • a2a message protocol - Message, Task, Artifact types
  • Bidirectional conversion - dspy-go ↔ a2a format
  • HTTP server - Optional JSON-RPC over HTTP (for remote agents)
  • Comprehensive tests - 60+ test cases covering all functionality

Package Structure

pkg/agents/communication/
├── protocol.go           # Core types (Message, Task, Part, AgentCard)
├── converters.go         # Bidirectional message conversion
├── agent_executor.go     # In-process agent composition ⭐
├── server.go             # HTTP server for remote access
├── *_test.go             # Comprehensive test suite
└── README.md             # This file

Quick Start

Compose agents hierarchically, similar to ADK Python's sub_agents:

import a2a "github.com/darwishdev/dspy-go/pkg/agents/communication"

// Create sub-agents
searchAgent := // ... your search agent
reasoningAgent := // ... your reasoning agent

searchExec := a2a.NewExecutor(searchAgent)
reasoningExec := a2a.NewExecutor(reasoningAgent)

// Create parent agent with sub-agents
parent := // ... your orchestrator agent
parentExec := a2a.NewExecutor(parent).
    WithSubAgent("search", searchExec).
    WithSubAgent("reasoning", reasoningExec)

// Parent calls sub-agent (in-process via a2a protocol)
msg := a2a.NewUserMessage("What is the capital of France?")
artifact, _ := parentExec.CallSubAgent(ctx, "search", msg)

fmt.Println(a2a.ExtractTextFromArtifact(artifact))
2. Direct Execution

Execute an agent with a2a messages:

executor := a2a.NewExecutor(myAgent)

msg := a2a.NewUserMessage("Explain quantum physics")
task, _ := executor.SendMessage(ctx, msg)

// Access results
for _, artifact := range task.Artifacts {
    fmt.Println(a2a.ExtractTextFromArtifact(artifact))
}
3. Multi-Level Hierarchy

Build complex agent systems:

// Level 3: Specialist agents
calculator := a2a.NewExecutor(calculatorAgent)
webSearch := a2a.NewExecutor(searchAgent)

// Level 2: Tool orchestrator
toolAgent := a2a.NewExecutor(toolOrchestratorAgent).
    WithSubAgent("calculator", calculator).
    WithSubAgent("search", webSearch)

// Level 1: Main orchestrator
mainAgent := a2a.NewExecutor(mainOrchestratorAgent).
    WithSubAgent("tools", toolAgent)

// Deep composition works automatically
result, _ := mainAgent.CallSubAgentSimple(ctx, "tools", "Calculate 2+2")
4. HTTP Server (Optional, for Remote Access)

Expose your agent via HTTP:

server, _ := a2a.NewServer(myAgent, a2a.ServerConfig{
    Host: "localhost",
    Port: 8080,
    Name: "MyAgent",
    Description: "A helpful agent",
})

server.Start(ctx)
// AgentCard: http://localhost:8080/.well-known/agent.json
// JSON-RPC: http://localhost:8080/rpc

Development

# Run tests
go test ./pkg/agents/communication/...

# Run with coverage
go test -cover ./pkg/agents/communication/...

# Build examples
go build ./examples/a2a_composition/...

References

Documentation

Overview

Package a2a implements Google's Agent-to-Agent (a2a) protocol for dspy-go. This package provides JSON-RPC over HTTP transport for interoperability with Python ADK agents and other a2a-compatible agents.

Index

Constants

View Source
const (
	RPCErrorCodeParseError     = -32700
	RPCErrorCodeInvalidRequest = -32600
	RPCErrorCodeMethodNotFound = -32601
	RPCErrorCodeInvalidParams  = -32602
	RPCErrorCodeInternalError  = -32603
)

Standard JSON-RPC error codes.

Variables

This section is empty.

Functions

func CapabilitiesToToolMetadata

func CapabilitiesToToolMetadata(capabilities []Capability) []*core.ToolMetadata

CapabilitiesToToolMetadata converts a2a Capabilities to dspy-go ToolMetadata. Only function-type capabilities are converted. Note: The returned metadata describes remote tools but doesn't include implementation. Use this when discovering remote agent capabilities via AgentCard.

func CreateSimpleInput

func CreateSimpleInput(question string) map[string]interface{}

CreateSimpleInput creates agent input from a simple text question.

func CreateSimpleOutput

func CreateSimpleOutput(answer string) map[string]interface{}

CreateSimpleOutput creates agent output from a simple text answer.

func ExtractTextFromArtifact

func ExtractTextFromArtifact(artifact Artifact) string

ExtractTextFromArtifact extracts all text content from an artifact.

func ExtractTextFromMessage

func ExtractTextFromMessage(msg *Message) string

ExtractTextFromMessage extracts all text content from a message. Returns concatenated text from all text parts, separated by newlines.

func MessageToAgentInput

func MessageToAgentInput(msg *Message) (map[string]interface{}, error)

MessageToAgentInput converts an a2a Message to dspy-go agent input format. Text parts are extracted and mapped to input fields based on metadata or position. The first text part without metadata defaults to the "question" field.

Types

type A2AExecutor

type A2AExecutor struct {
	// contains filtered or unexported fields
}

A2AExecutor wraps a dspy-go agent and enables a2a protocol communication. It supports sub-agents for hierarchical agent composition without HTTP.

Similar to ADK Python's LLMAgent with sub_agents parameter:

parent = LLMAgent(model=llm, sub_agents=[agent1, agent2])

Usage:

executor := a2a.NewExecutor(myAgent).
    WithSubAgent("search", searchExecutor).
    WithSubAgent("reasoning", reasoningExecutor)

result := executor.SendMessage(ctx, message)

func NewExecutor

func NewExecutor(agent agents.Agent) *A2AExecutor

NewExecutor creates a new a2a executor wrapping the given agent.

func NewExecutorWithConfig

func NewExecutorWithConfig(agent agents.Agent, config ExecutorConfig) *A2AExecutor

NewExecutorWithConfig creates a new executor with custom configuration.

func NewExecutorWithSubAgents

func NewExecutorWithSubAgents(agent agents.Agent, subAgents map[string]*A2AExecutor) *A2AExecutor

NewExecutorWithSubAgents creates an executor with multiple sub-agents at once.

Example:

executor := a2a.NewExecutorWithSubAgents(parent, map[string]*A2AExecutor{
    "search": searchExecutor,
    "reasoning": reasoningExecutor,
})

func (*A2AExecutor) CallSubAgent

func (e *A2AExecutor) CallSubAgent(ctx context.Context, name string, msg *Message) (Artifact, error)

CallSubAgent calls a registered sub-agent with an a2a message. This enables hierarchical agent composition:

parent.CallSubAgent("search", userMessage)
parent.CallSubAgent("reasoning", searchResults)

The communication uses a2a protocol but happens in-process (no HTTP).

func (*A2AExecutor) CallSubAgentSimple

func (e *A2AExecutor) CallSubAgentSimple(ctx context.Context, name string, text string) (string, error)

CallSubAgentSimple is a convenience method that creates a simple text message and calls the sub-agent.

func (*A2AExecutor) Execute

func (e *A2AExecutor) Execute(ctx context.Context, msg *Message) (Artifact, error)

Execute provides a simpler interface that returns just the artifact. This is useful when you don't need the full task tracking.

func (*A2AExecutor) GetAgentCard

func (e *A2AExecutor) GetAgentCard() AgentCard

GetAgentCard returns an AgentCard describing this executor. Useful for discovery and introspection.

func (*A2AExecutor) GetCapabilities

func (e *A2AExecutor) GetCapabilities() []Capability

GetCapabilities returns the capabilities of this executor. This includes both the wrapped agent's tools and registered sub-agents.

func (*A2AExecutor) GetSubAgent

func (e *A2AExecutor) GetSubAgent(name string) (*A2AExecutor, bool)

GetSubAgent retrieves a sub-agent by name.

func (*A2AExecutor) ListSubAgents

func (e *A2AExecutor) ListSubAgents() []string

ListSubAgents returns all registered sub-agent names.

func (*A2AExecutor) Name

func (e *A2AExecutor) Name() string

Name returns the executor's name.

func (*A2AExecutor) SendMessage

func (e *A2AExecutor) SendMessage(ctx context.Context, msg *Message) (*Task, error)

SendMessage executes the agent with an a2a message and returns a task. This is the primary way to interact with an a2a-wrapped agent.

The message is converted to agent input, the agent executes, and the output is converted to an a2a artifact.

func (*A2AExecutor) UnwrapAgent

func (e *A2AExecutor) UnwrapAgent() agents.Agent

UnwrapAgent returns the underlying wrapped agent. Useful for accessing agent-specific functionality.

func (*A2AExecutor) WithSubAgent

func (e *A2AExecutor) WithSubAgent(name string, subAgent *A2AExecutor) *A2AExecutor

WithSubAgent registers a sub-agent that can be called by this agent. The sub-agent becomes available as a capability.

type AgentCard

type AgentCard struct {
	Name         string                 `json:"name"`
	Description  string                 `json:"description"`
	URL          string                 `json:"url"` // RPC endpoint URL
	Version      string                 `json:"version"`
	Capabilities []Capability           `json:"capabilities,omitempty"`
	Metadata     map[string]interface{} `json:"metadata,omitempty"`
}

AgentCard describes an agent's capabilities and endpoint. This is served at /.well-known/agent.json for discovery.

type Artifact

type Artifact struct {
	ArtifactID string                 `json:"artifactId"`
	Parts      []Part                 `json:"parts"`
	Metadata   map[string]interface{} `json:"metadata,omitempty"`
}

Artifact represents output from a task.

func AgentOutputToArtifact

func AgentOutputToArtifact(output map[string]interface{}) (Artifact, error)

AgentOutputToArtifact converts dspy-go agent output to an a2a Artifact. Similar to AgentOutputToMessage but wraps the result in an Artifact structure.

func CreateErrorArtifact

func CreateErrorArtifact(err error) Artifact

CreateErrorArtifact creates an a2a Artifact representing an error.

func NewArtifact

func NewArtifact(parts ...Part) Artifact

NewArtifact creates a new artifact with the given parts.

func NewArtifactWithMetadata

func NewArtifactWithMetadata(metadata map[string]interface{}, parts ...Part) Artifact

NewArtifactWithMetadata creates a new artifact with parts and metadata.

type Capability

type Capability struct {
	Name        string                 `json:"name"`
	Description string                 `json:"description"`
	Type        string                 `json:"type"` // "function", "tool", "service"
	Schema      map[string]interface{} `json:"schema,omitempty"`
}

Capability describes a specific capability of an agent.

func NewCapability

func NewCapability(name, description, capType string) Capability

NewCapability creates a new capability.

func ToolsToCapabilities

func ToolsToCapabilities(tools []core.Tool) []Capability

ToolsToCapabilities converts dspy-go Tools to a2a Capabilities. Each tool becomes a function-type capability with its schema.

func (Capability) WithSchema

func (c Capability) WithSchema(schema map[string]interface{}) Capability

WithSchema adds a schema to the capability.

type Duration

type Duration time.Duration

Duration is a wrapper around time.Duration for easier JSON unmarshaling.

type ExecutorConfig

type ExecutorConfig struct {
	Name string // Agent name for identification
}

ExecutorConfig holds configuration for the executor.

type FilePart

type FilePart struct {
	URI      string `json:"uri,omitempty"`   // URI reference to file
	Bytes    string `json:"bytes,omitempty"` // base64-encoded file content
	MimeType string `json:"mimeType"`        // MIME type (e.g., "image/png")
}

FilePart represents a file attachment. Files can be referenced by URI or embedded as base64-encoded bytes.

type JSONRPCRequest

type JSONRPCRequest struct {
	JSONRPC string                 `json:"jsonrpc"` // Must be "2.0"
	Method  string                 `json:"method"`
	Params  map[string]interface{} `json:"params,omitempty"`
	ID      interface{}            `json:"id"` // string, number, or null
}

JSONRPCRequest represents a JSON-RPC 2.0 request.

func NewJSONRPCRequest

func NewJSONRPCRequest(method string, params map[string]interface{}) *JSONRPCRequest

NewJSONRPCRequest creates a new JSON-RPC request.

type JSONRPCResponse

type JSONRPCResponse struct {
	JSONRPC string      `json:"jsonrpc"` // Must be "2.0"
	Result  interface{} `json:"result,omitempty"`
	Error   *RPCError   `json:"error,omitempty"`
	ID      interface{} `json:"id"`
}

JSONRPCResponse represents a JSON-RPC 2.0 response.

func NewJSONRPCError

func NewJSONRPCError(id interface{}, code int, message string) *JSONRPCResponse

NewJSONRPCError creates an error JSON-RPC response.

func NewJSONRPCResponse

func NewJSONRPCResponse(id interface{}, result interface{}) *JSONRPCResponse

NewJSONRPCResponse creates a successful JSON-RPC response.

type Message

type Message struct {
	MessageID string                 `json:"messageId"`
	Role      Role                   `json:"role"`
	Parts     []Part                 `json:"parts"`
	ContextID string                 `json:"contextId,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

Message represents a message in the a2a protocol. Messages contain one or more parts and can maintain context across multiple exchanges.

func AgentOutputToMessage

func AgentOutputToMessage(output map[string]interface{}) (*Message, error)

AgentOutputToMessage converts dspy-go agent output to an a2a Message. Each output field becomes a text part with metadata indicating the field name. Internal fields (prefixed with "_") are excluded from conversion.

func CreateErrorMessage

func CreateErrorMessage(err error) *Message

CreateErrorMessage creates an a2a Message representing an error.

func NewAgentMessage

func NewAgentMessage(text string) *Message

NewAgentMessage creates an agent message with text.

func NewMessage

func NewMessage(role Role, parts ...Part) *Message

NewMessage creates a new message with the given role and parts.

func NewUserMessage

func NewUserMessage(text string) *Message

NewUserMessage creates a user message with text.

func (*Message) AddPart

func (m *Message) AddPart(part Part) *Message

AddPart adds a part to the message.

func (*Message) WithContext

func (m *Message) WithContext(contextID string) *Message

WithContext sets the context ID for this message.

type Part

type Part struct {
	Type     string                 `json:"type"` // "text", "file", or "data"
	Text     string                 `json:"text,omitempty"`
	File     *FilePart              `json:"file,omitempty"`
	Data     map[string]interface{} `json:"data,omitempty"`
	Metadata map[string]interface{} `json:"metadata,omitempty"`
}

Part represents a piece of content in a message. Parts can be text, files, or structured data.

func NewDataPart

func NewDataPart(data map[string]interface{}) Part

NewDataPart creates a structured data part.

func NewFilePart

func NewFilePart(uri, mimeType string) Part

NewFilePart creates a file part from a URI.

func NewFilePartFromBytes

func NewFilePartFromBytes(bytes, mimeType string) Part

NewFilePartFromBytes creates a file part from base64-encoded bytes.

func NewTextPart

func NewTextPart(text string) Part

NewTextPart creates a text part.

func NewTextPartWithMetadata

func NewTextPartWithMetadata(text string, metadata map[string]interface{}) Part

NewTextPartWithMetadata creates a text part with metadata.

type RPCError

type RPCError struct {
	Code    int         `json:"code"`
	Message string      `json:"message"`
	Data    interface{} `json:"data,omitempty"`
}

RPCError represents a JSON-RPC error.

func NewRPCError

func NewRPCError(code int, message string) *RPCError

NewRPCError creates a new RPC error.

func (*RPCError) Error

func (e *RPCError) Error() string

Error implements the error interface.

func (*RPCError) WithData

func (e *RPCError) WithData(data interface{}) *RPCError

WithData adds data to the RPC error.

type Role

type Role string

Role represents the sender of a message.

const (
	RoleUser  Role = "user"
	RoleAgent Role = "agent"
)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server exposes a dspy-go agent via the a2a protocol over HTTP.

func NewServer

func NewServer(agent agents.Agent, config ServerConfig) (*Server, error)

NewServer creates a new a2a server wrapping the given agent.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully stops the server.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start begins serving the a2a protocol.

type ServerConfig

type ServerConfig struct {
	Host        string   // Host address (e.g., "localhost", "0.0.0.0")
	Port        int      // Port number (e.g., 8080)
	Name        string   // Agent name for AgentCard
	Description string   // Agent description for AgentCard
	Version     string   // Agent version
	PathPrefix  string   // Optional path prefix (e.g., "/api/v1")
	MaxTaskAge  Duration // How long to keep completed tasks (default: 1 hour)
}

ServerConfig holds configuration for the a2a server.

type Task

type Task struct {
	ID        string                 `json:"id"`
	ContextID string                 `json:"contextId,omitempty"`
	Status    TaskStatus             `json:"status"`
	History   []Message              `json:"history,omitempty"`
	Artifacts []Artifact             `json:"artifacts,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

Task represents a running or completed task.

func NewTask

func NewTask() *Task

NewTask creates a new task with submitted status.

func (*Task) AddArtifact

func (t *Task) AddArtifact(artifact Artifact)

AddArtifact adds an artifact to the task.

func (*Task) GetArtifacts

func (t *Task) GetArtifacts() []Artifact

GetArtifacts returns a copy of the artifacts (thread-safe).

func (*Task) GetStatus

func (t *Task) GetStatus() TaskStatus

GetStatus returns a copy of the current status (thread-safe).

func (*Task) UpdateStatus

func (t *Task) UpdateStatus(state TaskState)

UpdateStatus updates the task status.

type TaskArtifactUpdateEvent

type TaskArtifactUpdateEvent struct {
	TaskID    string                 `json:"taskId"`
	Artifact  Artifact               `json:"artifact"`
	ContextID string                 `json:"contextId,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	LastChunk bool                   `json:"lastChunk"` // true if this is the last artifact chunk
	Append    bool                   `json:"append"`    // true if this should be appended to previous artifact
}

TaskArtifactUpdateEvent is sent when a task produces an artifact.

func NewTaskArtifactUpdateEvent

func NewTaskArtifactUpdateEvent(taskID string, artifact Artifact, lastChunk bool) *TaskArtifactUpdateEvent

NewTaskArtifactUpdateEvent creates an artifact update event.

type TaskState

type TaskState string

TaskState represents the current state of a task.

const (
	TaskStateSubmitted     TaskState = "submitted"      // Task has been submitted
	TaskStateWorking       TaskState = "working"        // Task is being processed
	TaskStateCompleted     TaskState = "completed"      // Task completed successfully
	TaskStateFailed        TaskState = "failed"         // Task failed with error
	TaskStateInputRequired TaskState = "input_required" // Task requires user input
	TaskStateAuthRequired  TaskState = "auth_required"  // Task requires authentication
)

func (TaskState) IsTerminal

func (s TaskState) IsTerminal() bool

IsTerminal returns true if the state is terminal (completed or failed).

type TaskStatus

type TaskStatus struct {
	State     TaskState `json:"state"`
	Message   *Message  `json:"message,omitempty"`
	Timestamp string    `json:"timestamp"` // RFC3339 format
}

TaskStatus represents the current status of a task.

func NewTaskStatus

func NewTaskStatus(state TaskState) TaskStatus

NewTaskStatus creates a new task status with the current timestamp.

func (TaskStatus) WithMessage

func (ts TaskStatus) WithMessage(msg *Message) TaskStatus

WithMessage adds a message to the task status.

type TaskStatusUpdateEvent

type TaskStatusUpdateEvent struct {
	TaskID    string                 `json:"taskId"`
	Status    TaskStatus             `json:"status"`
	ContextID string                 `json:"contextId,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	Final     bool                   `json:"final"` // true if this is the final status update
}

TaskStatusUpdateEvent is sent when a task's status changes.

func NewTaskStatusUpdateEvent

func NewTaskStatusUpdateEvent(taskID string, status TaskStatus, final bool) *TaskStatusUpdateEvent

NewTaskStatusUpdateEvent creates a status update event.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL