reactloops

package
v1.4.4-alpha1031 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2025 License: AGPL-3.0 Imports: 16 Imported by: 0

README

ReActLoop 模块使用说明

模块概述

reactloops 是 Yak AI 框架中的核心模块,实现了 ReAct (Reasoning and Acting) 循环执行逻辑。该模块负责:

  1. 循环执行: 通过迭代调用 AI 模型,执行多步骤任务
  2. 动作管理: 注册和执行各种动作(Actions)
  3. 状态控制: 管理任务状态转换(Pending, Processing, Completed, Aborted)
  4. 异步支持: 支持同步和异步模式的动作执行
  5. 流处理: 处理 AI 输出流,支持标签提取和镜像

核心组件

1. ReActLoop

主要的循环执行器,负责协调整个执行流程。

type ReActLoop struct {
    loopName    string
    config      AIInvokeRuntime
    actions     *utils.OrderedMap[string, *LoopAction]
    maxIterations int
    // ... 其他字段
}
2. LoopAction

定义可执行的动作。

type LoopAction struct {
    ActionType      string
    Description     string
    AsyncMode       bool
    ActionVerifier  ActionVerifyHandler    // 验证动作参数
    ActionHandler   ActionHandler          // 执行动作逻辑
}
3. LoopActionHandlerOperator

动作处理器中的操作符,用于控制循环流程。

type LoopActionHandlerOperator struct {
    // 提供以下方法:
    // - Continue()              // 继续下一次迭代
    // - Fail(reason)           // 失败并终止
    // - Feedback(message)      // 记录反馈
    // - DisallowNextLoopExit() // 禁止下一次迭代退出
}

使用方法

基本使用流程
// 1. 创建 ReActLoop
loop, err := NewReActLoop("my-loop", runtime, 
    WithMaxIterations(10),
    WithOnTaskCreated(func(task AIStatefulTask) {
        // 任务创建回调
    }),
)

// 2. 注册动作
loop.actions.Set("custom_action", &LoopAction{
    ActionType:  "custom_action",
    Description: "执行自定义操作",
    AsyncMode:   false,
    ActionVerifier: func(loop *ReActLoop, action *Action) error {
        // 验证动作参数
        return nil
    },
    ActionHandler: func(loop *ReActLoop, action *Action, operator *LoopActionHandlerOperator) {
        // 执行动作逻辑
        operator.Continue() // 或 operator.Fail()
    },
})

// 3. 执行循环
err = loop.Execute("task-id", context.Background(), "用户输入")
内置动作

模块提供了两个内置动作:

  1. 直接回答 (directly_answer): AI 直接回答用户问题
  2. 结束 (finish): 完成任务并退出循环
核心文件说明
exec.go - 核心执行逻辑

主要函数:

  • Execute(): 创建任务并执行循环
  • ExecuteWithExistedTask(): 使用已有任务执行循环
  • createMirrors(): 创建流镜像处理器

关键流程:

  1. 状态管理:taskStartProcessing()complete() / abort()
  2. Prompt 生成:调用 generateLoopPrompt()
  3. AI 调用:通过 CallAITransaction() 调用 AI
  4. 动作提取:从流中提取 next_action
  5. 动作验证:ActionVerifier
  6. 动作执行:ActionHandler
prompt.go - Prompt 生成

主要函数:

  • generateLoopPrompt(): 生成循环提示词
  • generateSchemaString(): 生成动作 Schema

Prompt 组成:

  • 背景信息(Background)
  • 持久化指令(PersistentContext)
  • 输出示例(OutputExample)
  • 响应式数据(ReactiveData)- 包含反馈
  • 用户查询(UserQuery)
  • 动作 Schema(Schema)

状态转换

任务状态转换流程:

Created → Processing → Completed/Aborted
  • Created: 任务创建
  • Processing: 正在处理
  • Completed: 成功完成
  • Aborted: 失败中止

同步 vs 异步模式

同步模式 (AsyncMode = false)
  • 动作在主循环中执行完成
  • 状态由循环控制
  • 执行完立即进入下一次迭代
异步模式 (AsyncMode = true)
  • 动作触发后立即返回
  • 状态由异步回调控制
  • 通过 WithOnAsyncTaskTrigger() 注册异步回调
  • 主循环不会自动进入下一次迭代

反馈机制

通过 operator.Feedback() 记录反馈,反馈会在下一次迭代时通过 ReactiveData 传递给 AI:

ActionHandler: func(loop *ReActLoop, action *Action, operator *LoopActionHandlerOperator) {
    operator.Feedback("步骤1完成,发现3个问题")
    operator.Continue()
}

Stream 处理和 Mirror

AI Tag 字段

通过注册 AI Tag 字段,可以从 AI 输出中提取特定标签内容:

loop.aiTagFields.Set("yaklang-code", &LoopAITagField{
    TagName:      "yaklang-code",
    VariableName: "generated_code",
})

当 AI 输出包含 <yaklang-code>...</yaklang-code> 时,内容会被提取并存储到 generated_code 变量中。

Stream 字段

注册流字段可以实时处理 JSON 中的特定字段:

loop.streamFields.Set("thought", &LoopStreamField{
    FieldName: "thought",
    Prefix:    "思考",
})

测试说明

测试策略

本模块的测试采用 mock AI response 驱动的方式:

  1. Mock Runtime: 模拟 AIInvokeRuntime
  2. Mock Response: 模拟 AI 返回的 JSON 格式动作
  3. 验证状态: 检查任务状态转换
  4. 验证行为: 检查动作处理器的调用
测试覆盖

主要测试场景:

  1. ✅ 基本执行流程
  2. ✅ 最大迭代次数限制
  3. ✅ 异步/同步模式
  4. ✅ ActionVerifier 和 ActionHandler
  5. ✅ 状态转换(Processing, Completed, Aborted)
  6. ✅ 错误处理和 Panic 恢复
  7. ✅ 反馈机制
  8. ✅ Prompt 生成
  9. ✅ Stream 处理和 Mirror
  10. ✅ 禁止退出循环
运行测试
# 运行所有测试
go test ./common/ai/aid/aireact/reactloops -v

# 运行特定测试
go test ./common/ai/aid/aireact/reactloops -v -run TestExecute_BasicFlow

# 查看覆盖率
go test ./common/ai/aid/aireact/reactloops -coverprofile=coverage.out
go tool cover -html=coverage.out
Mock AI Response 示例
runtime.callAIFunc = func(prompt string) (*AIResponse, error) {
    resp := aicommon.NewUnboundAIResponse()
    resp.SetTaskIndex("test-1")
    resp.EmitOutputStream(strings.NewReader(`{
        "thought": "我需要执行这个操作",
        "next_action": {
            "type": "custom_action",
            "params": {"key": "value"}
        }
    }`))
    resp.Close()
    return resp, nil
}

注意事项

  1. 迭代限制: 默认最大迭代次数为 100,可通过 WithMaxIterations() 自定义
  2. Emitter 必需: 必须提供有效的 Emitter,否则执行会失败
  3. 动作注册: 所有使用的动作必须提前注册
  4. Panic 恢复: 循环内部有 panic 恢复机制,会将任务标记为 Aborted
  5. 异步模式: 异步模式下,主循环不会自动完成任务,需要在异步回调中手动设置状态

最佳实践

  1. 错误处理: ActionVerifier 中进行参数验证,ActionHandler 中进行业务逻辑处理
  2. 反馈信息: 使用 operator.Feedback() 提供详细的执行信息,帮助 AI 做出更好的决策
  3. 状态管理: 正确使用 Continue()Fail() 控制循环流程
  4. 调试日志: 使用 common/log 包输出调试信息
  5. 测试覆盖: 为自定义动作编写单元测试,确保逻辑正确

常见问题

Q: 循环无限执行怎么办?

A: 设置合理的 maxIterations,并确保动作正确调用 Continue()Fail()

Q: 如何调试 Prompt?

A: 可以在 generateLoopPrompt() 中打印或记录生成的 prompt

Q: 异步模式下如何完成任务?

A: 在 WithOnAsyncTaskTrigger() 回调中手动调用 task.SetStatus(AITaskState_Completed)

Q: 如何处理超时?

A: 通过 context.WithTimeout() 创建带超时的 context 传递给 Execute()

相关文档

  • AI Tag 解析:common/ai/aid/aicommon/aitag/
  • 动作提取:common/ai/aid/aicommon/action_extractor.go
  • Timeline 管理:common/ai/aid/aicommon/timeline.go

维护者: Yaklang Team
最后更新: 2024年

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertReActLoopFactoryToActionFactory

func ConvertReActLoopFactoryToActionFactory(
	name string,
	factory LoopFactory,
) func(r aicommon.AIInvokeRuntime) (*LoopAction, error)

func RegisterAction

func RegisterAction(action *LoopAction)

func RegisterLoopFactory

func RegisterLoopFactory(
	name string,
	creator LoopFactory,
) error

Types

type ActionReflection

type ActionReflection struct {
	// 基本信息
	ActionType    string                 `json:"action_type"`
	ActionParams  map[string]interface{} `json:"action_params"`
	ExecutionTime time.Duration          `json:"execution_time"`
	IterationNum  int                    `json:"iteration_num"`
	Success       bool                   `json:"success"`
	ErrorMessage  string                 `json:"error_message,omitempty"`

	// 环境影响
	EnvironmentalImpact *EnvironmentalImpact `json:"environmental_impact,omitempty"`

	// 学习内容
	LearningInsights    []string  `json:"learning_insights,omitempty"`
	FutureSuggestions   []string  `json:"future_suggestions,omitempty"`
	ImpactAssessment    string    `json:"impact_assessment,omitempty"`
	EffectivenessRating string    `json:"effectiveness_rating,omitempty"`
	ReflectionLevel     string    `json:"reflection_level"`
	ReflectionTimestamp time.Time `json:"reflection_timestamp"`
}

ActionReflection 存储单次行动的反思结果

func (*ActionReflection) Dump

func (r *ActionReflection) Dump(nonce string) string

Dump 生成适合放入 Prompt 的格式化字符串(使用 nonce 保护)

func (*ActionReflection) ToMemoryContent

func (r *ActionReflection) ToMemoryContent() string

ToMemoryContent 转换为适合保存到 aimem 的内容格式

type ContextProviderFunc

type ContextProviderFunc func(loop *ReActLoop, nonce string) (string, error)

type EnvironmentalImpact

type EnvironmentalImpact struct {
	StateChanges    []string               `json:"state_changes"`
	ResourceUsage   map[string]interface{} `json:"resource_usage"`
	SideEffects     []string               `json:"side_effects"`
	PositiveEffects []string               `json:"positive_effects"`
	NegativeEffects []string               `json:"negative_effects"`
}

EnvironmentalImpact 环境影响分析

type FeedbackProviderFunc

type FeedbackProviderFunc func(loop *ReActLoop, feedback *bytes.Buffer, nonce string) (string, error)

type LoopAITagField

type LoopAITagField struct {
	TagName      string
	VariableName string
	AINodeId     string
}

type LoopAction

type LoopAction struct {
	// plan 与 forge executor 会允许支持异步执行,异步情况下仍然允许对话和其他功能
	AsyncMode         bool
	ActionType        string `json:"type"`
	Description       string `json:"description"`
	Options           []aitool.ToolOption
	ActionVerifier    LoopActionVerifierFunc
	ActionHandler     LoopActionHandlerFunc
	StreamFields      []*LoopStreamField
	AITagStreamFields []*LoopAITagField
}

func GetLoopAction

func GetLoopAction(name string) (*LoopAction, bool)

type LoopActionFactory

type LoopActionFactory func(r aicommon.AIInvokeRuntime) (*LoopAction, error)

type LoopActionHandlerFunc

type LoopActionHandlerFunc func(loop *ReActLoop, action *aicommon.Action, operator *LoopActionHandlerOperator)

type LoopActionHandlerOperator

type LoopActionHandlerOperator struct {
	// contains filtered or unexported fields
}

func (*LoopActionHandlerOperator) Continue

func (l *LoopActionHandlerOperator) Continue()

func (*LoopActionHandlerOperator) DisallowNextLoopExit

func (l *LoopActionHandlerOperator) DisallowNextLoopExit()

func (*LoopActionHandlerOperator) Exit

func (l *LoopActionHandlerOperator) Exit()

func (*LoopActionHandlerOperator) Fail

func (l *LoopActionHandlerOperator) Fail(i any)

func (*LoopActionHandlerOperator) Feedback

func (l *LoopActionHandlerOperator) Feedback(i any)

func (*LoopActionHandlerOperator) GetContext

func (r *LoopActionHandlerOperator) GetContext() context.Context

func (*LoopActionHandlerOperator) GetDisallowLoopExit

func (l *LoopActionHandlerOperator) GetDisallowLoopExit() bool

func (*LoopActionHandlerOperator) GetFeedback

func (l *LoopActionHandlerOperator) GetFeedback() *bytes.Buffer

func (*LoopActionHandlerOperator) GetReflectionData

func (l *LoopActionHandlerOperator) GetReflectionData() map[string]interface{}

GetReflectionData 获取自定义反思数据

func (*LoopActionHandlerOperator) GetReflectionLevel

func (l *LoopActionHandlerOperator) GetReflectionLevel() ReflectionLevel

GetReflectionLevel 获取当前设置的反思级别

func (*LoopActionHandlerOperator) GetTask

func (*LoopActionHandlerOperator) IsContinued

func (l *LoopActionHandlerOperator) IsContinued() bool

func (*LoopActionHandlerOperator) IsTerminated

func (l *LoopActionHandlerOperator) IsTerminated() (bool, error)

func (*LoopActionHandlerOperator) MarkSilence

func (l *LoopActionHandlerOperator) MarkSilence(i ...bool)

func (*LoopActionHandlerOperator) SetReflectionData

func (l *LoopActionHandlerOperator) SetReflectionData(key string, value interface{})

SetReflectionData 设置自定义反思数据 action 可以添加额外的上下文信息用于反思分析

func (*LoopActionHandlerOperator) SetReflectionLevel

func (l *LoopActionHandlerOperator) SetReflectionLevel(level ReflectionLevel)

SetReflectionLevel 设置该 action 执行后的反思级别 action 可以在执行过程中根据实际情况动态设置反思级别

type LoopActionVerifierFunc

type LoopActionVerifierFunc func(loop *ReActLoop, action *aicommon.Action) error

type LoopFactory

type LoopFactory func(r aicommon.AIInvokeRuntime, opts ...ReActLoopOption) (*ReActLoop, error)

func GetLoopFactory

func GetLoopFactory(name string) (LoopFactory, bool)

type LoopStreamField

type LoopStreamField struct {
	FieldName string
	AINodeId  string
	Prefix    string
}

type ReActLoop

type ReActLoop struct {
	// contains filtered or unexported fields
}

func CreateLoopByName

func CreateLoopByName(name string, invoker aicommon.AIInvokeRuntime, opts ...ReActLoopOption) (*ReActLoop, error)

func NewReActLoop

func NewReActLoop(name string, invoker aicommon.AIInvokeRuntime, options ...ReActLoopOption) (*ReActLoop, error)

func (*ReActLoop) DisallowAskForClarification

func (r *ReActLoop) DisallowAskForClarification()

func (*ReActLoop) Execute

func (r *ReActLoop) Execute(taskId string, ctx context.Context, userInput string) error

func (*ReActLoop) ExecuteWithExistedTask

func (r *ReActLoop) ExecuteWithExistedTask(task aicommon.AIStatefulTask) error

func (*ReActLoop) FinishAsyncTask

func (r *ReActLoop) FinishAsyncTask(t aicommon.AIStatefulTask, err error)

func (*ReActLoop) Get

func (r *ReActLoop) Get(i string) string

func (*ReActLoop) GetActionHandler

func (r *ReActLoop) GetActionHandler(actionName string) (*LoopAction, error)

func (*ReActLoop) GetAllActionNames

func (r *ReActLoop) GetAllActionNames() []string

func (*ReActLoop) GetAllActions

func (r *ReActLoop) GetAllActions() []*LoopAction

func (*ReActLoop) GetConfig

func (r *ReActLoop) GetConfig() aicommon.AICallerConfigIf

func (*ReActLoop) GetCurrentMemoriesContent

func (r *ReActLoop) GetCurrentMemoriesContent() string

func (*ReActLoop) GetCurrentTask

func (r *ReActLoop) GetCurrentTask() aicommon.AIStatefulTask

func (*ReActLoop) GetEmitter

func (r *ReActLoop) GetEmitter() *aicommon.Emitter

func (*ReActLoop) GetEnableSelfReflection

func (r *ReActLoop) GetEnableSelfReflection() bool

func (*ReActLoop) GetInt

func (r *ReActLoop) GetInt(k string) int

func (*ReActLoop) GetInvoker

func (r *ReActLoop) GetInvoker() aicommon.AIInvokeRuntime

func (*ReActLoop) GetMemoryTriage

func (r *ReActLoop) GetMemoryTriage() aicommon.MemoryTriage

func (*ReActLoop) GetReflectionHistory

func (r *ReActLoop) GetReflectionHistory() []*ActionReflection

GetReflectionHistory 获取历史反思记录

func (*ReActLoop) GetStringSlice

func (r *ReActLoop) GetStringSlice(i string) []string

func (*ReActLoop) GetVariable

func (r *ReActLoop) GetVariable(i string) any

func (*ReActLoop) NoActions

func (r *ReActLoop) NoActions() bool

func (*ReActLoop) OnAsyncTaskFinished

func (r *ReActLoop) OnAsyncTaskFinished(f func(task aicommon.AIStatefulTask))

func (*ReActLoop) OnAsyncTaskTrigger

func (r *ReActLoop) OnAsyncTaskTrigger(f func(ins *LoopAction, task aicommon.AIStatefulTask))

func (*ReActLoop) OnTaskCreated

func (r *ReActLoop) OnTaskCreated(f func(task aicommon.AIStatefulTask))

func (*ReActLoop) PushMemory

func (r *ReActLoop) PushMemory(result *memory_type.SearchMemoryResult)

func (*ReActLoop) RemoveAction

func (r *ReActLoop) RemoveAction(actionType string)

func (*ReActLoop) Set

func (r *ReActLoop) Set(i string, result any)

func (*ReActLoop) SetCurrentTask

func (r *ReActLoop) SetCurrentTask(t aicommon.AIStatefulTask)

type ReActLoopCoreGenerateCode

type ReActLoopCoreGenerateCode func(
	userInput string,
	contextResult string,
	contextFeedback string,
) (string, error)

type ReActLoopOption

type ReActLoopOption func(r *ReActLoop)

func WithAITagField

func WithAITagField(tagName, variableName string) ReActLoopOption

WithAITagField 行为变化!!!:现在VariableName 不仅仅是在loop中get数据的key,也是tag set到action的field的key

func WithAITagFieldWithAINodeId

func WithAITagFieldWithAINodeId(tagName, variableName, nodeId string) ReActLoopOption

func WithActionFactoryFromLoop

func WithActionFactoryFromLoop(name string) ReActLoopOption

func WithAllowAIForge

func WithAllowAIForge(b ...bool) ReActLoopOption

func WithAllowAIForgeGetter

func WithAllowAIForgeGetter(allowAIForge func() bool) ReActLoopOption

func WithAllowPlanAndExec

func WithAllowPlanAndExec(b ...bool) ReActLoopOption

func WithAllowPlanAndExecGetter

func WithAllowPlanAndExecGetter(allowPlanAndExec func() bool) ReActLoopOption

func WithAllowRAG

func WithAllowRAG(b ...bool) ReActLoopOption

func WithAllowRAGGetter

func WithAllowRAGGetter(allowRAG func() bool) ReActLoopOption

func WithAllowToolCall

func WithAllowToolCall(b ...bool) ReActLoopOption

func WithAllowToolCallGetter

func WithAllowToolCallGetter(allowToolCall func() bool) ReActLoopOption

func WithAllowUserInteract

func WithAllowUserInteract(b ...bool) ReActLoopOption

func WithEnableSelfReflection

func WithEnableSelfReflection(enable ...bool) ReActLoopOption

WithEnableSelfReflection 启用自我反思功能 启用后,每次 action 执行后会根据策略进行自我反思分析 action 可以通过 operator.SetReflectionLevel() 自定义反思级别

func WithInitTask

func WithInitTask(initHandler func(loop *ReActLoop, task aicommon.AIStatefulTask) error) ReActLoopOption

func WithLoopPromptGenerator

func WithLoopPromptGenerator(generator ReActLoopCoreGenerateCode) ReActLoopOption

func WithMaxIterations

func WithMaxIterations(maxIterations int) ReActLoopOption

func WithMemorySizeLimit

func WithMemorySizeLimit(sizeLimit int) ReActLoopOption

func WithMemoryTriage

func WithMemoryTriage(triage aicommon.MemoryTriage) ReActLoopOption

func WithOnAsyncTaskFinished

func WithOnAsyncTaskFinished(fn func(task aicommon.AIStatefulTask)) ReActLoopOption

func WithOnAsyncTaskTrigger

func WithOnAsyncTaskTrigger(fn func(i *LoopAction, task aicommon.AIStatefulTask)) ReActLoopOption

func WithOnPostIteraction

func WithOnPostIteraction(fn func(loop *ReActLoop, iteration int, task aicommon.AIStatefulTask, isDone bool, reason any)) ReActLoopOption

WithOnPostIteraction sets a callback function that is called after each iteration of the ReAct loop.

func WithOnTaskCreated

func WithOnTaskCreated(fn func(task aicommon.AIStatefulTask)) ReActLoopOption

func WithPersistentContextProvider

func WithPersistentContextProvider(provider ContextProviderFunc) ReActLoopOption

func WithPersistentInstruction

func WithPersistentInstruction(instruction string) ReActLoopOption

func WithReactiveDataBuilder

func WithReactiveDataBuilder(provider FeedbackProviderFunc) ReActLoopOption

func WithReflectionOutputExample

func WithReflectionOutputExample(example string) ReActLoopOption

func WithReflectionOutputExampleContextProvider

func WithReflectionOutputExampleContextProvider(provider ContextProviderFunc) ReActLoopOption

func WithRegisterLoopAction

func WithRegisterLoopAction(actionName string, desc string, opts []aitool.ToolOption, verifier LoopActionVerifierFunc, handler LoopActionHandlerFunc) ReActLoopOption

func WithRegisterLoopActionWithStreamField

func WithRegisterLoopActionWithStreamField(actionName string, desc string, opts []aitool.ToolOption, fields []*LoopStreamField, verifier LoopActionVerifierFunc, handler LoopActionHandlerFunc) ReActLoopOption

func WithToolsGetter

func WithToolsGetter(getter func() []*aitool.Tool) ReActLoopOption

func WithUserInteractGetter

func WithUserInteractGetter(allowUserInteract func() bool) ReActLoopOption

type ReflectionLevel

type ReflectionLevel int

ReflectionLevel 定义反思的深度级别

const (
	// ReflectionLevel_None 不进行反思
	ReflectionLevel_None ReflectionLevel = iota
	// ReflectionLevel_Minimal 最小反思:仅记录执行结果
	ReflectionLevel_Minimal
	// ReflectionLevel_Standard 标准反思:评估基本影响
	ReflectionLevel_Standard
	// ReflectionLevel_Deep 深度反思:详细分析环境变化和影响
	ReflectionLevel_Deep
	// ReflectionLevel_Critical 关键反思:失败场景的深度根因分析
	ReflectionLevel_Critical
)

func (ReflectionLevel) String

func (r ReflectionLevel) String() string

String returns the string representation of ReflectionLevel

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL