Documentation
¶
Overview ¶
包 hitl 提供 Human-in-the-Loop 工作流中断与恢复能力。
概述 ¶
hitl 用于在 Agent 执行过程中注入人工确认节点。当工作流到达需要 人工介入的关键决策点时,InterruptManager 会暂停执行并等待人类 响应,支持审批、补充输入、代码审查、断点调试和错误处理五种中断 类型,适用于高风险决策和关键业务流程。
核心接口 ¶
- InterruptStore:中断持久化存储接口,定义 Save / Load / List / Update 四个方法,包内提供 InMemoryInterruptStore 默认实现
- InterruptHandler:中断事件回调函数,用于通知外部系统(如 UI、 消息队列)有新的中断等待处理
主要能力 ¶
- InterruptManager:中断管理器核心,负责创建中断、等待响应、 解决/拒绝/取消/超时处理,内部通过 channel 实现阻塞等待。 使用 NewInterruptManager 创建实例
- 五种中断类型(InterruptType):Approval(审批)、Input(补充输入)、 Review(审查)、Breakpoint(断点)、Error(错误处理)
- 五种中断状态(InterruptStatus):Pending(等待中)、Resolved(已解决)、 Rejected(已拒绝)、Timeout(已超时)、Canceled(已取消)
- 超时机制:每个中断可配置独立超时(默认 24 小时),超时后自动 标记状态并释放等待的 goroutine
- 并发安全:所有操作通过 RWMutex 保护,resolveOnce 确保中断 只被解决一次
数据类型 ¶
- Interrupt:中断核心数据结构,包含工作流 ID、节点 ID、中断类型、 状态、可选项列表、输入 JSON Schema、人工响应等字段
- InterruptOptions:中断创建参数,包括 WorkflowID、NodeID、Type、 Title、Description、Timeout、CheckpointID 和自定义 Metadata
- Option:审批中断的可选项,包含 ID、Label、Description 和 IsDefault
- Response:人工对中断的响应,包含 OptionID、Input、Comment、 Approved 标志、UserID 和 Metadata
与其他包协同 ¶
- agent/streaming:流式交互中可在关键节点插入人工中断
- agent/voice:语音 Agent 的高风险指令可触发审批流程
- agent/guardrails:安全校验失败时可升级为人工审查中断
Index ¶
- type InMemoryInterruptStore
- func (s *InMemoryInterruptStore) List(ctx context.Context, workflowID string, status InterruptStatus) ([]*Interrupt, error)
- func (s *InMemoryInterruptStore) Load(ctx context.Context, interruptID string) (*Interrupt, error)
- func (s *InMemoryInterruptStore) Save(ctx context.Context, interrupt *Interrupt) error
- func (s *InMemoryInterruptStore) Update(ctx context.Context, interrupt *Interrupt) error
- type Interrupt
- type InterruptHandler
- type InterruptManager
- func (m *InterruptManager) CancelInterrupt(ctx context.Context, interruptID string) error
- func (m *InterruptManager) CreateInterrupt(ctx context.Context, opts InterruptOptions) (*Response, error)
- func (m *InterruptManager) GetPendingInterrupts(workflowID string) []*Interrupt
- func (m *InterruptManager) RegisterHandler(interruptType InterruptType, handler InterruptHandler)
- func (m *InterruptManager) ResolveInterrupt(ctx context.Context, interruptID string, response *Response) error
- type InterruptOptions
- type InterruptStatus
- type InterruptStore
- type InterruptType
- type Option
- type Response
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InMemoryInterruptStore ¶
type InMemoryInterruptStore struct {
// contains filtered or unexported fields
}
在MemoryInterruptStore中为中断提供内存.
func NewInMemoryInterruptStore ¶
func NewInMemoryInterruptStore() *InMemoryInterruptStore
New InMemory InterruptStore 创建了新的内存中断商店.
func (*InMemoryInterruptStore) List ¶
func (s *InMemoryInterruptStore) List(ctx context.Context, workflowID string, status InterruptStatus) ([]*Interrupt, error)
type Interrupt ¶
type Interrupt struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
NodeID string `json:"node_id"`
Type InterruptType `json:"type"`
Status InterruptStatus `json:"status"`
Title string `json:"title"`
Description string `json:"description"`
Data any `json:"data,omitempty"`
Options []Option `json:"options,omitempty"`
InputSchema json.RawMessage `json:"input_schema,omitempty"`
Response *Response `json:"response,omitempty"`
CreatedAt time.Time `json:"created_at"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
Timeout time.Duration `json:"timeout"`
CheckpointID string `json:"checkpoint_id,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
中断代表工作流程中断点.
type InterruptHandler ¶
中断汉德勒处理中断事件.
type InterruptManager ¶
type InterruptManager struct {
// contains filtered or unexported fields
}
中断管理者管理工作流程中断 。
func NewInterruptManager ¶
func NewInterruptManager(store InterruptStore, logger *zap.Logger) *InterruptManager
新干扰管理器创建了新的中断管理器 。
func (*InterruptManager) CancelInterrupt ¶
func (m *InterruptManager) CancelInterrupt(ctx context.Context, interruptID string) error
取消中断取消待决中断 。
func (*InterruptManager) CreateInterrupt ¶
func (m *InterruptManager) CreateInterrupt(ctx context.Context, opts InterruptOptions) (*Response, error)
创建中断创建并等待中断解决 。
func (*InterruptManager) GetPendingInterrupts ¶
func (m *InterruptManager) GetPendingInterrupts(workflowID string) []*Interrupt
获得待定 中断返回工作流程中所有待处理中断 。
func (*InterruptManager) RegisterHandler ¶
func (m *InterruptManager) RegisterHandler(interruptType InterruptType, handler InterruptHandler)
登记 Handler 为中断类型登记处理器 。
func (*InterruptManager) ResolveInterrupt ¶
func (m *InterruptManager) ResolveInterrupt(ctx context.Context, interruptID string, response *Response) error
解析中断解决待决中断 。
type InterruptOptions ¶
type InterruptOptions struct {
WorkflowID string
NodeID string
Type InterruptType
Title string
Description string
Data any
Options []Option
InputSchema json.RawMessage
Timeout time.Duration
CheckpointID string
Metadata map[string]any
}
中断选项配置中断创建 。
type InterruptStatus ¶
type InterruptStatus string
中断状态代表中断状态.
const ( InterruptStatusPending InterruptStatus = "pending" InterruptStatusResolved InterruptStatus = "resolved" InterruptStatusRejected InterruptStatus = "rejected" InterruptStatusTimeout InterruptStatus = "timeout" InterruptStatusCanceled InterruptStatus = "canceled" )
type InterruptStore ¶
type InterruptStore interface {
Save(ctx context.Context, interrupt *Interrupt) error
Load(ctx context.Context, interruptID string) (*Interrupt, error)
List(ctx context.Context, workflowID string, status InterruptStatus) ([]*Interrupt, error)
Update(ctx context.Context, interrupt *Interrupt) error
}
InterruptStore定义了中断的存储接口.
type InterruptType ¶
type InterruptType string
中断Type定义了工作流程中断的类型.
const ( InterruptTypeApproval InterruptType = "approval" InterruptTypeInput InterruptType = "input" InterruptTypeReview InterruptType = "review" InterruptTypeBreakpoint InterruptType = "breakpoint" InterruptTypeError InterruptType = "error" )
type Option ¶
type Option struct {
ID string `json:"id"`
Label string `json:"label"`
Description string `json:"description,omitempty"`
IsDefault bool `json:"is_default,omitempty"`
}
备选办法是可选择的核准中断的备选办法。
type Response ¶
type Response struct {
OptionID string `json:"option_id,omitempty"`
Input any `json:"input,omitempty"`
Comment string `json:"comment,omitempty"`
Approved bool `json:"approved"`
Timestamp time.Time `json:"timestamp"`
UserID string `json:"user_id,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
反应代表了人类对中断的反应。
Click to show internal directories.
Click to hide internal directories.