Documentation
¶
Overview ¶
Package a2a provides an input component that implements the A2A (Agent-to-Agent) protocol for receiving task delegations from external agents.
Overview ¶
The A2A adapter enables agent-to-agent communication following the A2A protocol specification. It receives task requests from external agents and converts them to SemStreams TaskMessages for processing by the agentic system.
Architecture ¶
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ External Agent │────▶│ A2A Adapter │────▶│ NATS/JetStream │
│ (A2A Client) │ │ (SemStreams) │ │ (Agent Dispatch)│
└─────────────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ Agent Card │
│ (Capabilities) │
└──────────────────┘
Components ¶
The package consists of several key components:
- Component: The main LifecycleComponent that handles A2A requests
- TaskMapper: Translates between A2A tasks and SemStreams TaskMessages
- AgentCardGenerator: Generates A2A agent cards from OASF records
- Config: Configuration for transport, authentication, and endpoints
Configuration ¶
Example configuration for HTTP transport:
{
"transport": "http",
"listen_address": ":8080",
"agent_card_path": "/.well-known/agent.json",
"enable_authentication": true,
"request_timeout": "30s",
"max_concurrent_tasks": 10
}
Example configuration for SLIM transport:
{
"transport": "slim",
"slim_group_id": "did:agntcy:group:tenant-123",
"enable_authentication": true
}
Transport Options ¶
The adapter supports two transport mechanisms:
- HTTP: RESTful endpoints for task submission and agent card retrieval
- SLIM: MLS-encrypted group messaging via the SLIM bridge
A2A Protocol Support ¶
The adapter implements the following A2A protocol endpoints:
- GET /.well-known/agent.json: Returns the agent card
- POST /tasks/send: Submit a new task
- GET /tasks/get?id=<id>: Get task status
- POST /tasks/cancel: Cancel a running task
Agent Card Generation ¶
Agent cards are automatically generated from OASF records stored in the OASF_RECORDS KV bucket. The card includes:
- Agent name and description
- Capabilities derived from OASF skills
- Authentication methods (DID-based)
- Supported input/output modes
Task Flow ¶
Inbound (A2A → NATS):
- External agent submits task via A2A endpoint
- Adapter validates authentication (DID verification)
- Task converted to agentic.TaskMessage
- Published to agent.task.a2a.{task_id}
Outbound (NATS → A2A):
- Agent completes task, publishes to agent.complete.*
- Adapter converts result to A2A TaskResult
- Response sent back to requesting agent
Authentication ¶
When EnableAuthentication is true, the adapter verifies incoming requests using DID-based authentication. The requester's DID can be provided via:
- Authorization header
- X-Agent-DID header
- Signed message body (for SLIM transport)
Usage ¶
Register the component with the component registry:
import a2a "github.com/c360studio/semstreams/input/a2a"
func init() {
a2a.Register(registry)
}
See Also ¶
- input/slim: SLIM bridge for encrypted cross-organizational messaging
- output/directory-bridge: Registers agents with AGNTCY directories
- processor/oasf-generator: Generates OASF records for agent card
- agentic/identity: DID and verifiable credential management
- docs/concepts/23-a2a-protocol.md: A2A integration guide
Index ¶
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry RegistryInterface) error
- func SerializeAgentCard(card *AgentCard) ([]byte, error)
- type AgentCard
- type AgentCardGenerator
- type Artifact
- type Authentication
- type Capability
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) Stop(_ time.Duration) error
- func (c *Component) UpdateAgentCard(card *AgentCard)
- type Config
- type Credentials
- type FilePart
- type MessagePart
- type OASFRecord
- type OASFSkill
- type Provider
- type RegistryInterface
- type Skill
- type Task
- type TaskMapper
- func (m *TaskMapper) CreateTaskStatusUpdate(taskID string, state string, message string) *Task
- func (m *TaskMapper) FromTaskResult(taskID string, result string, err error) *TaskResult
- func (m *TaskMapper) ParseTask(data []byte) (*Task, error)
- func (m *TaskMapper) SerializeTaskResult(result *TaskResult) ([]byte, error)
- func (m *TaskMapper) ToTaskMessage(task *Task, requesterDID string) (*agentic.TaskMessage, error)
- type TaskMessage
- type TaskResult
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new A2A adapter component.
func Register ¶
func Register(registry RegistryInterface) error
Register registers the A2A adapter input component with the given registry.
func SerializeAgentCard ¶
SerializeAgentCard serializes an agent card to JSON.
Types ¶
type AgentCard ¶
type AgentCard struct {
// Name is the agent's display name.
Name string `json:"name"`
// Description describes what the agent does.
Description string `json:"description"`
// URL is the agent's A2A endpoint.
URL string `json:"url"`
// Version is the agent card schema version.
Version string `json:"version"`
// Provider contains information about the agent provider.
Provider *Provider `json:"provider,omitempty"`
// Capabilities lists what the agent can do.
Capabilities []Capability `json:"capabilities"`
// Authentication describes supported auth methods.
Authentication *Authentication `json:"authentication,omitempty"`
// DefaultInputModes lists supported input types.
DefaultInputModes []string `json:"defaultInputModes,omitempty"`
// DefaultOutputModes lists supported output types.
DefaultOutputModes []string `json:"defaultOutputModes,omitempty"`
// Skills lists specific skills the agent has.
Skills []Skill `json:"skills,omitempty"`
}
AgentCard represents an A2A Agent Card. This is the publicly-accessible description of an agent's capabilities.
func ParseAgentCard ¶
ParseAgentCard parses a JSON agent card.
type AgentCardGenerator ¶
type AgentCardGenerator struct {
// BaseURL is the base URL for the agent's A2A endpoint.
BaseURL string
// ProviderOrg is the provider organization name.
ProviderOrg string
// ProviderURL is the provider's website URL.
ProviderURL string
// AgentDID is the agent's DID for authentication.
AgentDID string
}
AgentCardGenerator generates A2A agent cards from OASF records.
func NewAgentCardGenerator ¶
func NewAgentCardGenerator(baseURL, providerOrg string) *AgentCardGenerator
NewAgentCardGenerator creates a new agent card generator.
func (*AgentCardGenerator) GenerateFromOASF ¶
func (g *AgentCardGenerator) GenerateFromOASF(record *OASFRecord) (*AgentCard, error)
GenerateFromOASF generates an agent card from an OASF record.
type Artifact ¶
type Artifact struct {
// Name is the artifact name.
Name string `json:"name"`
// Description describes the artifact.
Description string `json:"description,omitempty"`
// Parts contains the artifact content.
Parts []MessagePart `json:"parts"`
// Index is the artifact position in the output sequence.
Index int `json:"index,omitempty"`
// Metadata contains additional artifact metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
Artifact represents an output artifact from a task.
type Authentication ¶
type Authentication struct {
// Schemes lists supported auth schemes.
Schemes []string `json:"schemes"` // e.g., ["did", "bearer"]
// Credentials contains credential information.
Credentials *Credentials `json:"credentials,omitempty"`
}
Authentication describes supported authentication methods.
type Capability ¶
type Capability struct {
// Name is the capability name.
Name string `json:"name"`
// Description describes the capability.
Description string `json:"description,omitempty"`
}
Capability describes a capability the agent has.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the A2A adapter input component. It receives A2A task requests and publishes them to NATS for agent processing.
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema.
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics.
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns the current health status.
func (*Component) Initialize ¶
Initialize prepares the component.
func (*Component) InputPorts ¶
InputPorts returns configured input port definitions.
func (*Component) OutputPorts ¶
OutputPorts returns configured output port definitions.
func (*Component) UpdateAgentCard ¶
UpdateAgentCard updates the cached agent card.
type Config ¶
type Config struct {
// Ports defines the input/output port configuration.
Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`
// Transport specifies the A2A transport mechanism.
// Supported values: "http", "slim"
Transport string `json:"transport" schema:"type:string,description:A2A transport type,category:basic,default:http"`
// ListenAddress is the address to listen on for incoming A2A requests.
// Only used when transport is "http".
ListenAddress string `json:"listen_address" schema:"type:string,description:HTTP listen address,category:basic,default::8080"`
// AgentCardPath is the path to serve the agent card.
AgentCardPath string `` /* 131-byte string literal not displayed */
// SLIMGroupID is the SLIM group for A2A communication.
// Only used when transport is "slim".
SLIMGroupID string `json:"slim_group_id" schema:"type:string,description:SLIM group for A2A,category:advanced"`
// RequestTimeout is the timeout for processing A2A requests.
RequestTimeout string `json:"request_timeout" schema:"type:string,description:Request processing timeout,category:advanced,default:30s"`
// MaxConcurrentTasks is the maximum number of concurrent task executions.
MaxConcurrentTasks int `json:"max_concurrent_tasks" schema:"type:int,description:Maximum concurrent tasks,category:advanced,default:10"`
// EnableAuthentication enables DID-based authentication for requests.
EnableAuthentication bool `json:"enable_authentication" schema:"type:bool,description:Enable DID authentication,category:security,default:true"`
// AllowedAgents is a list of DIDs allowed to send tasks.
// Empty list allows all authenticated agents.
AllowedAgents []string `json:"allowed_agents" schema:"type:array,description:Allowed agent DIDs,category:security"`
// OASFBucket is the KV bucket containing OASF records for agent card generation.
OASFBucket string `json:"oasf_bucket" schema:"type:string,description:OASF records KV bucket,category:advanced,default:OASF_RECORDS"`
// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}
Config defines the configuration for the A2A adapter component.
func (*Config) GetRequestTimeout ¶
GetRequestTimeout returns the request timeout duration.
type Credentials ¶
type Credentials struct {
// DID is the agent's DID.
DID string `json:"did,omitempty"`
// PublicKeyJWK is the agent's public key in JWK format.
PublicKeyJWK json.RawMessage `json:"publicKeyJwk,omitempty"`
}
Credentials contains credential configuration.
type FilePart ¶
type FilePart struct {
// Name is the filename.
Name string `json:"name"`
// MimeType is the content type.
MimeType string `json:"mimeType"`
// URI is the file location.
URI string `json:"uri,omitempty"`
// Bytes is base64-encoded file content.
Bytes string `json:"bytes,omitempty"`
}
FilePart represents a file in a message.
type MessagePart ¶
type MessagePart struct {
// Type identifies the part type.
Type string `json:"type"` // text, file, data
// Text contains text content (for type="text").
Text string `json:"text,omitempty"`
// File contains file information (for type="file").
File *FilePart `json:"file,omitempty"`
// Data contains structured data (for type="data").
Data json.RawMessage `json:"data,omitempty"`
}
MessagePart represents a part of a task message.
type OASFRecord ¶
type OASFRecord struct {
Name string `json:"name"`
Version string `json:"version"`
SchemaVersion string `json:"schema_version"`
Authors []string `json:"authors,omitempty"`
CreatedAt string `json:"created_at"`
Description string `json:"description"`
Skills []OASFSkill `json:"skills,omitempty"`
Domains []string `json:"domains,omitempty"`
}
OASFRecord represents an OASF record (from processor/oasf-generator). This is used to generate agent cards.
type OASFSkill ¶
type OASFSkill struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Confidence float64 `json:"confidence,omitempty"`
Permissions []string `json:"permissions,omitempty"`
}
OASFSkill represents a skill in an OASF record.
type Provider ¶
type Provider struct {
// Organization is the provider's organization name.
Organization string `json:"organization"`
// URL is the provider's website.
URL string `json:"url,omitempty"`
}
Provider contains information about the agent provider.
type RegistryInterface ¶
type RegistryInterface interface {
RegisterWithConfig(component.RegistrationConfig) error
}
RegistryInterface defines the minimal interface needed for registration.
type Skill ¶
type Skill struct {
// ID is the skill identifier.
ID string `json:"id"`
// Name is the skill display name.
Name string `json:"name"`
// Description describes what the skill does.
Description string `json:"description,omitempty"`
// InputSchema describes the expected input format.
InputSchema json.RawMessage `json:"inputSchema,omitempty"`
// OutputSchema describes the output format.
OutputSchema json.RawMessage `json:"outputSchema,omitempty"`
}
Skill describes a specific skill from the OASF record.
type Task ¶
type Task struct {
// ID is the unique task identifier.
ID string `json:"id"`
// SessionID groups related tasks in a conversation.
SessionID string `json:"sessionId,omitempty"`
// Status is the current task status.
Status TaskStatus `json:"status"`
// Message contains the task input.
Message TaskMessage `json:"message"`
// Artifacts are the task outputs.
Artifacts []Artifact `json:"artifacts,omitempty"`
// History contains previous messages in the session.
History []TaskMessage `json:"history,omitempty"`
// Metadata contains additional task metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
Task represents an A2A task request. Based on the A2A protocol specification.
type TaskMapper ¶
type TaskMapper struct {
// contains filtered or unexported fields
}
TaskMapper translates between A2A tasks and SemStreams messages.
func (*TaskMapper) CreateTaskStatusUpdate ¶
func (m *TaskMapper) CreateTaskStatusUpdate(taskID string, state string, message string) *Task
CreateTaskStatusUpdate creates a status update for a task.
func (*TaskMapper) FromTaskResult ¶
func (m *TaskMapper) FromTaskResult(taskID string, result string, err error) *TaskResult
FromTaskResult converts a SemStreams result to an A2A task result.
func (*TaskMapper) ParseTask ¶
func (m *TaskMapper) ParseTask(data []byte) (*Task, error)
ParseTask parses a JSON task request.
func (*TaskMapper) SerializeTaskResult ¶
func (m *TaskMapper) SerializeTaskResult(result *TaskResult) ([]byte, error)
SerializeTaskResult serializes a task result to JSON.
func (*TaskMapper) ToTaskMessage ¶
func (m *TaskMapper) ToTaskMessage(task *Task, requesterDID string) (*agentic.TaskMessage, error)
ToTaskMessage converts an A2A task to a SemStreams TaskMessage.
type TaskMessage ¶
type TaskMessage struct {
// Role identifies the message sender role.
Role string `json:"role"` // user, agent
// Parts contains the message content parts.
Parts []MessagePart `json:"parts"`
// Metadata contains additional message metadata.
Metadata map[string]any `json:"metadata,omitempty"`
}
TaskMessage represents a message in an A2A task.
type TaskResult ¶
type TaskResult struct {
// TaskID is the task identifier.
TaskID string `json:"task_id"`
// Status is the final task status.
Status TaskStatus `json:"status"`
// Artifacts are the task outputs.
Artifacts []Artifact `json:"artifacts,omitempty"`
// Error contains error information if the task failed.
Error string `json:"error,omitempty"`
}
TaskResult represents the result of a completed A2A task.
type TaskStatus ¶
type TaskStatus struct {
// State is the current state of the task.
State string `json:"state"` // submitted, working, completed, failed, canceled
// Message provides additional status information.
Message string `json:"message,omitempty"`
// Timestamp is when the status was last updated.
Timestamp time.Time `json:"timestamp,omitempty"`
}
TaskStatus represents the status of an A2A task.