multiagent

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

multiagent 多智能体协作系统

本模块是 goagent 框架的多智能体协作系统,提供多智能体交互、任务调度、消息通信和分布式协作能力。

目录

架构设计

系统架构图
graph TB
    subgraph MultiAgentSystem["MultiAgentSystem 多智能体系统"]
        A1["Agent 1<br/>Leader"]
        A2["Agent 2<br/>Worker"]
        A3["Agent 3<br/>Worker"]
        A4["Agent 4<br/>Validator"]
        MR["MessageRouter<br/>消息路由"]
        MQ["MessageQueue<br/>消息队列"]
        SM["SessionManager<br/>会话管理"]
    end

    subgraph Communication["通信层"]
        MC["MemoryCommunicator<br/>内存通信"]
        NC["NATSCommunicator<br/>分布式通信"]
        CS["ChannelStore<br/>Channel存储"]
    end

    subgraph TaskExecution["任务执行引擎"]
        Parallel["executeParallelTask<br/>并行执行"]
        Sequential["executeSequentialTask<br/>顺序执行"]
        Hierarchical["executeHierarchicalTask<br/>分层执行"]
        Consensus["executeConsensusTask<br/>共识执行"]
        Pipeline["executePipelineTask<br/>流水线执行"]
    end

    A1 --> MR
    A2 --> MR
    A3 --> MR
    A4 --> MR
    MR --> MQ

    MC --> CS
    NC --> NATS["NATS Server"]

    MultiAgentSystem --> TaskExecution
    MultiAgentSystem --> Communication

    style MultiAgentSystem fill:#e1f5ff
    style Communication fill:#fff3e0
    style TaskExecution fill:#e8f5e9
组件关系图
classDiagram
    class CollaborativeAgent {
        <<interface>>
        +GetRole() Role
        +SetRole(role Role)
        +ReceiveMessage(ctx, message) error
        +SendMessage(ctx, message) error
        +Collaborate(ctx, task) (*Assignment, error)
        +Vote(ctx, proposal) (bool, error)
    }

    class Communicator {
        <<interface>>
        +Send(ctx, to, message) error
        +Receive(ctx) (*AgentMessage, error)
        +Broadcast(ctx, message) error
        +Subscribe(ctx, topic) (<-chan, error)
        +Unsubscribe(ctx, topic) error
        +Close() error
    }

    class ChannelStore {
        <<interface>>
        +GetOrCreateChannel(agentID) chan
        +GetChannel(agentID) chan
        +ListChannels() []string
        +Subscribe(topic) <-chan
        +Unsubscribe(topic)
        +Close() error
    }

    class MultiAgentSystem {
        +RegisterAgent(id, agent) error
        +UnregisterAgent(id) error
        +CreateTeam(team) error
        +ExecuteTask(ctx, task) (*Task, error)
        +SendMessage(message) error
        +Close() error
    }

    CollaborativeAgent --> MultiAgentSystem : 注册
    Communicator --> ChannelStore : 使用
    MultiAgentSystem --> Communicator : 通信

核心组件

1. MultiAgentSystem 系统管理器

多智能体系统的核心管理器:

type MultiAgentSystem struct {
    agents      map[string]CollaborativeAgent
    teams       map[string]*Team
    messageQueue chan Message
    router      *MessageRouter
    logger      loggercore.Logger
}
方法 说明
RegisterAgent(id, agent) 注册智能体
UnregisterAgent(id) 注销智能体
CreateTeam(team) 创建团队
ExecuteTask(ctx, task) 执行协作任务
SendMessage(message) 发送消息
Close() 关闭系统
2. CollaborativeAgent 协作智能体

协作智能体接口定义:

type CollaborativeAgent interface {
    core.Agent
    GetRole() Role
    SetRole(role Role)
    ReceiveMessage(ctx context.Context, message Message) error
    SendMessage(ctx context.Context, message Message) error
    Collaborate(ctx context.Context, task *CollaborativeTask) (*Assignment, error)
    Vote(ctx context.Context, proposal interface{}) (bool, error)
}
3. Communicator 通信器

支持两种通信实现:

通信器 说明 适用场景
MemoryCommunicator 内存通信 单机多智能体
NATSCommunicator NATS 分布式 分布式部署
4. CollaborativeTask 协作任务

任务定义和状态:

type CollaborativeTask struct {
    ID          string
    Name        string
    Description string
    Type        CollaborationType
    Input       interface{}
    Output      interface{}
    Status      TaskStatus
    Assignments map[string]Assignment
    Results     map[string]interface{}
    StartTime   time.Time
    EndTime     time.Time
}

协作模式

五种协作类型
graph TB
    subgraph CollaborationTypes["协作类型"]
        direction LR
        Parallel["Parallel<br/>并行协作"]
        Sequential["Sequential<br/>顺序协作"]
        Hierarchical["Hierarchical<br/>分层协作"]
        Consensus["Consensus<br/>共识协作"]
        Pipeline["Pipeline<br/>管道协作"]
    end

    subgraph Characteristics["特点"]
        P_Char["独立执行<br/>结果合并"]
        S_Char["链式处理<br/>顺序依赖"]
        H_Char["层级分明<br/>职责清晰"]
        C_Char["投票决策<br/>达成共识"]
        Pi_Char["流式处理<br/>阶段划分"]
    end

    Parallel --> P_Char
    Sequential --> S_Char
    Hierarchical --> H_Char
    Consensus --> C_Char
    Pipeline --> Pi_Char
1. Parallel 并行协作

所有智能体同时执行独立子任务:

graph LR
    Task["协作任务"] --> A1["Agent 1"]
    Task --> A2["Agent 2"]
    Task --> A3["Agent 3"]
    A1 --> Merge["结果合并"]
    A2 --> Merge
    A3 --> Merge
2. Sequential 顺序协作

按顺序执行,前一个的输出作为后一个的输入:

graph LR
    Input["输入"] --> A1["Agent 1<br/>采集"]
    A1 --> A2["Agent 2<br/>处理"]
    A2 --> A3["Agent 3<br/>分析"]
    A3 --> Output["输出"]
3. Hierarchical 分层协作

Leader 分配任务,Worker 执行,Validator 验证:

graph TB
    Leader["Leader<br/>领导者"] -->|"分配任务"| Worker1["Worker 1"]
    Leader -->|"分配任务"| Worker2["Worker 2"]
    Worker1 -->|"提交结果"| Validator["Validator<br/>验证者"]
    Worker2 -->|"提交结果"| Validator
    Validator -->|"验证报告"| Leader
4. Consensus 共识协作

多个智能体投票决策:

graph TB
    Proposal["提案"] --> V1["Agent 1<br/>投票"]
    Proposal --> V2["Agent 2<br/>投票"]
    Proposal --> V3["Agent 3<br/>投票"]
    V1 -->|"同意"| Count["统计票数"]
    V2 -->|"同意"| Count
    V3 -->|"反对"| Count
    Count -->|"2/3 > 60%"| Result["提案通过"]
5. Pipeline 管道协作

数据流经多个处理阶段:

graph LR
    Source["数据源"] --> E["Extract<br/>抽取"]
    E -->|"原始数据"| T["Transform<br/>转换"]
    T -->|"清洗数据"| L["Load<br/>加载"]
    L --> Target["目标存储"]

    style E fill:#e1f5fe
    style T fill:#fff3e0
    style L fill:#e8f5e9
协作模式选择
flowchart TD
    Start["选择协作类型"] --> Q1{"任务间有依赖吗?"}

    Q1 -->|"无依赖"| Q2{"需要投票决策吗?"}
    Q1 -->|"有依赖"| Q3{"是流式数据吗?"}

    Q2 -->|"是"| Consensus["Consensus<br/>共识协作"]
    Q2 -->|"否"| Parallel["Parallel<br/>并行协作"]

    Q3 -->|"是"| Pipeline["Pipeline<br/>管道协作"]
    Q3 -->|"否"| Q4{"需要层级管理吗?"}

    Q4 -->|"是"| Hierarchical["Hierarchical<br/>分层协作"]
    Q4 -->|"否"| Sequential["Sequential<br/>顺序协作"]

角色系统

六种角色定义
graph TB
    subgraph Roles["智能体角色"]
        L["Leader<br/>领导者"]
        W["Worker<br/>工作者"]
        C["Coordinator<br/>协调者"]
        S["Specialist<br/>专家"]
        V["Validator<br/>验证者"]
        O["Observer<br/>观察者"]
    end

    subgraph Responsibilities["职责"]
        L1["制定计划<br/>分配任务<br/>验证结果"]
        W1["执行分配任务<br/>返回结果"]
        C1["管理通信<br/>同步状态<br/>协调执行"]
        S1["提供专业知识<br/>领域分析<br/>建议优化"]
        V1["验证正确性<br/>检查质量<br/>评估风险"]
        O1["监控进度<br/>收集指标<br/>报告状态"]
    end

    L --> L1
    W --> W1
    C --> C1
    S --> S1
    V --> V1
    O --> O1
角色常量
const (
    RoleLeader      = "leader"      // 领导者:制定计划和决策
    RoleWorker      = "worker"      // 工作者:执行任务
    RoleCoordinator = "coordinator" // 协调者:管理通信与同步
    RoleSpecialist  = "specialist"  // 专家:提供领域专业知识
    RoleValidator   = "validator"   // 验证者:验证结果
    RoleObserver    = "observer"    // 观察者:监控与报告
)

使用方法

创建多智能体系统
// 创建系统
logger := /* ... */
system := multiagent.NewMultiAgentSystem(logger)

// 创建智能体
leader := multiagent.NewBaseCollaborativeAgent(
    "leader1", "Leader", multiagent.RoleLeader, system)
worker1 := multiagent.NewBaseCollaborativeAgent(
    "worker1", "Worker 1", multiagent.RoleWorker, system)
worker2 := multiagent.NewBaseCollaborativeAgent(
    "worker2", "Worker 2", multiagent.RoleWorker, system)
validator := multiagent.NewBaseCollaborativeAgent(
    "validator1", "Validator", multiagent.RoleValidator, system)

// 注册智能体
system.RegisterAgent("leader1", leader)
system.RegisterAgent("worker1", worker1)
system.RegisterAgent("worker2", worker2)
system.RegisterAgent("validator1", validator)

// 优雅关闭
defer system.Close()
执行协作任务
// 创建并行任务
task := &multiagent.CollaborativeTask{
    ID:          "task-001",
    Name:        "数据处理",
    Type:        multiagent.CollaborationTypeParallel,
    Input: map[string]interface{}{
        "data": "example",
    },
    Assignments: make(map[string]multiagent.Assignment),
}

// 执行任务
ctx := context.Background()
result, err := system.ExecuteTask(ctx, task)
if err != nil {
    log.Fatal(err)
}

// 访问结果
for agentID, assignment := range result.Assignments {
    fmt.Printf("Agent %s: %v\n", agentID, assignment.Result)
}
分层协作示例
// 创建分层任务
task := &multiagent.CollaborativeTask{
    ID:   "hierarchical-task",
    Name: "项目管理",
    Type: multiagent.CollaborationTypeHierarchical,
    Input: map[string]interface{}{
        "project": "AI Agent 开发",
        "deadline": "2024-12-31",
    },
    Assignments: make(map[string]multiagent.Assignment),
}

// Leader 会先制定计划,然后分配给 Workers
result, err := system.ExecuteTask(ctx, task)
共识决策示例
// 创建共识任务
task := &multiagent.CollaborativeTask{
    ID:   "consensus-task",
    Name: "架构决策",
    Type: multiagent.CollaborationTypeConsensus,
    Input: map[string]interface{}{
        "proposal": "采用微服务架构",
        "threshold": 0.6, // 60% 通过
    },
    Assignments: make(map[string]multiagent.Assignment),
}

// 所有智能体投票
result, err := system.ExecuteTask(ctx, task)
fmt.Printf("投票结果: %v\n", result.Output)
消息通信
// 发送消息
message := multiagent.Message{
    ID:        "msg-001",
    From:      "leader1",
    To:        "worker1",
    Type:      multiagent.MessageTypeCommand,
    Content:   "开始处理数据",
    Priority:  1,
    Timestamp: time.Now(),
}
system.SendMessage(message)

// 智能体接收消息
agent.ReceiveMessage(ctx, message)
分布式部署(NATS)
// 创建 NATS 通信器
config := &multiagent.NATSConfig{
    URL:       "nats://localhost:4222",
    ClusterID: "goagent-cluster",
}
communicator, err := multiagent.NewNATSCommunicator("agent1", config)
if err != nil {
    log.Fatal(err)
}
defer communicator.Close()

// 发送消息
msg := &multiagent.AgentMessage{
    From:    "agent1",
    To:      "agent2",
    Type:    multiagent.MessageTypeRequest,
    Payload: "Hello",
}
communicator.Send(ctx, "agent2", msg)

// 订阅主题
ch, _ := communicator.Subscribe(ctx, "notifications")
for msg := range ch {
    fmt.Printf("收到消息: %v\n", msg)
}

API 参考

MultiAgentSystem API
// 创建系统
NewMultiAgentSystem(log Logger, opts ...SystemOption) *MultiAgentSystem

// 智能体管理
RegisterAgent(id string, agent CollaborativeAgent) error
UnregisterAgent(id string) error

// 团队管理
CreateTeam(team *Team) error

// 任务执行
ExecuteTask(ctx context.Context, task *CollaborativeTask) (*CollaborativeTask, error)

// 消息通信
SendMessage(message Message) error

// 生命周期
Close() error
CollaborativeAgent API
// 角色管理
GetRole() Role
SetRole(role Role)

// 消息通信
ReceiveMessage(ctx context.Context, message Message) error
SendMessage(ctx context.Context, message Message) error

// 协作参与
Collaborate(ctx context.Context, task *CollaborativeTask) (*Assignment, error)

// 共识投票
Vote(ctx context.Context, proposal interface{}) (bool, error)

// 状态管理
GetState(key string) (interface{}, bool)
SetState(key string, value interface{})
DeleteState(key string)
Communicator API
// 内存通信器
NewMemoryCommunicator(agentID string) *MemoryCommunicator
NewMemoryCommunicatorWithStore(agentID string, store ChannelStore) *MemoryCommunicator

// NATS 通信器
NewNATSCommunicator(agentID string, config *NATSConfig) (*NATSCommunicator, error)

// 通信操作
Send(ctx context.Context, to string, message *AgentMessage) error
Receive(ctx context.Context) (*AgentMessage, error)
Broadcast(ctx context.Context, message *AgentMessage) error
Subscribe(ctx context.Context, topic string) (<-chan *AgentMessage, error)
Unsubscribe(ctx context.Context, topic string) error
Router 和 Session API
// 消息路由
NewMessageRouter() *MessageRouter
RegisterRoute(pattern string, handler RouteHandler) error
RegisterPatternRoute(pattern string, handler RouteHandler) error
Route(ctx context.Context, message *AgentMessage) (*AgentMessage, error)

// 会话管理
NewSessionManager() *SessionManager
CreateSession(participants []string) (*AgentSession, error)
GetSession(sessionID string) (*AgentSession, error)
AddMessage(sessionID string, message *AgentMessage) error
CloseSession(sessionID string) error

代码结构

multiagent/
├── system.go               # 多智能体系统核心管理器
├── collaborative_agent.go  # 协作智能体实现
├── communication.go        # 通信接口与消息定义
├── communicator_memory.go  # 内存通信器
├── communicator_nats.go    # NATS 分布式通信器
├── router.go               # 消息路由器
├── channel_store.go        # Channel 存储接口
├── collaborative_agent_test.go
├── communicator_test.go
├── router_test.go
└── system_test.go

消息类型

const (
    MessageTypeRequest      = "request"      // 请求
    MessageTypeResponse     = "response"     // 响应
    MessageTypeBroadcast    = "broadcast"    // 广播
    MessageTypeNotification = "notification" // 通知
    MessageTypeCommand      = "command"      // 命令
    MessageTypeReport       = "report"       // 报告
    MessageTypeVote         = "vote"         // 投票
)

任务状态

const (
    TaskStatusPending   = "pending"   // 等待中
    TaskStatusAssigned  = "assigned"  // 已分配
    TaskStatusExecuting = "executing" // 执行中
    TaskStatusCompleted = "completed" // 已完成
    TaskStatusFailed    = "failed"    // 失败
    TaskStatusCancelled = "cancelled" // 已取消
)

扩展阅读

Documentation

Overview

Package multiagent provides multi-agent collaboration capabilities

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetDefaultChannelStore

func SetDefaultChannelStore(store ChannelStore)

SetDefaultChannelStore 设置全局默认的 ChannelStore(用于第三方实现)

Types

type AgentMessage

type AgentMessage struct {
	ID           string                 `json:"id"`
	From         string                 `json:"from"`
	To           string                 `json:"to"`
	Topic        string                 `json:"topic"`
	Type         MessageType            `json:"type"`
	Payload      interface{}            `json:"payload"`
	Metadata     map[string]string      `json:"metadata"`
	Timestamp    time.Time              `json:"timestamp"`
	TraceContext propagation.MapCarrier `json:"trace_context,omitempty"` // 追踪上下文
}

AgentMessage 消息(为避免与 Message 冲突)

func NewAgentMessage

func NewAgentMessage(from, to string, msgType MessageType, payload interface{}) *AgentMessage

NewAgentMessage 创建新消息

type AgentSession

type AgentSession struct {
	ID           string
	Participants []string
	Messages     []*AgentMessage
	State        map[string]interface{}
	CreatedAt    string
	UpdatedAt    string
}

AgentSession Agent 会话

type Assignment

type Assignment struct {
	AgentID   string      `json:"agent_id"`
	Role      Role        `json:"role"`
	Subtask   interface{} `json:"subtask"`
	Status    TaskStatus  `json:"status"`
	Result    interface{} `json:"result,omitempty"`
	StartTime time.Time   `json:"start_time,omitempty"`
	EndTime   time.Time   `json:"end_time,omitempty"`
}

Assignment represents an agent's assignment in a task

type BaseCollaborativeAgent

type BaseCollaborativeAgent struct {
	*core.BaseAgent
	// contains filtered or unexported fields
}

BaseCollaborativeAgent provides a base implementation of CollaborativeAgent

func NewBaseCollaborativeAgent

func NewBaseCollaborativeAgent(id, description string, role Role, system *MultiAgentSystem) *BaseCollaborativeAgent

NewBaseCollaborativeAgent creates a new base collaborative agent

func (*BaseCollaborativeAgent) ClearState added in v0.6.0

func (a *BaseCollaborativeAgent) ClearState()

ClearState clears all state (thread-safe)

func (*BaseCollaborativeAgent) Collaborate

Collaborate participates in a collaborative task

func (*BaseCollaborativeAgent) DeleteState added in v0.6.0

func (a *BaseCollaborativeAgent) DeleteState(key string)

DeleteState removes a value from the agent's state (thread-safe)

func (*BaseCollaborativeAgent) Execute

Execute implements the Agent interface

func (*BaseCollaborativeAgent) GetMessageTimeout added in v0.6.0

func (a *BaseCollaborativeAgent) GetMessageTimeout() time.Duration

GetMessageTimeout returns the message timeout duration

func (*BaseCollaborativeAgent) GetRole

func (a *BaseCollaborativeAgent) GetRole() Role

GetRole returns the agent's role

func (*BaseCollaborativeAgent) GetState added in v0.6.0

func (a *BaseCollaborativeAgent) GetState(key string) (interface{}, bool)

GetState returns a value from the agent's state (thread-safe)

func (*BaseCollaborativeAgent) ReceiveMessage

func (a *BaseCollaborativeAgent) ReceiveMessage(ctx context.Context, message Message) error

ReceiveMessage handles incoming messages

func (*BaseCollaborativeAgent) SendMessage

func (a *BaseCollaborativeAgent) SendMessage(ctx context.Context, message Message) error

SendMessage sends a message to another agent

func (*BaseCollaborativeAgent) SetMessageTimeout added in v0.6.0

func (a *BaseCollaborativeAgent) SetMessageTimeout(timeout time.Duration)

SetMessageTimeout sets the message timeout duration

func (*BaseCollaborativeAgent) SetRole

func (a *BaseCollaborativeAgent) SetRole(role Role)

SetRole sets the agent's role

func (*BaseCollaborativeAgent) SetState added in v0.6.0

func (a *BaseCollaborativeAgent) SetState(key string, value interface{})

SetState sets a value in the agent's state (thread-safe)

func (*BaseCollaborativeAgent) Vote

func (a *BaseCollaborativeAgent) Vote(ctx context.Context, proposal interface{}) (bool, error)

Vote participates in consensus decision making

type ChannelStore

type ChannelStore interface {
	// GetOrCreateChannel 获取或创建指定 agent 的消息 channel
	GetOrCreateChannel(agentID string) chan *AgentMessage

	// GetChannel 获取指定 agent 的消息 channel,如果不存在返回 nil
	GetChannel(agentID string) chan *AgentMessage

	// ListChannels 列出所有 channel 的 agent ID
	ListChannels() []string

	// Subscribe 订阅主题,返回接收消息的 channel
	Subscribe(topic string) <-chan *AgentMessage

	// Unsubscribe 取消订阅主题
	Unsubscribe(topic string)

	// GetSubscribers 获取指定主题的所有订阅者 channel
	GetSubscribers(topic string) []chan *AgentMessage

	// Close 关闭存储(可选的清理操作)
	Close() error
}

ChannelStore 定义 channel 存储接口,允许第三方实现不同的存储后端 例如:内存、Redis、NATS、Kafka 等

type CollaborationType

type CollaborationType string

CollaborationType defines the type of collaboration

const (
	CollaborationTypeParallel     CollaborationType = "parallel"
	CollaborationTypeSequential   CollaborationType = "sequential"
	CollaborationTypeHierarchical CollaborationType = "hierarchical"
	CollaborationTypeConsensus    CollaborationType = "consensus"
	CollaborationTypePipeline     CollaborationType = "pipeline"
)

type CollaborativeAgent

type CollaborativeAgent interface {
	core.Agent

	// GetRole returns the agent's role
	GetRole() Role

	// SetRole sets the agent's role
	SetRole(role Role)

	// ReceiveMessage handles incoming messages
	ReceiveMessage(ctx context.Context, message Message) error

	// SendMessage sends a message to another agent
	SendMessage(ctx context.Context, message Message) error

	// Collaborate participates in a collaborative task
	Collaborate(ctx context.Context, task *CollaborativeTask) (*Assignment, error)

	// Vote participates in consensus decision making
	Vote(ctx context.Context, proposal interface{}) (bool, error)
}

CollaborativeAgent interface for agents that can collaborate

type CollaborativeTask

type CollaborativeTask struct {
	ID          string                 `json:"id"`
	Name        string                 `json:"name"`
	Description string                 `json:"description"`
	Type        CollaborationType      `json:"type"`
	Input       interface{}            `json:"input"`
	Output      interface{}            `json:"output,omitempty"`
	Status      TaskStatus             `json:"status"`
	Assignments map[string]Assignment  `json:"assignments"`
	Results     map[string]interface{} `json:"results"`
	StartTime   time.Time              `json:"start_time"`
	EndTime     time.Time              `json:"end_time,omitempty"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

CollaborativeTask represents a task for multi-agent collaboration

type Communicator

type Communicator interface {
	// Send 发送消息
	Send(ctx context.Context, to string, message *AgentMessage) error

	// Receive 接收消息
	Receive(ctx context.Context) (*AgentMessage, error)

	// Broadcast 广播消息
	Broadcast(ctx context.Context, message *AgentMessage) error

	// Subscribe 订阅主题
	Subscribe(ctx context.Context, topic string) (<-chan *AgentMessage, error)

	// Unsubscribe 取消订阅
	Unsubscribe(ctx context.Context, topic string) error

	// Close 关闭
	Close() error
}

Communicator Agent 通信器接口

type InMemoryChannelStore

type InMemoryChannelStore struct {
	// contains filtered or unexported fields
}

InMemoryChannelStore 内存实现的 ChannelStore

func NewInMemoryChannelStore

func NewInMemoryChannelStore() *InMemoryChannelStore

NewInMemoryChannelStore 创建内存 channel 存储

func (*InMemoryChannelStore) Close

func (s *InMemoryChannelStore) Close() error

Close 实现 ChannelStore 接口

func (*InMemoryChannelStore) GetChannel

func (s *InMemoryChannelStore) GetChannel(agentID string) chan *AgentMessage

GetChannel 实现 ChannelStore 接口

func (*InMemoryChannelStore) GetOrCreateChannel

func (s *InMemoryChannelStore) GetOrCreateChannel(agentID string) chan *AgentMessage

GetOrCreateChannel 实现 ChannelStore 接口

func (*InMemoryChannelStore) GetSubscribers

func (s *InMemoryChannelStore) GetSubscribers(topic string) []chan *AgentMessage

GetSubscribers 实现 ChannelStore 接口

func (*InMemoryChannelStore) ListChannels

func (s *InMemoryChannelStore) ListChannels() []string

ListChannels 实现 ChannelStore 接口

func (*InMemoryChannelStore) Subscribe

func (s *InMemoryChannelStore) Subscribe(topic string) <-chan *AgentMessage

Subscribe 实现 ChannelStore 接口

func (*InMemoryChannelStore) Unsubscribe

func (s *InMemoryChannelStore) Unsubscribe(topic string)

Unsubscribe 实现 ChannelStore 接口

type MemoryCommunicator

type MemoryCommunicator struct {
	// contains filtered or unexported fields
}

MemoryCommunicator 内存通信器(单机多Agent)

func NewMemoryCommunicator

func NewMemoryCommunicator(agentID string) *MemoryCommunicator

NewMemoryCommunicator 创建内存通信器,使用全局默认的 ChannelStore

func NewMemoryCommunicatorWithStore

func NewMemoryCommunicatorWithStore(agentID string, store ChannelStore) *MemoryCommunicator

NewMemoryCommunicatorWithStore 创建内存通信器,使用指定的 ChannelStore

func (*MemoryCommunicator) Broadcast

func (c *MemoryCommunicator) Broadcast(ctx context.Context, message *AgentMessage) error

Broadcast 广播消息

func (*MemoryCommunicator) Close

func (c *MemoryCommunicator) Close() error

Close 关闭

func (*MemoryCommunicator) Receive

func (c *MemoryCommunicator) Receive(ctx context.Context) (*AgentMessage, error)

Receive 接收消息

func (*MemoryCommunicator) Send

func (c *MemoryCommunicator) Send(ctx context.Context, to string, message *AgentMessage) error

Send 发送消息

func (*MemoryCommunicator) Subscribe

func (c *MemoryCommunicator) Subscribe(ctx context.Context, topic string) (<-chan *AgentMessage, error)

Subscribe 订阅主题

func (*MemoryCommunicator) Unsubscribe

func (c *MemoryCommunicator) Unsubscribe(ctx context.Context, topic string) error

Unsubscribe 取消订阅

type Message

type Message struct {
	ID        string                 `json:"id"`
	From      string                 `json:"from"`
	To        string                 `json:"to"`
	Type      MessageType            `json:"type"`
	Content   interface{}            `json:"content"`
	Priority  int                    `json:"priority"`
	Timestamp time.Time              `json:"timestamp"`
	ReplyTo   string                 `json:"reply_to,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

Message represents a message between agents

type MessageRouter

type MessageRouter struct {
	// contains filtered or unexported fields
}

MessageRouter 消息路由器

func NewMessageRouter

func NewMessageRouter() *MessageRouter

NewAgentMessageRouter 创建消息路由器

func (*MessageRouter) RegisterPatternRoute

func (r *MessageRouter) RegisterPatternRoute(pattern string, handler RouteHandler) error

RegisterPatternRoute 注册模式路由(正则)

func (*MessageRouter) RegisterRoute

func (r *MessageRouter) RegisterRoute(pattern string, handler RouteHandler) error

RegisterRoute 注册路由

func (*MessageRouter) Route

func (r *MessageRouter) Route(ctx context.Context, message *AgentMessage) (*AgentMessage, error)

Route 路由消息

func (*MessageRouter) UnregisterRoute

func (r *MessageRouter) UnregisterRoute(pattern string)

UnregisterRoute 注销路由

type MessageType

type MessageType string

MessageType defines the type of message

const (
	MessageTypeRequest      MessageType = "request"
	MessageTypeResponse     MessageType = "response"
	MessageTypeBroadcast    MessageType = "broadcast"
	MessageTypeNotification MessageType = "notification"
	MessageTypeCommand      MessageType = "command"
	MessageTypeReport       MessageType = "report"
	MessageTypeVote         MessageType = "vote"
)

type MultiAgentSystem

type MultiAgentSystem struct {
	// contains filtered or unexported fields
}

MultiAgentSystem manages multiple agents working together

func NewMultiAgentSystem

func NewMultiAgentSystem(log loggercore.Logger, opts ...SystemOption) *MultiAgentSystem

NewMultiAgentSystem creates a new multi-agent system

func (*MultiAgentSystem) Close added in v0.6.0

func (s *MultiAgentSystem) Close() error

Close 优雅关闭多智能体系统

func (*MultiAgentSystem) CreateTeam

func (s *MultiAgentSystem) CreateTeam(team *Team) error

CreateTeam creates a new team of agents

func (*MultiAgentSystem) ExecuteTask

ExecuteTask executes a collaborative task

func (*MultiAgentSystem) RegisterAgent

func (s *MultiAgentSystem) RegisterAgent(id string, agent CollaborativeAgent) error

RegisterAgent registers an agent in the system

func (*MultiAgentSystem) SendMessage

func (s *MultiAgentSystem) SendMessage(message Message) error

SendMessage sends a message between agents

func (*MultiAgentSystem) UnregisterAgent

func (s *MultiAgentSystem) UnregisterAgent(id string) error

UnregisterAgent removes an agent from the system

type NATSCommunicator

type NATSCommunicator struct {
	// contains filtered or unexported fields
}

NATSCommunicator NATS 通信器(分布式)

func NewNATSCommunicator

func NewNATSCommunicator(agentID string, config *NATSConfig) (*NATSCommunicator, error)

NewNATSCommunicator 创建 NATS 通信器

func (*NATSCommunicator) Broadcast

func (c *NATSCommunicator) Broadcast(ctx context.Context, message *AgentMessage) error

Broadcast 广播消息

func (*NATSCommunicator) Close

func (c *NATSCommunicator) Close() error

Close 关闭

func (*NATSCommunicator) Receive

func (c *NATSCommunicator) Receive(ctx context.Context) (*AgentMessage, error)

Receive 接收消息

func (*NATSCommunicator) Send

func (c *NATSCommunicator) Send(ctx context.Context, to string, message *AgentMessage) error

Send 发送消息

func (*NATSCommunicator) Subscribe

func (c *NATSCommunicator) Subscribe(ctx context.Context, topic string) (<-chan *AgentMessage, error)

Subscribe 订阅主题

func (*NATSCommunicator) Unsubscribe

func (c *NATSCommunicator) Unsubscribe(ctx context.Context, topic string) error

Unsubscribe 取消订阅

type NATSConfig

type NATSConfig struct {
	URL         string
	ClusterID   string
	Credentials string
	TLS         *tls.Config
}

NATSConfig NATS 配置

type NegotiatingAgent

type NegotiatingAgent struct {
	*BaseCollaborativeAgent
	// contains filtered or unexported fields
}

NegotiatingAgent demonstrates an agent that can negotiate

func NewNegotiatingAgent

func NewNegotiatingAgent(id string, system *MultiAgentSystem) *NegotiatingAgent

NewNegotiatingAgent creates a new negotiating agent

func (*NegotiatingAgent) Negotiate

func (a *NegotiatingAgent) Negotiate(ctx context.Context, proposal interface{}, partners []string) (interface{}, error)

Negotiate conducts negotiation with other agents

type NegotiationRound

type NegotiationRound struct {
	Round    int                    `json:"round"`
	Proposal interface{}            `json:"proposal"`
	Offers   map[string]interface{} `json:"offers"`
	Accepted bool                   `json:"accepted"`
}

NegotiationRound represents a round of negotiation

type PipelineStage added in v0.6.0

type PipelineStage struct {
	Name        string                 `json:"name"`                  // 阶段名称
	Description string                 `json:"description,omitempty"` // 阶段描述
	Config      map[string]interface{} `json:"config,omitempty"`      // 阶段配置参数
}

PipelineStage 定义 Pipeline 任务的阶段结构 支持更直观的阶段配置,包含名称、描述和配置参数

type Plan added in v0.7.0

type Plan struct {
	Tasks       []TaskAssignment `json:"tasks"`
	Strategy    string           `json:"strategy,omitempty"`
	Description string           `json:"description,omitempty"`
}

Plan represents a structured plan from a leader agent

type Role

type Role string

Role defines the role of an agent in collaboration

const (
	RoleLeader      Role = "leader"
	RoleWorker      Role = "worker"
	RoleCoordinator Role = "coordinator"
	RoleSpecialist  Role = "specialist"
	RoleValidator   Role = "validator"
	RoleObserver    Role = "observer"
)

type RouteHandler

type RouteHandler func(ctx context.Context, message *AgentMessage) (*AgentMessage, error)

RouteHandler 路由处理器

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager 会话管理器

func NewSessionManager

func NewSessionManager() *SessionManager

NewSessionManager 创建会话管理器

func (*SessionManager) AddMessage

func (m *SessionManager) AddMessage(sessionID string, message *AgentMessage) error

AddMessage 添加消息到会话

func (*SessionManager) CloseSession

func (m *SessionManager) CloseSession(sessionID string) error

CloseSession 关闭会话

func (*SessionManager) CreateSession

func (m *SessionManager) CreateSession(participants []string) (*AgentSession, error)

CreateSession 创建会话

func (*SessionManager) GetSession

func (m *SessionManager) GetSession(sessionID string) (*AgentSession, error)

GetSession 获取会话

type SpecializedAgent

type SpecializedAgent struct {
	*BaseCollaborativeAgent
	// contains filtered or unexported fields
}

SpecializedAgent demonstrates a specialized collaborative agent

func NewSpecializedAgent

func NewSpecializedAgent(id, specialization string, system *MultiAgentSystem) *SpecializedAgent

NewSpecializedAgent creates a new specialized agent

func (*SpecializedAgent) Collaborate

func (a *SpecializedAgent) Collaborate(ctx context.Context, task *CollaborativeTask) (*Assignment, error)

Collaborate overrides base collaboration with specialized logic

type SystemOption

type SystemOption func(*MultiAgentSystem)

SystemOption configures the multi-agent system

func WithMaxAgents

func WithMaxAgents(max int) SystemOption

WithMaxAgents sets the maximum number of agents

func WithTimeout

func WithTimeout(timeout time.Duration) SystemOption

WithTimeout sets the default timeout

type TaskAssignment added in v0.7.0

type TaskAssignment struct {
	WorkerID string      `json:"worker_id"`
	Task     interface{} `json:"task"`
}

TaskAssignment represents a task assigned to a worker

type TaskStatus

type TaskStatus string

TaskStatus represents the status of a task

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusAssigned  TaskStatus = "assigned"
	TaskStatusExecuting TaskStatus = "executing"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCancelled TaskStatus = "cancelled"
)

type Team

type Team struct {
	ID           string                 `json:"id"`
	Name         string                 `json:"name"`
	Leader       string                 `json:"leader"`
	Members      []string               `json:"members"`
	Purpose      string                 `json:"purpose"`
	Capabilities []string               `json:"capabilities"`
	Metadata     map[string]interface{} `json:"metadata,omitempty"`
}

Team represents a team of agents

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL