workflow

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSessionNotFound is returned when a session is not found
	// ErrSessionNotFound 当会话未找到时返回
	ErrSessionNotFound = errors.New("session not found")

	// ErrSessionExists is returned when trying to create a session that already exists
	// ErrSessionExists 当尝试创建已存在的会话时返回
	ErrSessionExists = errors.New("session already exists")

	// ErrInvalidSessionID is returned when the session ID is invalid
	// ErrInvalidSessionID 当会话 ID 无效时返回
	ErrInvalidSessionID = errors.New("invalid session ID")

	// ErrInvalidWorkflowID is returned when the workflow ID is invalid
	// ErrInvalidWorkflowID 当工作流 ID 无效时返回
	ErrInvalidWorkflowID = errors.New("invalid workflow ID")
)

Functions

func FormatHistoryForAgent

func FormatHistoryForAgent(history []HistoryEntry, options *HistoryFormatOptions) string

FormatHistoryForAgent formats history context for agent use with flexible options FormatHistoryForAgent 使用灵活的选项格式化历史上下文供 agent 使用

func InjectHistoryToAgent

func InjectHistoryToAgent(a *agent.Agent, historyContext string) string

InjectHistoryToAgent injects history context into agent's temporary instructions InjectHistoryToAgent 将历史上下文注入到 agent 的临时指令中 Returns original instructions for later restoration (if needed) 返回原始指令用于后续恢复(如果需要)

func RestoreAgentInstructions

func RestoreAgentInstructions(a *agent.Agent)

RestoreAgentInstructions explicitly restores agent's original instructions RestoreAgentInstructions 显式恢复 agent 的原始指令 Note: Agent.Run() already auto-clears temp instructions via defer 注意:Agent.Run() 已经通过 defer 自动清除临时指令 This function is mainly for explicit restoration scenarios 此函数主要用于显式恢复场景

Types

type CancellationRecord

type CancellationRecord struct {
	RunID      string                 `json:"run_id"`
	Reason     string                 `json:"reason"`
	StepID     string                 `json:"step_id,omitempty"`
	Snapshot   map[string]interface{} `json:"snapshot,omitempty"`
	OccurredAt time.Time              `json:"occurred_at"`
}

CancellationRecord captures workflow cancellation details.

type Condition

type Condition struct {
	ID        string
	Name      string
	Condition ConditionFunc
	TrueNode  Node
	FalseNode Node
}

Condition represents a conditional branching node

func NewCondition

func NewCondition(config ConditionConfig) (*Condition, error)

NewCondition creates a new condition node

func (*Condition) Execute

func (c *Condition) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)

Execute evaluates the condition and executes the appropriate branch

func (*Condition) GetID

func (c *Condition) GetID() string

GetID returns the condition ID

func (*Condition) GetType

func (c *Condition) GetType() NodeType

GetType returns the node type

type ConditionConfig

type ConditionConfig struct {
	ID        string
	Name      string
	Condition ConditionFunc
	TrueNode  Node
	FalseNode Node
}

ConditionConfig contains condition configuration

type ConditionFunc

type ConditionFunc func(ctx *ExecutionContext) bool

ConditionFunc is a function that evaluates a condition

type Config

type Config struct {
	ID     string
	Name   string
	Steps  []Node
	Logger *slog.Logger

	// EnableHistory enables workflow history tracking
	// EnableHistory 启用工作流历史跟踪
	EnableHistory bool `json:"enable_history"`

	// HistoryStore is the storage backend for workflow history
	// HistoryStore 是工作流历史的存储后端
	HistoryStore WorkflowStorage `json:"-"`

	// NumHistoryRuns is the number of recent runs to include in history context
	// NumHistoryRuns 是历史上下文中包含的最近运行数量
	NumHistoryRuns int `json:"num_history_runs"`

	// AddHistoryToSteps automatically adds history context to all steps
	// AddHistoryToSteps 自动将历史上下文添加到所有步骤
	AddHistoryToSteps bool `json:"add_history_to_steps"`
}

Config contains workflow configuration Config 包含工作流配置

type ExecutionContext

type ExecutionContext struct {
	// Input is the input for the workflow execution
	// Input 是工作流执行的输入
	Input string `json:"input"`

	// Output is the output from the workflow execution
	// Output 是工作流执行的输出
	Output string `json:"output"`

	// Data holds temporary execution data
	// Data 保存临时执行数据
	Data map[string]interface{} `json:"data"`

	// Metadata holds additional metadata
	// Metadata 保存额外的元数据
	Metadata map[string]interface{} `json:"metadata"`

	// SessionState holds session-level state that persists across steps
	// SessionState 保存跨步骤持久化的会话级状态
	SessionState *SessionState `json:"session_state,omitempty"`

	// SessionID is the unique session identifier
	// SessionID 是唯一的会话标识符
	SessionID string `json:"session_id,omitempty"`

	// UserID is the user identifier for multi-tenant scenarios
	// UserID 是多租户场景的用户标识符
	UserID string `json:"user_id,omitempty"`

	// WorkflowHistory contains recent workflow run history
	// WorkflowHistory 包含最近的工作流运行历史
	WorkflowHistory []HistoryEntry `json:"workflow_history,omitempty"`

	// HistoryContext is the formatted history context string
	// HistoryContext 是格式化的历史上下文字符串
	HistoryContext string `json:"history_context,omitempty"`
}

ExecutionContext holds the execution state and data ExecutionContext 保存执行状态和数据

func NewExecutionContext

func NewExecutionContext(input string) *ExecutionContext

NewExecutionContext creates a new execution context NewExecutionContext 创建新的执行上下文

func NewExecutionContextWithSession

func NewExecutionContextWithSession(input, sessionID, userID string) *ExecutionContext

NewExecutionContextWithSession creates a new execution context with session info NewExecutionContextWithSession 创建带会话信息的新执行上下文

func (*ExecutionContext) AddMessage

func (ec *ExecutionContext) AddMessage(msg *types.Message)

AddMessage adds a message to the history AddMessage 添加消息到历史

func (*ExecutionContext) AddMessages

func (ec *ExecutionContext) AddMessages(msgs []*types.Message)

AddMessages adds multiple messages to the history AddMessages 批量添加消息

func (*ExecutionContext) ApplySessionState

func (ec *ExecutionContext) ApplySessionState(snapshot map[string]interface{})

ApplySessionState 用快照初始化会话状态。

func (*ExecutionContext) ClearMessages

func (ec *ExecutionContext) ClearMessages()

ClearMessages clears all message history ClearMessages 清空消息历史

func (*ExecutionContext) ExportSessionState

func (ec *ExecutionContext) ExportSessionState() map[string]interface{}

ExportSessionState 导出当前会话状态快照。

func (*ExecutionContext) Get

func (ec *ExecutionContext) Get(key string) (interface{}, bool)

Get retrieves a value from the context data Get 从上下文数据检索值

func (*ExecutionContext) GetHistoryContext

func (ec *ExecutionContext) GetHistoryContext() string

GetHistoryContext returns the formatted history context GetHistoryContext 获取格式化的历史上下文

func (*ExecutionContext) GetHistoryCount

func (ec *ExecutionContext) GetHistoryCount() int

GetHistoryCount returns the number of history entries GetHistoryCount 获取历史记录数量

func (*ExecutionContext) GetHistoryInput

func (ec *ExecutionContext) GetHistoryInput(index int) string

GetHistoryInput returns the input at the specified index GetHistoryInput 获取指定索引的历史输入 index 0 表示最早的历史,-1 表示最近的历史 index 0 means earliest history, -1 means most recent

func (*ExecutionContext) GetHistoryOutput

func (ec *ExecutionContext) GetHistoryOutput(index int) string

GetHistoryOutput returns the output at the specified index GetHistoryOutput 获取指定索引的历史输出

func (*ExecutionContext) GetLastHistoryEntry

func (ec *ExecutionContext) GetLastHistoryEntry() *HistoryEntry

GetLastHistoryEntry returns the last history entry GetLastHistoryEntry 获取最后一个历史条目

func (*ExecutionContext) GetMessages

func (ec *ExecutionContext) GetMessages() []*types.Message

GetMessages retrieves message history from session state GetMessages 获取消息历史

func (*ExecutionContext) GetSessionState

func (ec *ExecutionContext) GetSessionState(key string) (interface{}, bool)

GetSessionState retrieves a value from the session state GetSessionState 从会话状态检索值

func (*ExecutionContext) GetWorkflowHistory

func (ec *ExecutionContext) GetWorkflowHistory() []HistoryEntry

GetWorkflowHistory returns the workflow history GetWorkflowHistory 获取工作流历史

func (*ExecutionContext) HasHistory

func (ec *ExecutionContext) HasHistory() bool

HasHistory checks if there is any history HasHistory 检查是否有历史记录

func (*ExecutionContext) MergeMetadata

func (ec *ExecutionContext) MergeMetadata(metadata map[string]interface{})

MergeMetadata merges metadata into execution context.

func (*ExecutionContext) Set

func (ec *ExecutionContext) Set(key string, value interface{})

Set stores a value in the context data Set 在上下文数据中存储值

func (*ExecutionContext) SetHistoryContext

func (ec *ExecutionContext) SetHistoryContext(context string)

SetHistoryContext sets the formatted history context SetHistoryContext 设置格式化的历史上下文

func (*ExecutionContext) SetRunContextMetadata

func (ec *ExecutionContext) SetRunContextMetadata(runCtx map[string]interface{})

SetRunContextMetadata stores the run context payload into metadata under the "run_context" key so callers can inspect correlation identifiers (run_id, session_id, workflow_id, user_id, etc.) without reaching into context.Context.

func (*ExecutionContext) SetSessionState

func (ec *ExecutionContext) SetSessionState(key string, value interface{})

SetSessionState stores a value in the session state SetSessionState 在会话状态中存储值

func (*ExecutionContext) SetWorkflowHistory

func (ec *ExecutionContext) SetWorkflowHistory(history []HistoryEntry)

SetWorkflowHistory sets the workflow history SetWorkflowHistory 设置工作流历史

type HistoryEntry

type HistoryEntry struct {
	// Input is the input for this history entry
	// Input 是此历史条目的输入
	Input string `json:"input"`

	// Output is the output for this history entry
	// Output 是此历史条目的输出
	Output string `json:"output"`

	// Timestamp is when this entry was created
	// Timestamp 是此条目创建的时间
	Timestamp time.Time `json:"timestamp"`
}

HistoryEntry represents a single entry in the workflow history HistoryEntry 表示工作流历史中的单个条目

type HistoryFormatOptions

type HistoryFormatOptions struct {
	// Header is the opening tag for history context
	// Header 是历史上下文的开始标签
	Header string

	// Footer is the closing tag for history context
	// Footer 是历史上下文的结束标签
	Footer string

	// IncludeInput controls whether to include input in history
	// IncludeInput 控制是否在历史中包含输入
	IncludeInput bool

	// IncludeOutput controls whether to include output in history
	// IncludeOutput 控制是否在历史中包含输出
	IncludeOutput bool

	// IncludeTimestamp controls whether to include timestamps
	// IncludeTimestamp 控制是否包含时间戳
	IncludeTimestamp bool

	// InputLabel is the label for input field
	// InputLabel 是输入字段的标签
	InputLabel string

	// OutputLabel is the label for output field
	// OutputLabel 是输出字段的标签
	OutputLabel string
}

HistoryFormatOptions defines options for formatting history context HistoryFormatOptions 定义格式化历史上下文的选项

func DefaultHistoryFormatOptions

func DefaultHistoryFormatOptions() *HistoryFormatOptions

DefaultHistoryFormatOptions returns default formatting options DefaultHistoryFormatOptions 返回默认格式化选项

type Loop

type Loop struct {
	ID           string
	Name         string
	Body         Node
	Condition    LoopConditionFunc
	MaxIteration int
}

Loop represents a loop node that repeats execution

func NewLoop

func NewLoop(config LoopConfig) (*Loop, error)

NewLoop creates a new loop node

func (*Loop) Execute

func (l *Loop) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)

Execute runs the loop

func (*Loop) GetID

func (l *Loop) GetID() string

GetID returns the loop ID

func (*Loop) GetType

func (l *Loop) GetType() NodeType

GetType returns the node type

type LoopConditionFunc

type LoopConditionFunc func(ctx *ExecutionContext, iteration int) bool

LoopConditionFunc determines whether to continue looping

type LoopConfig

type LoopConfig struct {
	ID           string
	Name         string
	Body         Node
	Condition    LoopConditionFunc
	MaxIteration int
}

LoopConfig contains loop configuration

type MemoryStorage

type MemoryStorage struct {
	// contains filtered or unexported fields
}

MemoryStorage implements WorkflowStorage using in-memory storage MemoryStorage 使用内存存储实现 WorkflowStorage

func NewMemoryStorage

func NewMemoryStorage(maxSize int) *MemoryStorage

NewMemoryStorage creates a new memory-based workflow storage NewMemoryStorage 创建新的基于内存的工作流存储 maxSize specifies the maximum number of sessions to store (0 = unlimited) maxSize 指定要存储的最大会话数(0 = 无限制)

func (*MemoryStorage) Clear

func (m *MemoryStorage) Clear(ctx context.Context, olderThan time.Duration) (int, error)

Clear removes all sessions older than the specified duration Clear 删除早于指定持续时间的所有会话

func (*MemoryStorage) Close

func (m *MemoryStorage) Close() error

Close closes the storage and releases any resources Close 关闭存储并释放任何资源

func (*MemoryStorage) CreateSession

func (m *MemoryStorage) CreateSession(ctx context.Context, sessionID, workflowID, userID string) (*WorkflowSession, error)

CreateSession creates a new workflow session CreateSession 创建新的工作流会话

func (*MemoryStorage) DeleteSession

func (m *MemoryStorage) DeleteSession(ctx context.Context, sessionID string) error

DeleteSession deletes a workflow session by ID DeleteSession 通过 ID 删除工作流会话

func (*MemoryStorage) GetSession

func (m *MemoryStorage) GetSession(ctx context.Context, sessionID string) (*WorkflowSession, error)

GetSession retrieves a workflow session by ID GetSession 通过 ID 检索工作流会话

func (*MemoryStorage) GetStats

func (m *MemoryStorage) GetStats(ctx context.Context) (*SessionStats, error)

GetStats returns statistics about the stored sessions GetStats 返回有关存储会话的统计信息

func (*MemoryStorage) GetWorkflowStats

func (m *MemoryStorage) GetWorkflowStats(ctx context.Context, workflowID string) (*SessionStats, error)

GetWorkflowStats returns statistics for a specific workflow GetWorkflowStats 返回特定工作流的统计信息

func (*MemoryStorage) ListSessions

func (m *MemoryStorage) ListSessions(ctx context.Context, workflowID string, limit, offset int) ([]*WorkflowSession, error)

ListSessions lists all sessions for a given workflow ListSessions 列出给定工作流的所有会话

func (*MemoryStorage) ListUserSessions

func (m *MemoryStorage) ListUserSessions(ctx context.Context, userID string, limit, offset int) ([]*WorkflowSession, error)

ListUserSessions lists all sessions for a given user ListUserSessions 列出给定用户的所有会话

func (*MemoryStorage) UpdateSession

func (m *MemoryStorage) UpdateSession(ctx context.Context, session *WorkflowSession) error

UpdateSession updates an existing workflow session UpdateSession 更新现有的工作流会话

type Node

type Node interface {
	Execute(ctx context.Context, input *ExecutionContext) (*ExecutionContext, error)
	GetID() string
	GetType() NodeType
}

Node represents a node in the workflow graph

type NodeType

type NodeType string

NodeType represents the type of workflow node

const (
	NodeTypeStep      NodeType = "step"
	NodeTypeCondition NodeType = "condition"
	NodeTypeLoop      NodeType = "loop"
	NodeTypeParallel  NodeType = "parallel"
	NodeTypeRouter    NodeType = "router"
)

type Parallel

type Parallel struct {
	ID    string
	Name  string
	Nodes []Node
}

Parallel represents a node that executes multiple nodes concurrently

func NewParallel

func NewParallel(config ParallelConfig) (*Parallel, error)

NewParallel creates a new parallel node

func (*Parallel) Execute

func (p *Parallel) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)

Execute runs all child nodes in parallel Execute 并行运行所有子节点

func (*Parallel) GetID

func (p *Parallel) GetID() string

GetID returns the parallel node ID

func (*Parallel) GetType

func (p *Parallel) GetType() NodeType

GetType returns the node type

type ParallelConfig

type ParallelConfig struct {
	ID    string
	Name  string
	Nodes []Node
}

ParallelConfig contains parallel configuration

type Router

type Router struct {
	ID     string
	Name   string
	Router RouterFunc
	Routes map[string]Node
}

Router represents a dynamic routing node

func NewRouter

func NewRouter(config RouterConfig) (*Router, error)

NewRouter creates a new router node

func (*Router) Execute

func (r *Router) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)

Execute evaluates the router and executes the selected route

func (*Router) GetID

func (r *Router) GetID() string

GetID returns the router ID

func (*Router) GetType

func (r *Router) GetType() NodeType

GetType returns the node type

type RouterConfig

type RouterConfig struct {
	ID     string
	Name   string
	Router RouterFunc
	Routes map[string]Node
}

RouterConfig contains router configuration

type RouterFunc

type RouterFunc func(ctx *ExecutionContext) string

RouterFunc determines which node to execute

type RunOption

type RunOption func(*runOptions)

RunOption configures workflow run behaviour.

func WithMediaPayload

func WithMediaPayload(payload interface{}) RunOption

WithMediaPayload attaches media payload to the execution context.

func WithMetadata

func WithMetadata(metadata map[string]interface{}) RunOption

WithMetadata injects arbitrary metadata into the execution context.

func WithResumeFrom

func WithResumeFrom(stepID string) RunOption

WithResumeFrom instructs the workflow to resume from the specified step ID.

func WithRunContext

func WithRunContext(rc *run.RunContext) RunOption

WithRunContext injects a caller-provided run context, allowing workflows to reuse correlation identifiers from upstream orchestrators.

func WithSessionState

func WithSessionState(state map[string]interface{}) RunOption

WithSessionState injects a session state snapshot for the execution context.

func WithUserID

func WithUserID(userID string) RunOption

WithUserID sets the user ID for the workflow execution context.

type RunStatus

type RunStatus string

RunStatus represents the status of a workflow run RunStatus 表示工作流运行的状态

const (
	// RunStatusPending indicates the run is pending
	// RunStatusPending 表示运行正在等待
	RunStatusPending RunStatus = "pending"

	// RunStatusRunning indicates the run is in progress
	// RunStatusRunning 表示运行正在进行中
	RunStatusRunning RunStatus = "running"

	// RunStatusCompleted indicates the run completed successfully
	// RunStatusCompleted 表示运行成功完成
	RunStatusCompleted RunStatus = "completed"

	// RunStatusFailed indicates the run failed with an error
	// RunStatusFailed 表示运行失败并出现错误
	RunStatusFailed RunStatus = "failed"

	// RunStatusCancelled indicates the run was cancelled
	// RunStatusCancelled 表示运行被取消
	RunStatusCancelled RunStatus = "cancelled"
)

type SessionFilter

type SessionFilter struct {
	// WorkflowID filters sessions by workflow ID
	// WorkflowID 按工作流 ID 过滤会话
	WorkflowID string

	// UserID filters sessions by user ID
	// UserID 按用户 ID 过滤会话
	UserID string

	// CreatedAfter filters sessions created after this time
	// CreatedAfter 过滤在此时间之后创建的会话
	CreatedAfter time.Time

	// CreatedBefore filters sessions created before this time
	// CreatedBefore 过滤在此时间之前创建的会话
	CreatedBefore time.Time

	// Limit is the maximum number of sessions to return (0 = no limit)
	// Limit 是要返回的最大会话数(0 = 无限制)
	Limit int

	// Offset is the number of sessions to skip
	// Offset 是要跳过的会话数
	Offset int
}

SessionFilter defines filter criteria for querying sessions SessionFilter 定义查询会话的过滤条件

type SessionState

type SessionState struct {
	// contains filtered or unexported fields
}

SessionState provides thread-safe session state management for workflows SessionState 为工作流提供线程安全的会话状态管理

func FromMap

func FromMap(data map[string]interface{}) *SessionState

FromMap creates a session state from a map FromMap 从map创建会话状态

func MergeParallelSessionStates

func MergeParallelSessionStates(original *SessionState, modified []*SessionState) *SessionState

MergeParallelSessionStates merges multiple session states from parallel execution MergeParallelSessionStates 合并来自并行执行的多个会话状态 This function collects all changes from parallel branches and merges them 此函数收集并行分支的所有变更并合并它们

func NewSessionState

func NewSessionState() *SessionState

NewSessionState creates a new session state NewSessionState 创建新的会话状态

func (*SessionState) Clear

func (ss *SessionState) Clear()

Clear removes all keys from the session state Clear 清空会话状态中的所有键

func (*SessionState) Clone

func (ss *SessionState) Clone() *SessionState

Clone creates a deep copy of the session state Clone 创建会话状态的深拷贝

func (*SessionState) Delete

func (ss *SessionState) Delete(key string)

Delete removes a key from the session state Delete 从会话状态中删除键

func (*SessionState) Get

func (ss *SessionState) Get(key string) (interface{}, bool)

Get retrieves a value from the session state Get 从会话状态检索值

func (*SessionState) GetAll

func (ss *SessionState) GetAll() map[string]interface{}

GetAll returns a copy of all session state data GetAll 返回所有会话状态数据的副本

func (*SessionState) Merge

func (ss *SessionState) Merge(other *SessionState)

Merge merges another session state into this one Merge 将另一个会话状态合并到当前状态 Later values overwrite earlier ones (last-write-wins) 后面的值覆盖前面的值(最后写入获胜)

func (*SessionState) Set

func (ss *SessionState) Set(key string, value interface{})

Set stores a value in the session state Set 在会话状态中存储值

func (*SessionState) ToMap

func (ss *SessionState) ToMap() map[string]interface{}

ToMap returns the session state as a regular map (not thread-safe) ToMap 将会话状态作为常规map返回(不是线程安全的)

type SessionStats

type SessionStats struct {
	// TotalSessions is the total number of sessions
	// TotalSessions 是会话的总数
	TotalSessions int

	// TotalRuns is the total number of workflow runs across all sessions
	// TotalRuns 是所有会话中工作流运行的总数
	TotalRuns int

	// CompletedRuns is the number of completed runs
	// CompletedRuns 是已完成运行的数量
	CompletedRuns int

	// SuccessfulRuns is the number of successful runs
	// SuccessfulRuns 是成功运行的数量
	SuccessfulRuns int

	// FailedRuns is the number of failed runs
	// FailedRuns 是失败运行的数量
	FailedRuns int

	// AverageDuration is the average duration of completed runs
	// AverageDuration 是已完成运行的平均持续时间
	AverageDuration time.Duration
}

SessionStats provides statistics about workflow sessions SessionStats 提供有关工作流会话的统计信息

type Step

type Step struct {
	ID          string
	Name        string
	Agent       *agent.Agent
	Description string
	// contains filtered or unexported fields
}

Step represents a basic workflow step that executes an agent Step 代表执行 agent 的基本工作流步骤

func NewStep

func NewStep(config StepConfig) (*Step, error)

NewStep creates a new step

func (*Step) Execute

func (s *Step) Execute(ctx context.Context, execCtx *ExecutionContext) (*ExecutionContext, error)

Execute runs the step Execute 执行步骤

func (*Step) GetID

func (s *Step) GetID() string

GetID returns the step ID

func (*Step) GetType

func (s *Step) GetType() NodeType

GetType returns the node type

type StepConfig

type StepConfig struct {
	ID          string
	Name        string
	Agent       *agent.Agent
	Description string

	// AddHistoryToStep enables/disables history for this step
	// nil means inherit from workflow
	// AddHistoryToStep 为此步骤启用/禁用历史
	// nil 表示从 workflow 继承
	AddHistoryToStep *bool `json:"add_history_to_step,omitempty"`

	// NumHistoryRuns specifies history count for this step
	// nil means use workflow default
	// NumHistoryRuns 指定此步骤的历史数量
	// nil 表示使用 workflow 默认值
	NumHistoryRuns *int `json:"num_history_runs,omitempty"`
}

StepConfig contains step configuration StepConfig 包含步骤配置

type StorageStats

type StorageStats interface {
	// GetStats returns statistics about the stored sessions
	// GetStats 返回有关存储会话的统计信息
	GetStats(ctx context.Context) (*SessionStats, error)

	// GetWorkflowStats returns statistics for a specific workflow
	// GetWorkflowStats 返回特定工作流的统计信息
	GetWorkflowStats(ctx context.Context, workflowID string) (*SessionStats, error)
}

StorageStats is an optional interface for storage implementations that support statistics StorageStats 是支持统计信息的存储实现的可选接口

type Workflow

type Workflow struct {
	ID    string
	Name  string
	Steps []Node
	// contains filtered or unexported fields
}

Workflow represents a multi-step process Workflow 代表多步骤流程

func New

func New(config Config) (*Workflow, error)

New creates a new workflow New 创建新的工作流

func (*Workflow) AddStep

func (w *Workflow) AddStep(step Node)

AddStep adds a step to the workflow

func (*Workflow) Run

func (w *Workflow) Run(ctx context.Context, input string, sessionID string, opts ...RunOption) (*ExecutionContext, error)

Run executes the workflow. sessionID 参数可选,为空则自动生成。

type WorkflowHistoryConfig

type WorkflowHistoryConfig struct {
	// AddHistoryToSteps enables automatic history injection to all steps
	// AddHistoryToSteps 启用自动向所有步骤注入历史
	AddHistoryToSteps bool `json:"add_history_to_steps"`

	// NumHistoryRuns is the default number of history runs to include
	// NumHistoryRuns 是默认包含的历史运行数量
	NumHistoryRuns int `json:"num_history_runs"`
}

WorkflowHistoryConfig contains workflow-level history configuration WorkflowHistoryConfig 包含工作流级别的历史配置

type WorkflowMetrics

type WorkflowMetrics struct {
	// contains filtered or unexported fields
}

WorkflowMetrics tracks runtime metrics for a workflow execution. The structure is safe for concurrent access as workflow steps may run in parallel and attempt to query duration information while the workflow is still running.

func NewWorkflowMetrics

func NewWorkflowMetrics() *WorkflowMetrics

NewWorkflowMetrics constructs a metrics tracker with zeroed values.

func (*WorkflowMetrics) Duration

func (m *WorkflowMetrics) Duration() time.Duration

Duration returns the current or final duration. When the workflow is still running it reports the elapsed time since Start was called.

func (*WorkflowMetrics) Snapshot

func (m *WorkflowMetrics) Snapshot() map[string]interface{}

Snapshot returns a copy of the metrics in a map that can be safely embedded into metadata payloads.

func (*WorkflowMetrics) Start

func (m *WorkflowMetrics) Start()

Start marks the workflow as started. Subsequent calls are no-ops so callers do not need to guard repeated invocations.

func (*WorkflowMetrics) Stop

func (m *WorkflowMetrics) Stop() time.Duration

Stop marks the workflow as completed and returns the final duration. The method is idempotent; calling Stop multiple times returns the same duration.

type WorkflowRun

type WorkflowRun struct {
	// RunID is the unique identifier for this run
	// RunID 是此次运行的唯一标识符
	RunID string `json:"run_id"`

	// SessionID is the session this run belongs to
	// SessionID 是此次运行所属的会话 ID
	SessionID string `json:"session_id"`

	// WorkflowID is the ID of the workflow that was executed
	// WorkflowID 是被执行的工作流的 ID
	WorkflowID string `json:"workflow_id"`

	// Input is the input provided to the workflow
	// Input 是提供给工作流的输入
	Input string `json:"input"`

	// Output is the final output of the workflow
	// Output 是工作流的最终输出
	Output string `json:"output"`

	// Messages contains the conversation history for this run
	// Messages 包含此次运行的对话历史
	Messages []*types.Message `json:"messages,omitempty"`

	// Status indicates the current status of the run
	// Status 表示运行的当前状态
	Status RunStatus `json:"status"`

	// Error contains error message if the run failed
	// Error 包含运行失败时的错误信息
	Error string `json:"error,omitempty"`

	// StartedAt is the timestamp when the run started
	// StartedAt 是运行开始的时间戳
	StartedAt time.Time `json:"started_at"`

	// CompletedAt is the timestamp when the run completed
	// CompletedAt 是运行完成的时间戳
	CompletedAt time.Time `json:"completed_at,omitempty"`

	// Metadata contains additional metadata for this run
	// Metadata 包含此次运行的额外元数据
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// CancellationReason provides context when run is cancelled.
	CancellationReason string `json:"cancellation_reason,omitempty"`

	// CancellationSnapshot stores the last known snapshot before cancellation.
	CancellationSnapshot map[string]interface{} `json:"cancellation_snapshot,omitempty"`

	// LastStepID indicates the last step identifier processed before cancellation/failure.
	LastStepID string `json:"last_step_id,omitempty"`

	// ResumedFrom records the step the run resumed from, if applicable.
	ResumedFrom string `json:"resumed_from,omitempty"`

	// Events captures structured run output events for observability.
	Events run.Events `json:"events,omitempty"`
}

WorkflowRun represents a single execution of a workflow WorkflowRun 表示工作流的单次执行记录

func NewWorkflowRun

func NewWorkflowRun(runID, sessionID, workflowID, input string) *WorkflowRun

NewWorkflowRun creates a new workflow run with the given parameters NewWorkflowRun 使用给定参数创建新的工作流运行记录

func (*WorkflowRun) AddEvents

func (r *WorkflowRun) AddEvents(events run.Events)

AddEvents appends structured run events to the workflow run metadata.

func (*WorkflowRun) AddMessage

func (r *WorkflowRun) AddMessage(msg *types.Message)

AddMessage adds a message to the run's conversation history AddMessage 将消息添加到运行的对话历史中

func (*WorkflowRun) ApplyCancellation

func (r *WorkflowRun) ApplyCancellation(reason, stepID string, snapshot map[string]interface{})

ApplyCancellation enriches cancellation metadata.

func (*WorkflowRun) Duration

func (r *WorkflowRun) Duration() time.Duration

Duration returns the duration of the run Duration 返回运行的持续时间

func (*WorkflowRun) IsCompleted

func (r *WorkflowRun) IsCompleted() bool

IsCompleted returns true if the run has completed (success, failure, or cancelled) IsCompleted 如果运行已完成(成功、失败或取消)则返回 true

func (*WorkflowRun) IsSuccessful

func (r *WorkflowRun) IsSuccessful() bool

IsSuccessful returns true if the run completed successfully IsSuccessful 如果运行成功完成则返回 true

func (*WorkflowRun) MarkCancelled

func (r *WorkflowRun) MarkCancelled()

MarkCancelled marks the run as cancelled MarkCancelled 将运行标记为已取消

func (*WorkflowRun) MarkCompleted

func (r *WorkflowRun) MarkCompleted(output string)

MarkCompleted marks the run as completed with the given output MarkCompleted 使用给定输出将运行标记为已完成

func (*WorkflowRun) MarkFailed

func (r *WorkflowRun) MarkFailed(err error)

MarkFailed marks the run as failed with the given error MarkFailed 使用给定错误将运行标记为失败

func (*WorkflowRun) MarkStarted

func (r *WorkflowRun) MarkStarted()

MarkStarted marks the run as started MarkStarted 将运行标记为已开始

type WorkflowSession

type WorkflowSession struct {

	// SessionID is the unique identifier for this session
	// SessionID 是此会话的唯一标识符
	SessionID string `json:"session_id"`

	// WorkflowID is the ID of the workflow this session belongs to
	// WorkflowID 是此会话所属工作流的 ID
	WorkflowID string `json:"workflow_id"`

	// UserID is the user who owns this session
	// UserID 是拥有此会话的用户
	UserID string `json:"user_id,omitempty"`

	// Runs contains all workflow runs in this session
	// Runs 包含此会话中的所有工作流运行
	Runs []*WorkflowRun `json:"runs"`

	// CreatedAt is the timestamp when the session was created
	// CreatedAt 是会话创建的时间戳
	CreatedAt time.Time `json:"created_at"`

	// UpdatedAt is the timestamp when the session was last updated
	// UpdatedAt 是会话最后更新的时间戳
	UpdatedAt time.Time `json:"updated_at"`

	// Metadata contains additional session metadata
	// Metadata 包含额外的会话元数据
	Metadata map[string]interface{} `json:"metadata,omitempty"`

	// Cancellations captures cancellation snapshots for later recovery.
	Cancellations []*CancellationRecord `json:"cancellations,omitempty"`
	// contains filtered or unexported fields
}

WorkflowSession manages multiple workflow runs for a single session WorkflowSession 管理单个会话的多个工作流运行

func NewWorkflowSession

func NewWorkflowSession(sessionID, workflowID, userID string) *WorkflowSession

NewWorkflowSession creates a new workflow session NewWorkflowSession 创建新的工作流会话

func (*WorkflowSession) AddCancellation

func (s *WorkflowSession) AddCancellation(record *CancellationRecord)

AddCancellation records a cancellation snapshot.

func (*WorkflowSession) AddRun

func (s *WorkflowSession) AddRun(run *WorkflowRun)

AddRun adds a workflow run to the session AddRun 将工作流运行添加到会话

func (*WorkflowSession) Clear

func (s *WorkflowSession) Clear()

Clear removes all runs from the session Clear 移除会话中的所有运行

func (*WorkflowSession) CountCompletedRuns

func (s *WorkflowSession) CountCompletedRuns() int

CountCompletedRuns returns the number of completed runs CountCompletedRuns 返回已完成运行的数量

func (*WorkflowSession) CountFailedRuns

func (s *WorkflowSession) CountFailedRuns() int

CountFailedRuns returns the number of failed runs CountFailedRuns 返回失败运行的数量

func (*WorkflowSession) CountRuns

func (s *WorkflowSession) CountRuns() int

CountRuns returns the total number of runs in the session CountRuns 返回会话中运行的总数

func (*WorkflowSession) CountSuccessfulRuns

func (s *WorkflowSession) CountSuccessfulRuns() int

CountSuccessfulRuns returns the number of successful runs CountSuccessfulRuns 返回成功运行的数量

func (*WorkflowSession) GetCancellations

func (s *WorkflowSession) GetCancellations() []*CancellationRecord

GetCancellations returns a copy of cancellation records.

func (*WorkflowSession) GetHistory

func (s *WorkflowSession) GetHistory(numRuns int) []HistoryEntry

GetHistory returns the most recent N completed runs as history entries GetHistory 返回最近 N 个已完成的运行作为历史条目

func (*WorkflowSession) GetHistoryContext

func (s *WorkflowSession) GetHistoryContext(numRuns int) string

GetHistoryContext returns formatted workflow history context for agent use GetHistoryContext 返回格式化的工作流历史上下文供 agent 使用

func (*WorkflowSession) GetHistoryMessages

func (s *WorkflowSession) GetHistoryMessages(numRuns int) []*types.Message

GetHistoryMessages returns conversation messages from recent runs GetHistoryMessages 返回最近运行的对话消息

func (*WorkflowSession) GetLastRun

func (s *WorkflowSession) GetLastRun() *WorkflowRun

GetLastRun returns the most recent run in the session GetLastRun 返回会话中最近的运行

func (*WorkflowSession) GetMetadata

func (s *WorkflowSession) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves a metadata value GetMetadata 检索元数据值

func (*WorkflowSession) GetRuns

func (s *WorkflowSession) GetRuns() []*WorkflowRun

GetRuns returns all runs in the session (thread-safe copy) GetRuns 返回会话中的所有运行(线程安全副本)

func (*WorkflowSession) SetMetadata

func (s *WorkflowSession) SetMetadata(key string, value interface{})

SetMetadata sets a metadata value SetMetadata 设置元数据值

type WorkflowStorage

type WorkflowStorage interface {
	// CreateSession creates a new workflow session
	// CreateSession 创建新的工作流会话
	// Returns ErrSessionExists if a session with the same ID already exists
	// 如果具有相同 ID 的会话已存在则返回 ErrSessionExists
	CreateSession(ctx context.Context, sessionID, workflowID, userID string) (*WorkflowSession, error)

	// GetSession retrieves a workflow session by ID
	// GetSession 通过 ID 检索工作流会话
	// Returns ErrSessionNotFound if the session does not exist
	// 如果会话不存在则返回 ErrSessionNotFound
	GetSession(ctx context.Context, sessionID string) (*WorkflowSession, error)

	// UpdateSession updates an existing workflow session
	// UpdateSession 更新现有的工作流会话
	// Returns ErrSessionNotFound if the session does not exist
	// 如果会话不存在则返回 ErrSessionNotFound
	UpdateSession(ctx context.Context, session *WorkflowSession) error

	// DeleteSession deletes a workflow session by ID
	// DeleteSession 通过 ID 删除工作流会话
	// Returns ErrSessionNotFound if the session does not exist
	// 如果会话不存在则返回 ErrSessionNotFound
	DeleteSession(ctx context.Context, sessionID string) error

	// ListSessions lists all sessions for a given workflow
	// ListSessions 列出给定工作流的所有会话
	// Returns empty slice if no sessions found
	// 如果未找到会话则返回空切片
	ListSessions(ctx context.Context, workflowID string, limit, offset int) ([]*WorkflowSession, error)

	// ListUserSessions lists all sessions for a given user
	// ListUserSessions 列出给定用户的所有会话
	// Returns empty slice if no sessions found
	// 如果未找到会话则返回空切片
	ListUserSessions(ctx context.Context, userID string, limit, offset int) ([]*WorkflowSession, error)

	// Clear removes all sessions older than the specified duration
	// Clear 删除早于指定持续时间的所有会话
	Clear(ctx context.Context, olderThan time.Duration) (int, error)

	// Close closes the storage and releases any resources
	// Close 关闭存储并释放任何资源
	Close() error
}

WorkflowStorage defines the interface for storing and retrieving workflow sessions WorkflowStorage 定义用于存储和检索工作流会话的接口

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL