a2a

package
v1.0.0-alpha.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 14 Imported by: 0

README

A2A Protocol Adapter

The A2A (Agent-to-Agent) adapter is a SemStreams input component that enables external agents to delegate tasks using the A2A protocol specification. It receives task requests via HTTP or SLIM transport, converts them to SemStreams TaskMessages, and publishes them to NATS for processing by the agentic system.

Overview

The A2A adapter acts as a protocol bridge between external agent systems and SemStreams' internal agentic architecture. It handles authentication, task mapping, agent card generation, and bidirectional communication with external agents.

Key Features
  • A2A Protocol Compliance: Full implementation of A2A task submission, status queries, and cancellation endpoints
  • Multi-Transport Support: HTTP (RESTful) and SLIM (MLS-encrypted group messaging)
  • Agent Card Generation: Automatic generation of A2A agent cards from OASF records
  • DID Authentication: Decentralized identifier-based authentication for agent-to-agent trust
  • Task Mapping: Bidirectional conversion between A2A tasks and SemStreams TaskMessages
  • NATS Integration: Publishes tasks to JetStream for reliable processing

Architecture

flowchart TD
    A[External Agent] -->|A2A Task Request| B[A2A Adapter]
    B -->|Authenticate| C{DID Validation}
    C -->|Valid| D[Task Mapper]
    C -->|Invalid| E[Return 401]
    D -->|Convert to TaskMessage| F[NATS JetStream]
    F -->|agent.task.a2a.*| G[Agent Dispatch]
    G -->|Process Task| H[Agent Loop]
    H -->|agent.complete.*| I[Response Listener]
    I -->|Convert to A2A Result| J[External Agent]

    K[OASF Records] -->|Generate| L[Agent Card]
    L -->|Serve| M[/.well-known/agent.json]

    style B fill:#4a90e2
    style F fill:#50c878
    style L fill:#f39c12

Configuration

HTTP Transport
{
  "transport": "http",
  "listen_address": ":8080",
  "agent_card_path": "/.well-known/agent.json",
  "enable_authentication": true,
  "request_timeout": "30s",
  "max_concurrent_tasks": 10,
  "oasf_bucket": "OASF_RECORDS",
  "ports": {
    "inputs": [
      {
        "name": "a2a_requests",
        "subject": "a2a.request.>",
        "type": "nats",
        "description": "Incoming A2A task requests"
      }
    ],
    "outputs": [
      {
        "name": "task_messages",
        "subject": "agent.task.a2a.>",
        "type": "jetstream",
        "stream_name": "AGENT_TASKS",
        "required": true,
        "description": "Task messages to agent dispatch"
      },
      {
        "name": "a2a_responses",
        "subject": "a2a.response.>",
        "type": "nats",
        "description": "Outgoing A2A task responses"
      }
    ]
  }
}
SLIM Transport
{
  "transport": "slim",
  "slim_group_id": "did:agntcy:group:tenant-123",
  "enable_authentication": true,
  "request_timeout": "30s",
  "max_concurrent_tasks": 10
}
Configuration Options
Field Type Default Description
transport string "http" Transport type: "http" or "slim"
listen_address string ":8080" HTTP listen address (HTTP transport only)
agent_card_path string "/.well-known/agent.json" Agent card endpoint path
slim_group_id string - SLIM group DID (SLIM transport only, required if transport=slim)
request_timeout duration "30s" Request processing timeout
max_concurrent_tasks int 10 Maximum concurrent task executions
enable_authentication bool true Enable DID-based authentication
allowed_agents []string [] Allowed agent DIDs (empty = all authenticated agents)
oasf_bucket string "OASF_RECORDS" KV bucket for OASF records (agent card generation)

HTTP Endpoints

Agent Card

GET /.well-known/agent.json

Returns the agent card describing this agent's capabilities.

Response:

{
  "name": "SemStreams Agent",
  "description": "A2A-compatible agent powered by SemStreams",
  "url": "http://localhost:8080",
  "version": "1.0",
  "capabilities": [
    {
      "name": "task-execution",
      "description": "Execute delegated tasks"
    }
  ],
  "defaultInputModes": ["text"],
  "defaultOutputModes": ["text"],
  "authentication": {
    "schemes": ["did"],
    "credentials": {
      "did": "did:example:agent123"
    }
  },
  "skills": [
    {
      "id": "skill-001",
      "name": "Data Analysis",
      "description": "Analyze structured and unstructured data"
    }
  ]
}
Send Task

POST /tasks/send

Submit a new task for execution.

Headers:

  • Authorization: DID or bearer token (if authentication enabled)
  • X-Agent-DID: Alternative header for agent DID
  • Content-Type: application/json

Request Body:

{
  "id": "task-123",
  "sessionId": "session-456",
  "status": {
    "state": "submitted",
    "timestamp": "2025-01-15T10:30:00Z"
  },
  "message": {
    "role": "user",
    "parts": [
      {
        "type": "text",
        "text": "Analyze this dataset and provide insights"
      }
    ]
  },
  "metadata": {
    "role": "architect",
    "model": "gpt-4"
  }
}

Response: 202 Accepted

{
  "id": "task-123",
  "status": {
    "state": "submitted",
    "message": "Task accepted for processing",
    "timestamp": "2025-01-15T10:30:01Z"
  }
}
Get Task Status

GET /tasks/get?id=<task_id>

Query the status of a submitted task.

Response:

{
  "id": "task-123",
  "status": {
    "state": "working",
    "message": "Task is being processed",
    "timestamp": "2025-01-15T10:30:15Z"
  }
}

Task States:

  • submitted: Task accepted, queued for processing
  • working: Task in progress
  • completed: Task successfully completed
  • failed: Task execution failed
  • canceled: Task canceled by requester
Cancel Task

POST /tasks/cancel

Request cancellation of a running task.

Headers:

  • Authorization: DID or bearer token (if authentication enabled)
  • Content-Type: application/json

Request Body:

{
  "id": "task-123"
}

Response:

{
  "id": "task-123",
  "status": {
    "state": "canceled",
    "message": "Task cancellation requested",
    "timestamp": "2025-01-15T10:31:00Z"
  }
}

Task Mapping

The adapter translates between A2A task format and SemStreams TaskMessage format.

A2A Task → TaskMessage
// A2A Task (input)
{
  "id": "task-123",
  "sessionId": "session-456",
  "message": {
    "role": "user",
    "parts": [{"type": "text", "text": "Analyze data"}]
  },
  "metadata": {
    "role": "architect",
    "model": "gpt-4"
  }
}

// SemStreams TaskMessage (output)
{
  "task_id": "task-123",
  "prompt": "Analyze data",
  "role": "architect",
  "model": "gpt-4",
  "channel_type": "a2a",
  "channel_id": "session-456",
  "user_id": "did:example:agent123"
}

Mapping Rules:

  • task.idtask_id
  • task.message.parts[].textprompt (concatenated with newlines)
  • task.metadata.rolerole (with fallback to "general")
  • task.metadata.modelmodel (with fallback to "default")
  • task.sessionIdchannel_id
  • Requester DID (from auth) → user_id
  • Fixed channel_type = "a2a"
TaskResult → A2A Result
// SemStreams result
TaskID: "task-123"
Result: "Analysis complete: 3 key insights found"
Error: nil

// A2A TaskResult (output)
{
  "task_id": "task-123",
  "status": {
    "state": "completed",
    "timestamp": "2025-01-15T10:35:00Z"
  },
  "artifacts": [
    {
      "name": "result",
      "description": "Task execution result",
      "parts": [
        {
          "type": "text",
          "text": "Analysis complete: 3 key insights found"
        }
      ],
      "index": 0
    }
  ]
}

Agent Card Generation

Agent cards are automatically generated from OASF (Open Agent Skills Format) records stored in the OASF_RECORDS KV bucket. The card describes the agent's capabilities for discovery by external agents.

OASF Record → Agent Card
// OASF Record (input from processor/oasf-generator)
{
  "name": "DataAnalysisAgent",
  "description": "Expert in data analysis and visualization",
  "skills": [
    {
      "id": "skill-001",
      "name": "Statistical Analysis",
      "description": "Perform statistical analysis on datasets",
      "confidence": 0.95
    },
    {
      "id": "skill-002",
      "name": "Data Visualization",
      "description": "Create charts and visualizations",
      "confidence": 0.90
    }
  ]
}

// Agent Card (output)
{
  "name": "DataAnalysisAgent",
  "description": "Expert in data analysis and visualization",
  "url": "http://localhost:8080",
  "version": "1.0",
  "capabilities": [
    {"name": "Statistical Analysis", "description": "Perform statistical analysis on datasets"},
    {"name": "Data Visualization", "description": "Create charts and visualizations"}
  ],
  "skills": [
    {
      "id": "skill-001",
      "name": "Statistical Analysis",
      "description": "Perform statistical analysis on datasets"
    },
    {
      "id": "skill-002",
      "name": "Data Visualization",
      "description": "Create charts and visualizations"
    }
  ],
  "defaultInputModes": ["text"],
  "defaultOutputModes": ["text"],
  "authentication": {
    "schemes": ["did"],
    "credentials": {"did": "did:example:agent123"}
  }
}
Updating Agent Cards
// Generate card from OASF record
generator := NewAgentCardGenerator("http://localhost:8080", "YourOrg")
generator.AgentDID = "did:example:agent123"

card, err := generator.GenerateFromOASF(oasfRecord)
if err != nil {
    log.Fatal(err)
}

// Update component's cached card
component.UpdateAgentCard(card)

NATS Topology

Published Messages

Subject Pattern: agent.task.a2a.<task_id>

Published to the AGENT_TASKS JetStream stream for durable task processing.

Message Structure:

{
  "type": {
    "domain": "agentic",
    "category": "task",
    "version": "v1"
  },
  "payload": {
    "task_id": "task-123",
    "prompt": "Analyze this dataset",
    "role": "architect",
    "model": "gpt-4",
    "channel_type": "a2a",
    "channel_id": "session-456",
    "user_id": "did:example:agent123"
  }
}
Subscribed Messages

Subject Pattern: agent.complete.*

Listens for task completion events to send responses back to external agents.

Expected Message Structure:

{
  "type": {
    "domain": "agentic",
    "category": "complete",
    "version": "v1"
  },
  "payload": {
    "task_id": "task-123",
    "result": "Analysis complete: 3 key insights found",
    "status": "success"
  }
}

Authentication

When enable_authentication is true, the adapter validates incoming requests using DID-based authentication.

Authentication Flow
sequenceDiagram
    participant EA as External Agent
    participant A2A as A2A Adapter
    participant NATS as NATS JetStream

    EA->>A2A: POST /tasks/send<br/>(Authorization: did:example:agent123)
    A2A->>A2A: Extract DID from headers
    A2A->>A2A: Validate DID (check allowed_agents)
    alt Valid DID
        A2A->>A2A: Convert to TaskMessage
        A2A->>NATS: Publish to agent.task.a2a.*
        A2A->>EA: 202 Accepted
    else Invalid DID
        A2A->>EA: 401 Unauthorized
    end
DID Headers

The adapter checks for DIDs in the following order:

  1. Authorization header (e.g., Authorization: did:example:agent123)
  2. X-Agent-DID header (e.g., X-Agent-DID: did:example:agent123)
Allowed Agents

If allowed_agents is configured, only DIDs in the list can submit tasks:

{
  "enable_authentication": true,
  "allowed_agents": [
    "did:example:agent123",
    "did:example:agent456"
  ]
}

If allowed_agents is empty, all authenticated agents are allowed.

Usage Example

Component Registration
package main

import (
    "github.com/c360studio/semstreams/component"
    "github.com/c360studio/semstreams/input/a2a"
)

func main() {
    registry := component.NewRegistry()

    // Register A2A adapter
    if err := a2a.Register(registry); err != nil {
        log.Fatalf("Failed to register A2A adapter: %v", err)
    }

    // Component is now available as "a2a-adapter"
}
Flow Configuration
components:
  - name: external-agent-gateway
    type: a2a-adapter
    config:
      transport: http
      listen_address: ":8080"
      enable_authentication: true
      allowed_agents:
        - "did:example:trusted-agent"
      request_timeout: "60s"
      max_concurrent_tasks: 20
Programmatic Usage
package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/c360studio/semstreams/component"
    "github.com/c360studio/semstreams/input/a2a"
    "github.com/c360studio/semstreams/natsclient"
)

func main() {
    // Create NATS client
    natsClient, err := natsclient.NewClient("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }

    // Configure component
    config := a2a.DefaultConfig()
    config.ListenAddress = ":9090"
    config.EnableAuthentication = true

    rawConfig, _ := json.Marshal(config)

    // Create dependencies
    deps := component.Dependencies{
        NATSClient: natsClient,
    }

    // Create component
    comp, err := a2a.NewComponent(rawConfig, deps)
    if err != nil {
        log.Fatal(err)
    }

    // Initialize and start
    a2aComp := comp.(*a2a.Component)
    if err := a2aComp.Initialize(); err != nil {
        log.Fatal(err)
    }

    ctx := context.Background()
    if err := a2aComp.Start(ctx); err != nil {
        log.Fatal(err)
    }

    log.Println("A2A adapter running on :9090")

    // Run until interrupted
    select {}
}

Testing

Unit Tests

Run unit tests for the package:

task test -- ./input/a2a/...
Integration Tests

Integration tests require Docker (testcontainers for NATS):

task test:integration -- ./input/a2a/...
Test Coverage
go test -cover ./input/a2a/...
Manual Testing with curl

Start the adapter, then test endpoints:

# Get agent card
curl http://localhost:8080/.well-known/agent.json

# Send a task
curl -X POST http://localhost:8080/tasks/send \
  -H "Content-Type: application/json" \
  -H "X-Agent-DID: did:example:test-agent" \
  -d '{
    "id": "test-task-001",
    "sessionId": "test-session",
    "status": {"state": "submitted"},
    "message": {
      "role": "user",
      "parts": [{"type": "text", "text": "Hello from A2A!"}]
    }
  }'

# Get task status
curl http://localhost:8080/tasks/get?id=test-task-001

# Cancel task
curl -X POST http://localhost:8080/tasks/cancel \
  -H "Content-Type: application/json" \
  -H "X-Agent-DID: did:example:test-agent" \
  -d '{"id": "test-task-001"}'
Testing with Authentication Disabled

For local testing, disable authentication:

{
  "enable_authentication": false
}

This allows requests without DIDs, useful for development and debugging.

  • input/slim: SLIM bridge for encrypted cross-organizational agent messaging
  • output/directory-bridge: Registers agents with AGNTCY directories for discovery
  • processor/oasf-generator: Generates OASF records used for agent card creation
  • processor/agentic-dispatch: Routes TaskMessages to appropriate agent roles
  • agentic/identity: DID and verifiable credential management

Protocol References

Limitations and Future Work

Current Limitations
  • Task status queries return placeholder data (TODO: integrate with task storage)
  • Task cancellation is acknowledged but not propagated to running tasks
  • SLIM transport is not yet implemented (placeholder)
  • DID signature verification is simplified (production needs full verification)
Planned Features
  • Full SLIM transport implementation for encrypted agent messaging
  • Task state persistence in NATS KV for status queries
  • Task cancellation propagation via agent.cancel.* subjects
  • Complete DID verification with public key validation
  • Agent card caching with TTL and automatic refresh
  • Webhook support for push-based task completion notifications
  • Rate limiting per agent DID
  • Task priority levels from A2A metadata

Performance Considerations

  • Concurrent Tasks: max_concurrent_tasks limits simultaneous task processing to prevent resource exhaustion
  • Request Timeout: request_timeout ensures requests don't block indefinitely
  • Body Size Limit: HTTP requests limited to 1MB to prevent DoS attacks
  • NATS Backpressure: JetStream provides durable queuing and backpressure handling

Security Considerations

  • Authentication: Enable enable_authentication in production environments
  • Allowed Agents: Use allowed_agents to whitelist trusted agent DIDs
  • HTTPS: Deploy behind a reverse proxy with TLS termination
  • DID Verification: Implement full signature verification for production use
  • Rate Limiting: Consider adding rate limiting per agent DID
  • Input Validation: All task inputs are validated before processing
  • NATS Security: Use NATS authentication and TLS for internal communication

Troubleshooting

Adapter Won't Start

Error: failed to listen on :8080: address already in use

Solution: Port 8080 is already bound. Change listen_address or stop the conflicting service.

Authentication Failures

Error: 401 Unauthorized

Solution: Ensure requests include valid DID headers (Authorization or X-Agent-DID).

Tasks Not Appearing in NATS

Check:

  1. NATS client is connected: nc.Status()
  2. JetStream stream exists: nats stream ls
  3. Subject matches expected pattern: agent.task.a2a.*
Agent Card Not Updating

Solution: Agent cards are generated from OASF records. Ensure OASF_RECORDS KV bucket contains valid records. Call UpdateAgentCard() after generating a new card.

Contributing

When contributing to the A2A adapter, follow SemStreams development standards:

  • Add tests for new features (unit + integration)
  • Update this README for API or configuration changes
  • Follow Go coding standards and pass task lint
  • Document complex mapping logic in code comments
  • Update protocol references if A2A spec changes

License

Part of the SemStreams project. See root LICENSE file.

Documentation

Overview

Package a2a provides an input component that implements the A2A (Agent-to-Agent) protocol for receiving task delegations from external agents.

Overview

The A2A adapter enables agent-to-agent communication following the A2A protocol specification. It receives task requests from external agents and converts them to SemStreams TaskMessages for processing by the agentic system.

Architecture

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│  External Agent │────▶│   A2A Adapter    │────▶│  NATS/JetStream │
│  (A2A Client)   │     │  (SemStreams)    │     │  (Agent Dispatch)│
└─────────────────┘     └──────────────────┘     └─────────────────┘
                               │
                               ▼
                        ┌──────────────────┐
                        │   Agent Card     │
                        │  (Capabilities)  │
                        └──────────────────┘

Components

The package consists of several key components:

  • Component: The main LifecycleComponent that handles A2A requests
  • TaskMapper: Translates between A2A tasks and SemStreams TaskMessages
  • AgentCardGenerator: Generates A2A agent cards from OASF records
  • Config: Configuration for transport, authentication, and endpoints

Configuration

Example configuration for HTTP transport:

{
  "transport": "http",
  "listen_address": ":8080",
  "agent_card_path": "/.well-known/agent.json",
  "enable_authentication": true,
  "request_timeout": "30s",
  "max_concurrent_tasks": 10
}

Example configuration for SLIM transport:

{
  "transport": "slim",
  "slim_group_id": "did:agntcy:group:tenant-123",
  "enable_authentication": true
}

Transport Options

The adapter supports two transport mechanisms:

  • HTTP: RESTful endpoints for task submission and agent card retrieval
  • SLIM: MLS-encrypted group messaging via the SLIM bridge

A2A Protocol Support

The adapter implements the following A2A protocol endpoints:

  • GET /.well-known/agent.json: Returns the agent card
  • POST /tasks/send: Submit a new task
  • GET /tasks/get?id=<id>: Get task status
  • POST /tasks/cancel: Cancel a running task

Agent Card Generation

Agent cards are automatically generated from OASF records stored in the OASF_RECORDS KV bucket. The card includes:

  • Agent name and description
  • Capabilities derived from OASF skills
  • Authentication methods (DID-based)
  • Supported input/output modes

Task Flow

Inbound (A2A → NATS):

  1. External agent submits task via A2A endpoint
  2. Adapter validates authentication (DID verification)
  3. Task converted to agentic.TaskMessage
  4. Published to agent.task.a2a.{task_id}

Outbound (NATS → A2A):

  1. Agent completes task, publishes to agent.complete.*
  2. Adapter converts result to A2A TaskResult
  3. Response sent back to requesting agent

Authentication

When EnableAuthentication is true, the adapter verifies incoming requests using DID-based authentication. The requester's DID can be provided via:

  • Authorization header
  • X-Agent-DID header
  • Signed message body (for SLIM transport)

Usage

Register the component with the component registry:

import a2a "github.com/c360studio/semstreams/input/a2a"

func init() {
    a2a.Register(registry)
}

See Also

  • input/slim: SLIM bridge for encrypted cross-organizational messaging
  • output/directory-bridge: Registers agents with AGNTCY directories
  • processor/oasf-generator: Generates OASF records for agent card
  • agentic/identity: DID and verifiable credential management
  • docs/concepts/23-a2a-protocol.md: A2A integration guide

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewComponent

func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)

NewComponent creates a new A2A adapter component.

func Register

func Register(registry RegistryInterface) error

Register registers the A2A adapter input component with the given registry.

func SerializeAgentCard

func SerializeAgentCard(card *AgentCard) ([]byte, error)

SerializeAgentCard serializes an agent card to JSON.

Types

type AgentCard

type AgentCard struct {
	// Name is the agent's display name.
	Name string `json:"name"`

	// Description describes what the agent does.
	Description string `json:"description"`

	// URL is the agent's A2A endpoint.
	URL string `json:"url"`

	// Version is the agent card schema version.
	Version string `json:"version"`

	// Provider contains information about the agent provider.
	Provider *Provider `json:"provider,omitempty"`

	// Capabilities lists what the agent can do.
	Capabilities []Capability `json:"capabilities"`

	// Authentication describes supported auth methods.
	Authentication *Authentication `json:"authentication,omitempty"`

	// DefaultInputModes lists supported input types.
	DefaultInputModes []string `json:"defaultInputModes,omitempty"`

	// DefaultOutputModes lists supported output types.
	DefaultOutputModes []string `json:"defaultOutputModes,omitempty"`

	// Skills lists specific skills the agent has.
	Skills []Skill `json:"skills,omitempty"`
}

AgentCard represents an A2A Agent Card. This is the publicly-accessible description of an agent's capabilities.

func ParseAgentCard

func ParseAgentCard(data []byte) (*AgentCard, error)

ParseAgentCard parses a JSON agent card.

type AgentCardGenerator

type AgentCardGenerator struct {
	// BaseURL is the base URL for the agent's A2A endpoint.
	BaseURL string

	// ProviderOrg is the provider organization name.
	ProviderOrg string

	// ProviderURL is the provider's website URL.
	ProviderURL string

	// AgentDID is the agent's DID for authentication.
	AgentDID string
}

AgentCardGenerator generates A2A agent cards from OASF records.

func NewAgentCardGenerator

func NewAgentCardGenerator(baseURL, providerOrg string) *AgentCardGenerator

NewAgentCardGenerator creates a new agent card generator.

func (*AgentCardGenerator) GenerateFromOASF

func (g *AgentCardGenerator) GenerateFromOASF(record *OASFRecord) (*AgentCard, error)

GenerateFromOASF generates an agent card from an OASF record.

type Artifact

type Artifact struct {
	// Name is the artifact name.
	Name string `json:"name"`

	// Description describes the artifact.
	Description string `json:"description,omitempty"`

	// Parts contains the artifact content.
	Parts []MessagePart `json:"parts"`

	// Index is the artifact position in the output sequence.
	Index int `json:"index,omitempty"`

	// Metadata contains additional artifact metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

Artifact represents an output artifact from a task.

type Authentication

type Authentication struct {
	// Schemes lists supported auth schemes.
	Schemes []string `json:"schemes"` // e.g., ["did", "bearer"]

	// Credentials contains credential information.
	Credentials *Credentials `json:"credentials,omitempty"`
}

Authentication describes supported authentication methods.

type Capability

type Capability struct {
	// Name is the capability name.
	Name string `json:"name"`

	// Description describes the capability.
	Description string `json:"description,omitempty"`
}

Capability describes a capability the agent has.

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component implements the A2A adapter input component. It receives A2A task requests and publishes them to NATS for agent processing.

func (*Component) ConfigSchema

func (c *Component) ConfigSchema() component.ConfigSchema

ConfigSchema returns the configuration schema.

func (*Component) DataFlow

func (c *Component) DataFlow() component.FlowMetrics

DataFlow returns current data flow metrics.

func (*Component) Health

func (c *Component) Health() component.HealthStatus

Health returns the current health status.

func (*Component) Initialize

func (c *Component) Initialize() error

Initialize prepares the component.

func (*Component) InputPorts

func (c *Component) InputPorts() []component.Port

InputPorts returns configured input port definitions.

func (*Component) Meta

func (c *Component) Meta() component.Metadata

Meta returns component metadata.

func (*Component) OutputPorts

func (c *Component) OutputPorts() []component.Port

OutputPorts returns configured output port definitions.

func (*Component) Start

func (c *Component) Start(ctx context.Context) error

Start begins processing A2A requests.

func (*Component) Stop

func (c *Component) Stop(_ time.Duration) error

Stop gracefully stops the component.

func (*Component) UpdateAgentCard

func (c *Component) UpdateAgentCard(card *AgentCard)

UpdateAgentCard updates the cached agent card.

type Config

type Config struct {
	// Ports defines the input/output port configuration.
	Ports *component.PortConfig `json:"ports" schema:"type:ports,description:Port configuration,category:basic"`

	// Transport specifies the A2A transport mechanism.
	// Supported values: "http", "slim"
	Transport string `json:"transport" schema:"type:string,description:A2A transport type,category:basic,default:http"`

	// ListenAddress is the address to listen on for incoming A2A requests.
	// Only used when transport is "http".
	ListenAddress string `json:"listen_address" schema:"type:string,description:HTTP listen address,category:basic,default::8080"`

	// AgentCardPath is the path to serve the agent card.
	AgentCardPath string `` /* 131-byte string literal not displayed */

	// SLIMGroupID is the SLIM group for A2A communication.
	// Only used when transport is "slim".
	SLIMGroupID string `json:"slim_group_id" schema:"type:string,description:SLIM group for A2A,category:advanced"`

	// RequestTimeout is the timeout for processing A2A requests.
	RequestTimeout string `json:"request_timeout" schema:"type:string,description:Request processing timeout,category:advanced,default:30s"`

	// MaxConcurrentTasks is the maximum number of concurrent task executions.
	MaxConcurrentTasks int `json:"max_concurrent_tasks" schema:"type:int,description:Maximum concurrent tasks,category:advanced,default:10"`

	// EnableAuthentication enables DID-based authentication for requests.
	EnableAuthentication bool `json:"enable_authentication" schema:"type:bool,description:Enable DID authentication,category:security,default:true"`

	// AllowedAgents is a list of DIDs allowed to send tasks.
	// Empty list allows all authenticated agents.
	AllowedAgents []string `json:"allowed_agents" schema:"type:array,description:Allowed agent DIDs,category:security"`

	// OASFBucket is the KV bucket containing OASF records for agent card generation.
	OASFBucket string `json:"oasf_bucket" schema:"type:string,description:OASF records KV bucket,category:advanced,default:OASF_RECORDS"`

	// ConsumerNameSuffix adds a suffix to consumer names for uniqueness in tests.
	ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`

	// DeleteConsumerOnStop enables consumer cleanup on stop (for testing).
	DeleteConsumerOnStop bool `` /* 128-byte string literal not displayed */
}

Config defines the configuration for the A2A adapter component.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns the default configuration.

func (*Config) GetRequestTimeout

func (c *Config) GetRequestTimeout() time.Duration

GetRequestTimeout returns the request timeout duration.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type Credentials

type Credentials struct {
	// DID is the agent's DID.
	DID string `json:"did,omitempty"`

	// PublicKeyJWK is the agent's public key in JWK format.
	PublicKeyJWK json.RawMessage `json:"publicKeyJwk,omitempty"`
}

Credentials contains credential configuration.

type FilePart

type FilePart struct {
	// Name is the filename.
	Name string `json:"name"`

	// MimeType is the content type.
	MimeType string `json:"mimeType"`

	// URI is the file location.
	URI string `json:"uri,omitempty"`

	// Bytes is base64-encoded file content.
	Bytes string `json:"bytes,omitempty"`
}

FilePart represents a file in a message.

type MessagePart

type MessagePart struct {
	// Type identifies the part type.
	Type string `json:"type"` // text, file, data

	// Text contains text content (for type="text").
	Text string `json:"text,omitempty"`

	// File contains file information (for type="file").
	File *FilePart `json:"file,omitempty"`

	// Data contains structured data (for type="data").
	Data json.RawMessage `json:"data,omitempty"`
}

MessagePart represents a part of a task message.

type OASFRecord

type OASFRecord struct {
	Name          string      `json:"name"`
	Version       string      `json:"version"`
	SchemaVersion string      `json:"schema_version"`
	Authors       []string    `json:"authors,omitempty"`
	CreatedAt     string      `json:"created_at"`
	Description   string      `json:"description"`
	Skills        []OASFSkill `json:"skills,omitempty"`
	Domains       []string    `json:"domains,omitempty"`
}

OASFRecord represents an OASF record (from processor/oasf-generator). This is used to generate agent cards.

type OASFSkill

type OASFSkill struct {
	ID          string   `json:"id"`
	Name        string   `json:"name"`
	Description string   `json:"description,omitempty"`
	Confidence  float64  `json:"confidence,omitempty"`
	Permissions []string `json:"permissions,omitempty"`
}

OASFSkill represents a skill in an OASF record.

type Provider

type Provider struct {
	// Organization is the provider's organization name.
	Organization string `json:"organization"`

	// URL is the provider's website.
	URL string `json:"url,omitempty"`
}

Provider contains information about the agent provider.

type RegistryInterface

type RegistryInterface interface {
	RegisterWithConfig(component.RegistrationConfig) error
}

RegistryInterface defines the minimal interface needed for registration.

type Skill

type Skill struct {
	// ID is the skill identifier.
	ID string `json:"id"`

	// Name is the skill display name.
	Name string `json:"name"`

	// Description describes what the skill does.
	Description string `json:"description,omitempty"`

	// InputSchema describes the expected input format.
	InputSchema json.RawMessage `json:"inputSchema,omitempty"`

	// OutputSchema describes the output format.
	OutputSchema json.RawMessage `json:"outputSchema,omitempty"`
}

Skill describes a specific skill from the OASF record.

type Task

type Task struct {
	// ID is the unique task identifier.
	ID string `json:"id"`

	// SessionID groups related tasks in a conversation.
	SessionID string `json:"sessionId,omitempty"`

	// Status is the current task status.
	Status TaskStatus `json:"status"`

	// Message contains the task input.
	Message TaskMessage `json:"message"`

	// Artifacts are the task outputs.
	Artifacts []Artifact `json:"artifacts,omitempty"`

	// History contains previous messages in the session.
	History []TaskMessage `json:"history,omitempty"`

	// Metadata contains additional task metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

Task represents an A2A task request. Based on the A2A protocol specification.

type TaskMapper

type TaskMapper struct {
	// contains filtered or unexported fields
}

TaskMapper translates between A2A tasks and SemStreams messages.

func NewTaskMapper

func NewTaskMapper() *TaskMapper

NewTaskMapper creates a new task mapper.

func (*TaskMapper) CreateTaskStatusUpdate

func (m *TaskMapper) CreateTaskStatusUpdate(taskID string, state string, message string) *Task

CreateTaskStatusUpdate creates a status update for a task.

func (*TaskMapper) FromTaskResult

func (m *TaskMapper) FromTaskResult(taskID string, result string, err error) *TaskResult

FromTaskResult converts a SemStreams result to an A2A task result.

func (*TaskMapper) ParseTask

func (m *TaskMapper) ParseTask(data []byte) (*Task, error)

ParseTask parses a JSON task request.

func (*TaskMapper) SerializeTaskResult

func (m *TaskMapper) SerializeTaskResult(result *TaskResult) ([]byte, error)

SerializeTaskResult serializes a task result to JSON.

func (*TaskMapper) ToTaskMessage

func (m *TaskMapper) ToTaskMessage(task *Task, requesterDID string) (*agentic.TaskMessage, error)

ToTaskMessage converts an A2A task to a SemStreams TaskMessage.

type TaskMessage

type TaskMessage struct {
	// Role identifies the message sender role.
	Role string `json:"role"` // user, agent

	// Parts contains the message content parts.
	Parts []MessagePart `json:"parts"`

	// Metadata contains additional message metadata.
	Metadata map[string]any `json:"metadata,omitempty"`
}

TaskMessage represents a message in an A2A task.

type TaskResult

type TaskResult struct {
	// TaskID is the task identifier.
	TaskID string `json:"task_id"`

	// Status is the final task status.
	Status TaskStatus `json:"status"`

	// Artifacts are the task outputs.
	Artifacts []Artifact `json:"artifacts,omitempty"`

	// Error contains error information if the task failed.
	Error string `json:"error,omitempty"`
}

TaskResult represents the result of a completed A2A task.

type TaskStatus

type TaskStatus struct {
	// State is the current state of the task.
	State string `json:"state"` // submitted, working, completed, failed, canceled

	// Message provides additional status information.
	Message string `json:"message,omitempty"`

	// Timestamp is when the status was last updated.
	Timestamp time.Time `json:"timestamp,omitempty"`
}

TaskStatus represents the status of an A2A task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL