federation

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

包 federation 提供跨组织智能体联合协作与任务编排能力。

概述

federation 解决的核心问题是:当多个独立部署的 Agent 节点需要协同完成任务时, 如何安全地发现彼此、分发任务并汇总结果。它通过联邦网络将分布式节点组织为 一个逻辑整体,支持跨组织边界的智能体协作。

核心模型

本包围绕以下类型展开:

  • FederatedNode:联邦网络中的节点,携带 endpoint、公钥、能力列表与在线状态
  • FederatedTask:跨节点分发的任务,包含优先级、超时、所需能力与执行结果
  • Orchestrator:联邦编排器,负责节点注册、任务提交、分发与结果收集
  • TaskHandler:任务处理函数,由各节点注册以响应特定类型的联邦任务

节点状态通过 NodeStatus 枚举管理(online / offline / degraded), 任务生命周期通过 TaskStatus 枚举跟踪(pending / running / completed / failed)。

主要能力

  • 节点发现与注册:动态加入或移除联邦节点
  • 能力匹配路由:根据 RequiredCaps 自动筛选具备对应能力的目标节点
  • 并行任务分发:向多个目标节点并发下发任务,汇总各节点执行结果
  • 心跳健康检测:周期性检查节点存活状态,自动标记离线节点
  • 本地/远程透明执行:本地节点直接调用 handler,远程节点通过 HTTPS 转发
  • TLS 安全通信:节点间通信默认启用 TLS 加密

与其他包协同

federation 可与 agent/handoff 配合,将跨组织的任务交接建模为联邦任务分发。 同时可与 agent/k8s 结合,在 Kubernetes 集群间构建联邦网络。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentRegistration added in v1.0.0

type AgentRegistration struct {
	ID           string
	Name         string
	Endpoint     string
	Capabilities []string
	Organization string
	Metadata     map[string]string
}

AgentRegistration is the local representation for registering an agent.

type BridgeConfig added in v1.0.0

type BridgeConfig struct {
	SyncInterval time.Duration // how often to sync all nodes
	AutoSync     bool          // auto-sync on node registration
}

BridgeConfig holds configuration for the DiscoveryBridge.

func DefaultBridgeConfig added in v1.0.0

func DefaultBridgeConfig() BridgeConfig

DefaultBridgeConfig returns a BridgeConfig with sensible defaults.

type DiscoveryBridge added in v1.0.0

type DiscoveryBridge struct {
	// contains filtered or unexported fields
}

DiscoveryBridge syncs federated nodes with the discovery registry.

func NewDiscoveryBridge added in v1.0.0

func NewDiscoveryBridge(
	orchestrator *Orchestrator,
	registry DiscoveryRegistry,
	config BridgeConfig,
	logger *zap.Logger,
) *DiscoveryBridge

NewDiscoveryBridge creates a new DiscoveryBridge.

func (*DiscoveryBridge) Start added in v1.0.0

func (b *DiscoveryBridge) Start(ctx context.Context) error

Start begins periodic sync and auto-sync on node changes.

func (*DiscoveryBridge) Stop added in v1.0.0

func (b *DiscoveryBridge) Stop()

Stop stops the bridge. It is safe to call multiple times.

func (*DiscoveryBridge) SyncAllNodes added in v1.0.0

func (b *DiscoveryBridge) SyncAllNodes(ctx context.Context) error

SyncAllNodes syncs all known federated nodes to discovery.

func (*DiscoveryBridge) SyncNode added in v1.0.0

func (b *DiscoveryBridge) SyncNode(ctx context.Context, node *FederatedNode) error

SyncNode registers a single federated node's capabilities in discovery.

type DiscoveryRegistry added in v1.0.0

type DiscoveryRegistry interface {
	RegisterAgent(ctx context.Context, info *AgentRegistration) error
	UnregisterAgent(ctx context.Context, agentID string) error
	UpdateAgentStatus(ctx context.Context, agentID string, status string) error
}

DiscoveryRegistry is a local interface matching the subset of discovery.CapabilityRegistry needed for federation sync. Using local interface pattern (§15).

type DiscoveryRegistryAdapter added in v1.0.0

type DiscoveryRegistryAdapter struct {
	// contains filtered or unexported fields
}

DiscoveryRegistryAdapter adapts discovery.DiscoveryService to the DiscoveryRegistry interface used by the federation bridge.

func NewDiscoveryRegistryAdapter added in v1.0.0

func NewDiscoveryRegistryAdapter(service *discovery.DiscoveryService) *DiscoveryRegistryAdapter

NewDiscoveryRegistryAdapter creates a new adapter wrapping the given DiscoveryService.

func (*DiscoveryRegistryAdapter) RegisterAgent added in v1.0.0

func (a *DiscoveryRegistryAdapter) RegisterAgent(ctx context.Context, info *AgentRegistration) error

RegisterAgent converts an AgentRegistration to discovery.AgentInfo and registers it via the discovery service.

func (*DiscoveryRegistryAdapter) UnregisterAgent added in v1.0.0

func (a *DiscoveryRegistryAdapter) UnregisterAgent(ctx context.Context, agentID string) error

UnregisterAgent removes an agent from the discovery service.

func (*DiscoveryRegistryAdapter) UpdateAgentStatus added in v1.0.0

func (a *DiscoveryRegistryAdapter) UpdateAgentStatus(ctx context.Context, agentID string, status string) error

UpdateAgentStatus updates an agent's status in the discovery registry.

type FederatedNode

type FederatedNode struct {
	ID           string            `json:"id"`
	Name         string            `json:"name"`
	Endpoint     string            `json:"endpoint"`
	PublicKey    string            `json:"public_key,omitempty"`
	Capabilities []string          `json:"capabilities"`
	Metadata     map[string]string `json:"metadata,omitempty"`
	Status       NodeStatus        `json:"status"`
	LastSeen     time.Time         `json:"last_seen"`
}

联邦 节点代表联邦的一个节点.

type FederatedTask

type FederatedTask struct {
	ID           string         `json:"id"`
	Type         string         `json:"type"`
	Payload      any            `json:"payload"`
	SourceNode   string         `json:"source_node"`
	TargetNodes  []string       `json:"target_nodes,omitempty"`
	Priority     int            `json:"priority"`
	Timeout      time.Duration  `json:"timeout"`
	RequiredCaps []string       `json:"required_capabilities,omitempty"`
	CreatedAt    time.Time      `json:"created_at"`
	Status       TaskStatus     `json:"status"`
	Results      map[string]any `json:"results,omitempty"`
}

联邦 这项任务是分配给各联邦的任务。

type FederationConfig

type FederationConfig struct {
	NodeID            string
	NodeName          string
	ListenAddr        string
	TLSConfig         *tls.Config
	HeartbeatInterval time.Duration
	TaskTimeout       time.Duration
}

Federation Config配置了联邦管弦乐团.

type NodeStatus

type NodeStatus string

节点状态代表一个联邦节点的地位.

const (
	NodeStatusOnline   NodeStatus = "online"
	NodeStatusOffline  NodeStatus = "offline"
	NodeStatusDegraded NodeStatus = "degraded"
)

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator管理联邦代理合作.

func NewOrchestrator

func NewOrchestrator(config FederationConfig, logger *zap.Logger) *Orchestrator

新奥尔良创造了一个新的联邦管弦乐团.

func (*Orchestrator) GetTask

func (o *Orchestrator) GetTask(taskID string) (*FederatedTask, bool)

GetTask 以 ID 检索任务 。

func (*Orchestrator) ListNodes

func (o *Orchestrator) ListNodes() []*FederatedNode

ListNodes 返回所有已注册的节点 。

func (*Orchestrator) RegisterHandler

func (o *Orchestrator) RegisterHandler(taskType string, handler TaskHandler)

登记 Handler 登记任务处理器 。

func (*Orchestrator) RegisterNode

func (o *Orchestrator) RegisterNode(node *FederatedNode)

注册点在联邦登记一个节点。

func (*Orchestrator) SetOnNodeRegister added in v1.0.0

func (o *Orchestrator) SetOnNodeRegister(fn func(node *FederatedNode))

SetOnNodeRegister sets a callback invoked after a node is registered.

func (*Orchestrator) SetOnNodeStatusChange added in v1.0.0

func (o *Orchestrator) SetOnNodeStatusChange(fn func(nodeID string, status NodeStatus))

SetOnNodeStatusChange sets a callback invoked when a node's status changes.

func (*Orchestrator) SetOnNodeUnregister added in v1.0.0

func (o *Orchestrator) SetOnNodeUnregister(fn func(nodeID string))

SetOnNodeUnregister sets a callback invoked after a node is unregistered.

func (*Orchestrator) Start

func (o *Orchestrator) Start(ctx context.Context) error

开始指挥

func (*Orchestrator) Stop

func (o *Orchestrator) Stop()

停止停止指挥器。

func (*Orchestrator) SubmitTask

func (o *Orchestrator) SubmitTask(ctx context.Context, task *FederatedTask) error

向联邦提交任务。

func (*Orchestrator) UnregisterNode

func (o *Orchestrator) UnregisterNode(nodeID string)

UnregisterNode从联邦中删除一个节点.

type TaskHandler

type TaskHandler func(ctx context.Context, task *FederatedTask) (any, error)

特劳斯·汉德勒处理联邦任务.

type TaskStatus

type TaskStatus string

任务状态代表联合任务状态.

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL