a2a

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package a2a is Harbor's southbound A2A wire driver.

It implements `distributed.RemoteTransport` against the full A2A v1 spec (vendored at `docs/specifications/a2a.proto`, pinned in `docs/specifications/README.md`). Phase 22 froze the Go shapes (`internal/distributed/a2a/types.go`) and the contract surface (`internal/distributed/remote.go`); Phase 29 ships the wire realisation: JSON-RPC 2.0 over HTTPS for unary calls, Server-Sent Events for streaming, and `GET <peer>/.well-known/agent-card.json` for discovery.

Driver registration. `init()` calls `distributed.RegisterRemoteTransport("a2a", factory)` so `distributed.OpenRemoteTransport` resolves `"a2a"` after the `cmd/harbor/main.go` blank import fires.

Configuration. The driver reads its peer list from `Dependencies.Cfg.Tools.A2APeers` (Phase 29 introduces `config.ToolsConfig` — see `internal/config`). Each peer carries (URL, TrustTier, LatencyTierMS, AllowInsecureLoopback, AgentCardTTL). HTTPS is required by default; HTTP is accepted only when the host is loopback or `AllowInsecureLoopback` is set. The driver builds its internal `Registry` from this list at construction.

Discovery. Before a peer's first call, the driver fetches its AgentCard via `GET <peer>/.well-known/agent-card.json`, validates it against the Phase 22 Go shapes, caches it (per-peer TTL, default 10 minutes), and locates the JSONRPC AgentInterface (the peer's HTTP endpoint for JSON-RPC calls). Peers that advertise no JSONRPC interface fail with `ErrNoJSONRPCInterface`.

Identity propagation (AGENTS.md §6 rule 9). Every outbound call reads `identity.Identity` from `ctx` and stamps the triple onto the request headers (`X-Harbor-Tenant` / `X-Harbor-User` / `X-Harbor-Session`). The wire driver does NOT validate identity at its boundary — Phase 22's contract puts that responsibility on the runtime above; the wire driver propagates verbatim so a missing identity surfaces at the peer.

Reliability shell (D-024). The wire driver does NOT add its own retry/timeout shell. `ToolPolicy` (Phase 26) wraps every Tool invocation that routes through this driver; double-wrapping is forbidden per D-024.

Concurrent reuse (D-025). The driver is safe for N concurrent goroutines against a single instance: the `*http.Client` is shared, the JSON-RPC ID counter is atomic, the AgentCard cache + Registry take an RWMutex on reads, and per-call state (request body, response stream, parsed envelope) lives on the goroutine stack.

Index

Constants

View Source
const (
	MethodSendMessage                      = "message/send"
	MethodSendStreamingMessage             = "message/stream"
	MethodGetTask                          = "tasks/get"
	MethodListTasks                        = "tasks/list"
	MethodCancelTask                       = "tasks/cancel"
	MethodSubscribeToTask                  = "tasks/subscribe"
	MethodCreateTaskPushNotificationConfig = "tasks/pushNotificationConfig/set"
	MethodGetTaskPushNotificationConfig    = "tasks/pushNotificationConfig/get"
	MethodListTaskPushNotificationConfigs  = "tasks/pushNotificationConfig/list"
	MethodDeleteTaskPushNotificationConfig = "tasks/pushNotificationConfig/delete"
	MethodGetExtendedAgentCard             = "agent/getAuthenticatedExtendedCard"
)

Canonical A2A JSON-RPC method names. The wire driver speaks these over POST against the peer's JSON-RPC endpoint (the AgentInterface URL discovered from the AgentCard).

View Source
const AgentCardPath = "/.well-known/agent-card.json"

AgentCardPath is the canonical discovery path appended to a peer's base URL. Per the A2A spec: `GET <peer>/.well-known/agent-card.json`.

View Source
const DriverName = "a2a"

DriverName is the registry key for `distributed.OpenRemoteTransport`.

Variables

View Source
var (
	// ErrInsecureScheme — a peer URL uses http:// but its host is not
	// loopback and `AllowInsecureLoopback` is not set. AGENTS.md §7
	// requires HTTPS for non-localhost peers.
	ErrInsecureScheme = errors.New("a2a: insecure HTTP scheme; require HTTPS or loopback")

	// ErrPeerNotAllowed — a call targeted a peer URL that is not
	// registered with the driver. The driver enforces an explicit
	// allowlist; "discover-on-call" is intentionally not supported.
	ErrPeerNotAllowed = errors.New("a2a: peer URL is not in the registered allowlist")

	// ErrNoJSONRPCInterface — the discovered AgentCard declares no
	// AgentInterface with `ProtocolBinding == "JSONRPC"`. Phase 29 only
	// implements the JSON-RPC binding; HTTP+JSON and gRPC bindings on
	// the same card are read-only metadata.
	ErrNoJSONRPCInterface = errors.New("a2a: AgentCard exposes no JSONRPC interface")

	// ErrAgentCardSchemaInvalid — the fetched AgentCard JSON failed to
	// parse against the Phase 22 Go shapes.
	ErrAgentCardSchemaInvalid = errors.New("a2a: AgentCard schema invalid")

	// ErrJSONRPCError — the peer returned a JSON-RPC error envelope.
	// The wrapping `*jsonRPCError` carries `code` + `message` + `data`.
	ErrJSONRPCError = errors.New("a2a: JSON-RPC error returned by peer")

	// ErrSSEStreamMalformed — an SSE frame did not parse to an
	// `a2a.StreamResponse`. The wrapped detail names the offending
	// frame number.
	ErrSSEStreamMalformed = errors.New("a2a: SSE stream malformed")

	// ErrSSELineTooLong — an SSE line exceeded sseMaxLineBytes. A
	// hostile peer streaming an unterminated line cannot force
	// unbounded buffer growth; the parser bails loudly instead.
	ErrSSELineTooLong = errors.New("a2a: SSE line exceeds maximum length")

	// ErrInvalidPeerURL — a configured peer URL did not parse.
	ErrInvalidPeerURL = errors.New("a2a: invalid peer URL")
)

Sentinel errors. Callers compare via errors.Is.

Functions

func New

New constructs a wire RemoteTransport. Returns an error when:

  • Any configured peer has an invalid URL or violates the HTTPS-only rule.
  • No peers are configured AND no `WithRegistry` override is supplied (the driver is then not callable — fail at construction).

Types

type Option

type Option func(*driverConfig)

Option configures the wire driver at construction.

func WithAgentCardTTL

func WithAgentCardTTL(d time.Duration) Option

WithAgentCardTTL overrides the AgentCard cache TTL (default 10min).

func WithHTTPClient

func WithHTTPClient(c *http.Client) Option

WithHTTPClient overrides the default *http.Client. Useful for tests that bind to httptest.Server (an explicit InsecureSkipVerify or per-test transport tuning lives on the supplied client).

func WithRegistry

func WithRegistry(r *Registry) Option

WithRegistry seeds the driver's route registry with a fixed peer list (skipping the `Dependencies.Cfg.Tools.A2APeers` lookup). The caller retains ownership; the driver does not Close the registry.

type PeerSpec

type PeerSpec struct {
	// URL is the peer's base URL — e.g. "https://agent.example".
	// The wire driver appends "/.well-known/agent-card.json" for
	// discovery and the JSON-RPC method paths for calls.
	URL string
	// TrustTier is an operator-set integer in [1, 5]: 1 = untrusted
	// (third-party with no contract), 5 = first-party (in-org).
	// Higher values rank higher; the validator rejects values
	// outside [1, 5] at config-load time.
	TrustTier int
	// LatencyTierMS is the operator's hint at the peer's expected
	// p50 latency in milliseconds. Smaller values rank higher.
	LatencyTierMS int
	// AllowInsecureLoopback opts a loopback HTTP peer into the
	// driver. See security.go for the precise rule.
	AllowInsecureLoopback bool
	// AgentCardTTL overrides the driver-level AgentCard cache TTL.
	// Zero falls back to the driver default.
	AgentCardTTL time.Duration
	// Capabilities is the discovered capability vocabulary: the
	// union of `AgentSkill.ID` and the entries of `AgentSkill.Tags`
	// across the peer's skills. Filled in when the wire driver
	// completes discovery; empty until then. Resolve filters by
	// exact string equality.
	Capabilities []string
}

PeerSpec is a discovered peer's contact info + tier hints. PeerSpec is the input shape to `Registry.AddPeer`; the registry copies it into an internal record so PeerSpec is safe to construct, register, and discard.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps capability strings (`AgentSkill.ID` or a tag) to a scored list of peer candidates. Read-mostly: AddPeer takes the write lock, Resolve takes the read lock. Internally synchronized; safe for N concurrent goroutines (D-025).

func NewRegistry

func NewRegistry() *Registry

NewRegistry constructs an empty route-scoring registry.

func (*Registry) AddPeer

func (r *Registry) AddPeer(spec PeerSpec) error

AddPeer registers spec under spec.URL. Replaces any prior record at the same URL. Returns ErrInvalidPeerURL when spec.URL is empty; returns a tier-violation error when TrustTier is outside [1, 5] or LatencyTierMS is negative.

func (*Registry) PeerSpec

func (r *Registry) PeerSpec(url string) (PeerSpec, bool)

PeerSpec returns the stored spec for url and a presence bool.

func (*Registry) Peers

func (r *Registry) Peers() []string

Peers returns a snapshot of every registered peer URL. Order is sorted for determinism.

func (*Registry) RemovePeer

func (r *Registry) RemovePeer(url string)

RemovePeer unregisters the peer at url. No-op if absent.

func (*Registry) Resolve

func (r *Registry) Resolve(capability string) ([]Route, error)

Resolve returns the ranked candidate set for capability. Highest CompositeScore first; ties broken by lower LatencyTierMS, then by URL ascending so the result is deterministic. Empty capability returns every peer (uniform CapabilityScore=0).

func (*Registry) UpdateCapabilities

func (r *Registry) UpdateCapabilities(url string, ids, tags []string) error

UpdateCapabilities replaces the capability set for a registered peer. Used by the wire driver after discovery completes. Returns ErrPeerNotAllowed when url is not registered.

type Route

type Route struct {
	// PeerURL is the resolved peer's base URL.
	PeerURL string
	// TrustTier is the operator-supplied trust tier ([1, 5]).
	TrustTier int
	// LatencyTierMS is the operator-supplied latency tier hint.
	LatencyTierMS int
	// CapabilityScore is 1 when the capability matches an
	// `AgentSkill.ID`; 0 when it matches only via a tag.
	CapabilityScore int
	// CompositeScore is the deterministic ranking value (higher is
	// better).
	CompositeScore float64
}

Route is a scored candidate returned by Registry.Resolve. Ordered slices land highest-score-first.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL