stars

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2025 License: MIT Imports: 7 Imported by: 0

README

Stars (群星)

Stars 是 Aster 框架中的多 Agent 协作组件,负责管理多个 Agent 之间的协作、通信和任务执行。

概念

Stars (群星) 就像夜空中的群星一样,多个 Agent 组成一个协作单元,共同完成任务。Stars 提供:

  • 动态成员管理:Agent 可以随时加入或离开
  • 角色分工:Leader 和 Worker 两种角色
  • 消息通信:广播和点对点消息
  • 任务执行:Leader-Worker 协作模式

核心概念

角色 (Role)

Stars 支持两种角色:

  • Leader (领导者):负责协调和决策,接收任务并分配给 Workers
  • Worker (工作者):负责执行具体任务
成员 (Member)

每个成员包含:

  • AgentID: Agent 的唯一标识
  • Role: 角色(Leader 或 Worker)
  • Tags: 能力标签(可选)
消息 (Message)

消息包含:

  • From: 发送者 Agent ID
  • To: 接收者 Agent ID(空表示广播)
  • Text: 消息内容
  • Time: 发送时间

核心 API

创建 Stars
stars := stars.New(cosmos, "DevTeam")
添加成员
// 添加 Leader
stars.Join("leader-1", stars.RoleLeader)

// 添加 Workers
stars.Join("worker-1", stars.RoleWorker)
stars.Join("worker-2", stars.RoleWorker)
移除成员
stars.Leave("worker-1")
查看成员
members := stars.Members()
for _, m := range members {
    fmt.Printf("%s (%s)\n", m.AgentID, m.Role)
}
发送消息
// 点对点消息
stars.Send(ctx, "leader-1", "worker-1", "请处理任务 A")

// 广播消息
stars.Broadcast(ctx, "开始新的迭代")
执行任务
// 使用 Leader-Worker 模式执行任务
for event, err := range stars.Run(ctx, "开发用户认证功能") {
    if err != nil {
        log.Printf("Error: %v", err)
        continue
    }

    fmt.Printf("[%s] %s: %s\n",
        event.AgentID,
        event.Type,
        event.Content)
}
查看历史
history := stars.History()
for _, msg := range history {
    if msg.To == "" {
        fmt.Printf("[广播] %s: %s\n", msg.From, msg.Text)
    } else {
        fmt.Printf("%s → %s: %s\n", msg.From, msg.To, msg.Text)
    }
}

使用场景

1. 开发团队协作
// 创建开发团队
devTeam := stars.New(cosmos, "DevTeam")

// 添加成员
devTeam.Join("tech-lead", stars.RoleLeader)
devTeam.Join("frontend-dev", stars.RoleWorker)
devTeam.Join("backend-dev", stars.RoleWorker)
devTeam.Join("qa-engineer", stars.RoleWorker)

// 执行开发任务
for event := range devTeam.Run(ctx, "开发新功能") {
    // 处理事件
}
2. 数据处理流水线
// 创建数据处理团队
pipeline := stars.New(cosmos, "DataPipeline")

pipeline.Join("coordinator", stars.RoleLeader)
pipeline.Join("collector", stars.RoleWorker)
pipeline.Join("processor", stars.RoleWorker)
pipeline.Join("analyzer", stars.RoleWorker)

// 协调数据处理
pipeline.Run(ctx, "处理今日数据")
3. 客服团队
// 创建客服团队
support := stars.New(cosmos, "SupportTeam")

support.Join("supervisor", stars.RoleLeader)
support.Join("agent-1", stars.RoleWorker)
support.Join("agent-2", stars.RoleWorker)
support.Join("agent-3", stars.RoleWorker)

// 处理客户请求
support.Run(ctx, "处理客户咨询")

协作模式

Leader-Worker 模式

Stars 使用 Leader-Worker 模式进行任务执行:

  1. 任务分配:Leader 接收任务
  2. 任务分解:Leader 将任务分解为子任务
  3. 任务执行:Workers 执行子任务
  4. 结果汇总:Leader 汇总结果
┌─────────┐
│  Task   │
└────┬────┘
     │
     ↓
┌─────────┐
│ Leader  │ ← 接收任务,分解并分配
└────┬────┘
     │
     ├──→ Worker 1 ← 执行子任务 A
     ├──→ Worker 2 ← 执行子任务 B
     └──→ Worker 3 ← 执行子任务 C
     │
     ↓
┌─────────┐
│ Result  │ ← Leader 汇总结果
└─────────┘

与 Room 的区别

Stars 是 Room 的增强版本,主要改进:

Room Stars
Room Stars (群星)
无角色系统 Leader/Worker 角色
简单消息路由 完整的协作模式
无任务执行 Run() 方法执行任务

示例

查看 examples/stars 目录获取完整示例:

  • basic/: 基本使用示例
  • dynamic/: 动态成员管理示例

最佳实践

  1. 明确角色分工:确保有且只有一个 Leader

    // 先添加 Leader
    stars.Join("leader-1", stars.RoleLeader)
    
    // 再添加 Workers
    stars.Join("worker-1", stars.RoleWorker)
    stars.Join("worker-2", stars.RoleWorker)
    
  2. 使用有意义的名称:给 Stars 和 Agent 起有意义的名称

    devTeam := stars.New(cosmos, "DevTeam")
    supportTeam := stars.New(cosmos, "SupportTeam")
    
  3. 动态调整成员:根据负载动态添加或移除 Workers

    // 负载高时添加 Worker
    if load > threshold {
        stars.Join(newWorkerID, stars.RoleWorker)
    }
    
    // 负载低时移除 Worker
    if load < threshold {
        stars.Leave(idleWorkerID)
    }
    
  4. 消息历史管理:定期清理或持久化消息历史

    // Stars 自动限制历史记录为最近 100 条
    history := stars.History()
    
  5. 错误处理:处理 Run() 返回的错误

    for event, err := range stars.Run(ctx, task) {
        if err != nil {
            log.Printf("Error: %v", err)
            // 决定是否继续或中断
            continue
        }
        // 处理事件
    }
    
  6. 异步消息:消息发送是异步的,不会阻塞

    // 发送消息后立即返回
    stars.Send(ctx, from, to, message)
    stars.Broadcast(ctx, message)
    

未来扩展

Stars 的设计支持未来扩展:

  • 更多角色类型:Coordinator、Observer、Specialist 等
  • 更多协作模式:Democratic(民主投票)、Swarm(集群自组织)等
  • 消息持久化:将消息历史持久化到数据库
  • 更丰富的监控:成员状态、任务进度等

当前版本保持简单实用,专注于核心功能。

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AccessPolicy

type AccessPolicy struct {
	ReadAllow   []string `json:"read_allow"`   // 允许读取的Agent
	WriteAllow  []string `json:"write_allow"`  // 允许写入的Agent
	DeleteAllow []string `json:"delete_allow"` // 允许删除的Agent
	AdminAllow  []string `json:"admin_allow"`  // 允许管理的Agent
	Public      bool     `json:"public"`       // 是否公开
}

AccessPolicy 访问策略

type AgentConnection

type AgentConnection struct {
	ID               string                 `json:"id"`
	Name             string                 `json:"name"`
	Address          string                 `json:"address"`
	SubscribedStates []string               `json:"subscribed_states"`
	ConnectionType   ConnectionType         `json:"connection_type"`
	Status           ConnectionStatus       `json:"status"`
	LastHeartbeat    time.Time              `json:"last_heartbeat"`
	Metadata         map[string]interface{} `json:"metadata"`

	// 同步状态
	SyncStatus     map[string]SyncStatus `json:"sync_status"`
	PendingUpdates []StateUpdate         `json:"pending_updates"`
	ConflictCount  int64                 `json:"conflict_count"`
	// contains filtered or unexported fields
}

AgentConnection Agent连接

type ConflictDetector

type ConflictDetector struct {
	// contains filtered or unexported fields
}

ConflictDetector 冲突检测器

func NewConflictDetector

func NewConflictDetector(eventBus EventBus) *ConflictDetector

NewConflictDetector 创建冲突检测器

func (*ConflictDetector) AddRule

func (cd *ConflictDetector) AddRule(rule ConflictRule)

AddRule 添加冲突规则

func (*ConflictDetector) DetectConflict

func (cd *ConflictDetector) DetectConflict(event StateEvent, currentState *SharedState) (*ConflictInfo, error)

DetectConflict 检测冲突

func (*ConflictDetector) ResolveConflict

func (cd *ConflictDetector) ResolveConflict(conflict *ConflictInfo, strategy ConflictResolution) (*SharedState, error)

ResolveConflict 解决冲突

type ConflictInfo

type ConflictInfo struct {
	ConflictID  string                  `json:"conflict_id"`
	Type        ConflictType            `json:"type"`
	Description string                  `json:"description"`
	States      map[string]*SharedState `json:"states"`
	Operations  []PendingOperation      `json:"operations"`
	Resolution  ConflictResolution      `json:"resolution"`
	ResolvedAt  time.Time               `json:"resolved_at,omitempty"`
	ResolvedBy  string                  `json:"resolved_by,omitempty"`
	Metadata    map[string]interface{}  `json:"metadata"`
}

ConflictInfo 冲突信息

type ConflictResolution

type ConflictResolution string

ConflictResolution 冲突解决策略

const (
	ConflictResolutionLastWrite ConflictResolution = "last_write" // 最后写入优先
	ConflictResolutionMerge     ConflictResolution = "merge"      // 合并冲突
	ConflictResolutionReject    ConflictResolution = "reject"     // 拒绝冲突
	ConflictResolutionCustom    ConflictResolution = "custom"     // 自定义策略
)

type ConflictRule

type ConflictRule interface {
	DetectConflict(event StateEvent, currentState *SharedState) (*ConflictInfo, error)
	ResolveConflict(conflict *ConflictInfo, strategy ConflictResolution) (*SharedState, error)
}

ConflictRule 冲突规则

type ConflictType

type ConflictType string

ConflictType 冲突类型

const (
	ConflictTypeWriteWrite ConflictType = "write_write" // 写-写冲突
	ConflictTypeReadWrite  ConflictType = "read_write"  // 读-写冲突
	ConflictTypeVersion    ConflictType = "version"     // 版本冲突
	ConflictTypeSchema     ConflictType = "schema"      // 模式冲突
	ConflictTypeCustom     ConflictType = "custom"      // 自定义冲突
)

type ConnectionStatus

type ConnectionStatus string

ConnectionStatus 连接状态

const (
	ConnectionStatusConnected    ConnectionStatus = "connected"    // 已连接
	ConnectionStatusDisconnected ConnectionStatus = "disconnected" // 已断开
	ConnectionStatusConnecting   ConnectionStatus = "connecting"   // 连接中
	ConnectionStatusError        ConnectionStatus = "error"        // 错误
	ConnectionStatusReconnecting ConnectionStatus = "reconnecting" // 重连中
)

type ConnectionType

type ConnectionType string

ConnectionType 连接类型

const (
	ConnectionTypePush          ConnectionType = "push"          // 推送模式
	ConnectionTypePull          ConnectionType = "pull"          // 拉取模式
	ConnectionTypeBidirectional ConnectionType = "bidirectional" // 双向模式
)

type ConsistencyLevel

type ConsistencyLevel string

ConsistencyLevel 一致性级别

const (
	ConsistencyLevelStrong   ConsistencyLevel = "strong"   // 强一致性
	ConsistencyLevelWeak     ConsistencyLevel = "weak"     // 弱一致性
	ConsistencyLevelEventual ConsistencyLevel = "eventual" // 最终一致性
)

type DatabasePersistence

type DatabasePersistence struct {
	// contains filtered or unexported fields
}

DatabasePersistence 数据库持久化实现

func NewDatabasePersistence

func NewDatabasePersistence(config *PersistenceConfig) (*DatabasePersistence, error)

NewDatabasePersistence 创建数据库持久化

func (*DatabasePersistence) Backup

func (dp *DatabasePersistence) Backup(ctx context.Context, path string) error

Backup 备份数据库到指定路径

func (*DatabasePersistence) Close

func (dp *DatabasePersistence) Close() error

Close 关闭数据库连接

func (*DatabasePersistence) DeleteState

func (dp *DatabasePersistence) DeleteState(ctx context.Context, stateID string) error

DeleteState 从数据库删除状态

func (*DatabasePersistence) ListStates

func (dp *DatabasePersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)

ListStates 列出数据库中的状态

func (*DatabasePersistence) LoadState

func (dp *DatabasePersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)

LoadState 从数据库加载状态

func (*DatabasePersistence) Restore

func (dp *DatabasePersistence) Restore(ctx context.Context, path string) error

Restore 从指定路径恢复数据库

func (*DatabasePersistence) SaveState

func (dp *DatabasePersistence) SaveState(ctx context.Context, state *SharedState) error

SaveState 保存状态到数据库

func (*DatabasePersistence) UpdateState

func (dp *DatabasePersistence) UpdateState(ctx context.Context, state *SharedState) error

UpdateState 更新数据库中的状态

type DefaultConflictRule

type DefaultConflictRule struct{}

DefaultConflictRule 默认冲突规则

func (*DefaultConflictRule) DetectConflict

func (dcr *DefaultConflictRule) DetectConflict(event StateEvent, currentState *SharedState) (*ConflictInfo, error)

DetectConflict 检测冲突

func (*DefaultConflictRule) ResolveConflict

func (dcr *DefaultConflictRule) ResolveConflict(conflict *ConflictInfo, strategy ConflictResolution) (*SharedState, error)

ResolveConflict 解决冲突

type Event

type Event struct {
	AgentID string    // 产生事件的 Agent ID
	Type    string    // 事件类型
	Content string    // 事件内容
	Time    time.Time // 事件时间
}

Event 群星事件

type EventBus

type EventBus interface {
	Subscribe(eventType EventType, handler EventHandler) error
	Unsubscribe(eventType EventType, handler EventHandler) error
	Publish(event StateEvent) error
	PublishAsync(event StateEvent) error
	Close() error
}

EventBus 事件总线接口

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, event StateEvent) error
}

EventHandler 事件处理器

type EventType

type EventType string

EventType 事件类型

const (
	// 状态事件
	EventTypeStateCreated EventType = "state_created"
	EventTypeStateUpdated EventType = "state_updated"
	EventTypeStateDeleted EventType = "state_deleted"
	EventTypeStateRead    EventType = "state_read"

	// 同步事件
	EventTypeSyncStarted   EventType = "sync_started"
	EventTypeSyncCompleted EventType = "sync_completed"
	EventTypeSyncFailed    EventType = "sync_failed"
	EventTypeSyncConflict  EventType = "sync_conflict"

	// 冲突事件
	EventTypeConflictDetected EventType = "conflict_detected"
	EventTypeConflictResolved EventType = "conflict_resolved"

	// Agent事件
	EventTypeAgentConnected    EventType = "agent_connected"
	EventTypeAgentDisconnected EventType = "agent_disconnected"
	EventTypeAgentHeartbeat    EventType = "agent_heartbeat"

	// 系统事件
	EventTypeSystemStarted  EventType = "system_started"
	EventTypeSystemShutdown EventType = "system_shutdown"
	EventTypeConfigChanged  EventType = "config_changed"
)

type FilePersistence

type FilePersistence struct {
	// contains filtered or unexported fields
}

FilePersistence 文件持久化实现

func NewFilePersistence

func NewFilePersistence(config *PersistenceConfig) (*FilePersistence, error)

NewFilePersistence 创建文件持久化

func (*FilePersistence) Backup

func (fp *FilePersistence) Backup(ctx context.Context, path string) error

Backup 备份到指定路径

func (*FilePersistence) Close

func (fp *FilePersistence) Close() error

Close 关闭文件持久化

func (*FilePersistence) DeleteState

func (fp *FilePersistence) DeleteState(ctx context.Context, stateID string) error

DeleteState 从文件删除状态

func (*FilePersistence) ListStates

func (fp *FilePersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)

ListStates 列出文件中的状态

func (*FilePersistence) LoadState

func (fp *FilePersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)

LoadState 从文件加载状态

func (*FilePersistence) Restore

func (fp *FilePersistence) Restore(ctx context.Context, path string) error

Restore 从指定路径恢复

func (*FilePersistence) SaveState

func (fp *FilePersistence) SaveState(ctx context.Context, state *SharedState) error

SaveState 保存状态到文件

func (*FilePersistence) UpdateState

func (fp *FilePersistence) UpdateState(ctx context.Context, state *SharedState) error

UpdateState 更新文件中的状态

type InMemoryEventBus

type InMemoryEventBus struct {
	// contains filtered or unexported fields
}

InMemoryEventBus 内存事件总线

func NewInMemoryEventBus

func NewInMemoryEventBus(bufferSize int) *InMemoryEventBus

NewInMemoryEventBus 创建内存事件总线

func (*InMemoryEventBus) Close

func (bus *InMemoryEventBus) Close() error

Close 关闭事件总线

func (*InMemoryEventBus) Publish

func (bus *InMemoryEventBus) Publish(event StateEvent) error

Publish 同步发布事件

func (*InMemoryEventBus) PublishAsync

func (bus *InMemoryEventBus) PublishAsync(event StateEvent) error

PublishAsync 异步发布事件

func (*InMemoryEventBus) Subscribe

func (bus *InMemoryEventBus) Subscribe(eventType EventType, handler EventHandler) error

Subscribe 订阅事件

func (*InMemoryEventBus) Unsubscribe

func (bus *InMemoryEventBus) Unsubscribe(eventType EventType, handler EventHandler) error

Unsubscribe 取消订阅事件

type Member

type Member struct {
	AgentID string   // Agent ID
	Role    Role     // 角色
	Tags    []string // 能力标签(可选)
}

Member 群星成员

type MemoryPersistence

type MemoryPersistence struct {
	// contains filtered or unexported fields
}

MemoryPersistence 内存持久化实现

func NewMemoryPersistence

func NewMemoryPersistence() *MemoryPersistence

NewMemoryPersistence 创建内存持久化

func (*MemoryPersistence) Backup

func (mp *MemoryPersistence) Backup(ctx context.Context, path string) error

Backup 备份

func (*MemoryPersistence) Close

func (mp *MemoryPersistence) Close() error

Close 关闭

func (*MemoryPersistence) DeleteState

func (mp *MemoryPersistence) DeleteState(ctx context.Context, stateID string) error

DeleteState 删除状态

func (*MemoryPersistence) ListStates

func (mp *MemoryPersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)

ListStates 列出状态

func (*MemoryPersistence) LoadState

func (mp *MemoryPersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)

LoadState 加载状态

func (*MemoryPersistence) Restore

func (mp *MemoryPersistence) Restore(ctx context.Context, path string) error

Restore 恢复

func (*MemoryPersistence) SaveState

func (mp *MemoryPersistence) SaveState(ctx context.Context, state *SharedState) error

SaveState 保存状态

func (*MemoryPersistence) UpdateState

func (mp *MemoryPersistence) UpdateState(ctx context.Context, state *SharedState) error

UpdateState 更新状态

type Message

type Message struct {
	From string    // 发送者 Agent ID
	To   string    // 接收者 Agent ID(空表示广播)
	Text string    // 消息内容
	Time time.Time // 发送时间
}

Message 群星消息

type OperationType

type OperationType string

OperationType 操作类型

const (
	OperationTypeSet    OperationType = "set"    // 设置值
	OperationTypeDelete OperationType = "delete" // 删除值
	OperationTypeMerge  OperationType = "merge"  // 合并值
	OperationTypeCAS    OperationType = "cas"    // Compare-And-Set
)

type PendingOperation

type PendingOperation struct {
	ID        string                 `json:"id"`
	Type      OperationType          `json:"type"`
	Key       string                 `json:"key"`
	Value     interface{}            `json:"value"`
	OldValue  interface{}            `json:"old_value"`
	Timestamp time.Time              `json:"timestamp"`
	AgentID   string                 `json:"agent_id"`
	Retry     int                    `json:"retry"`
	Metadata  map[string]interface{} `json:"metadata"`
}

PendingOperation 待处理操作

type PersistenceConfig

type PersistenceConfig struct {
	Type             PersistenceType          `json:"type"`
	ConnectionString string                   `json:"connection_string"`
	Timeout          time.Duration            `json:"timeout"`
	MaxConnections   int                      `json:"max_connections"`
	BatchSize        int                      `json:"batch_size"`
	Compression      bool                     `json:"compression"`
	Encryption       bool                     `json:"encryption"`
	EncryptionKey    string                   `json:"encryption_key,omitempty"`
	Retention        map[string]time.Duration `json:"retention"`
}

PersistenceConfig 持久化配置

type PersistenceLayer

type PersistenceLayer interface {
	SaveState(ctx context.Context, state *SharedState) error
	LoadState(ctx context.Context, stateID string) (*SharedState, error)
	UpdateState(ctx context.Context, state *SharedState) error
	DeleteState(ctx context.Context, stateID string) error
	ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)
	Backup(ctx context.Context, path string) error
	Restore(ctx context.Context, path string) error
	Close() error
}

PersistenceLayer 持久化层接口

func NewPersistenceLayer

func NewPersistenceLayer(config *PersistenceConfig) (PersistenceLayer, error)

NewPersistenceLayer 创建持久化层

type PersistenceType

type PersistenceType string

PersistenceType 持久化类型

const (
	PersistenceTypeMemory   PersistenceType = "memory"   // 内存持久化
	PersistenceTypeFile     PersistenceType = "file"     // 文件持久化
	PersistenceTypeDatabase PersistenceType = "database" // 数据库持久化
	PersistenceTypeRedis    PersistenceType = "redis"    // Redis持久化
)

type RedisPersistence

type RedisPersistence struct {
	// contains filtered or unexported fields
}

RedisPersistence Redis持久化实现

func NewRedisPersistence

func NewRedisPersistence(config *PersistenceConfig) (*RedisPersistence, error)

NewRedisPersistence 创建Redis持久化

func (*RedisPersistence) Backup

func (rp *RedisPersistence) Backup(ctx context.Context, path string) error

Backup 备份Redis到指定路径

func (*RedisPersistence) Close

func (rp *RedisPersistence) Close() error

Close 关闭Redis连接

func (*RedisPersistence) DeleteState

func (rp *RedisPersistence) DeleteState(ctx context.Context, stateID string) error

DeleteState 从Redis删除状态

func (*RedisPersistence) ListStates

func (rp *RedisPersistence) ListStates(ctx context.Context, filters map[string]interface{}) ([]*SharedState, error)

ListStates 列出Redis中的状态

func (*RedisPersistence) LoadState

func (rp *RedisPersistence) LoadState(ctx context.Context, stateID string) (*SharedState, error)

LoadState 从Redis加载状态

func (*RedisPersistence) Restore

func (rp *RedisPersistence) Restore(ctx context.Context, path string) error

Restore 从指定路径恢复Redis

func (*RedisPersistence) SaveState

func (rp *RedisPersistence) SaveState(ctx context.Context, state *SharedState) error

SaveState 保存状态到Redis

func (*RedisPersistence) UpdateState

func (rp *RedisPersistence) UpdateState(ctx context.Context, state *SharedState) error

UpdateState 更新Redis中的状态

type Role

type Role string

Role Agent 在群星中的角色

const (
	// RoleLeader 领导者 - 负责协调和决策
	RoleLeader Role = "leader"
	// RoleWorker 工作者 - 负责执行任务
	RoleWorker Role = "worker"
)

type SharedState

type SharedState struct {
	// 基本信息
	ID      string    `json:"id"`
	Name    string    `json:"name"`
	Type    StateType `json:"type"`
	Owners  []string  `json:"owners"`  // 状态拥有者(Agent列表)
	Readers []string  `json:"readers"` // 状态读取者

	// 状态数据
	Data      map[string]interface{} `json:"data"`
	Version   int64                  `json:"version"`
	Timestamp time.Time              `json:"timestamp"`

	// 元数据
	Metadata map[string]interface{} `json:"metadata"`
	Tags     []string               `json:"tags"`

	// 同步信息
	LastSync   time.Time          `json:"last_sync"`
	SyncStatus SyncStatus         `json:"sync_status"`
	PendingOps []PendingOperation `json:"pending_ops"`

	// 访问控制
	AccessPolicy AccessPolicy `json:"access_policy"`
	// contains filtered or unexported fields
}

SharedState 共享状态

type Stars

type Stars struct {
	// contains filtered or unexported fields
}

Stars 群星 - 多 Agent 协作单元 Stars 是 Aster 框架中的多 Agent 协作组件, 负责管理多个 Agent 之间的协作、通信和任务执行。

func New

func New(cosmos *cosmos.Cosmos, name string) *Stars

New 创建群星协作组

func (*Stars) Broadcast

func (s *Stars) Broadcast(ctx context.Context, text string) error

Broadcast 广播消息给所有成员

func (*Stars) History

func (s *Stars) History() []Message

History 获取消息历史

func (*Stars) ID

func (s *Stars) ID() string

ID 获取群星 ID

func (*Stars) Join

func (s *Stars) Join(agentID string, role Role) error

Join 添加成员到群星

func (*Stars) Leave

func (s *Stars) Leave(agentID string) error

Leave 从群星中移除成员

func (*Stars) Members

func (s *Stars) Members() []Member

Members 获取所有成员

func (*Stars) Name

func (s *Stars) Name() string

Name 获取群星名称

func (*Stars) Run

func (s *Stars) Run(ctx context.Context, task string) iter.Seq2[*Event, error]

Run 执行任务(Leader-Worker 模式) 返回一个迭代器,流式返回执行事件

func (*Stars) Send

func (s *Stars) Send(ctx context.Context, from, to, text string) error

Send 发送点对点消息

func (*Stars) Size

func (s *Stars) Size() int

Size 获取成员数量

type StateEvent

type StateEvent struct {
	Type      EventType              `json:"type"`
	StateID   string                 `json:"state_id"`
	State     *SharedState           `json:"state,omitempty"`
	AgentID   string                 `json:"agent_id,omitempty"`
	Operation *PendingOperation      `json:"operation,omitempty"`
	Conflict  *ConflictInfo          `json:"conflict,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
	Metadata  map[string]interface{} `json:"metadata"`
}

StateEvent 状态事件

type StateEventHandler

type StateEventHandler struct {
	// contains filtered or unexported fields
}

StateEventHandler 状态事件处理器

func (*StateEventHandler) Handle

func (seh *StateEventHandler) Handle(ctx context.Context, event StateEvent) error

Handle 处理状态事件

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager 状态管理器

func NewStateManager

func NewStateManager(config *StateManagerConfig, eventBus EventBus, persistence PersistenceLayer) *StateManager

NewStateManager 创建状态管理器

func (*StateManager) CreateState

func (sm *StateManager) CreateState(ctx context.Context, stateID, name string, stateType StateType, owners []string) (*SharedState, error)

CreateState 创建共享状态

func (*StateManager) DeleteState

func (sm *StateManager) DeleteState(ctx context.Context, stateID, agentID string) error

DeleteState 删除共享状态

func (*StateManager) GetMetrics

func (sm *StateManager) GetMetrics() *StateMetrics

GetMetrics 获取状态管理指标

func (*StateManager) GetState

func (sm *StateManager) GetState(stateID string) (*SharedState, error)

GetState 获取共享状态

func (*StateManager) UpdateState

func (sm *StateManager) UpdateState(ctx context.Context, stateID string, agentID string, updates map[string]interface{}, metadata map[string]interface{}) error

UpdateState 更新共享状态

type StateManagerConfig

type StateManagerConfig struct {
	// 同步配置
	SyncInterval time.Duration `json:"sync_interval"`
	MaxRetries   int           `json:"max_retries"`
	RetryDelay   time.Duration `json:"retry_delay"`

	// 持久化配置
	EnablePersistence bool            `json:"enable_persistence"`
	PersistenceType   PersistenceType `json:"persistence_type"`
	FlushInterval     time.Duration   `json:"flush_interval"`

	// 一致性配置
	ConsistencyLevel   ConsistencyLevel   `json:"consistency_level"`
	ConflictResolution ConflictResolution `json:"conflict_resolution"`

	// 监控配置
	EnableMetrics bool `json:"enable_metrics"`
	EnableAudit   bool `json:"enable_audit"`
}

StateManagerConfig 状态管理器配置

type StateMetrics

type StateMetrics struct {
	TotalStates       int64         `json:"total_states"`
	ActiveStates      int64         `json:"active_states"`
	PendingOperations int64         `json:"pending_operations"`
	SyncErrors        int64         `json:"sync_errors"`
	Conflicts         int64         `json:"conflicts"`
	ReadOps           int64         `json:"read_ops"`
	WriteOps          int64         `json:"write_ops"`
	DeleteOps         int64         `json:"delete_ops"`
	AverageSyncTime   time.Duration `json:"average_sync_time"`
	LastSyncTime      time.Time     `json:"last_sync_time"`
}

StateMetrics 状态指标

type StateSyncManager

type StateSyncManager struct {
	// contains filtered or unexported fields
}

StateSyncManager 状态同步管理器

func NewStateSyncManager

func NewStateSyncManager(stateManager *StateManager, eventBus EventBus, config *SyncConfig) *StateSyncManager

NewStateSyncManager 创建状态同步管理器

func (*StateSyncManager) ConnectAgent

func (ssm *StateSyncManager) ConnectAgent(agentID, name, address string, connType ConnectionType, states []string) (*AgentConnection, error)

ConnectAgent 连接Agent

func (*StateSyncManager) CreateSyncTask

func (ssm *StateSyncManager) CreateSyncTask(taskType SyncTaskType, stateID string, targetAgents []string, updates []StateUpdate) (*SyncTask, error)

CreateSyncTask 创建同步任务

func (*StateSyncManager) DisconnectAgent

func (ssm *StateSyncManager) DisconnectAgent(agentID string) error

DisconnectAgent 断开Agent连接

func (*StateSyncManager) GetAgentConnection

func (ssm *StateSyncManager) GetAgentConnection(agentID string) (*AgentConnection, error)

GetAgentConnection 获取Agent连接

func (*StateSyncManager) GetMetrics

func (ssm *StateSyncManager) GetMetrics() *SyncMetrics

GetMetrics 获取同步指标

func (*StateSyncManager) ListAgentConnections

func (ssm *StateSyncManager) ListAgentConnections() map[string]*AgentConnection

ListAgentConnections 列出所有Agent连接

type StateType

type StateType string

StateType 状态类型

const (
	StateTypeGlobal   StateType = "global"   // 全局状态
	StateTypeSession  StateType = "session"  // 会话状态
	StateTypeWorkflow StateType = "workflow" // 工作流状态
	StateTypeTask     StateType = "task"     // 任务状态
	StateTypeAgent    StateType = "agent"    // Agent状态
	StateTypeResource StateType = "resource" // 资源状态
)

type StateUpdate

type StateUpdate struct {
	ID        string                 `json:"id"`
	StateID   string                 `json:"state_id"`
	AgentID   string                 `json:"agent_id"`
	Type      OperationType          `json:"type"`
	Key       string                 `json:"key"`
	OldValue  interface{}            `json:"old_value"`
	NewValue  interface{}            `json:"new_value"`
	Version   int64                  `json:"version"`
	Timestamp time.Time              `json:"timestamp"`
	Metadata  map[string]interface{} `json:"metadata"`
}

StateUpdate 状态更新

type SyncConfig

type SyncConfig struct {
	// 基础配置
	SyncInterval time.Duration `json:"sync_interval"`
	MaxRetries   int           `json:"max_retries"`
	RetryDelay   time.Duration `json:"retry_delay"`
	Timeout      time.Duration `json:"timeout"`

	// 批处理配置
	BatchSize     int           `json:"batch_size"`
	FlushInterval time.Duration `json:"flush_interval"`

	// 一致性配置
	ConsistencyLevel ConsistencyLevel   `json:"consistency_level"`
	ConflictStrategy ConflictResolution `json:"conflict_strategy"`

	// 压缩配置
	Compression     bool   `json:"compression"`
	CompressionType string `json:"compression_type"`

	// 安全配置
	Encryption    bool   `json:"encryption"`
	EncryptionKey string `json:"encryption_key"`

	// 性能配置
	MaxConcurrentSyncs int `json:"max_concurrent_syncs"`
	QueueBufferSize    int `json:"queue_buffer_size"`
}

SyncConfig 同步配置

type SyncMetrics

type SyncMetrics struct {
	TotalSyncs        int64         `json:"total_syncs"`
	SuccessfulSyncs   int64         `json:"successful_syncs"`
	FailedSyncs       int64         `json:"failed_syncs"`
	Conflicts         int64         `json:"conflicts"`
	PendingTasks      int64         `json:"pending_tasks"`
	ActiveConnections int64         `json:"active_connections"`
	AverageLatency    time.Duration `json:"average_latency"`
	LastSyncTime      time.Time     `json:"last_sync_time"`
	ThroughputPerSec  float64       `json:"throughput_per_sec"`
}

SyncMetrics 同步指标

type SyncStatus

type SyncStatus string

SyncStatus 同步状态

const (
	SyncStatusSynced   SyncStatus = "synced"   // 已同步
	SyncStatusPending  SyncStatus = "pending"  // 待同步
	SyncStatusConflict SyncStatus = "conflict" // 冲突
	SyncStatusError    SyncStatus = "error"    // 错误
	SyncStatusSyncing  SyncStatus = "syncing"  // 同步中
)

type SyncTask

type SyncTask struct {
	ID           string                 `json:"id"`
	Type         SyncTaskType           `json:"type"`
	Priority     int                    `json:"priority"`
	StateID      string                 `json:"state_id"`
	TargetAgents []string               `json:"target_agents"`
	Updates      []StateUpdate          `json:"updates"`
	CreatedAt    time.Time              `json:"created_at"`
	StartedAt    time.Time              `json:"started_at,omitempty"`
	CompletedAt  time.Time              `json:"completed_at,omitempty"`
	Status       TaskStatus             `json:"status"`
	RetryCount   int                    `json:"retry_count"`
	MaxRetries   int                    `json:"max_retries"`
	Error        string                 `json:"error,omitempty"`
	Metadata     map[string]interface{} `json:"metadata"`
}

SyncTask 同步任务

type SyncTaskType

type SyncTaskType string

SyncTaskType 同步任务类型

const (
	SyncTaskTypeBroadcast SyncTaskType = "broadcast"  // 广播同步
	SyncTaskTypeUnicast   SyncTaskType = "unicast"    // 单播同步
	SyncTaskTypeMulticast SyncTaskType = "multicast"  // 多播同步
	SyncTaskTypeFullSync  SyncTaskType = "full_sync"  // 全量同步
	SyncTaskTypeDeltaSync SyncTaskType = "delta_sync" // 增量同步
)

type TaskStatus

type TaskStatus string

TaskStatus 任务状态

const (
	TaskStatusPending   TaskStatus = "pending"   // 待执行
	TaskStatusRunning   TaskStatus = "running"   // 执行中
	TaskStatusCompleted TaskStatus = "completed" // 已完成
	TaskStatusFailed    TaskStatus = "failed"    // 失败
	TaskStatusCancelled TaskStatus = "cancelled" // 已取消
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL