Documentation
¶
Index ¶
- Variables
- func FormatHistoryForAgent(history []HistoryEntry, options *HistoryFormatOptions) string
- func InjectHistoryToAgent(a *agent.Agent, historyContext string) string
- func RestoreAgentInstructions(a *agent.Agent)
- type CancellationRecord
- type Condition
- type ConditionConfig
- type ConditionFunc
- type Config
- type ExecutionContext
- func (ec *ExecutionContext) AddMessage(msg *types.Message)
- func (ec *ExecutionContext) AddMessages(msgs []*types.Message)
- func (ec *ExecutionContext) ApplySessionState(snapshot map[string]interface{})
- func (ec *ExecutionContext) ClearMessages()
- func (ec *ExecutionContext) ExportSessionState() map[string]interface{}
- func (ec *ExecutionContext) Get(key string) (interface{}, bool)
- func (ec *ExecutionContext) GetHistoryContext() string
- func (ec *ExecutionContext) GetHistoryCount() int
- func (ec *ExecutionContext) GetHistoryInput(index int) string
- func (ec *ExecutionContext) GetHistoryOutput(index int) string
- func (ec *ExecutionContext) GetLastHistoryEntry() *HistoryEntry
- func (ec *ExecutionContext) GetMessages() []*types.Message
- func (ec *ExecutionContext) GetSessionState(key string) (interface{}, bool)
- func (ec *ExecutionContext) GetWorkflowHistory() []HistoryEntry
- func (ec *ExecutionContext) HasHistory() bool
- func (ec *ExecutionContext) MergeMetadata(metadata map[string]interface{})
- func (ec *ExecutionContext) Set(key string, value interface{})
- func (ec *ExecutionContext) SetHistoryContext(context string)
- func (ec *ExecutionContext) SetRunContextMetadata(runCtx map[string]interface{})
- func (ec *ExecutionContext) SetSessionState(key string, value interface{})
- func (ec *ExecutionContext) SetWorkflowHistory(history []HistoryEntry)
- type HistoryEntry
- type HistoryFormatOptions
- type Loop
- type LoopConditionFunc
- type LoopConfig
- type MemoryStorage
- func (m *MemoryStorage) Clear(ctx context.Context, olderThan time.Duration) (int, error)
- func (m *MemoryStorage) Close() error
- func (m *MemoryStorage) CreateSession(ctx context.Context, sessionID, workflowID, userID string) (*WorkflowSession, error)
- func (m *MemoryStorage) DeleteSession(ctx context.Context, sessionID string) error
- func (m *MemoryStorage) GetSession(ctx context.Context, sessionID string) (*WorkflowSession, error)
- func (m *MemoryStorage) GetStats(ctx context.Context) (*SessionStats, error)
- func (m *MemoryStorage) GetWorkflowStats(ctx context.Context, workflowID string) (*SessionStats, error)
- func (m *MemoryStorage) ListSessions(ctx context.Context, workflowID string, limit, offset int) ([]*WorkflowSession, error)
- func (m *MemoryStorage) ListUserSessions(ctx context.Context, userID string, limit, offset int) ([]*WorkflowSession, error)
- func (m *MemoryStorage) UpdateSession(ctx context.Context, session *WorkflowSession) error
- type Node
- type NodeType
- type Parallel
- type ParallelConfig
- type Router
- type RouterConfig
- type RouterFunc
- type RunOption
- func WithMediaPayload(payload interface{}) RunOption
- func WithMetadata(metadata map[string]interface{}) RunOption
- func WithResumeFrom(stepID string) RunOption
- func WithRunContext(rc *run.RunContext) RunOption
- func WithSessionState(state map[string]interface{}) RunOption
- func WithUserID(userID string) RunOption
- type RunStatus
- type SessionFilter
- type SessionState
- func (ss *SessionState) Clear()
- func (ss *SessionState) Clone() *SessionState
- func (ss *SessionState) Delete(key string)
- func (ss *SessionState) Get(key string) (interface{}, bool)
- func (ss *SessionState) GetAll() map[string]interface{}
- func (ss *SessionState) Merge(other *SessionState)
- func (ss *SessionState) Set(key string, value interface{})
- func (ss *SessionState) ToMap() map[string]interface{}
- type SessionStats
- type Step
- type StepConfig
- type StorageStats
- type Workflow
- type WorkflowHistoryConfig
- type WorkflowMetrics
- type WorkflowRun
- func (r *WorkflowRun) AddEvents(events run.Events)
- func (r *WorkflowRun) AddMessage(msg *types.Message)
- func (r *WorkflowRun) ApplyCancellation(reason, stepID string, snapshot map[string]interface{})
- func (r *WorkflowRun) Duration() time.Duration
- func (r *WorkflowRun) IsCompleted() bool
- func (r *WorkflowRun) IsSuccessful() bool
- func (r *WorkflowRun) MarkCancelled()
- func (r *WorkflowRun) MarkCompleted(output string)
- func (r *WorkflowRun) MarkFailed(err error)
- func (r *WorkflowRun) MarkStarted()
- type WorkflowSession
- func (s *WorkflowSession) AddCancellation(record *CancellationRecord)
- func (s *WorkflowSession) AddRun(run *WorkflowRun)
- func (s *WorkflowSession) Clear()
- func (s *WorkflowSession) CountCompletedRuns() int
- func (s *WorkflowSession) CountFailedRuns() int
- func (s *WorkflowSession) CountRuns() int
- func (s *WorkflowSession) CountSuccessfulRuns() int
- func (s *WorkflowSession) GetCancellations() []*CancellationRecord
- func (s *WorkflowSession) GetHistory(numRuns int) []HistoryEntry
- func (s *WorkflowSession) GetHistoryContext(numRuns int) string
- func (s *WorkflowSession) GetHistoryMessages(numRuns int) []*types.Message
- func (s *WorkflowSession) GetLastRun() *WorkflowRun
- func (s *WorkflowSession) GetMetadata(key string) (interface{}, bool)
- func (s *WorkflowSession) GetRuns() []*WorkflowRun
- func (s *WorkflowSession) SetMetadata(key string, value interface{})
- type WorkflowStorage
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSessionNotFound is returned when a session is not found // ErrSessionNotFound 当会话未找到时返回 ErrSessionNotFound = errors.New("session not found") // ErrSessionExists is returned when trying to create a session that already exists // ErrSessionExists 当尝试创建已存在的会话时返回 ErrSessionExists = errors.New("session already exists") // ErrInvalidSessionID is returned when the session ID is invalid // ErrInvalidSessionID 当会话 ID 无效时返回 ErrInvalidSessionID = errors.New("invalid session ID") // ErrInvalidWorkflowID is returned when the workflow ID is invalid // ErrInvalidWorkflowID 当工作流 ID 无效时返回 ErrInvalidWorkflowID = errors.New("invalid workflow ID") )
Functions ¶
func FormatHistoryForAgent ¶
func FormatHistoryForAgent(history []HistoryEntry, options *HistoryFormatOptions) string
FormatHistoryForAgent formats history context for agent use with flexible options FormatHistoryForAgent 使用灵活的选项格式化历史上下文供 agent 使用
func InjectHistoryToAgent ¶
InjectHistoryToAgent injects history context into agent's temporary instructions InjectHistoryToAgent 将历史上下文注入到 agent 的临时指令中 Returns original instructions for later restoration (if needed) 返回原始指令用于后续恢复(如果需要)
func RestoreAgentInstructions ¶
RestoreAgentInstructions explicitly restores agent's original instructions RestoreAgentInstructions 显式恢复 agent 的原始指令 Note: Agent.Run() already auto-clears temp instructions via defer 注意:Agent.Run() 已经通过 defer 自动清除临时指令 This function is mainly for explicit restoration scenarios 此函数主要用于显式恢复场景
Types ¶
type CancellationRecord ¶
type CancellationRecord struct {
RunID string `json:"run_id"`
Reason string `json:"reason"`
StepID string `json:"step_id,omitempty"`
Snapshot map[string]interface{} `json:"snapshot,omitempty"`
OccurredAt time.Time `json:"occurred_at"`
}
CancellationRecord captures workflow cancellation details.
type Condition ¶
type Condition struct {
ID string
Name string
Condition ConditionFunc
TrueNode Node
FalseNode Node
}
Condition represents a conditional branching node
func NewCondition ¶
func NewCondition(config ConditionConfig) (*Condition, error)
NewCondition creates a new condition node
func (*Condition) Execute ¶
func (c *Condition) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)
Execute evaluates the condition and executes the appropriate branch
type ConditionConfig ¶
type ConditionConfig struct {
ID string
Name string
Condition ConditionFunc
TrueNode Node
FalseNode Node
}
ConditionConfig contains condition configuration
type ConditionFunc ¶
type ConditionFunc func(ctx *ExecutionContext) bool
ConditionFunc is a function that evaluates a condition
type Config ¶
type Config struct {
ID string
Name string
Steps []Node
Logger *slog.Logger
// EnableHistory enables workflow history tracking
// EnableHistory 启用工作流历史跟踪
EnableHistory bool `json:"enable_history"`
// HistoryStore is the storage backend for workflow history
// HistoryStore 是工作流历史的存储后端
HistoryStore WorkflowStorage `json:"-"`
// NumHistoryRuns is the number of recent runs to include in history context
// NumHistoryRuns 是历史上下文中包含的最近运行数量
NumHistoryRuns int `json:"num_history_runs"`
// AddHistoryToSteps automatically adds history context to all steps
// AddHistoryToSteps 自动将历史上下文添加到所有步骤
AddHistoryToSteps bool `json:"add_history_to_steps"`
}
Config contains workflow configuration Config 包含工作流配置
type ExecutionContext ¶
type ExecutionContext struct {
// Input is the input for the workflow execution
// Input 是工作流执行的输入
Input string `json:"input"`
// Output is the output from the workflow execution
// Output 是工作流执行的输出
Output string `json:"output"`
// Data holds temporary execution data
// Data 保存临时执行数据
Data map[string]interface{} `json:"data"`
// Metadata holds additional metadata
// Metadata 保存额外的元数据
Metadata map[string]interface{} `json:"metadata"`
// SessionState holds session-level state that persists across steps
// SessionState 保存跨步骤持久化的会话级状态
SessionState *SessionState `json:"session_state,omitempty"`
// SessionID is the unique session identifier
// SessionID 是唯一的会话标识符
SessionID string `json:"session_id,omitempty"`
// UserID is the user identifier for multi-tenant scenarios
// UserID 是多租户场景的用户标识符
UserID string `json:"user_id,omitempty"`
// WorkflowHistory contains recent workflow run history
// WorkflowHistory 包含最近的工作流运行历史
WorkflowHistory []HistoryEntry `json:"workflow_history,omitempty"`
// HistoryContext is the formatted history context string
// HistoryContext 是格式化的历史上下文字符串
HistoryContext string `json:"history_context,omitempty"`
}
ExecutionContext holds the execution state and data ExecutionContext 保存执行状态和数据
func NewExecutionContext ¶
func NewExecutionContext(input string) *ExecutionContext
NewExecutionContext creates a new execution context NewExecutionContext 创建新的执行上下文
func NewExecutionContextWithSession ¶
func NewExecutionContextWithSession(input, sessionID, userID string) *ExecutionContext
NewExecutionContextWithSession creates a new execution context with session info NewExecutionContextWithSession 创建带会话信息的新执行上下文
func (*ExecutionContext) AddMessage ¶
func (ec *ExecutionContext) AddMessage(msg *types.Message)
AddMessage adds a message to the history AddMessage 添加消息到历史
func (*ExecutionContext) AddMessages ¶
func (ec *ExecutionContext) AddMessages(msgs []*types.Message)
AddMessages adds multiple messages to the history AddMessages 批量添加消息
func (*ExecutionContext) ApplySessionState ¶
func (ec *ExecutionContext) ApplySessionState(snapshot map[string]interface{})
ApplySessionState 用快照初始化会话状态。
func (*ExecutionContext) ClearMessages ¶
func (ec *ExecutionContext) ClearMessages()
ClearMessages clears all message history ClearMessages 清空消息历史
func (*ExecutionContext) ExportSessionState ¶
func (ec *ExecutionContext) ExportSessionState() map[string]interface{}
ExportSessionState 导出当前会话状态快照。
func (*ExecutionContext) Get ¶
func (ec *ExecutionContext) Get(key string) (interface{}, bool)
Get retrieves a value from the context data Get 从上下文数据检索值
func (*ExecutionContext) GetHistoryContext ¶
func (ec *ExecutionContext) GetHistoryContext() string
GetHistoryContext returns the formatted history context GetHistoryContext 获取格式化的历史上下文
func (*ExecutionContext) GetHistoryCount ¶
func (ec *ExecutionContext) GetHistoryCount() int
GetHistoryCount returns the number of history entries GetHistoryCount 获取历史记录数量
func (*ExecutionContext) GetHistoryInput ¶
func (ec *ExecutionContext) GetHistoryInput(index int) string
GetHistoryInput returns the input at the specified index GetHistoryInput 获取指定索引的历史输入 index 0 表示最早的历史,-1 表示最近的历史 index 0 means earliest history, -1 means most recent
func (*ExecutionContext) GetHistoryOutput ¶
func (ec *ExecutionContext) GetHistoryOutput(index int) string
GetHistoryOutput returns the output at the specified index GetHistoryOutput 获取指定索引的历史输出
func (*ExecutionContext) GetLastHistoryEntry ¶
func (ec *ExecutionContext) GetLastHistoryEntry() *HistoryEntry
GetLastHistoryEntry returns the last history entry GetLastHistoryEntry 获取最后一个历史条目
func (*ExecutionContext) GetMessages ¶
func (ec *ExecutionContext) GetMessages() []*types.Message
GetMessages retrieves message history from session state GetMessages 获取消息历史
func (*ExecutionContext) GetSessionState ¶
func (ec *ExecutionContext) GetSessionState(key string) (interface{}, bool)
GetSessionState retrieves a value from the session state GetSessionState 从会话状态检索值
func (*ExecutionContext) GetWorkflowHistory ¶
func (ec *ExecutionContext) GetWorkflowHistory() []HistoryEntry
GetWorkflowHistory returns the workflow history GetWorkflowHistory 获取工作流历史
func (*ExecutionContext) HasHistory ¶
func (ec *ExecutionContext) HasHistory() bool
HasHistory checks if there is any history HasHistory 检查是否有历史记录
func (*ExecutionContext) MergeMetadata ¶
func (ec *ExecutionContext) MergeMetadata(metadata map[string]interface{})
MergeMetadata merges metadata into execution context.
func (*ExecutionContext) Set ¶
func (ec *ExecutionContext) Set(key string, value interface{})
Set stores a value in the context data Set 在上下文数据中存储值
func (*ExecutionContext) SetHistoryContext ¶
func (ec *ExecutionContext) SetHistoryContext(context string)
SetHistoryContext sets the formatted history context SetHistoryContext 设置格式化的历史上下文
func (*ExecutionContext) SetRunContextMetadata ¶
func (ec *ExecutionContext) SetRunContextMetadata(runCtx map[string]interface{})
SetRunContextMetadata stores the run context payload into metadata under the "run_context" key so callers can inspect correlation identifiers (run_id, session_id, workflow_id, user_id, etc.) without reaching into context.Context.
func (*ExecutionContext) SetSessionState ¶
func (ec *ExecutionContext) SetSessionState(key string, value interface{})
SetSessionState stores a value in the session state SetSessionState 在会话状态中存储值
func (*ExecutionContext) SetWorkflowHistory ¶
func (ec *ExecutionContext) SetWorkflowHistory(history []HistoryEntry)
SetWorkflowHistory sets the workflow history SetWorkflowHistory 设置工作流历史
type HistoryEntry ¶
type HistoryEntry struct {
// Input is the input for this history entry
// Input 是此历史条目的输入
Input string `json:"input"`
// Output is the output for this history entry
// Output 是此历史条目的输出
Output string `json:"output"`
// Timestamp is when this entry was created
// Timestamp 是此条目创建的时间
Timestamp time.Time `json:"timestamp"`
}
HistoryEntry represents a single entry in the workflow history HistoryEntry 表示工作流历史中的单个条目
type HistoryFormatOptions ¶
type HistoryFormatOptions struct {
// Header is the opening tag for history context
// Header 是历史上下文的开始标签
Header string
// Footer 是历史上下文的结束标签
Footer string
// IncludeInput controls whether to include input in history
// IncludeInput 控制是否在历史中包含输入
IncludeInput bool
// IncludeOutput controls whether to include output in history
// IncludeOutput 控制是否在历史中包含输出
IncludeOutput bool
// IncludeTimestamp controls whether to include timestamps
// IncludeTimestamp 控制是否包含时间戳
IncludeTimestamp bool
// InputLabel is the label for input field
// InputLabel 是输入字段的标签
InputLabel string
// OutputLabel is the label for output field
// OutputLabel 是输出字段的标签
OutputLabel string
}
HistoryFormatOptions defines options for formatting history context HistoryFormatOptions 定义格式化历史上下文的选项
func DefaultHistoryFormatOptions ¶
func DefaultHistoryFormatOptions() *HistoryFormatOptions
DefaultHistoryFormatOptions returns default formatting options DefaultHistoryFormatOptions 返回默认格式化选项
type Loop ¶
type Loop struct {
ID string
Name string
Body Node
Condition LoopConditionFunc
MaxIteration int
}
Loop represents a loop node that repeats execution
func (*Loop) Execute ¶
func (l *Loop) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)
Execute runs the loop
type LoopConditionFunc ¶
type LoopConditionFunc func(ctx *ExecutionContext, iteration int) bool
LoopConditionFunc determines whether to continue looping
type LoopConfig ¶
type LoopConfig struct {
ID string
Name string
Body Node
Condition LoopConditionFunc
MaxIteration int
}
LoopConfig contains loop configuration
type MemoryStorage ¶
type MemoryStorage struct {
// contains filtered or unexported fields
}
MemoryStorage implements WorkflowStorage using in-memory storage MemoryStorage 使用内存存储实现 WorkflowStorage
func NewMemoryStorage ¶
func NewMemoryStorage(maxSize int) *MemoryStorage
NewMemoryStorage creates a new memory-based workflow storage NewMemoryStorage 创建新的基于内存的工作流存储 maxSize specifies the maximum number of sessions to store (0 = unlimited) maxSize 指定要存储的最大会话数(0 = 无限制)
func (*MemoryStorage) Clear ¶
Clear removes all sessions older than the specified duration Clear 删除早于指定持续时间的所有会话
func (*MemoryStorage) Close ¶
func (m *MemoryStorage) Close() error
Close closes the storage and releases any resources Close 关闭存储并释放任何资源
func (*MemoryStorage) CreateSession ¶
func (m *MemoryStorage) CreateSession(ctx context.Context, sessionID, workflowID, userID string) (*WorkflowSession, error)
CreateSession creates a new workflow session CreateSession 创建新的工作流会话
func (*MemoryStorage) DeleteSession ¶
func (m *MemoryStorage) DeleteSession(ctx context.Context, sessionID string) error
DeleteSession deletes a workflow session by ID DeleteSession 通过 ID 删除工作流会话
func (*MemoryStorage) GetSession ¶
func (m *MemoryStorage) GetSession(ctx context.Context, sessionID string) (*WorkflowSession, error)
GetSession retrieves a workflow session by ID GetSession 通过 ID 检索工作流会话
func (*MemoryStorage) GetStats ¶
func (m *MemoryStorage) GetStats(ctx context.Context) (*SessionStats, error)
GetStats returns statistics about the stored sessions GetStats 返回有关存储会话的统计信息
func (*MemoryStorage) GetWorkflowStats ¶
func (m *MemoryStorage) GetWorkflowStats(ctx context.Context, workflowID string) (*SessionStats, error)
GetWorkflowStats returns statistics for a specific workflow GetWorkflowStats 返回特定工作流的统计信息
func (*MemoryStorage) ListSessions ¶
func (m *MemoryStorage) ListSessions(ctx context.Context, workflowID string, limit, offset int) ([]*WorkflowSession, error)
ListSessions lists all sessions for a given workflow ListSessions 列出给定工作流的所有会话
func (*MemoryStorage) ListUserSessions ¶
func (m *MemoryStorage) ListUserSessions(ctx context.Context, userID string, limit, offset int) ([]*WorkflowSession, error)
ListUserSessions lists all sessions for a given user ListUserSessions 列出给定用户的所有会话
func (*MemoryStorage) UpdateSession ¶
func (m *MemoryStorage) UpdateSession(ctx context.Context, session *WorkflowSession) error
UpdateSession updates an existing workflow session UpdateSession 更新现有的工作流会话
type Node ¶
type Node interface {
Execute(ctx context.Context, input *ExecutionContext) (*ExecutionContext, error)
GetID() string
GetType() NodeType
}
Node represents a node in the workflow graph
type Parallel ¶
Parallel represents a node that executes multiple nodes concurrently
func NewParallel ¶
func NewParallel(config ParallelConfig) (*Parallel, error)
NewParallel creates a new parallel node
func (*Parallel) Execute ¶
func (p *Parallel) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)
Execute runs all child nodes in parallel Execute 并行运行所有子节点
type ParallelConfig ¶
ParallelConfig contains parallel configuration
type Router ¶
type Router struct {
ID string
Name string
Router RouterFunc
Routes map[string]Node
}
Router represents a dynamic routing node
func NewRouter ¶
func NewRouter(config RouterConfig) (*Router, error)
NewRouter creates a new router node
func (*Router) Execute ¶
func (r *Router) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)
Execute evaluates the router and executes the selected route
type RouterConfig ¶
type RouterConfig struct {
ID string
Name string
Router RouterFunc
Routes map[string]Node
}
RouterConfig contains router configuration
type RouterFunc ¶
type RouterFunc func(ctx *ExecutionContext) string
RouterFunc determines which node to execute
type RunOption ¶
type RunOption func(*runOptions)
RunOption configures workflow run behaviour.
func WithMediaPayload ¶
func WithMediaPayload(payload interface{}) RunOption
WithMediaPayload attaches media payload to the execution context.
func WithMetadata ¶
WithMetadata injects arbitrary metadata into the execution context.
func WithResumeFrom ¶
WithResumeFrom instructs the workflow to resume from the specified step ID.
func WithRunContext ¶
func WithRunContext(rc *run.RunContext) RunOption
WithRunContext injects a caller-provided run context, allowing workflows to reuse correlation identifiers from upstream orchestrators.
func WithSessionState ¶
WithSessionState injects a session state snapshot for the execution context.
func WithUserID ¶
WithUserID sets the user ID for the workflow execution context.
type RunStatus ¶
type RunStatus string
RunStatus represents the status of a workflow run RunStatus 表示工作流运行的状态
const ( // RunStatusPending indicates the run is pending // RunStatusPending 表示运行正在等待 RunStatusPending RunStatus = "pending" // RunStatusRunning indicates the run is in progress // RunStatusRunning 表示运行正在进行中 RunStatusRunning RunStatus = "running" // RunStatusCompleted indicates the run completed successfully // RunStatusCompleted 表示运行成功完成 RunStatusCompleted RunStatus = "completed" // RunStatusFailed indicates the run failed with an error // RunStatusFailed 表示运行失败并出现错误 RunStatusFailed RunStatus = "failed" // RunStatusCancelled indicates the run was cancelled // RunStatusCancelled 表示运行被取消 RunStatusCancelled RunStatus = "cancelled" )
type SessionFilter ¶
type SessionFilter struct {
// WorkflowID filters sessions by workflow ID
// WorkflowID 按工作流 ID 过滤会话
WorkflowID string
// UserID filters sessions by user ID
// UserID 按用户 ID 过滤会话
UserID string
// CreatedAfter filters sessions created after this time
// CreatedAfter 过滤在此时间之后创建的会话
CreatedAfter time.Time
// CreatedBefore filters sessions created before this time
// CreatedBefore 过滤在此时间之前创建的会话
CreatedBefore time.Time
// Limit is the maximum number of sessions to return (0 = no limit)
// Limit 是要返回的最大会话数(0 = 无限制)
Limit int
// Offset is the number of sessions to skip
// Offset 是要跳过的会话数
Offset int
}
SessionFilter defines filter criteria for querying sessions SessionFilter 定义查询会话的过滤条件
type SessionState ¶
type SessionState struct {
// contains filtered or unexported fields
}
SessionState provides thread-safe session state management for workflows SessionState 为工作流提供线程安全的会话状态管理
func FromMap ¶
func FromMap(data map[string]interface{}) *SessionState
FromMap creates a session state from a map FromMap 从map创建会话状态
func MergeParallelSessionStates ¶
func MergeParallelSessionStates(original *SessionState, modified []*SessionState) *SessionState
MergeParallelSessionStates merges multiple session states from parallel execution MergeParallelSessionStates 合并来自并行执行的多个会话状态 This function collects all changes from parallel branches and merges them 此函数收集并行分支的所有变更并合并它们
func NewSessionState ¶
func NewSessionState() *SessionState
NewSessionState creates a new session state NewSessionState 创建新的会话状态
func (*SessionState) Clear ¶
func (ss *SessionState) Clear()
Clear removes all keys from the session state Clear 清空会话状态中的所有键
func (*SessionState) Clone ¶
func (ss *SessionState) Clone() *SessionState
Clone creates a deep copy of the session state Clone 创建会话状态的深拷贝
func (*SessionState) Delete ¶
func (ss *SessionState) Delete(key string)
Delete removes a key from the session state Delete 从会话状态中删除键
func (*SessionState) Get ¶
func (ss *SessionState) Get(key string) (interface{}, bool)
Get retrieves a value from the session state Get 从会话状态检索值
func (*SessionState) GetAll ¶
func (ss *SessionState) GetAll() map[string]interface{}
GetAll returns a copy of all session state data GetAll 返回所有会话状态数据的副本
func (*SessionState) Merge ¶
func (ss *SessionState) Merge(other *SessionState)
Merge merges another session state into this one Merge 将另一个会话状态合并到当前状态 Later values overwrite earlier ones (last-write-wins) 后面的值覆盖前面的值(最后写入获胜)
func (*SessionState) Set ¶
func (ss *SessionState) Set(key string, value interface{})
Set stores a value in the session state Set 在会话状态中存储值
func (*SessionState) ToMap ¶
func (ss *SessionState) ToMap() map[string]interface{}
ToMap returns the session state as a regular map (not thread-safe) ToMap 将会话状态作为常规map返回(不是线程安全的)
type SessionStats ¶
type SessionStats struct {
// TotalSessions is the total number of sessions
// TotalSessions 是会话的总数
TotalSessions int
// TotalRuns is the total number of workflow runs across all sessions
// TotalRuns 是所有会话中工作流运行的总数
TotalRuns int
// CompletedRuns is the number of completed runs
// CompletedRuns 是已完成运行的数量
CompletedRuns int
// SuccessfulRuns is the number of successful runs
// SuccessfulRuns 是成功运行的数量
SuccessfulRuns int
// FailedRuns is the number of failed runs
// FailedRuns 是失败运行的数量
FailedRuns int
// AverageDuration is the average duration of completed runs
// AverageDuration 是已完成运行的平均持续时间
AverageDuration time.Duration
}
SessionStats provides statistics about workflow sessions SessionStats 提供有关工作流会话的统计信息
type Step ¶
type Step struct {
ID string
Name string
Agent *agent.Agent
Description string
// contains filtered or unexported fields
}
Step represents a basic workflow step that executes an agent Step 代表执行 agent 的基本工作流步骤
func (*Step) Execute ¶
func (s *Step) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)
Execute runs the step Execute 执行步骤
type StepConfig ¶
type StepConfig struct {
ID string
Name string
Agent *agent.Agent
Description string
// AddHistoryToStep enables/disables history for this step
// nil means inherit from workflow
// AddHistoryToStep 为此步骤启用/禁用历史
// nil 表示从 workflow 继承
AddHistoryToStep *bool `json:"add_history_to_step,omitempty"`
// NumHistoryRuns specifies history count for this step
// nil means use workflow default
// NumHistoryRuns 指定此步骤的历史数量
// nil 表示使用 workflow 默认值
NumHistoryRuns *int `json:"num_history_runs,omitempty"`
}
StepConfig contains step configuration StepConfig 包含步骤配置
type StorageStats ¶
type StorageStats interface {
// GetStats returns statistics about the stored sessions
// GetStats 返回有关存储会话的统计信息
GetStats(ctx context.Context) (*SessionStats, error)
// GetWorkflowStats returns statistics for a specific workflow
// GetWorkflowStats 返回特定工作流的统计信息
GetWorkflowStats(ctx context.Context, workflowID string) (*SessionStats, error)
}
StorageStats is an optional interface for storage implementations that support statistics StorageStats 是支持统计信息的存储实现的可选接口
type Workflow ¶
type Workflow struct {
ID string
Name string
Steps []Node
// contains filtered or unexported fields
}
Workflow represents a multi-step process Workflow 代表多步骤流程
type WorkflowHistoryConfig ¶
type WorkflowHistoryConfig struct {
// AddHistoryToSteps enables automatic history injection to all steps
// AddHistoryToSteps 启用自动向所有步骤注入历史
AddHistoryToSteps bool `json:"add_history_to_steps"`
// NumHistoryRuns is the default number of history runs to include
// NumHistoryRuns 是默认包含的历史运行数量
NumHistoryRuns int `json:"num_history_runs"`
}
WorkflowHistoryConfig contains workflow-level history configuration WorkflowHistoryConfig 包含工作流级别的历史配置
type WorkflowMetrics ¶
type WorkflowMetrics struct {
// contains filtered or unexported fields
}
WorkflowMetrics tracks runtime metrics for a workflow execution. The structure is safe for concurrent access as workflow steps may run in parallel and attempt to query duration information while the workflow is still running.
func NewWorkflowMetrics ¶
func NewWorkflowMetrics() *WorkflowMetrics
NewWorkflowMetrics constructs a metrics tracker with zeroed values.
func (*WorkflowMetrics) Duration ¶
func (m *WorkflowMetrics) Duration() time.Duration
Duration returns the current or final duration. When the workflow is still running it reports the elapsed time since Start was called.
func (*WorkflowMetrics) Snapshot ¶
func (m *WorkflowMetrics) Snapshot() map[string]interface{}
Snapshot returns a copy of the metrics in a map that can be safely embedded into metadata payloads.
func (*WorkflowMetrics) Start ¶
func (m *WorkflowMetrics) Start()
Start marks the workflow as started. Subsequent calls are no-ops so callers do not need to guard repeated invocations.
func (*WorkflowMetrics) Stop ¶
func (m *WorkflowMetrics) Stop() time.Duration
Stop marks the workflow as completed and returns the final duration. The method is idempotent; calling Stop multiple times returns the same duration.
type WorkflowRun ¶
type WorkflowRun struct {
// RunID is the unique identifier for this run
// RunID 是此次运行的唯一标识符
RunID string `json:"run_id"`
// SessionID is the session this run belongs to
// SessionID 是此次运行所属的会话 ID
SessionID string `json:"session_id"`
// WorkflowID is the ID of the workflow that was executed
// WorkflowID 是被执行的工作流的 ID
WorkflowID string `json:"workflow_id"`
// Input is the input provided to the workflow
// Input 是提供给工作流的输入
Input string `json:"input"`
// Output is the final output of the workflow
// Output 是工作流的最终输出
Output string `json:"output"`
// Messages contains the conversation history for this run
// Messages 包含此次运行的对话历史
Messages []*types.Message `json:"messages,omitempty"`
// Status indicates the current status of the run
// Status 表示运行的当前状态
Status RunStatus `json:"status"`
// Error contains error message if the run failed
// Error 包含运行失败时的错误信息
Error string `json:"error,omitempty"`
// StartedAt is the timestamp when the run started
// StartedAt 是运行开始的时间戳
StartedAt time.Time `json:"started_at"`
// CompletedAt is the timestamp when the run completed
// CompletedAt 是运行完成的时间戳
CompletedAt time.Time `json:"completed_at,omitempty"`
// Metadata contains additional metadata for this run
// Metadata 包含此次运行的额外元数据
Metadata map[string]interface{} `json:"metadata,omitempty"`
// CancellationReason provides context when run is cancelled.
CancellationReason string `json:"cancellation_reason,omitempty"`
// CancellationSnapshot stores the last known snapshot before cancellation.
CancellationSnapshot map[string]interface{} `json:"cancellation_snapshot,omitempty"`
// LastStepID indicates the last step identifier processed before cancellation/failure.
LastStepID string `json:"last_step_id,omitempty"`
// ResumedFrom records the step the run resumed from, if applicable.
ResumedFrom string `json:"resumed_from,omitempty"`
// Events captures structured run output events for observability.
Events run.Events `json:"events,omitempty"`
}
WorkflowRun represents a single execution of a workflow WorkflowRun 表示工作流的单次执行记录
func NewWorkflowRun ¶
func NewWorkflowRun(runID, sessionID, workflowID, input string) *WorkflowRun
NewWorkflowRun creates a new workflow run with the given parameters NewWorkflowRun 使用给定参数创建新的工作流运行记录
func (*WorkflowRun) AddEvents ¶
func (r *WorkflowRun) AddEvents(events run.Events)
AddEvents appends structured run events to the workflow run metadata.
func (*WorkflowRun) AddMessage ¶
func (r *WorkflowRun) AddMessage(msg *types.Message)
AddMessage adds a message to the run's conversation history AddMessage 将消息添加到运行的对话历史中
func (*WorkflowRun) ApplyCancellation ¶
func (r *WorkflowRun) ApplyCancellation(reason, stepID string, snapshot map[string]interface{})
ApplyCancellation enriches cancellation metadata.
func (*WorkflowRun) Duration ¶
func (r *WorkflowRun) Duration() time.Duration
Duration returns the duration of the run Duration 返回运行的持续时间
func (*WorkflowRun) IsCompleted ¶
func (r *WorkflowRun) IsCompleted() bool
IsCompleted returns true if the run has completed (success, failure, or cancelled) IsCompleted 如果运行已完成(成功、失败或取消)则返回 true
func (*WorkflowRun) IsSuccessful ¶
func (r *WorkflowRun) IsSuccessful() bool
IsSuccessful returns true if the run completed successfully IsSuccessful 如果运行成功完成则返回 true
func (*WorkflowRun) MarkCancelled ¶
func (r *WorkflowRun) MarkCancelled()
MarkCancelled marks the run as cancelled MarkCancelled 将运行标记为已取消
func (*WorkflowRun) MarkCompleted ¶
func (r *WorkflowRun) MarkCompleted(output string)
MarkCompleted marks the run as completed with the given output MarkCompleted 使用给定输出将运行标记为已完成
func (*WorkflowRun) MarkFailed ¶
func (r *WorkflowRun) MarkFailed(err error)
MarkFailed marks the run as failed with the given error MarkFailed 使用给定错误将运行标记为失败
func (*WorkflowRun) MarkStarted ¶
func (r *WorkflowRun) MarkStarted()
MarkStarted marks the run as started MarkStarted 将运行标记为已开始
type WorkflowSession ¶
type WorkflowSession struct {
// SessionID is the unique identifier for this session
// SessionID 是此会话的唯一标识符
SessionID string `json:"session_id"`
// WorkflowID is the ID of the workflow this session belongs to
// WorkflowID 是此会话所属工作流的 ID
WorkflowID string `json:"workflow_id"`
// UserID is the user who owns this session
// UserID 是拥有此会话的用户
UserID string `json:"user_id,omitempty"`
// Runs contains all workflow runs in this session
// Runs 包含此会话中的所有工作流运行
Runs []*WorkflowRun `json:"runs"`
// CreatedAt is the timestamp when the session was created
// CreatedAt 是会话创建的时间戳
CreatedAt time.Time `json:"created_at"`
// UpdatedAt is the timestamp when the session was last updated
// UpdatedAt 是会话最后更新的时间戳
UpdatedAt time.Time `json:"updated_at"`
// Metadata contains additional session metadata
// Metadata 包含额外的会话元数据
Metadata map[string]interface{} `json:"metadata,omitempty"`
// Cancellations captures cancellation snapshots for later recovery.
Cancellations []*CancellationRecord `json:"cancellations,omitempty"`
// contains filtered or unexported fields
}
WorkflowSession manages multiple workflow runs for a single session WorkflowSession 管理单个会话的多个工作流运行
func NewWorkflowSession ¶
func NewWorkflowSession(sessionID, workflowID, userID string) *WorkflowSession
NewWorkflowSession creates a new workflow session NewWorkflowSession 创建新的工作流会话
func (*WorkflowSession) AddCancellation ¶
func (s *WorkflowSession) AddCancellation(record *CancellationRecord)
AddCancellation records a cancellation snapshot.
func (*WorkflowSession) AddRun ¶
func (s *WorkflowSession) AddRun(run *WorkflowRun)
AddRun adds a workflow run to the session AddRun 将工作流运行添加到会话
func (*WorkflowSession) Clear ¶
func (s *WorkflowSession) Clear()
Clear removes all runs from the session Clear 移除会话中的所有运行
func (*WorkflowSession) CountCompletedRuns ¶
func (s *WorkflowSession) CountCompletedRuns() int
CountCompletedRuns returns the number of completed runs CountCompletedRuns 返回已完成运行的数量
func (*WorkflowSession) CountFailedRuns ¶
func (s *WorkflowSession) CountFailedRuns() int
CountFailedRuns returns the number of failed runs CountFailedRuns 返回失败运行的数量
func (*WorkflowSession) CountRuns ¶
func (s *WorkflowSession) CountRuns() int
CountRuns returns the total number of runs in the session CountRuns 返回会话中运行的总数
func (*WorkflowSession) CountSuccessfulRuns ¶
func (s *WorkflowSession) CountSuccessfulRuns() int
CountSuccessfulRuns returns the number of successful runs CountSuccessfulRuns 返回成功运行的数量
func (*WorkflowSession) GetCancellations ¶
func (s *WorkflowSession) GetCancellations() []*CancellationRecord
GetCancellations returns a copy of cancellation records.
func (*WorkflowSession) GetHistory ¶
func (s *WorkflowSession) GetHistory(numRuns int) []HistoryEntry
GetHistory returns the most recent N completed runs as history entries GetHistory 返回最近 N 个已完成的运行作为历史条目
func (*WorkflowSession) GetHistoryContext ¶
func (s *WorkflowSession) GetHistoryContext(numRuns int) string
GetHistoryContext returns formatted workflow history context for agent use GetHistoryContext 返回格式化的工作流历史上下文供 agent 使用
func (*WorkflowSession) GetHistoryMessages ¶
func (s *WorkflowSession) GetHistoryMessages(numRuns int) []*types.Message
GetHistoryMessages returns conversation messages from recent runs GetHistoryMessages 返回最近运行的对话消息
func (*WorkflowSession) GetLastRun ¶
func (s *WorkflowSession) GetLastRun() *WorkflowRun
GetLastRun returns the most recent run in the session GetLastRun 返回会话中最近的运行
func (*WorkflowSession) GetMetadata ¶
func (s *WorkflowSession) GetMetadata(key string) (interface{}, bool)
GetMetadata retrieves a metadata value GetMetadata 检索元数据值
func (*WorkflowSession) GetRuns ¶
func (s *WorkflowSession) GetRuns() []*WorkflowRun
GetRuns returns all runs in the session (thread-safe copy) GetRuns 返回会话中的所有运行(线程安全副本)
func (*WorkflowSession) SetMetadata ¶
func (s *WorkflowSession) SetMetadata(key string, value interface{})
SetMetadata sets a metadata value SetMetadata 设置元数据值
type WorkflowStorage ¶
type WorkflowStorage interface {
// CreateSession creates a new workflow session
// CreateSession 创建新的工作流会话
// Returns ErrSessionExists if a session with the same ID already exists
// 如果具有相同 ID 的会话已存在则返回 ErrSessionExists
CreateSession(ctx context.Context, sessionID, workflowID, userID string) (*WorkflowSession, error)
// GetSession retrieves a workflow session by ID
// GetSession 通过 ID 检索工作流会话
// Returns ErrSessionNotFound if the session does not exist
// 如果会话不存在则返回 ErrSessionNotFound
GetSession(ctx context.Context, sessionID string) (*WorkflowSession, error)
// UpdateSession updates an existing workflow session
// UpdateSession 更新现有的工作流会话
// Returns ErrSessionNotFound if the session does not exist
// 如果会话不存在则返回 ErrSessionNotFound
UpdateSession(ctx context.Context, session *WorkflowSession) error
// DeleteSession deletes a workflow session by ID
// DeleteSession 通过 ID 删除工作流会话
// Returns ErrSessionNotFound if the session does not exist
// 如果会话不存在则返回 ErrSessionNotFound
DeleteSession(ctx context.Context, sessionID string) error
// ListSessions lists all sessions for a given workflow
// ListSessions 列出给定工作流的所有会话
// Returns empty slice if no sessions found
// 如果未找到会话则返回空切片
ListSessions(ctx context.Context, workflowID string, limit, offset int) ([]*WorkflowSession, error)
// ListUserSessions lists all sessions for a given user
// ListUserSessions 列出给定用户的所有会话
// Returns empty slice if no sessions found
// 如果未找到会话则返回空切片
ListUserSessions(ctx context.Context, userID string, limit, offset int) ([]*WorkflowSession, error)
// Clear removes all sessions older than the specified duration
// Clear 删除早于指定持续时间的所有会话
Clear(ctx context.Context, olderThan time.Duration) (int, error)
// Close closes the storage and releases any resources
// Close 关闭存储并释放任何资源
Close() error
}
WorkflowStorage defines the interface for storing and retrieving workflow sessions WorkflowStorage 定义用于存储和检索工作流会话的接口