Documentation
¶
Overview ¶
包 federation 提供跨组织智能体联合协作与任务编排能力。
概述 ¶
federation 解决的核心问题是:当多个独立部署的 Agent 节点需要协同完成任务时, 如何安全地发现彼此、分发任务并汇总结果。它通过联邦网络将分布式节点组织为 一个逻辑整体,支持跨组织边界的智能体协作。
核心模型 ¶
本包围绕以下类型展开:
- FederatedNode:联邦网络中的节点,携带 endpoint、公钥、能力列表与在线状态
- FederatedTask:跨节点分发的任务,包含优先级、超时、所需能力与执行结果
- Orchestrator:联邦编排器,负责节点注册、任务提交、分发与结果收集
- TaskHandler:任务处理函数,由各节点注册以响应特定类型的联邦任务
节点状态通过 NodeStatus 枚举管理(online / offline / degraded), 任务生命周期通过 TaskStatus 枚举跟踪(pending / running / completed / failed)。
主要能力 ¶
- 节点发现与注册:动态加入或移除联邦节点
- 能力匹配路由:根据 RequiredCaps 自动筛选具备对应能力的目标节点
- 并行任务分发:向多个目标节点并发下发任务,汇总各节点执行结果
- 心跳健康检测:周期性检查节点存活状态,自动标记离线节点
- 本地/远程透明执行:本地节点直接调用 handler,远程节点通过 HTTPS 转发
- TLS 安全通信:节点间通信默认启用 TLS 加密
与其他包协同 ¶
federation 可与 agent/handoff 配合,将跨组织的任务交接建模为联邦任务分发。 同时可与 agent/k8s 结合,在 Kubernetes 集群间构建联邦网络。
Index ¶
- type AgentRegistration
- type BridgeConfig
- type DiscoveryBridge
- type DiscoveryRegistry
- type DiscoveryRegistryAdapter
- func (a *DiscoveryRegistryAdapter) RegisterAgent(ctx context.Context, info *AgentRegistration) error
- func (a *DiscoveryRegistryAdapter) UnregisterAgent(ctx context.Context, agentID string) error
- func (a *DiscoveryRegistryAdapter) UpdateAgentStatus(ctx context.Context, agentID string, status string) error
- type FederatedNode
- type FederatedTask
- type FederationConfig
- type NodeStatus
- type Orchestrator
- func (o *Orchestrator) GetTask(taskID string) (*FederatedTask, bool)
- func (o *Orchestrator) ListNodes() []*FederatedNode
- func (o *Orchestrator) RegisterHandler(taskType string, handler TaskHandler)
- func (o *Orchestrator) RegisterNode(node *FederatedNode)
- func (o *Orchestrator) SetOnNodeRegister(fn func(node *FederatedNode))
- func (o *Orchestrator) SetOnNodeStatusChange(fn func(nodeID string, status NodeStatus))
- func (o *Orchestrator) SetOnNodeUnregister(fn func(nodeID string))
- func (o *Orchestrator) Start(ctx context.Context) error
- func (o *Orchestrator) Stop()
- func (o *Orchestrator) SubmitTask(ctx context.Context, task *FederatedTask) error
- func (o *Orchestrator) UnregisterNode(nodeID string)
- type TaskHandler
- type TaskStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentRegistration ¶ added in v1.0.0
type AgentRegistration struct {
ID string
Name string
Endpoint string
Capabilities []string
Organization string
Metadata map[string]string
}
AgentRegistration is the local representation for registering an agent.
type BridgeConfig ¶ added in v1.0.0
type BridgeConfig struct {
SyncInterval time.Duration // how often to sync all nodes
AutoSync bool // auto-sync on node registration
}
BridgeConfig holds configuration for the DiscoveryBridge.
func DefaultBridgeConfig ¶ added in v1.0.0
func DefaultBridgeConfig() BridgeConfig
DefaultBridgeConfig returns a BridgeConfig with sensible defaults.
type DiscoveryBridge ¶ added in v1.0.0
type DiscoveryBridge struct {
// contains filtered or unexported fields
}
DiscoveryBridge syncs federated nodes with the discovery registry.
func NewDiscoveryBridge ¶ added in v1.0.0
func NewDiscoveryBridge( orchestrator *Orchestrator, registry DiscoveryRegistry, config BridgeConfig, logger *zap.Logger, ) *DiscoveryBridge
NewDiscoveryBridge creates a new DiscoveryBridge.
func (*DiscoveryBridge) Start ¶ added in v1.0.0
func (b *DiscoveryBridge) Start(ctx context.Context) error
Start begins periodic sync and auto-sync on node changes.
func (*DiscoveryBridge) Stop ¶ added in v1.0.0
func (b *DiscoveryBridge) Stop()
Stop stops the bridge. It is safe to call multiple times.
func (*DiscoveryBridge) SyncAllNodes ¶ added in v1.0.0
func (b *DiscoveryBridge) SyncAllNodes(ctx context.Context) error
SyncAllNodes syncs all known federated nodes to discovery.
func (*DiscoveryBridge) SyncNode ¶ added in v1.0.0
func (b *DiscoveryBridge) SyncNode(ctx context.Context, node *FederatedNode) error
SyncNode registers a single federated node's capabilities in discovery.
type DiscoveryRegistry ¶ added in v1.0.0
type DiscoveryRegistry interface {
RegisterAgent(ctx context.Context, info *AgentRegistration) error
UnregisterAgent(ctx context.Context, agentID string) error
UpdateAgentStatus(ctx context.Context, agentID string, status string) error
}
DiscoveryRegistry is a local interface matching the subset of discovery.CapabilityRegistry needed for federation sync. Using local interface pattern (§15).
type DiscoveryRegistryAdapter ¶ added in v1.0.0
type DiscoveryRegistryAdapter struct {
// contains filtered or unexported fields
}
DiscoveryRegistryAdapter adapts discovery.DiscoveryService to the DiscoveryRegistry interface used by the federation bridge.
func NewDiscoveryRegistryAdapter ¶ added in v1.0.0
func NewDiscoveryRegistryAdapter(service *discovery.DiscoveryService) *DiscoveryRegistryAdapter
NewDiscoveryRegistryAdapter creates a new adapter wrapping the given DiscoveryService.
func (*DiscoveryRegistryAdapter) RegisterAgent ¶ added in v1.0.0
func (a *DiscoveryRegistryAdapter) RegisterAgent(ctx context.Context, info *AgentRegistration) error
RegisterAgent converts an AgentRegistration to discovery.AgentInfo and registers it via the discovery service.
func (*DiscoveryRegistryAdapter) UnregisterAgent ¶ added in v1.0.0
func (a *DiscoveryRegistryAdapter) UnregisterAgent(ctx context.Context, agentID string) error
UnregisterAgent removes an agent from the discovery service.
func (*DiscoveryRegistryAdapter) UpdateAgentStatus ¶ added in v1.0.0
func (a *DiscoveryRegistryAdapter) UpdateAgentStatus(ctx context.Context, agentID string, status string) error
UpdateAgentStatus updates an agent's status in the discovery registry.
type FederatedNode ¶
type FederatedNode struct {
ID string `json:"id"`
Name string `json:"name"`
Endpoint string `json:"endpoint"`
PublicKey string `json:"public_key,omitempty"`
Capabilities []string `json:"capabilities"`
Metadata map[string]string `json:"metadata,omitempty"`
Status NodeStatus `json:"status"`
LastSeen time.Time `json:"last_seen"`
}
联邦 节点代表联邦的一个节点.
type FederatedTask ¶
type FederatedTask struct {
ID string `json:"id"`
Type string `json:"type"`
Payload any `json:"payload"`
SourceNode string `json:"source_node"`
TargetNodes []string `json:"target_nodes,omitempty"`
Priority int `json:"priority"`
Timeout time.Duration `json:"timeout"`
RequiredCaps []string `json:"required_capabilities,omitempty"`
CreatedAt time.Time `json:"created_at"`
Status TaskStatus `json:"status"`
Results map[string]any `json:"results,omitempty"`
}
联邦 这项任务是分配给各联邦的任务。
type FederationConfig ¶
type FederationConfig struct {
NodeID string
NodeName string
ListenAddr string
TLSConfig *tls.Config
HeartbeatInterval time.Duration
TaskTimeout time.Duration
}
Federation Config配置了联邦管弦乐团.
type NodeStatus ¶
type NodeStatus string
节点状态代表一个联邦节点的地位.
const ( NodeStatusOnline NodeStatus = "online" NodeStatusOffline NodeStatus = "offline" NodeStatusDegraded NodeStatus = "degraded" )
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator管理联邦代理合作.
func NewOrchestrator ¶
func NewOrchestrator(config FederationConfig, logger *zap.Logger) *Orchestrator
新奥尔良创造了一个新的联邦管弦乐团.
func (*Orchestrator) GetTask ¶
func (o *Orchestrator) GetTask(taskID string) (*FederatedTask, bool)
GetTask 以 ID 检索任务 。
func (*Orchestrator) ListNodes ¶
func (o *Orchestrator) ListNodes() []*FederatedNode
ListNodes 返回所有已注册的节点 。
func (*Orchestrator) RegisterHandler ¶
func (o *Orchestrator) RegisterHandler(taskType string, handler TaskHandler)
登记 Handler 登记任务处理器 。
func (*Orchestrator) RegisterNode ¶
func (o *Orchestrator) RegisterNode(node *FederatedNode)
注册点在联邦登记一个节点。
func (*Orchestrator) SetOnNodeRegister ¶ added in v1.0.0
func (o *Orchestrator) SetOnNodeRegister(fn func(node *FederatedNode))
SetOnNodeRegister sets a callback invoked after a node is registered.
func (*Orchestrator) SetOnNodeStatusChange ¶ added in v1.0.0
func (o *Orchestrator) SetOnNodeStatusChange(fn func(nodeID string, status NodeStatus))
SetOnNodeStatusChange sets a callback invoked when a node's status changes.
func (*Orchestrator) SetOnNodeUnregister ¶ added in v1.0.0
func (o *Orchestrator) SetOnNodeUnregister(fn func(nodeID string))
SetOnNodeUnregister sets a callback invoked after a node is unregistered.
func (*Orchestrator) SubmitTask ¶
func (o *Orchestrator) SubmitTask(ctx context.Context, task *FederatedTask) error
向联邦提交任务。
func (*Orchestrator) UnregisterNode ¶
func (o *Orchestrator) UnregisterNode(nodeID string)
UnregisterNode从联邦中删除一个节点.
type TaskHandler ¶
type TaskHandler func(ctx context.Context, task *FederatedTask) (any, error)
特劳斯·汉德勒处理联邦任务.
type TaskStatus ¶
type TaskStatus string
任务状态代表联合任务状态.
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" )