Documentation
¶
Overview ¶
Package a2a is Harbor's southbound A2A wire driver.
It implements `distributed.RemoteTransport` against the full A2A v1 spec (vendored at `docs/specifications/a2a.proto`, pinned in `docs/specifications/README.md`). Phase 22 froze the Go shapes (`internal/distributed/a2a/types.go`) and the contract surface (`internal/distributed/remote.go`); Phase 29 ships the wire realisation: JSON-RPC 2.0 over HTTPS for unary calls, Server-Sent Events for streaming, and `GET <peer>/.well-known/agent-card.json` for discovery.
Driver registration. `init()` calls `distributed.RegisterRemoteTransport("a2a", factory)` so `distributed.OpenRemoteTransport` resolves `"a2a"` after the `cmd/harbor/main.go` blank import fires.
Configuration. The driver reads its peer list from `Dependencies.Cfg.Tools.A2APeers` (Phase 29 introduces `config.ToolsConfig` — see `internal/config`). Each peer carries (URL, TrustTier, LatencyTierMS, AllowInsecureLoopback, AgentCardTTL). HTTPS is required by default; HTTP is accepted only when the host is loopback or `AllowInsecureLoopback` is set. The driver builds its internal `Registry` from this list at construction.
Discovery. Before a peer's first call, the driver fetches its AgentCard via `GET <peer>/.well-known/agent-card.json`, validates it against the Phase 22 Go shapes, caches it (per-peer TTL, default 10 minutes), and locates the JSONRPC AgentInterface (the peer's HTTP endpoint for JSON-RPC calls). Peers that advertise no JSONRPC interface fail with `ErrNoJSONRPCInterface`.
Identity propagation (AGENTS.md §6 rule 9). Every outbound call reads `identity.Identity` from `ctx` and stamps the triple onto the request headers (`X-Harbor-Tenant` / `X-Harbor-User` / `X-Harbor-Session`). The wire driver does NOT validate identity at its boundary — Phase 22's contract puts that responsibility on the runtime above; the wire driver propagates verbatim so a missing identity surfaces at the peer.
Reliability shell (D-024). The wire driver does NOT add its own retry/timeout shell. `ToolPolicy` (Phase 26) wraps every Tool invocation that routes through this driver; double-wrapping is forbidden per D-024.
Concurrent reuse (D-025). The driver is safe for N concurrent goroutines against a single instance: the `*http.Client` is shared, the JSON-RPC ID counter is atomic, the AgentCard cache + Registry take an RWMutex on reads, and per-call state (request body, response stream, parsed envelope) lives on the goroutine stack.
Index ¶
- Constants
- Variables
- func New(deps distributed.Dependencies, opts ...Option) (distributed.RemoteTransport, error)
- type Option
- type PeerSpec
- type Registry
- func (r *Registry) AddPeer(spec PeerSpec) error
- func (r *Registry) PeerSpec(url string) (PeerSpec, bool)
- func (r *Registry) Peers() []string
- func (r *Registry) RemovePeer(url string)
- func (r *Registry) Resolve(capability string) ([]Route, error)
- func (r *Registry) UpdateCapabilities(url string, ids, tags []string) error
- type Route
Constants ¶
const ( MethodSendMessage = "message/send" MethodSendStreamingMessage = "message/stream" MethodGetTask = "tasks/get" MethodListTasks = "tasks/list" MethodCancelTask = "tasks/cancel" MethodSubscribeToTask = "tasks/subscribe" MethodCreateTaskPushNotificationConfig = "tasks/pushNotificationConfig/set" MethodGetTaskPushNotificationConfig = "tasks/pushNotificationConfig/get" MethodListTaskPushNotificationConfigs = "tasks/pushNotificationConfig/list" MethodDeleteTaskPushNotificationConfig = "tasks/pushNotificationConfig/delete" MethodGetExtendedAgentCard = "agent/getAuthenticatedExtendedCard" )
Canonical A2A JSON-RPC method names. The wire driver speaks these over POST against the peer's JSON-RPC endpoint (the AgentInterface URL discovered from the AgentCard).
const AgentCardPath = "/.well-known/agent-card.json"
AgentCardPath is the canonical discovery path appended to a peer's base URL. Per the A2A spec: `GET <peer>/.well-known/agent-card.json`.
const DriverName = "a2a"
DriverName is the registry key for `distributed.OpenRemoteTransport`.
Variables ¶
var ( // ErrInsecureScheme — a peer URL uses http:// but its host is not // loopback and `AllowInsecureLoopback` is not set. AGENTS.md §7 // requires HTTPS for non-localhost peers. ErrInsecureScheme = errors.New("a2a: insecure HTTP scheme; require HTTPS or loopback") // ErrPeerNotAllowed — a call targeted a peer URL that is not // registered with the driver. The driver enforces an explicit // allowlist; "discover-on-call" is intentionally not supported. ErrPeerNotAllowed = errors.New("a2a: peer URL is not in the registered allowlist") // ErrNoJSONRPCInterface — the discovered AgentCard declares no // AgentInterface with `ProtocolBinding == "JSONRPC"`. Phase 29 only // implements the JSON-RPC binding; HTTP+JSON and gRPC bindings on // the same card are read-only metadata. ErrNoJSONRPCInterface = errors.New("a2a: AgentCard exposes no JSONRPC interface") // ErrAgentCardSchemaInvalid — the fetched AgentCard JSON failed to // parse against the Phase 22 Go shapes. ErrAgentCardSchemaInvalid = errors.New("a2a: AgentCard schema invalid") // ErrJSONRPCError — the peer returned a JSON-RPC error envelope. // The wrapping `*jsonRPCError` carries `code` + `message` + `data`. ErrJSONRPCError = errors.New("a2a: JSON-RPC error returned by peer") // ErrSSEStreamMalformed — an SSE frame did not parse to an // `a2a.StreamResponse`. The wrapped detail names the offending // frame number. ErrSSEStreamMalformed = errors.New("a2a: SSE stream malformed") // ErrSSELineTooLong — an SSE line exceeded sseMaxLineBytes. A // hostile peer streaming an unterminated line cannot force // unbounded buffer growth; the parser bails loudly instead. ErrSSELineTooLong = errors.New("a2a: SSE line exceeds maximum length") // ErrInvalidPeerURL — a configured peer URL did not parse. ErrInvalidPeerURL = errors.New("a2a: invalid peer URL") )
Sentinel errors. Callers compare via errors.Is.
Functions ¶
func New ¶
func New(deps distributed.Dependencies, opts ...Option) (distributed.RemoteTransport, error)
New constructs a wire RemoteTransport. Returns an error when:
- Any configured peer has an invalid URL or violates the HTTPS-only rule.
- No peers are configured AND no `WithRegistry` override is supplied (the driver is then not callable — fail at construction).
Types ¶
type Option ¶
type Option func(*driverConfig)
Option configures the wire driver at construction.
func WithAgentCardTTL ¶
WithAgentCardTTL overrides the AgentCard cache TTL (default 10min).
func WithHTTPClient ¶
WithHTTPClient overrides the default *http.Client. Useful for tests that bind to httptest.Server (an explicit InsecureSkipVerify or per-test transport tuning lives on the supplied client).
func WithRegistry ¶
WithRegistry seeds the driver's route registry with a fixed peer list (skipping the `Dependencies.Cfg.Tools.A2APeers` lookup). The caller retains ownership; the driver does not Close the registry.
type PeerSpec ¶
type PeerSpec struct {
// URL is the peer's base URL — e.g. "https://agent.example".
// The wire driver appends "/.well-known/agent-card.json" for
// discovery and the JSON-RPC method paths for calls.
URL string
// TrustTier is an operator-set integer in [1, 5]: 1 = untrusted
// (third-party with no contract), 5 = first-party (in-org).
// Higher values rank higher; the validator rejects values
// outside [1, 5] at config-load time.
TrustTier int
// LatencyTierMS is the operator's hint at the peer's expected
// p50 latency in milliseconds. Smaller values rank higher.
LatencyTierMS int
// AllowInsecureLoopback opts a loopback HTTP peer into the
// driver. See security.go for the precise rule.
AllowInsecureLoopback bool
// AgentCardTTL overrides the driver-level AgentCard cache TTL.
// Zero falls back to the driver default.
AgentCardTTL time.Duration
// Capabilities is the discovered capability vocabulary: the
// union of `AgentSkill.ID` and the entries of `AgentSkill.Tags`
// across the peer's skills. Filled in when the wire driver
// completes discovery; empty until then. Resolve filters by
// exact string equality.
Capabilities []string
}
PeerSpec is a discovered peer's contact info + tier hints. PeerSpec is the input shape to `Registry.AddPeer`; the registry copies it into an internal record so PeerSpec is safe to construct, register, and discard.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maps capability strings (`AgentSkill.ID` or a tag) to a scored list of peer candidates. Read-mostly: AddPeer takes the write lock, Resolve takes the read lock. Internally synchronized; safe for N concurrent goroutines (D-025).
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry constructs an empty route-scoring registry.
func (*Registry) AddPeer ¶
AddPeer registers spec under spec.URL. Replaces any prior record at the same URL. Returns ErrInvalidPeerURL when spec.URL is empty; returns a tier-violation error when TrustTier is outside [1, 5] or LatencyTierMS is negative.
func (*Registry) Peers ¶
Peers returns a snapshot of every registered peer URL. Order is sorted for determinism.
func (*Registry) RemovePeer ¶
RemovePeer unregisters the peer at url. No-op if absent.
type Route ¶
type Route struct {
// PeerURL is the resolved peer's base URL.
PeerURL string
// TrustTier is the operator-supplied trust tier ([1, 5]).
TrustTier int
// LatencyTierMS is the operator-supplied latency tier hint.
LatencyTierMS int
// CapabilityScore is 1 when the capability matches an
// `AgentSkill.ID`; 0 when it matches only via a tag.
CapabilityScore int
// CompositeScore is the deterministic ranking value (higher is
// better).
CompositeScore float64
}
Route is a scored candidate returned by Registry.Resolve. Ordered slices land highest-score-first.