agentflow

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2026 License: Apache-2.0 Imports: 101 Imported by: 0

README

agentflow-go

Go Reference License

English | 简体中文

agentflow-go 是面向 Go 后端工程师 的可嵌入 Agent 运行时库:用 Go 代码(pkg/buildercore.Scenario)组合 Agent、Tool、Skill、LLM Gateway、Memory、Run State 与 Human-in-the-loop,在自有服务中显式接线后调用 Framework.Run

快速开始

go get github.com/aijustin/agentflow-go
go run ./examples/go/minimal/main.go
go run ./examples/go/builder/main.go
make validate-builder
make test

产品方向:docs/product-direction.md · Builder 参考:docs/builder-reference.md

发布前建议运行 GOTOOLCHAIN=auto make release-check。见 docs/release-checklist.mddocs/api-stability.md

集成指南:docs/library-integration.md · HTML 手册:docs/manual.html · 与 LangGraph 对比:docs/competitive-analysis-langgraph.md

集成路径

目标 入口
首选:Go DSL 构造场景 docs/builder-reference.md · examples/go/builder/main.go
嵌入现有 Go 服务 docs/library-integration.md
进程内最小运行 examples/go/minimal/main.go
Postgres / 文件持久化 examples/go/postgres/main.go
HTTP + 异步 Worker examples/go/http-worker/main.go
HITL 暂停与恢复 examples/go/hitl-resume/main.go
事件触发 examples/go/event-trigger/main.go
测试与示例接线 pkg/testutil

库 API:ValidateWiringNewFramework.RunNewProductionHTTPHandlerNewFrameworkJobHandlerNewPrometheusRecorderNewOpenTelemetryTracerScenarioJSONSchemaVersion;Builder 栈入口见 builder.go(如 MinimalAutonomous)。

示例路径对照表

可运行 Go 示例(examples/go/
目录 说明 运行命令
builder Go DSL 构造场景并进程内 Run(推荐起点 go run ./examples/go/builder/main.go
minimal 最小嵌入:buildertestutil.WiringOptionsNewRun go run ./examples/go/minimal/main.go
postgres Postgres / 文件 RunState 持久化 go run ./examples/go/postgres/main.go
http-worker 挂载 NewProductionHTTPHandler + 异步 Worker go run ./examples/go/http-worker/main.go
hitl-resume HITL 暂停与 ResumeAndContinue go run ./examples/go/hitl-resume/main.go
event-trigger scenario.triggers 事件驱动 Run go run ./examples/go/event-trigger/main.go
tier-memory 进程内 tier 记忆最小示例 go run ./examples/go/tier-memory/main.go
tier-worker Postgres warm/cold tier + memory.reconcile 异步 Worker examples/deploy/
validate 校验 builder catalog 或 legacy YAML go run ./examples/go/validate -kind builder all

生产环境请用 WithLLMGateway / WithToolExecutor 替代 testutil.WiringOptions;测试接线见 pkg/testutil

Builder catalog 对照

完整 Catalog ID 与 builder.* 函数对照见 docs/builder-reference.md。共享 stack 实现在 examples/go/scenario/scenario.go

校验全部 catalog stack:

go run ./examples/go/validate -kind builder all
make validate-builder

环境要求

  • Go 1.25.10+
  • macOS/Linux shell
作为框架在其他 Go 项目中使用

添加依赖:

go get github.com/aijustin/agentflow-go

引入根门面包:

package main

import (
    "context"
    "fmt"
    "log"

    agentflow "github.com/aijustin/agentflow-go"
    "github.com/aijustin/agentflow-go/pkg/builder"
)

func main() {
    scenario := builder.MinimalAutonomous("assistant")
    fw, err := agentflow.New(scenario, agentflow.WithLLMGateway(myLLMGateway))
    if err != nil {
        log.Fatal(err)
    }

    result, err := fw.Run(context.Background(), agentflow.RunRequest{
        RunID:  "run-1",
        Agent:  "assistant",
        Prompt: "hello",
    })
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(result.Output)
}

如需接入自定义 LLM、Memory、RunState、EventSink 或 HumanGate,可使用 Option API:

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(
    scenario,
    agentflow.WithLLMGateway(myLLMGateway),
    agentflow.WithToolExecutor("repo_search", myToolExecutor),
    agentflow.WithMemoryRepository("session", myMemoryRepo),
    agentflow.WithRunStateRepository(myRunStateRepo),
    agentflow.WithEventSink(myEventSink),
)

常见 LLM Provider 的构造函数已从根包暴露:

gateway := agentflow.NewOpenAICompatibleGateway([]llm.Profile{{
  Name:      "default",
  Provider:  "openai-compatible",
  Model:     "qwen/qwen3.6-35b-a3b",
  Endpoint:  "http://127.0.0.1:1234/v1",
  APIKeyEnv: "AGENT_REALMODEL_API_KEY",
}}, nil)

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithLLMGateway(gateway))

如果需要同时接 OpenAI-compatible 聊天与 Embedding,可使用 NewOpenAICompatibleProvider,并显式声明 profile 能力:

provider := agentflow.NewOpenAICompatibleProvider([]llm.Profile{
  {Name: "chat", Provider: "openai-compatible", Model: "qwen/qwen3.6-35b-a3b", Endpoint: "http://127.0.0.1:1234/v1"},
  {Name: "embed", Provider: "openai-compatible", Model: "text-embedding-3-small", Endpoint: "http://127.0.0.1:1234/v1", Capabilities: []llm.Capability{llm.CapEmbed}},
}, nil)

混合 Provider 场景可使用 NewLLMProviderRouter 按 profile 路由 chat/tool/structured/stream 和 embedding 调用。能力会显式检查:Provider 不支持的能力会清晰失败,不会被静默模拟。

openaiProvider := agentflow.NewOpenAICompatibleProvider(openaiProfiles, nil)
anthropicGateway := agentflow.NewAnthropicGateway(anthropicProfiles, nil)

provider := agentflow.NewLLMProviderRouter(map[string]llm.Gateway{
  "chat":  anthropicGateway,
  "embed": openaiProvider,
})

结构化输出:在 agents.<name>.output_schema 中配置 JSON Schema,并调用 RunStructured。LLM Gateway 需要实现 llm.StructuredOutputter

result, err := fw.RunStructured(ctx, agentflow.RunRequest{
    RunID:  "run-json",
    Agent:  "assistant",
    Prompt: "return JSON",
})
fmt.Println(string(result.StructuredOutput))

流式输出:使用实现了 llm.Streamer 的 Gateway:

chunks, err := fw.Stream(ctx, agentflow.RunRequest{
    RunID:  "run-stream",
    Agent:  "assistant",
    Prompt: "stream the answer",
})
if err != nil {
    log.Fatal(err)
}
for chunk := range chunks {
    if chunk.Error != "" {
        log.Fatal(chunk.Error)
    }
    fmt.Print(chunk.Content)
}

当 Agent 配置了工具,并且 LLM Gateway 支持 CapToolCall 时,Runtime 会执行自主工具调用循环:向 LLM 发送工具规格,校验返回的工具调用是否在 Agent 白名单中,执行审批策略和每次运行的 rate_cap,按 retry_limit/max_retries 对分类后的临时 LLM/工具错误做指数退避重试,执行注册的 ToolExecutor,将受限后的工具结果回填给 LLM,直到 LLM 返回最终答案或达到 max_stepsStream 也支持带工具的 Agent:它会运行同一套受治理工具循环,并把最终答案作为流式 chunk 输出。

配置 orchestration.planning.enabled: true 后,Runtime 会在自主工具循环前先执行规划 pass。规划默认使用当前执行 Agent,也可以通过 orchestration.planning.agent 指定专门规划 Agent;生成的简短 JSON 计划会注入后续执行上下文。设置 orchestration.planning.execute: true 可在 tool loop 中跟踪 plan step 完成状态(见 builder.MultiExpertResearch())。

固定工作流支持 toolagentskillhuman_gatetransformparallel_grouploop 节点。condition 可使用 exists(...)missing(...)eq(...)ne(...) 读取 steps.<node_id> 路径,transform 节点可用 set/copy 从前序步骤构造结构化输出。

当 Agent 绑定 memory 时,Runtime 会在上下文准备前读取 conversation/session 记忆并注入 LLM 上下文,执行后追加用户输入、助手回复和工具观察结果。根门面会自动为 in_memory 类型创建内存仓库,除非调用方显式传入自定义仓库。

启用内置 HMAC Token 的 HITL Gate:

scenario := builder.MinimalHumanInLoop("assistant")
fw, err := agentflow.New(scenario,
    agentflow.WithHITLTokenSecret([]byte("strong-secret"), nil),
)
if err != nil {
    log.Fatal(err)
}

result, err := fw.Run(ctx, agentflow.RunRequest{RunID: "run-1", Prompt: "needs approval"})
if err != nil {
    log.Fatal(err)
}

if result.Token != "" {
    err = fw.Resume(ctx, result.Token, core.DecisionApprove, nil)
}

需要进程重启后仍能恢复运行时,可使用文件持久化适配器:

runs, _ := agentflow.NewFileRunStateRepository("./data/runs")
blobs, _ := agentflow.NewFileBlobStore("./data/blobs")
memoryRepo, _ := agentflow.NewFileMemoryRepository("./data/memory")

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithRunStateRepository(runs),
    agentflow.WithBlobStore(blobs),
    agentflow.WithMemoryRepository("session", memoryRepo),
)

生产环境需要 PostgreSQL RunState 时,可在应用侧注册 database/sql driver,并把初始化后的连接池传给根门面构造器:

db, err := sql.Open("pgx", os.Getenv("AGENTFLOW_POSTGRES_DSN"))
if err != nil {
  log.Fatal(err)
}
runs, err := agentflow.NewPostgresRunStateRepository(db)
if err != nil {
  log.Fatal(err)
}

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithRunStateRepository(runs),
)

表结构契约和运维注意事项见 docs/persistence/postgres-runstate.md

如果希望使用 Redis 存储低延迟 CAS RunState,也可以使用 Redis RunState 适配器:

runs, err := agentflow.NewRedisRunStateRepository(agentflow.RedisRunStateRepositoryConfig{
  Addr:      os.Getenv("AGENTFLOW_REDIS_ADDR"),
  Password:  os.Getenv("AGENTFLOW_REDIS_PASSWORD"),
  KeyPrefix: "agentflow:runstate:",
})
if err != nil {
  log.Fatal(err)
}

存储语义和运维注意事项见 docs/persistence/redis-runstate.md

生产环境异步执行可使用队列和 Worker。PostgreSQL 队列适配器基于 database/sql,不强制绑定具体驱动:

queue, err := agentflow.NewPostgresJobQueue(db)
if err != nil {
  log.Fatal(err)
}

runHandler, err := agentflow.NewFrameworkJobHandler(agentflow.FrameworkRunJobHandlerConfig{Framework: fw})
if err != nil {
  log.Fatal(err)
}

worker, err := async.NewWorker(queue, runHandler, async.WorkerConfig{
  WorkerID:      "worker-1",
  Concurrency:   4,
  LeaseTTL:      time.Minute,
  RenewInterval: 30 * time.Second,
  JobTimeout:    5 * time.Minute,
})

agentflow.NewProductionHTTPHandler 会挂载 /healthz/readyz、异步 run/event/resume job API;当配置 Framework 时还会挂载同步 /v1/events/v1/hitl/resume。更多说明见 docs/async-runtime.mddocs/persistence/postgres-queue.md

MCP Server 可以通过适配器变成普通受治理工具,无需改变 runtime core:

mcpClient, err := agentflow.NewMCPHTTPClient("http://127.0.0.1:3333/mcp", nil)
if err != nil {
  log.Fatal(err)
}
searchTool, err := agentflow.NewMCPToolExecutor(mcpClient, "search")
if err != nil {
  log.Fatal(err)
}
fw, err := agentflow.New(builder.MinimalMCPTool("assistant"),
  agentflow.WithToolExecutor("docs.search", searchTool),
)

适配模型和安全注意事项见 docs/mcp-tools.md

重型或租户隔离的工具不需要在框架启动时全部构造。可以先在 scenario.tools 声明 manifest,然后通过 WithToolResolver 在运行时完成 allowlist、审批、RBAC、治理策略和 rate cap 检查后,再按需解析真正的 executor:

resolver := agentflow.ToolResolverFunc(func(ctx context.Context, tool core.Tool) (core.ToolExecutor, error) {
  switch tool.Type {
  case "builtin.sql":
    return newTenantSQLTool(ctx, tool.Metadata)
  case "mcp.tool":
    return newTenantMCPTool(ctx, tool.Metadata)
  default:
    return nil, fmt.Errorf("unsupported tool type %q", tool.Type)
  }
})

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithToolResolver(resolver),
)

WithToolExecutor 仍适合轻量或常驻工具,并且优先级高于 resolver。resolver 解析出的 executor 会按场景工具名缓存在 framework 生命周期内。Skill 不负责初始化工具;它只在场景构建阶段展开 prompt 片段、策略覆盖和 workflow 片段,真实 executor 绑定由 resolver 在调用时完成。

读取内部 API 可注册受限 HTTP Tool Executor:

httpTool, err := agentflow.NewHTTPToolExecutor(agentflow.HTTPToolConfig{
  AllowedHosts: []string{"https://status.example.internal"},
})
if err != nil {
  log.Fatal(err)
}
fw, err := agentflow.New(builder.MinimalHTTPTool("assistant"),
  agentflow.WithToolExecutor("http.status", httpTool),
)

该执行器必须配置 host allowlist,默认只允许 GET/HEAD。详见 docs/tools-http.md

读取本地 runbook 或已检出的文档,可注册受限文件系统读取 Tool Executor:

filesystemTool, err := agentflow.NewFilesystemToolExecutor(agentflow.FilesystemToolConfig{
  AllowedRoots: []string{"/srv/agentflow/runbooks"},
})
if err != nil {
  log.Fatal(err)
}
fw, err := agentflow.New(builder.MinimalFilesystemTool("assistant"),
  agentflow.WithToolExecutor("fs.read", filesystemTool),
)

该执行器必须配置 root allowlist,会拒绝路径逃逸和符号链接逃逸,并限制文件大小。详见 docs/tools-filesystem.md

需要读取业务库、工单库或报表库时,可注册受限 SQL 查询 Tool Executor,并使用命名 allowlist 查询:

sqlTool, err := agentflow.NewSQLToolExecutor(agentflow.SQLToolConfig{
  DB: db,
  AllowedQueries: map[string]string{
    "tickets.open": "SELECT id, title, status FROM tickets WHERE status = $1",
  },
  MaxRows: 20,
})
if err != nil {
  log.Fatal(err)
}
fw, err := agentflow.New(builder.MinimalSQLTool("assistant"),
  agentflow.WithToolExecutor("sql.query", sqlTool),
)

该执行器默认只执行命名 SELECT 查询,拒绝多语句 SQL,带超时并限制返回行数。详见 docs/tools-sql.md

SQL 工具可接入任意 database/sql 驱动,包括 PostgreSQL、MySQL 和 ClickHouse。宿主应用自行导入具体驱动并传入已打开的 *sql.DB;agentflow-go 不强制引入数据库驱动依赖。

代码审查流水线可注册只读 Git 工具:

gitTool, err := agentflow.NewGitToolExecutor(agentflow.GitToolConfig{
  AllowedRoots: []string{"/workspace/repos"},
})
fw, err := agentflow.New(builder.CodeReviewPipeline(),
  agentflow.WithToolExecutor("git", gitTool),
)

详见 docs/tools-git.md。须通过 WithToolExecutor(或 WithToolResolver)显式注册 executor。

客服工单场景可注册 ticket 工具并注入 store:

store := agentflow.NewMemoryTicketStore(map[string]agentflow.Ticket{
  "T-9": {ID: "T-9", Title: "Login issue", Status: "open"},
})
ticketTool, err := agentflow.NewTicketToolExecutor(agentflow.TicketToolConfig{Store: store})
fw, err := agentflow.New(builder.MinimalTicketHandling("support"),
  agentflow.WithToolExecutor("ticket", ticketTool),
)

详见 docs/tools-ticket.md

RAG 场景可组合 Embedder、VectorStore 和 Retriever Tool:

store, err := agentflow.NewPostgresVectorStore(agentflow.PostgresVectorStoreConfig{DB: db})
if err != nil {
  log.Fatal(err)
}
retriever, err := agentflow.NewRetrieverTool(agentflow.RetrieverToolConfig{
  Embedder:     provider,
  Store:        store,
  Profile:      "embed",
  Namespace:    "tenant-a/docs",
  DefaultLimit: 5,
})
if err != nil {
  log.Fatal(err)
}
fw, err := agentflow.New(builder.MinimalRAG("assistant"),
  agentflow.WithLLMGateway(provider),
  agentflow.WithToolExecutor("knowledge.retrieve", retriever),
)

公共契约和 pgvector 表结构见 docs/knowledge-rag.mddocs/persistence/pgvector.md

使用 migrations/postgres 中的 SQL,由宿主应用自己的 migration 工具建表后再接入 Postgres 适配器。见 docs/persistence/postgres-runstate.mddocs/persistence/postgres-queue.md

大输出需要进入 S3-compatible 对象存储时,可单独配置 BlobStore:

blobs, err := agentflow.NewS3BlobStore(agentflow.S3BlobStoreConfig{
  Endpoint:        os.Getenv("AGENTFLOW_S3_ENDPOINT"),
  Bucket:          os.Getenv("AGENTFLOW_S3_BUCKET"),
  Region:          os.Getenv("AGENTFLOW_S3_REGION"),
  Prefix:          "agentflow/outputs",
  AccessKeyID:     os.Getenv("AGENTFLOW_S3_ACCESS_KEY_ID"),
  SecretAccessKey: os.Getenv("AGENTFLOW_S3_SECRET_ACCESS_KEY"),
})
if err != nil {
  log.Fatal(err)
}

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithBlobStore(blobs),
)

对象路径和安全注意事项见 docs/persistence/s3-blobstore.md

企业级可观测和治理能力保持可选且低依赖:

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithEventSink(agentflow.NewSlogEventSink(logger)),
  agentflow.WithAuditSink(agentflow.NewSlogAuditSink(logger)),
  agentflow.WithToolGovernancePolicy(governance.ChainToolPolicies(
    governance.NewToolBudgetPolicy(8),
    governance.NewMaxSideEffectPolicy(core.SideEffectRead),
  )),
  agentflow.WithOutputRedactor(governance.NewJSONFieldRedactor("secret", "token")),
)

治理策略会在工具执行前生效,输出脱敏会在运行时 step output 持久化前执行。

AgentFlow 也内置了运行时可观测面板,用于查看实时会话、编排时序和事件详情。PostgreSQL 事件仓库默认自动创建表和索引,开启面板只需要接入事件 sink 并挂载 HTTP handler:

eventStore, err := agentflow.NewPostgresEventStore(ctx, agentflow.PostgresEventStoreConfig{DB: db})
if err != nil {
  log.Fatal(err)
}
eventHub := agentflow.NewEventHub()

scenario := builder.MinimalAutonomous("assistant")
fw, err := agentflow.New(scenario, agentflow.WithEventSink(agentflow.NewEventFanoutSink(
    agentflow.NewEventStoreSink(eventStore, eventHub),
    agentflow.NewSlogEventSink(logger),
  )),
)

dashboard, err := agentflow.NewObservabilityHTTPHandler(agentflow.ObservabilityHTTPHandlerConfig{
  Store: eventStore,
  Hub:   eventHub,
})
mux.Handle("/observability/", http.StripPrefix("/observability", dashboard))

数据库配置、自动建表、接口列表和安全建议见 docs/observability-dashboard.md

底层扩展接口位于:

  • github.com/aijustin/agentflow-go/pkg/core
  • github.com/aijustin/agentflow-go/pkg/llm
  • github.com/aijustin/agentflow-go/pkg/contextwindow
  • github.com/aijustin/agentflow-go/pkg/async
  • github.com/aijustin/agentflow-go/pkg/audit
  • github.com/aijustin/agentflow-go/pkg/governance
  • github.com/aijustin/agentflow-go/pkg/identity
  • github.com/aijustin/agentflow-go/pkg/knowledge
  • github.com/aijustin/agentflow-go/pkg/mcp
  • github.com/aijustin/agentflow-go/pkg/memory
  • github.com/aijustin/agentflow-go/pkg/runstate
  • github.com/aijustin/agentflow-go/pkg/security

内置工具适配器说明见 docs/tools-http.mddocs/tools-filesystem.mddocs/tools-sql.mddocs/tools-git.mddocs/tools-ticket.mddocs/mcp-tools.mddocs/knowledge-rag.md

安装依赖
go mod download
校验示例场景
go run ./examples/go/validate -kind builder all
make validate-builder
可运行示例
示例 说明
examples/go/minimal 进程内 Run + 测试接线
examples/go/postgres 文件或 Postgres RunState
examples/go/http-worker 生产 HTTP Handler + 异步 Worker
examples/go/hitl-resume HITL 暂停与 ResumeAndContinue
examples/go/event-trigger HandleEvent 与 triggers

将示例中的 testutil.WiringOptions 替换为显式的 WithLLMGateway / WithToolExecutor 即可用于生产。

排障见 docs/troubleshooting.md

HTTP 集成

在自有服务中挂载库提供的 Handler,例如:

go run ./examples/go/http-worker/main.go

默认监听 127.0.0.1:7060(可通过 AGENT_HTTP_ADDR 覆盖);Studio 面板:http://127.0.0.1:7060/observability/

生产环境 HITL 续跑使用 NewProductionHTTPHandlerNewHumanHTTPHandlerPOST /v1/hitl/resume。设置 "continue": true 会调用 ResumeAndContinue

curl -X POST http://localhost:7060/v1/hitl/resume \
  -H 'Content-Type: application/json' \
  -d '{
    "token": "'"$TOKEN"'",
    "decision": "approve",
    "continue": true
  }'

Webhook 事件在配置 Framework 时使用 POST /v1/events。详见 docs/async-runtime.md

网络传递的 Token 使用 HMAC 签名。生产环境必须设置强密钥,并使用持久化 RunState 仓库。

YAML 场景格式(Studio 互操作)

新场景请用 pkg/builder 在 Go 中定义。YAML 仅用于 Studio 导入/导出 与字段对照,不再提供 LoadScenarioFile / NewFromFile 等公共加载 API。

示例栈(Go builder,非 YAML 文件):

Builder 说明
builder.MinimalAutonomous("assistant") 自主工具循环基线
builder.MinimalFixedWorkflowReview("reviewer") 图工作流 + 条件 + HITL
builder.CodeReviewPipeline() Git 工具 + parallel_group
builder.MultiExpertResearch() Hybrid + planning

完整 catalog(19 条):make validate-builder

库 API

大多数应用只需要引入根门面:

import agentflow "github.com/aijustin/agentflow-go"

公共包:

作用
root package 框架门面:校验、运行、恢复、事件处理、Studio 互操作与扩展注入。
pkg/async 异步执行所需的 Job Queue、Lease、Handler 和 Worker 契约。
pkg/eventrouter 外部事件类型与 scenario.triggers 到 RunRequest 的路由。
pkg/audit 合规记录所需的 Audit Event 模型和 Sink 契约。
pkg/coordination 用于 Worker 和工作流协调的分布式租约契约。
pkg/core Agent、Tool、Skill、Scenario、Workflow、HumanGate、Event 类型。
pkg/llm 提供商无关的 LLM 能力接口和请求/响应类型。
pkg/contextwindow 上下文窗口策略管理、token 估算、裁剪和压缩统计。
pkg/identity Principal、角色、租户/工作区/项目作用域和 context helpers。
pkg/memory Memory Namespace 和 Repository 契约。
pkg/runstate RunSnapshot、CAS Repository 端口、Blob 引用和 Token 签名。
pkg/security API Key 认证器、授权 action/resource 和 RBAC policy 契约。

创建并保存运行快照:

repo := runstateinmem.NewRepository()
snapshot := runstate.RunSnapshot{
    RunID:        "run-1",
    ScenarioName: "demo",
    Status:       runstate.RunStatusRunning,
}
if err := repo.Save(context.Background(), &snapshot, 0); err != nil {
    log.Fatal(err)
}

签发并验证 HITL Token:

signer, err := runstate.NewTokenSigner([]byte("secret"))
if err != nil {
    log.Fatal(err)
}
token, err := signer.Sign(runstate.TokenPayload{RunID: "run-1", Version: 1})
if err != nil {
    log.Fatal(err)
}
payload, err := signer.Verify(token)
if err != nil {
    log.Fatal(err)
}
fmt.Println(payload.RunID)

获取 Redis 分布式租约,用于 Worker 协调:

locker, err := agentflow.NewRedisLocker(agentflow.RedisLockerConfig{
  Addr:      os.Getenv("AGENTFLOW_REDIS_ADDR"),
  Password:  os.Getenv("AGENTFLOW_REDIS_PASSWORD"),
  KeyPrefix: "agentflow:",
})
if err != nil {
  log.Fatal(err)
}
lease, acquired, err := locker.Acquire(ctx, "run:123", "worker:alpha", 30*time.Second)
if err != nil {
  log.Fatal(err)
}
if acquired {
  defer func() { _ = locker.Release(ctx, lease) }()
}

租约语义和运维注意事项见 docs/persistence/redis-locker.md

通过 async worker foundation 执行异步任务:

queue := agentflow.NewInMemoryJobQueue()
worker, err := async.NewWorker(
  queue,
  async.HandlerFunc(func(ctx context.Context, job async.Job) error {
    return nil
  }),
  async.WorkerConfig{WorkerID: "worker-1", Concurrency: 4},
)
if err != nil {
  log.Fatal(err)
}

队列状态、Worker 行为和后续生产化切片见 docs/async-runtime.md

暴露异步 run/event/resume job endpoints:

queue := agentflow.NewInMemoryJobQueue()
handler, err := agentflow.NewAsyncRunHTTPHandler(agentflow.AsyncRunHTTPHandlerConfig{
  Queue:  queue,
  Policy: security.NewDefaultRolePolicy(),
  Audit:  auditSink,
})
if err != nil {
  log.Fatal(err)
}
http.Handle("/v1/", middleware(handler))

生产 Handler 可同时挂载可选的同步 event/HITL 路由:

api, err := agentflow.NewProductionHTTPHandler(agentflow.ProductionHTTPHandlerConfig{
  Queue:     queue,
  Framework: fw,
  Policy:    security.NewDefaultRolePolicy(),
  Audit:     auditSink,
  Version:   agentflow.Version,
})

完整路由矩阵见 docs/async-runtime.md/v1/runs/v1/jobs/events/v1/jobs/hitl/resume/v1/events/v1/hitl/resume)。

使用 API Key 保护 HTTP handler,并把企业 Principal 注入 request context:

auth, err := agentflow.NewStaticAPIKeyAuthenticator(map[string]identity.Principal{
  os.Getenv("AGENTFLOW_SERVICE_API_KEY"): {
    ID:    "svc-agent-runner",
    Type:  identity.PrincipalService,
    Scope: identity.Scope{TenantID: "tenant-1"},
    Roles: []identity.Role{identity.RoleService},
  },
})
if err != nil {
  log.Fatal(err)
}
middleware, err := agentflow.NewAPIKeyMiddleware(agentflow.APIKeyMiddlewareConfig{Authenticator: auth})
if err != nil {
  log.Fatal(err)
}
handler := middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  principal, _ := identity.RequirePrincipal(r.Context())
  _ = principal
}))

生产 OIDC/OAuth2 网关可使用 OIDC Discovery/JWKS 自动刷新来校验 JWT:

auth, err := agentflow.NewOIDCJWTAuthenticator(agentflow.OIDCJWTAuthenticatorConfig{
  Issuer:          "https://issuer.example.com",
  Audience:        "agentflow-api",
  DiscoveryURL:    "https://issuer.example.com/.well-known/openid-configuration",
  RefreshInterval: 5 * time.Minute,
})
if err != nil {
  log.Fatal(err)
}
middleware, err := agentflow.NewJWTMiddleware(agentflow.JWTMiddlewareConfig{Authenticator: auth})

为 HTTP handler 添加授权检查:

authz, err := agentflow.NewAuthorizationMiddleware(agentflow.AuthorizationMiddlewareConfig{
  Policy:   security.NewDefaultRolePolicy(),
  Action:   security.ActionRunSubmit,
  Resource: security.Resource{Type: "run"},
  Audit:    auditSink,
})
if err != nil {
  log.Fatal(err)
}
handler = middleware(authz(handler))

使用 runtime 工具授权和审计记录运行框架:

fw, err := agentflow.New(
  scenario,
  agentflow.WithSecurityPolicy(security.NewDefaultRolePolicy()),
  agentflow.WithAuditSink(auditSink),
)
ctx := identity.WithPrincipal(context.Background(), identity.Principal{
  ID:    "svc-agent-runner",
  Type:  identity.PrincipalService,
  Scope: identity.Scope{TenantID: "tenant-1"},
  Roles: []identity.Role{identity.RoleService},
})
result, err := fw.Run(ctx, agentflow.RunRequest{RunID: "run-1", Agent: "assistant", Prompt: "hello"})

将审计事件写入 append-only JSONL 文件:

auditSink, err := agentflow.NewFileAuditSink("./data/audit/events.jsonl")
if err != nil {
  log.Fatal(err)
}
err = auditSink.Record(ctx, audit.Event{
  Type:    audit.EventRunSubmitted,
  RunID:   "run-1",
  Outcome: "accepted",
})

架构

项目采用 DDD 风格分层和 Hexagonal Ports/Adapters:

examples/
  go/          # 可复制的集成 main(minimal、validate、builder、http-worker 等)
  deploy/      # 参考 Compose 栈(Postgres、Redis、MinIO)
pkg/
  core/
  builder/     # Go DSL 构造 core.Scenario
  catalog/     # Tool/Skill manifest 加载与校验
  llm/
  contextwindow/
  memory/
  runstate/
internal/
  application/
    runtime/
    orchestration/
    scenario/
  adapter/
    config/yaml/
    human/cli/
    human/http/
    llm/openai/
    llm/anthropic/
    llm/local/
    llm/mock/
    memory/inmem/
    runstate/inmem/
    blob/inmem/

设计边界:

  • Skill = prompt fragments + tool whitelist/policy + 可内联的 workflow 子图
  • Tool = 带 Schema 的执行单元
  • Agent = 拥有 LLM 和 Memory 绑定的实体
  • RunStateRepository 与 Memory 分离,专门处理可恢复的运行快照。
  • 上下文治理按 LLM Profile 生效:不同 Agent/Tool 可以路由到具有不同窗口、输出、thinking 和压缩策略的 LLM Profile。
  • 自主执行支持可选 planning pass、LLM 工具调用循环、工具白名单、审批拒绝、每次运行 rate cap、分类重试、受限工具结果回填和生命周期事件。
  • 结构化输出使用 Agent 级 output_schema 和 Provider 的 StructuredOutputter;普通流式输出使用 Streamer,带工具 Agent 的流式输出会复用受治理工具循环,并在结束后持久化累积的最终答案。
  • Memory 绑定已接入 Runtime 读写,用于 conversation/session 历史。
  • 固定工作流按图依赖和边执行,支持有限并行、parallel_group/loop 节点、条件跳过、重试、transform/agent/human-gate 节点和 CAS 安全输出保存。
  • Workflow human-gate 节点会持久化 CurrentNodeID/PendingGate,审批后可继续执行下游图;ResumeAndContinue 还支持自主、工作流和工具审批暂停路径的续跑。
  • 外部事件通过 scenario.triggers 映射到 Framework.HandleEvent、Webhook HTTP(NewWebhookHTTPHandler)、同步 /v1/events 和异步 event job。
  • sub_agents 会在自主执行中作为虚拟 delegation tool 暴露给 supervisor Agent。
  • Skill prompt fragments、Agent policy、Tool policy 和 workflow segments 会在场景构建阶段展开为命名空间化的 workflow 节点。
  • Tool 的声明面和执行面已经分离:scenario.tools 向 LLM 和校验器暴露 manifest,WithToolExecutor 提前注册轻量 executor,WithToolResolver 则在允许的调用真正进入执行阶段时按需绑定重型或租户隔离 executor。
  • 文件版 RunState、BlobStore 和 Memory 适配器可通过根门面使用;PostgreSQL RunState 和 Redis RunState 可用于生产持久化;S3-compatible BlobStore 可用于大输出对象存储,支持 MinIO/AWS S3 风格 endpoint,以及经过验证的腾讯云 COS/阿里云 OSS S3 兼容接口;Redis 分布式租约可用于 Worker 协调;异步队列和 Worker 契约支持 runeventresume.continue 任务(NewFrameworkJobHandler),HTTP 路由由 NewAsyncRunHTTPHandlerNewProductionHTTPHandler 提供;当输出超过 step_output_threshold 时会外置到 BlobStore。
  • 企业 identity context、API Key middleware、静态和 OIDC/JWKS JWT middleware、授权 middleware、RBAC policy 契约和 runtime tool authorization 可通过 pkg/identitypkg/securityNewStaticAPIKeyAuthenticatorNewOIDCJWTAuthenticatorNewAPIKeyMiddlewareNewJWTMiddlewareNewAuthorizationMiddlewareWithSecurityPolicy 使用。
  • Audit event 契约和 noop/内存/文件 sink 可通过 pkg/auditNewNoopAuditSinkNewInMemoryAuditSinkNewFileAuditSinkWithAuditSink 使用。
  • 运行时可观测面板、事件仓库、实时 EventHub 和 PostgreSQL 自动建表可通过 NewPostgresEventStoreNewInMemoryEventStoreNewEventStoreSinkNewEventHubNewObservabilityHTTPHandler 使用。
  • 企业认证/租户和可观测/治理设计见 docs/security-auth-tenancy.mddocs/observability-governance.mddocs/observability-dashboard.md
  • 内存适配器是并发安全的,并按 run/session 命名空间隔离。

测试

默认单元测试:

make test

集成测试:

make test-integration

真实本地模型流程测试:

export AGENT_REALMODEL_BASE_URL="http://127.0.0.1:1234/v1"
export AGENT_REALMODEL_MODEL="qwen/qwen3.6-35b-a3b"
export AGENT_REALMODEL_API_KEY="..."
make test-realmodel

并发内存适配器 Race 测试:

make test-race

静态检查和漏洞扫描:

make vet
make lint
make security

直接运行:

CGO_ENABLED=0 go test -ldflags="-w" ./...
CGO_ENABLED=0 go test -ldflags="-w" -tags=integration ./...
CGO_ENABLED=0 go test -ldflags="-w" -tags=realmodel -run TestRealModel -v .
go test -race ./internal/adapter/memory/inmem ./internal/adapter/runstate/inmem ./internal/adapter/blob/inmem

在较旧的 Darwin 本地工具链 + CGO_ENABLED=0 环境中,-ldflags="-w" 可规避本地 dyld 测试二进制问题。

当前状态

已实现:

  • pkg/builder Go DSL(19 条 catalog stack)、ValidateScenario 与 Studio YAML 互操作(导入/导出)
  • Autonomous runtime engine,包含自主执行前的可选 planning pass
  • 已接入根门面的 Fixed-workflow runner
  • In-memory Memory、RunStateRepository、BlobStore
  • LLM 抽象,以及 OpenAI-compatible、Anthropic、local、router 和 mock 测试路径的根包构造函数
  • 注册工具、OpenAI-compatible function calling 和 Anthropic Messages tool use 的自主工具调用循环
  • 通过 WithToolResolver 在运行时策略检查后惰性绑定重型或租户隔离工具 executor
  • Runtime memory integration:注入历史并持久化用户/助手/工具观察结果
  • 固定工作流图调度:依赖、并行、parallel_group/loop 节点、重试、条件、transform/agent/human-gate 节点、CAS 安全输出保存
  • Workflow-level HITL pause/resume,以及 ResumeAndContinue 续跑路径
  • 事件触发器(scenario.triggers)、HandleEvent、Webhook HTTP 和异步 event job
  • 内置 Git / ticket 工具 executor,适用于代码审查和客服工单场景
  • 自主运行中的 planning pass 执行跟踪
  • 通过虚拟 sub-agent tools 实现多 Agent delegation baseline
  • Skill prompt/workflow expansion、compatible-agent 校验、Agent policy overlay 和 Tool policy overlay
  • 文件版 RunState、Blob、Memory 持久化适配器,以及 PostgreSQL RunState、Redis RunState 和 S3-compatible BlobStore 持久化适配器
  • 用于 Worker 和工作流协调的 Redis 分布式租约适配器
  • 异步 Job Queue 和 Worker 契约、内存/PostgreSQL 队列适配器、租约续租、支持 run/event/resume.continue 的 framework job handler,以及带可选同步 event/HITL 路由的生产 HTTP handler
  • 企业 identity context、API Key middleware、静态/JWKS Discovery JWT middleware、授权 middleware、RBAC policy 契约和 runtime tool authorization
  • Audit event 模型,以及 noop、内存和 JSONL 文件 sink,加上 framework audit wiring
  • ResumeAndContinueHandleEvent 事件驱动运行
  • Runtime hardening:全局/Agent/Profile timeout、分类 LLM/Tool retry + 指数退避、Tool rate cap、工具结果回填上限、失败状态持久化、大输出 Blob 外置
  • 结构化输出和流式输出 Runtime 路径,包含带工具 Agent 的流式运行
  • 上下文治理:滑动窗口、启发式摘要压缩、丰富 LLM Profile 配置、ContextPrepared 事件
  • HTTP HITL 与 Webhook 路由(NewHumanHTTPHandlerNewWebhookHTTPHandler
  • GitHub Actions CI、golangci-lint、govulncheck/CodeQL、Dependabot 与模块发版检查
  • 单元测试和集成测试

后续生产路线:

  • 在现有 recorder/tracer 端口之上补充具体 Prometheus/OpenTelemetry exporter
  • 在当前 Compose 和 Kustomize base 之外补充 Helm chart 打包
  • 完善 Tool/Skill catalog manifest 校验、打包流程,以及针对托管服务的集成测试矩阵

贡献

参见 CONTRIBUTING.md

License

本项目使用 Apache License 2.0

Documentation

Overview

Package agentflow provides a small public facade for embedding the scenario-driven agent runtime in other Go projects.

Applications that need low-level extension points can import pkg/core, pkg/llm, pkg/memory, and pkg/runstate directly. Applications that only need to load a YAML scenario and run it should use this package.

Index

Examples

Constants

View Source
const SchemaVersion = "2020-12"

SchemaVersion is the JSON Schema draft used by ScenarioJSONSchema.

View Source
const Version = "0.2.2"

Version is the library release version exposed to embedders.

Variables

AdaptiveRAG builds the adaptive-rag workflow example stack.

View Source
var AdaptiveRAGWorkflow = scenariobuilder.AdaptiveRAGWorkflow

AdaptiveRAGWorkflow builds the adaptive RAG workflow graph.

CodeReviewPipeline builds the code review workflow example stack.

View Source
var CodeReviewPipelineWorkflow = scenariobuilder.CodeReviewPipelineWorkflow

CodeReviewPipelineWorkflow builds the code review workflow graph.

ContextGovernance builds the context governance example stack.

CorrectiveRAG builds the corrective-rag workflow example stack.

View Source
var CorrectiveRAGWorkflow = scenariobuilder.CorrectiveRAGWorkflow

CorrectiveRAGWorkflow builds the corrective RAG workflow graph.

View Source
var DeclarativeInterruptWorkflow = scenariobuilder.DeclarativeInterruptWorkflow

DeclarativeInterruptWorkflow builds the prepare → interrupt → continue graph.

View Source
var FixedWorkflowReviewWorkflow = scenariobuilder.FixedWorkflowReviewWorkflow

FixedWorkflowReviewWorkflow builds the inspect → review workflow graph.

HybridResearch builds the hybrid research example stack.

View Source
var HybridResearchWorkflow = scenariobuilder.HybridResearchWorkflow

HybridResearchWorkflow builds the hybrid research workflow graph.

MapItemField sets the per-item field name for map branches.

MapNodeInput builds map node input JSON from a list field and branches.

MapOnError sets the map node error policy branch.

MinimalAutonomous builds the common mock/session/tool autonomous stack.

View Source
var MinimalDeclarativeInterrupt = scenariobuilder.MinimalDeclarativeInterrupt

MinimalDeclarativeInterrupt builds the declarative interrupt demo stack.

View Source
var MinimalFilesystemTool = scenariobuilder.MinimalFilesystemTool

MinimalFilesystemTool builds the filesystem tool example stack.

View Source
var MinimalFixedWorkflowReview = scenariobuilder.MinimalFixedWorkflowReview

MinimalFixedWorkflowReview builds the fixed workflow review example stack.

MinimalHTTPTool builds the http tool example stack.

MinimalHumanInLoop builds the human-in-loop demo stack.

MinimalMCPTool builds the MCP tool example stack.

MinimalRAG builds the rag-knowledge example stack.

MinimalSQLTool builds the sql tool example stack.

View Source
var MinimalTicketHandling = scenariobuilder.MinimalTicketHandling

MinimalTicketHandling builds the ticket-handling example stack.

View Source
var MultiExpertResearch = scenariobuilder.MultiExpertResearch

MultiExpertResearch builds the multi-expert hybrid example stack.

View Source
var MultiExpertResearchWorkflow = scenariobuilder.MultiExpertResearchWorkflow

MultiExpertResearchWorkflow builds the parallel expert workflow graph.

NewMinimal starts a scenario with the standard mock/session stack. Register tools, then call MinimalAgent or configure agents manually.

View Source
var NewScenarioBuilder = scenariobuilder.New

NewScenarioBuilder creates a scenario builder. Prefer pkg/builder when importing only the builder package without the root facade.

View Source
var NewWorkflowBuilder = scenariobuilder.NewWorkflow

NewWorkflowBuilder creates a workflow builder.

SelfRAG builds the self-rag workflow example stack.

SelfRAGWorkflow builds the self RAG workflow graph.

View Source
var TierMemoryAutonomous = scenariobuilder.TierMemoryAutonomous

TierMemoryAutonomous builds the tier-memory example stack.

View Source
var WorkflowEnhancements = scenariobuilder.WorkflowEnhancements

WorkflowEnhancements builds the workflow enhancements example stack.

View Source
var WorkflowEnhancementsWorkflow = scenariobuilder.WorkflowEnhancementsWorkflow

WorkflowEnhancementsWorkflow builds the workflow enhancements graph.

Functions

func LoadSkillManifest added in v0.1.8

func LoadSkillManifest(data []byte) (core.Skill, error)

LoadSkillManifest loads and validates a standalone skill catalog manifest document.

func LoadSkillManifestFile added in v0.1.8

func LoadSkillManifestFile(path string) (core.Skill, error)

LoadSkillManifestFile loads and validates a standalone skill catalog manifest.

func LoadToolManifest added in v0.1.8

func LoadToolManifest(data []byte) (core.Tool, error)

LoadToolManifest loads and validates a standalone tool catalog manifest document.

func LoadToolManifestFile added in v0.1.8

func LoadToolManifestFile(path string) (core.Tool, error)

LoadToolManifestFile loads and validates a standalone tool catalog manifest.

func NewAPIKeyMiddleware

func NewAPIKeyMiddleware(config APIKeyMiddlewareConfig) (func(http.Handler) http.Handler, error)

func NewAnthropicGateway

func NewAnthropicGateway(profiles []llm.Profile, client *http.Client) llm.Gateway

NewAnthropicGateway creates a gateway for Anthropic Messages APIs.

func NewAsyncRunHTTPHandler

func NewAsyncRunHTTPHandler(config AsyncRunHTTPHandlerConfig) (http.Handler, error)

func NewAuthorizationMiddleware

func NewAuthorizationMiddleware(config AuthorizationMiddlewareConfig) (func(http.Handler) http.Handler, error)

func NewBlobTierColdStore added in v0.2.0

func NewBlobTierColdStore(config BlobTierColdStoreConfig) (tier.Store, error)

NewBlobTierColdStore stores gzip JSON cold-tier records in a BlobAdmin backend.

func NewCheckpointHTTPHandler added in v0.2.0

func NewCheckpointHTTPHandler(config CheckpointHTTPHandlerConfig) (http.Handler, error)

NewCheckpointHTTPHandler serves production checkpoint routes:

  • GET /v1/runs/{run_id}/steps
  • POST /v1/runs/{run_id}/resume-from-step
  • GET /v1/runs/{run_id}/checkpoints
  • GET /v1/runs/{run_id}/checkpoints/{version}
  • POST /v1/runs/{run_id}/resume-from-checkpoint

func NewCognitiveTierMemory added in v0.1.9

func NewCognitiveTierMemory(manager tier.Manager, weights tier.RecallWeights) memory.CognitiveMemory

NewCognitiveTierMemory exposes a tier Manager through the CognitiveMemory port.

func NewCompositeTierStore added in v0.1.9

func NewCompositeTierStore(config CompositeTierStoreConfig) tier.Store

NewCompositeTierStore routes records across tier backends.

func NewEventFanoutSink

func NewEventFanoutSink(sinks ...core.EventSink) core.EventSink

func NewEventHub

func NewEventHub() *observability.EventHub

func NewEventStoreSink

func NewEventStoreSink(store observability.EventStore, publishers ...observability.EventPublisher) core.EventSink

func NewFileAuditSink

func NewFileAuditSink(path string) (audit.Sink, error)

func NewFileBlobStore

func NewFileBlobStore(dir string) (runstate.BlobStore, error)

NewFileBlobStore creates a file-backed blob store.

func NewFileKnowledgeLoader

func NewFileKnowledgeLoader(config FileKnowledgeLoaderConfig) (knowledge.Loader, error)

NewFileKnowledgeLoader creates a filesystem document loader for knowledge ingestion.

func NewFileMemoryRepository

func NewFileMemoryRepository(dir string) (memory.Repository, error)

NewFileMemoryRepository creates a JSON-file-backed memory repository.

func NewFileRunStateRepository

func NewFileRunStateRepository(dir string) (runstate.Repository, error)

NewFileRunStateRepository creates a JSON-file-backed run-state repository.

func NewFileTierColdStore added in v0.1.9

func NewFileTierColdStore(dir string) (tier.Store, error)

NewFileTierColdStore creates a gzip JSON cold-tier store on the local filesystem.

func NewFilesystemToolExecutor

func NewFilesystemToolExecutor(config FilesystemToolConfig) (core.ToolExecutor, error)

NewFilesystemToolExecutor creates a governed filesystem read tool executor.

func NewFrameworkJobHandler added in v0.1.2

func NewFrameworkJobHandler(config FrameworkRunJobHandlerConfig) (asyncpkg.Handler, error)

NewFrameworkJobHandler executes framework run, event, resume.continue, and memory.reconcile jobs.

func NewGitToolExecutor added in v0.1.2

func NewGitToolExecutor(config GitToolConfig) (core.ToolExecutor, error)

NewGitToolExecutor creates a read-only git tool executor.

func NewHTTPKnowledgeLoader

func NewHTTPKnowledgeLoader(config HTTPKnowledgeLoaderConfig) (knowledge.Loader, error)

NewHTTPKnowledgeLoader creates an HTTP document loader for knowledge ingestion.

func NewHTTPToolExecutor

func NewHTTPToolExecutor(config HTTPToolConfig) (core.ToolExecutor, error)

NewHTTPToolExecutor creates a governed HTTP client tool executor.

func NewHumanHTTPHandler added in v0.1.2

func NewHumanHTTPHandler(config HumanHTTPHandlerConfig) http.Handler

NewHumanHTTPHandler serves human gate resume requests. When the request sets continue=true, the handler calls ResumeAndContinue instead of Resume.

func NewInMemoryAuditSink

func NewInMemoryAuditSink(limit int) audit.Sink

func NewInMemoryBlobStore

func NewInMemoryBlobStore() runstate.BlobStore

NewInMemoryBlobStore creates the default in-memory blob store used by New.

func NewInMemoryCheckpointHistory added in v0.2.0

func NewInMemoryCheckpointHistory() runstate.CheckpointHistory

NewInMemoryCheckpointHistory creates an append-only in-memory checkpoint history store.

func NewInMemoryEventStore

func NewInMemoryEventStore() observability.EventStore

func NewInMemoryJobQueue

func NewInMemoryJobQueue() asyncpkg.Queue

func NewInMemoryRunStateRepository

func NewInMemoryRunStateRepository() runstate.Repository

NewInMemoryRunStateRepository creates the default in-memory run-state repository used by New.

func NewInMemoryTierHotStore added in v0.1.9

func NewInMemoryTierHotStore() tier.Store

NewInMemoryTierHotStore creates an in-process hot-tier store.

func NewJWTMiddleware

func NewJWTMiddleware(config JWTMiddlewareConfig) (func(http.Handler) http.Handler, error)

func NewKnowledgeIndexer

func NewKnowledgeIndexer(config KnowledgeIndexerConfig) (*knowledge.Indexer, error)

NewKnowledgeIndexer creates a document chunking, embedding, and vector upsert pipeline.

func NewLLMReranker added in v0.1.5

func NewLLMReranker(gateway llm.Gateway, profile string) knowledge.Reranker

NewLLMReranker creates an LLM reranker for retrieval tools.

func NewLLMRouter

func NewLLMRouter(routes map[string]llm.Gateway) llm.Gateway

NewLLMRouter routes profile names to provider-specific gateways.

func NewLLMTierSummarizer added in v0.2.0

func NewLLMTierSummarizer(gateway llm.Chatter, profile string) tier.ContentSummarizer

NewLLMTierSummarizer creates an LLM-backed cold-tier content summarizer.

func NewLocalGateway

func NewLocalGateway(profiles []llm.Profile, client *http.Client) llm.Gateway

NewLocalGateway creates a gateway for local OpenAI-compatible model servers.

func NewMCPHTTPClient

func NewMCPHTTPClient(endpoint string, client *http.Client) (mcp.Client, error)

NewMCPHTTPClient creates an MCP JSON-RPC client over HTTP.

func NewMCPToolExecutor

func NewMCPToolExecutor(client mcp.Client, tool string) (core.ToolExecutor, error)

NewMCPToolExecutor adapts one MCP server tool into an AgentFlow tool executor.

func NewMockLLMGateway added in v0.1.4

func NewMockLLMGateway(scenario core.Scenario) llm.Gateway

NewMockLLMGateway creates a fallback mock gateway for tests and examples.

func NewNoopAuditSink

func NewNoopAuditSink() audit.Sink

func NewOIDCJWTAuthenticator

func NewOIDCJWTAuthenticator(config OIDCJWTAuthenticatorConfig) (security.BearerAuthenticator, error)

NewOIDCJWTAuthenticator creates a bearer authenticator that discovers and refreshes RSA verification keys from OIDC Discovery/JWKS endpoints.

func NewObservabilityEventSink

func NewObservabilityEventSink(recorder observability.Recorder, tracer observability.Tracer, next core.EventSink) core.EventSink

func NewObservabilityHTTPHandler

func NewObservabilityHTTPHandler(config ObservabilityHTTPHandlerConfig) (http.Handler, error)

func NewOpenAICompatibleEmbedder

func NewOpenAICompatibleEmbedder(profiles []llm.Profile, client *http.Client) llm.Embedder

NewOpenAICompatibleEmbedder creates an embedder for OpenAI-compatible embedding APIs.

func NewOpenAICompatibleGateway

func NewOpenAICompatibleGateway(profiles []llm.Profile, client *http.Client) llm.Gateway

NewOpenAICompatibleGateway creates a gateway for OpenAI-compatible chat APIs.

func NewOpenTelemetryStdoutTracerProvider added in v0.1.8

func NewOpenTelemetryStdoutTracerProvider(ctx context.Context, config OpenTelemetryTracerProviderConfig) (*sdktrace.TracerProvider, error)

NewOpenTelemetryStdoutTracerProvider creates a TracerProvider that exports spans to stdout.

func NewOpenTelemetryTracer added in v0.1.8

func NewOpenTelemetryTracer(tracer oteltrace.Tracer) observability.Tracer

NewOpenTelemetryTracer wraps a host-configured OpenTelemetry tracer.

func NewPostgresCheckpointHistory added in v0.2.0

func NewPostgresCheckpointHistory(db *sql.DB, tableName ...string) (runstate.CheckpointHistory, error)

NewPostgresCheckpointHistory creates a PostgreSQL append-only checkpoint history store.

func NewPostgresJobQueue

func NewPostgresJobQueue(db *sql.DB, tableName ...string) (asyncpkg.Queue, error)

func NewPostgresRunStateRepository

func NewPostgresRunStateRepository(db *sql.DB, tableName ...string) (runstate.Repository, error)

NewPostgresRunStateRepository creates a PostgreSQL-compatible run-state repository using a caller-provided *sql.DB. Applications must import and register their preferred PostgreSQL database/sql driver.

func NewPostgresTierWarmStore added in v0.1.9

func NewPostgresTierWarmStore(config PostgresTierWarmStoreConfig) (tier.Store, error)

NewPostgresTierWarmStore creates a warm-tier store backed by Postgres JSONB rows.

func NewPostgresVectorStore

func NewPostgresVectorStore(config PostgresVectorStoreConfig) (knowledge.VectorStore, error)

NewPostgresVectorStore creates a pgvector-compatible knowledge vector store.

func NewProductionHTTPHandler

func NewProductionHTTPHandler(config ProductionHTTPHandlerConfig) (http.Handler, error)

func NewRedisLocker

func NewRedisLocker(config RedisLockerConfig) (coordination.Locker, error)

NewRedisLocker creates a Redis-backed lease manager for distributed worker and workflow coordination.

func NewRedisRunStateRepository

func NewRedisRunStateRepository(config RedisRunStateRepositoryConfig) (runstate.Repository, error)

NewRedisRunStateRepository creates a Redis-backed run-state repository with compare-and-swap version checks for distributed workers.

func NewRetentionHTTPHandler added in v0.2.0

func NewRetentionHTTPHandler(config RetentionHTTPHandlerConfig) (http.Handler, error)

NewRetentionHTTPHandler serves admin retention routes:

  • POST /v1/admin/retention/purge-runs
  • POST /v1/admin/retention/purge-expired
  • POST /v1/admin/retention/purge-policy
  • POST /v1/admin/retention/purge-blobs

func NewRetrieverTool

func NewRetrieverTool(config RetrieverToolConfig) (core.ToolExecutor, error)

NewRetrieverTool creates a semantic retrieval tool backed by an embedder and vector store.

func NewS3BlobStore

func NewS3BlobStore(config S3BlobStoreConfig) (runstate.BlobStore, error)

NewS3BlobStore creates an S3-compatible blob store for large runtime and workflow outputs. It uses path-style object URLs, AWS Signature Version 4, and supports providers whose S3-compatible PUT/GET behavior has been tested.

func NewSQLToolExecutor

func NewSQLToolExecutor(config SQLToolConfig) (core.ToolExecutor, error)

NewSQLToolExecutor creates a governed read-only SQL query tool executor.

func NewScoreReranker added in v0.1.5

func NewScoreReranker() knowledge.Reranker

NewScoreReranker creates a lexical score reranker for retrieval tools.

func NewSlogAuditSink

func NewSlogAuditSink(logger *stdslog.Logger) audit.Sink

func NewSlogEventSink

func NewSlogEventSink(logger *stdslog.Logger) core.EventSink

func NewStaticAPIKeyAuthenticator

func NewStaticAPIKeyAuthenticator(keys map[string]identity.Principal) (security.APIKeyAuthenticator, error)

func NewStudioHTTPHandler added in v0.2.0

func NewStudioHTTPHandler(config StudioHTTPHandlerConfig) (http.Handler, error)

NewStudioHTTPHandler serves production Studio routes:

  • POST /v1/studio/validate
  • POST /v1/studio/codegen
  • POST /v1/studio/yaml
  • POST /v1/studio/import-yaml
  • POST /v1/studio/run
  • POST /v1/studio/save (when StudioSavePath is set)

func NewTicketToolExecutor added in v0.1.2

func NewTicketToolExecutor(config TicketToolConfig) (core.ToolExecutor, error)

NewTicketToolExecutor creates a ticket store backed tool executor.

func NewTierColdSummaryIndexer added in v0.2.0

func NewTierColdSummaryIndexer(config TierColdSummaryIndexerConfig) (tier.ColdSummaryIndexer, error)

NewTierColdSummaryIndexer indexes cold-tier summaries for semantic recall.

func NewVerboseSlogEventSink added in v0.1.6

func NewVerboseSlogEventSink(logger *stdslog.Logger) core.EventSink

NewVerboseSlogEventSink logs runtime events with redacted-safe payload details to stderr-friendly sinks.

func NewWebhookHTTPHandler added in v0.1.2

func NewWebhookHTTPHandler(config WebhookHTTPHandlerConfig) (http.Handler, error)

NewWebhookHTTPHandler serves POST / requests that accept IncomingEvent JSON payloads.

func OpenTelemetryTracerFromProvider added in v0.1.8

func OpenTelemetryTracerFromProvider(provider *sdktrace.TracerProvider, instrumentationName string) observability.Tracer

OpenTelemetryTracerFromProvider returns a tracer backed by a TracerProvider.

func PrometheusMetricsHandler added in v0.1.4

func PrometheusMetricsHandler(recorder *PrometheusRecorder) http.Handler

PrometheusMetricsHandler returns an http.Handler that serves recorder metrics.

func ScenarioJSONSchema added in v0.1.4

func ScenarioJSONSchema() []byte

ScenarioJSONSchema returns a copy of the AgentFlow scenario JSON Schema.

func ValidateScenario

func ValidateScenario(scenario core.Scenario) error

ValidateScenario validates a scenario built programmatically.

func ValidateSkillManifest added in v0.1.8

func ValidateSkillManifest(skill core.Skill) error

ValidateSkillManifest validates a skill manifest for catalog registration.

func ValidateToolManifest added in v0.1.8

func ValidateToolManifest(tool core.Tool) error

ValidateToolManifest validates a tool manifest for catalog registration.

func ValidateWiring added in v0.1.4

func ValidateWiring(scenario core.Scenario, opts ...Option) error

ValidateWiring checks that a scenario's declared dependencies are covered by the provided options before constructing a Framework.

func ValidateWiringWithOptions added in v0.1.4

func ValidateWiringWithOptions(scenario core.Scenario, wiring WiringOptions, opts ...Option) error

ValidateWiringWithOptions validates wiring using explicit wiring rules.

Types

type APIKeyMiddlewareConfig

type APIKeyMiddlewareConfig struct {
	Authenticator security.APIKeyAuthenticator
	HeaderName    string
}

type AsyncRunHTTPHandlerConfig

type AsyncRunHTTPHandlerConfig struct {
	Queue        asyncpkg.Queue
	Policy       security.Policy
	Audit        audit.Sink
	IDGenerator  func() string
	Now          func() time.Time
	MaxBodyBytes int64
}

type AuthorizationMiddlewareConfig

type AuthorizationMiddlewareConfig struct {
	Policy       security.Policy
	Action       security.Action
	Resource     security.Resource
	ResourceFunc func(*http.Request) security.Resource
	Audit        audit.Sink
}

type BlobTierColdStoreConfig added in v0.2.0

type BlobTierColdStoreConfig struct {
	Blobs    runstate.BlobStore
	IndexDir string
}

BlobTierColdStoreConfig configures a blob-backed cold tier with a local index directory.

type CheckpointHTTPHandlerConfig added in v0.2.0

type CheckpointHTTPHandlerConfig struct {
	Framework    *Framework
	MaxBodyBytes int64
}

type CodegenResult added in v0.2.0

type CodegenResult struct {
	Language string `json:"language"`
	Code     string `json:"code"`
}

CodegenResult contains generated builder code for a Studio graph.

type CompositeTierStoreConfig added in v0.1.9

type CompositeTierStoreConfig struct {
	Hot  tier.Store
	Warm tier.Store
	Cold tier.Store
}

CompositeTierStoreConfig wires hot, warm, and cold tier backends.

type EventRouter added in v0.1.2

type EventRouter = eventrouter.Router

EventRouter maps external events to run requests for a scenario.

func NewEventRouter added in v0.1.2

func NewEventRouter(scenario core.Scenario) *EventRouter

NewEventRouter creates a router from scenario trigger definitions.

type FileKnowledgeLoaderConfig

type FileKnowledgeLoaderConfig struct {
	Paths     []string
	Namespace string
	Metadata  map[string]string
	MaxBytes  int64
}

type FilesystemToolConfig

type FilesystemToolConfig struct {
	AllowedRoots []string
	MaxBytes     int64
}

type ForkRunResult added in v0.2.0

type ForkRunResult struct {
	RunID           string `json:"run_id"`
	ParentRunID     string `json:"parent_run_id"`
	ThreadID        string `json:"thread_id"`
	ForkFromVersion int64  `json:"fork_from_version,omitempty"`
}

type Framework

type Framework struct {
	// contains filtered or unexported fields
}

Framework is an embeddable runtime wrapper for one scenario.

func New

func New(scenario core.Scenario, opts ...Option) (*Framework, error)

New creates a Framework for a validated scenario. By default it wires in-memory run-state and blob stores and a no-op event sink. Production applications should provide persistent repositories through options.

Example
package main

import (
	"context"
	"encoding/json"

	agentflow "github.com/aijustin/agentflow-go"
	"github.com/aijustin/agentflow-go/pkg/builder"
	"github.com/aijustin/agentflow-go/pkg/core"
)

func testAutonomousScenario() core.Scenario {
	return builder.MinimalAutonomous("assistant", builder.MinimalScenarioName("autonomous-echo"))
}

type noopTool struct{}

func (noopTool) Execute(context.Context, core.ToolCall) (core.ToolResult, error) {
	return core.ToolResult{}, nil
}

func main() {
	fw, err := agentflow.New(testAutonomousScenario(), agentflow.WithToolExecutor("echo", noopTool{}))
	if err != nil {
		panic(err)
	}
	result, err := fw.Run(context.Background(), agentflow.RunRequest{
		RunID:  "example-run",
		Prompt: "hello",
	})
	if err != nil {
		panic(err)
	}
	out, _ := json.Marshal(result.Status)
	println(string(out))
}

func (*Framework) BlobStore

func (f *Framework) BlobStore() runstate.BlobStore

BlobStore returns the blob store backing large step outputs.

func (*Framework) Catalog added in v0.1.4

func (f *Framework) Catalog() catalog.Catalog

func (*Framework) Close added in v0.1.4

func (f *Framework) Close(ctx context.Context) error

Close releases resources registered through WithCloser or WithDatabase.

func (*Framework) CompareRuns added in v0.2.0

func (f *Framework) CompareRuns(ctx context.Context, runA, runB string) (studio.RunCompareResult, error)

CompareRuns diffs step outputs between two persisted runs.

func (*Framework) ExportScenarioGraph added in v0.2.0

func (f *Framework) ExportScenarioGraph() ScenarioGraph

ExportScenarioGraph exports the framework scenario as a nested graph.

func (*Framework) ForkRun added in v0.2.0

func (f *Framework) ForkRun(ctx context.Context, parentRunID string, version int64) (ForkRunResult, error)

ForkRun copies a run snapshot into a new run ID without modifying the parent run.

func (*Framework) GenerateStudioBuilderCode added in v0.2.0

func (f *Framework) GenerateStudioBuilderCode(_ context.Context, edited graph.ScenarioGraph) (CodegenResult, error)

GenerateStudioBuilderCode renders builder Go code for an edited Studio graph.

func (*Framework) GenerateStudioScenarioYAML added in v0.2.0

func (f *Framework) GenerateStudioScenarioYAML(_ context.Context, edited graph.ScenarioGraph) (CodegenResult, error)

GenerateStudioScenarioYAML renders legacy scenario YAML for an edited Studio graph.

func (*Framework) GetRunCheckpoint added in v0.2.0

func (f *Framework) GetRunCheckpoint(ctx context.Context, runID string, version int64) (runstate.RunSnapshot, error)

GetRunCheckpoint loads one historical snapshot revision for a run.

func (*Framework) HandleEvent added in v0.1.2

func (f *Framework) HandleEvent(ctx context.Context, event IncomingEvent) (RunResult, error)

HandleEvent resolves an incoming event and executes the scenario.

func (*Framework) ImportStudioScenarioYAML added in v0.2.0

func (f *Framework) ImportStudioScenarioYAML(_ context.Context, yamlData []byte, layout graph.ScenarioGraph) (ImportStudioResult, error)

ImportStudioScenarioYAML parses legacy scenario YAML and returns an editable graph. When layout is non-empty, node positions from layout are merged onto the imported graph.

func (*Framework) ListRunCheckpoints added in v0.2.0

func (f *Framework) ListRunCheckpoints(ctx context.Context, runID string, limit int) (ListRunCheckpointsResult, error)

ListRunCheckpoints returns append-only snapshot revisions recorded for a run.

func (*Framework) ListRunSteps added in v0.2.0

func (f *Framework) ListRunSteps(ctx context.Context, runID string) (ListRunStepsResult, error)

ListRunSteps returns persisted step outputs and the current snapshot version.

func (*Framework) ListRunThread added in v0.2.0

func (f *Framework) ListRunThread(ctx context.Context, runID string) ([]ThreadRunSummary, error)

ListRunThread returns runs in the same fork/thread group as the given run.

func (*Framework) PurgeExpired added in v0.1.4

func (f *Framework) PurgeExpired(ctx context.Context, maxAge time.Duration) (int, error)

PurgeExpired deletes terminal run snapshots whose UpdatedAt is before now-maxAge. Snapshots without UpdatedAt are skipped.

func (*Framework) PurgeOrphanBlobs added in v0.1.4

func (f *Framework) PurgeOrphanBlobs(ctx context.Context) (int, error)

PurgeOrphanBlobs deletes blob objects that are no longer referenced by any run snapshot for the current scenario (and tenant, when a principal is present).

func (*Framework) PurgeRuns added in v0.1.4

func (f *Framework) PurgeRuns(ctx context.Context, filter runstate.ListFilter) (int, error)

PurgeRuns deletes run snapshots matching the filter.

func (*Framework) PurgeWithPolicy added in v0.1.4

func (f *Framework) PurgeWithPolicy(ctx context.Context, policy RetentionPolicy) (int, error)

PurgeWithPolicy deletes run snapshots using a retention policy.

func (*Framework) ResolveEvent added in v0.1.2

func (f *Framework) ResolveEvent(event IncomingEvent) (RunRequest, error)

ResolveEvent resolves an incoming event without executing it.

func (*Framework) Resume

func (f *Framework) Resume(ctx context.Context, token string, decision core.Decision, amendment json.RawMessage) error

Resume resumes a paused run through the configured human gate.

func (*Framework) ResumeAndContinue added in v0.1.2

func (f *Framework) ResumeAndContinue(ctx context.Context, token string, decision core.Decision, amendment json.RawMessage) (RunResult, error)

ResumeAndContinue resumes a paused run and continues execution until the next pause point or completion.

func (*Framework) ResumeFromCheckpoint added in v0.2.0

func (f *Framework) ResumeFromCheckpoint(ctx context.Context, runID string, version int64) (RunResult, error)

ResumeFromCheckpoint restores a historical snapshot revision and reruns the workflow forward from that restored state.

func (*Framework) ResumeFromStep added in v0.2.0

func (f *Framework) ResumeFromStep(ctx context.Context, runID, nodeID string) (RunResult, error)

ResumeFromStep rewinds a workflow run to the given node, truncating that node and all downstream step outputs, then reruns from that point forward.

func (*Framework) ResumeRunByID added in v0.2.0

func (f *Framework) ResumeRunByID(ctx context.Context, runID string, decision core.Decision, amendment json.RawMessage, continueExecution bool) (RunResult, error)

ResumeRunByID resumes a paused run by signing a HITL token from the current snapshot. When continueExecution is true, execution continues until completion or the next pause.

func (*Framework) Run

func (f *Framework) Run(ctx context.Context, req RunRequest) (RunResult, error)

Run executes the framework scenario.

func (*Framework) RunStateRepository

func (f *Framework) RunStateRepository() runstate.Repository

RunStateRepository returns the repository backing run-state snapshots.

func (*Framework) RunStructured

func (f *Framework) RunStructured(ctx context.Context, req RunRequest) (RunResult, error)

RunStructured executes an agent using its configured output_schema and a gateway that implements llm.StructuredOutputter.

func (*Framework) RunStudioGraph added in v0.2.0

func (f *Framework) RunStudioGraph(ctx context.Context, edited graph.ScenarioGraph, req RunRequest) (RunResult, error)

RunStudioGraph validates an edited graph and executes it as a new run.

func (*Framework) SaveStudioGraph added in v0.2.0

func (f *Framework) SaveStudioGraph(ctx context.Context, edited graph.ScenarioGraph, path string) (SaveStudioResult, error)

SaveStudioGraph validates an edited graph, writes legacy YAML to path, and updates the framework scenario.

func (*Framework) Scenario

func (f *Framework) Scenario() core.Scenario

Scenario returns the scenario used by this framework.

func (*Framework) Stream

func (f *Framework) Stream(ctx context.Context, req RunRequest) (<-chan llm.ChatChunk, error)

Stream executes an agent using a gateway that implements llm.Streamer.

func (*Framework) ValidateStudioGraph added in v0.2.0

func (f *Framework) ValidateStudioGraph(_ context.Context, edited graph.ScenarioGraph) (ValidateStudioResult, error)

ValidateStudioGraph validates an edited Studio graph against the framework scenario.

type FrameworkRunJobHandlerConfig

type FrameworkRunJobHandlerConfig struct {
	Framework *Framework
}

type GitToolConfig added in v0.1.2

type GitToolConfig struct {
	AllowedRoots []string
}

type GraphEdge added in v0.2.0

type GraphEdge = graph.GraphEdge

GraphEdge connects two GraphNodes.

type GraphNode added in v0.2.0

type GraphNode = graph.GraphNode

GraphNode is a workflow node in a GraphView.

type GraphView added in v0.2.0

type GraphView = graph.GraphView

GraphView describes one workflow DAG for visualization.

type HTTPKnowledgeLoaderConfig

type HTTPKnowledgeLoaderConfig struct {
	URLs      []string
	Namespace string
	Metadata  map[string]string
	MaxBytes  int64
	Client    *http.Client
}

type HTTPToolConfig

type HTTPToolConfig struct {
	AllowedHosts     []string
	AllowedMethods   []string
	DefaultHeaders   map[string]string
	MaxResponseBytes int64
	Client           *http.Client
}

type HumanHTTPHandlerConfig added in v0.1.2

type HumanHTTPHandlerConfig struct {
	Framework    *Framework
	MaxBodyBytes int64
}

type ImportStudioResult added in v0.2.0

type ImportStudioResult struct {
	ScenarioName string              `json:"scenario_name"`
	Graph        graph.ScenarioGraph `json:"graph"`
}

ImportStudioResult describes a YAML import into an editable Studio graph.

type IncomingEvent added in v0.1.2

type IncomingEvent = eventrouter.Event

IncomingEvent is an external trigger delivered through webhooks or CLI.

type JWTAlgorithm

type JWTAlgorithm string
const (
	JWTAlgorithmHS256 JWTAlgorithm = "HS256"
	JWTAlgorithmRS256 JWTAlgorithm = "RS256"
)

type JWTAuthenticatorConfig

type JWTAuthenticatorConfig struct {
	Issuer         string
	Audience       string
	Keys           []JWTKey
	Now            func() time.Time
	Leeway         time.Duration
	PrincipalType  identity.PrincipalType
	TenantClaim    string
	WorkspaceClaim string
	ProjectClaim   string
	RolesClaim     string
}

type JWTKey

type JWTKey struct {
	ID              string
	Algorithm       JWTAlgorithm
	HMACSecret      []byte
	RSAPublicKeyPEM []byte
}

type JWTMiddlewareConfig

type JWTMiddlewareConfig struct {
	Authenticator security.BearerAuthenticator
}

type KnowledgeIndexerConfig

type KnowledgeIndexerConfig struct {
	Embedder  llm.Embedder
	Store     knowledge.VectorStore
	Profile   string
	Namespace string
	BatchSize int
	Chunker   knowledge.Chunker
}

type KnowledgeRegistry added in v0.1.5

type KnowledgeRegistry struct {
	Embedder llm.Embedder
	Store    knowledge.VectorStore
	Reranker knowledge.Reranker
}

KnowledgeRegistry wires scenario knowledge collections to retriever executors.

type LLMProviderRouter

type LLMProviderRouter interface {
	llm.Gateway
	llm.Embedder
}

func NewLLMProviderRouter

func NewLLMProviderRouter(routes map[string]llm.Gateway) LLMProviderRouter

NewLLMProviderRouter routes chat/tool/structured/streaming and embedding calls by profile name when the selected route supports the requested capability.

type ListRunCheckpointsResult added in v0.2.0

type ListRunCheckpointsResult struct {
	RunID       string                       `json:"run_id"`
	Checkpoints []runstate.CheckpointSummary `json:"checkpoints"`
}

ListRunCheckpointsResult summarizes append-only snapshot revisions for a run.

type ListRunStepsResult added in v0.2.0

type ListRunStepsResult struct {
	RunID         string             `json:"run_id"`
	Version       int64              `json:"version"`
	Status        runstate.RunStatus `json:"status"`
	CurrentNodeID string             `json:"current_node_id,omitempty"`
	PendingHITL   *PendingHITLInfo   `json:"pending_hitl,omitempty"`
	Steps         []RunStep          `json:"steps"`
}

ListRunStepsResult summarizes checkpointed workflow steps for a run.

type MCPRegistry added in v0.1.5

type MCPRegistry struct {
	Clients    map[string]mcp.Client
	HTTPClient *http.Client
}

MCPRegistry supplies MCP clients for scenario server declarations.

type MCPStdioClient

type MCPStdioClient interface {
	mcp.Client
	Close() error
}

func NewMCPStdioClient

func NewMCPStdioClient(ctx context.Context, config MCPStdioClientConfig) (MCPStdioClient, error)

NewMCPStdioClient creates an MCP JSON-RPC client over a child process stdio transport.

type MCPStdioClientConfig

type MCPStdioClientConfig struct {
	Command string
	Args    []string
	Env     []string
	Dir     string
}

type MapBranch added in v0.2.0

type MapBranch = scenariobuilder.MapBranch

MapBranch configures a map node fan-out branch.

type OIDCJWTAuthenticatorConfig

type OIDCJWTAuthenticatorConfig struct {
	Issuer          string
	Audience        string
	DiscoveryURL    string
	JWKSURL         string
	HTTPClient      *http.Client
	RefreshInterval time.Duration
	Now             func() time.Time
	Leeway          time.Duration
	PrincipalType   identity.PrincipalType
	TenantClaim     string
	WorkspaceClaim  string
	ProjectClaim    string
	RolesClaim      string
}

type ObservabilityHTTPHandlerConfig

type ObservabilityHTTPHandlerConfig struct {
	Store          observability.EventStore
	Hub            *observability.EventHub
	AuthMiddleware func(http.Handler) http.Handler
	// Framework enables Studio graph export, step listing, and resume-from-step.
	Framework *Framework
	// StudioSavePath enables POST /observability/api/studio/save for the configured scenario file.
	StudioSavePath string
	// TraceExploreURL is an optional trace UI link template, e.g. https://jaeger.example.com/trace/{trace_id}.
	TraceExploreURL string
}

type OpenAICompatibleProvider

type OpenAICompatibleProvider interface {
	llm.Gateway
	llm.Embedder
}

func NewOpenAICompatibleProvider

func NewOpenAICompatibleProvider(profiles []llm.Profile, client *http.Client) OpenAICompatibleProvider

NewOpenAICompatibleProvider creates a gateway/embedder for OpenAI-compatible APIs.

type OpenTelemetryTracer added in v0.1.8

type OpenTelemetryTracer = oteladapter.Tracer

OpenTelemetryTracer adapts go.opentelemetry.io/otel/trace.Tracer to observability.Tracer.

type OpenTelemetryTracerProviderConfig added in v0.1.8

type OpenTelemetryTracerProviderConfig = oteladapter.TracerProviderConfig

OpenTelemetryTracerProviderConfig configures a stdout-exporting TracerProvider for local development.

type Option

type Option func(*options) error

Option customizes Framework construction.

func KnowledgeWiringOptions added in v0.1.5

func KnowledgeWiringOptions(scenario core.Scenario, registry KnowledgeRegistry) ([]Option, error)

KnowledgeWiringOptions returns Framework options that bind scenario knowledge collections.

func MCPWiringOptions added in v0.1.5

func MCPWiringOptions(ctx context.Context, scenario core.Scenario, registry MCPRegistry) ([]Option, error)

MCPWiringOptions returns Framework options that wire mcp.tool declarations to MCP servers.

func WireMCPTools added in v0.1.5

func WireMCPTools(ctx context.Context, scenario core.Scenario, registry MCPRegistry) ([]Option, error)

WireMCPTools binds scenario MCP servers to mcp.tool executors.

func WithAuditSink

func WithAuditSink(sink audit.Sink) Option

WithAuditSink wires an audit sink used for compliance-oriented events.

func WithBlobStore

func WithBlobStore(store runstate.BlobStore) Option

WithBlobStore wires storage for large step outputs.

func WithCheckpointHistory added in v0.2.0

func WithCheckpointHistory(history runstate.CheckpointHistory) Option

WithCheckpointHistory wires append-only run snapshot history for time-travel.

func WithCloser added in v0.1.4

func WithCloser(fn func(context.Context) error) Option

WithCloser registers a function invoked by Framework.Close in LIFO order.

func WithCognitiveMemory added in v0.1.9

func WithCognitiveMemory(name string, repo memory.CognitiveMemory) Option

WithCognitiveMemory wires a cognitive memory backend by scenario memory name.

func WithDatabase added in v0.1.4

func WithDatabase(db *sql.DB) Option

WithDatabase registers a database handle for automatic close on Framework.Close.

func WithEventSink

func WithEventSink(sink core.EventSink) Option

WithEventSink wires observability event output.

func WithHITLTokenSecret

func WithHITLTokenSecret(secret []byte, tokenWriter io.Writer) Option

WithHITLTokenSecret wires the built-in HMAC-token human gate using the same RunStateRepository as the framework. tokenWriter can be nil.

func WithHITLTokenTTL

func WithHITLTokenTTL(ttl time.Duration) Option

WithHITLTokenTTL sets the lifetime for tokens emitted by WithHITLTokenSecret.

func WithHumanGate

func WithHumanGate(gate core.HumanGate) Option

WithHumanGate wires a custom human-in-the-loop gate.

func WithJobQueue added in v0.1.9

func WithJobQueue(queue async.Queue) Option

WithJobQueue wires an async queue used to enqueue memory.reconcile jobs after tier writes.

func WithLLMGateway

func WithLLMGateway(gateway llm.Gateway) Option

WithLLMGateway wires a provider-neutral LLM gateway.

func WithLogger added in v0.1.1

func WithLogger(logger log.Logger) Option

WithLogger wires a structured logger that receives warning and error messages from the runtime. If not provided, messages are silently discarded.

func WithMemoryRepository

func WithMemoryRepository(name string, repo memory.Repository) Option

WithMemoryRepository wires a memory backend by scenario memory name.

func WithOutputRedactor

func WithOutputRedactor(redactor governance.OutputRedactor) Option

WithOutputRedactor wires an output redactor that scrubs sensitive fields from step outputs before they are persisted or returned to callers.

func WithRecorder added in v0.1.1

func WithRecorder(recorder observability.Recorder) Option

WithRecorder wires a metrics recorder. If not provided, metrics are discarded via observability.NoopRecorder.

func WithRequireLLM added in v0.1.4

func WithRequireLLM() Option

WithRequireLLM makes New fail when no LLM gateway is wired.

func WithRunStateRepository

func WithRunStateRepository(repo runstate.Repository) Option

WithRunStateRepository wires run-state persistence used for pause/resume.

func WithSecurityPolicy

func WithSecurityPolicy(policy security.Policy) Option

WithSecurityPolicy wires an authorization policy used by runtime execution.

func WithTierColdSummarizer added in v0.2.0

func WithTierColdSummarizer(name string, summarizer tier.ContentSummarizer) Option

WithTierColdSummarizer wires an LLM summarizer for cold-tier archive on a memory name.

func WithTierColdSummaryIndexer added in v0.2.0

func WithTierColdSummaryIndexer(name string, indexer tier.ColdSummaryIndexer) Option

WithTierColdSummaryIndexer wires a vector indexer for cold-tier summary recall on a memory name.

func WithTierMemory added in v0.1.9

func WithTierMemory(name string, manager tier.Manager) Option

WithTierMemory wires a tier manager by scenario memory name.

func WithTierStore added in v0.1.9

func WithTierStore(name string, store tier.Store, policy tier.Policy) Option

WithTierStore wires a tier store and builds a default manager from policy.

func WithToolExecutor

func WithToolExecutor(name string, executor core.ToolExecutor) Option

WithToolExecutor registers an executable tool implementation by scenario tool name. Agent tool policies still come from the scenario YAML.

func WithToolGovernancePolicy

func WithToolGovernancePolicy(policy governance.ToolPolicy) Option

WithToolGovernancePolicy wires a per-invocation tool governance policy. The policy is evaluated before every tool execution and can deny calls based on side-effect level, call budget, or custom logic.

func WithToolResolver

func WithToolResolver(resolver core.ToolResolver) Option

WithToolResolver wires a resolver that creates or retrieves tool executors only when a declared tool is invoked. Explicit WithToolExecutor registrations take precedence over the resolver.

func WithTracer added in v0.1.1

func WithTracer(tracer observability.Tracer) Option

WithTracer wires a distributed-tracing provider. If not provided, tracing is a no-op via observability.NoopTracer.

type PendingHITLInfo added in v0.2.0

type PendingHITLInfo struct {
	NodeID    string `json:"node_id,omitempty"`
	Interrupt bool   `json:"interrupt,omitempty"`
}

PendingHITLInfo describes a paused run awaiting HITL approval.

type Plan

type Plan struct {
	Scenario core.Scenario
	LLMs     map[string]llm.Profile
	Memory   map[string]memory.Namespace
}

Plan is a resolved scenario plan that library users can inspect before creating a Framework.

func BuildPlan

func BuildPlan(scenario core.Scenario) (Plan, error)

BuildPlan validates and resolves public LLM and memory metadata from a scenario. It does not create provider clients or start execution.

type PostgresEventStoreConfig

type PostgresEventStoreConfig struct {
	DB              *sql.DB
	TableName       string
	SkipSchemaSetup bool
}

type PostgresTierWarmStoreConfig added in v0.1.9

type PostgresTierWarmStoreConfig struct {
	DB        *sql.DB
	TableName string
}

PostgresTierWarmStoreConfig configures a Postgres-backed warm tier store.

type PostgresVectorStoreConfig

type PostgresVectorStoreConfig struct {
	DB        *sql.DB
	TableName string
}

type ProductionHTTPHandlerConfig

type ProductionHTTPHandlerConfig struct {
	Queue          asyncpkg.Queue
	Policy         security.Policy
	Audit          audit.Sink
	AuthMiddleware func(http.Handler) http.Handler
	MetricsHandler http.Handler
	IDGenerator    func() string
	Now            func() time.Time
	MaxBodyBytes   int64
	Version        string
	// Framework enables sync /v1/events and /v1/hitl/resume when set.
	Framework *Framework
	// StudioSavePath enables POST /v1/studio/save for the configured scenario file.
	StudioSavePath string
}

type PrometheusRecorder added in v0.1.4

type PrometheusRecorder = promrecorder.Recorder

PrometheusRecorder exposes in-process Prometheus text metrics for agentflow runtime signals.

func NewPrometheusRecorder added in v0.1.4

func NewPrometheusRecorder() *PrometheusRecorder

NewPrometheusRecorder creates a Prometheus-compatible observability recorder.

type RedisLockerConfig

type RedisLockerConfig struct {
	Addr         string
	Password     string
	DB           int
	KeyPrefix    string
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

type RedisRunStateRepositoryConfig

type RedisRunStateRepositoryConfig struct {
	Addr         string
	Password     string
	DB           int
	KeyPrefix    string
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

type RetentionHTTPHandlerConfig added in v0.2.0

type RetentionHTTPHandlerConfig struct {
	Framework    *Framework
	Policy       security.Policy
	Audit        audit.Sink
	MaxBodyBytes int64
}

type RetentionPolicy added in v0.1.4

type RetentionPolicy struct {
	MaxAge       time.Duration
	Status       runstate.RunStatus
	ScenarioName string
	Limit        int
}

RetentionPolicy controls run-state cleanup.

type RetrieverToolConfig

type RetrieverToolConfig struct {
	Embedder            llm.Embedder
	Store               knowledge.VectorStore
	Profile             string
	Namespace           string
	DefaultLimit        int
	SearchMode          knowledge.SearchMode
	CandidateMultiplier int
	Reranker            knowledge.Reranker
	VectorWeight        float64
	TextWeight          float64
}

type RunRequest

type RunRequest = appexec.RunRequest

RunRequest is the input passed to Framework.Run.

type RunResult

type RunResult = appexec.RunResult

RunResult is the result returned from Framework.Run.

type RunStep added in v0.2.0

type RunStep struct {
	NodeID string                 `json:"node_id"`
	Output runstate.StepOutputRef `json:"output"`
}

RunStep describes one persisted workflow step output.

type S3BlobStoreConfig

type S3BlobStoreConfig struct {
	Endpoint        string
	Bucket          string
	Region          string
	Prefix          string
	AccessKeyID     string
	SecretAccessKey string
	SessionToken    string
	HTTPClient      *http.Client
}

type SQLToolConfig

type SQLToolConfig struct {
	DB              *sql.DB
	AllowedQueries  map[string]string
	AllowAdHocQuery bool
	MaxRows         int
	Timeout         time.Duration
}

type SaveStudioResult added in v0.2.0

type SaveStudioResult struct {
	Path         string              `json:"path"`
	ScenarioName string              `json:"scenario_name"`
	Graph        graph.ScenarioGraph `json:"graph,omitempty"`
}

SaveStudioResult describes a persisted Studio graph write.

type ScenarioBuilder added in v0.1.10

type ScenarioBuilder = scenariobuilder.ScenarioBuilder

ScenarioBuilder constructs scenarios with a fluent Go API. See pkg/builder for the full surface.

type ScenarioGraph added in v0.2.0

type ScenarioGraph = graph.ScenarioGraph

ScenarioGraph is a Studio-friendly orchestration topology view.

type StudioHTTPHandlerConfig added in v0.2.0

type StudioHTTPHandlerConfig struct {
	Framework      *Framework
	StudioSavePath string
	MaxBodyBytes   int64
}

type ThreadRunSummary added in v0.2.0

type ThreadRunSummary struct {
	RunID           string             `json:"run_id"`
	ParentRunID     string             `json:"parent_run_id,omitempty"`
	ForkFromVersion int64              `json:"fork_from_version,omitempty"`
	ThreadID        string             `json:"thread_id"`
	Status          runstate.RunStatus `json:"status"`
	ScenarioName    string             `json:"scenario_name,omitempty"`
}

ThreadRunSummary describes one run in a fork/thread group.

type Ticket added in v0.1.2

type Ticket = toolticket.Ticket

Ticket is a support ticket record manipulated by the ticket tool.

type TicketStore added in v0.1.2

type TicketStore = toolticket.Store

TicketStore persists ticket records for the ticket tool executor.

func NewMemoryTicketStore added in v0.1.2

func NewMemoryTicketStore(seed map[string]Ticket) TicketStore

NewMemoryTicketStore creates an in-memory ticket store for tests and demos.

type TicketToolConfig added in v0.1.2

type TicketToolConfig struct {
	Store toolticket.Store
}

type TierColdSummaryIndexerConfig added in v0.2.0

type TierColdSummaryIndexerConfig struct {
	Embedder   llm.Embedder
	Store      knowledge.VectorStore
	Profile    string
	MemoryName string
}

TierColdSummaryIndexerConfig configures vector indexing for cold-tier summaries.

type ToolResolver

type ToolResolver = core.ToolResolver

type ToolResolverFunc

type ToolResolverFunc = core.ToolResolverFunc

type ValidateStudioResult added in v0.2.0

type ValidateStudioResult struct {
	Valid     bool   `json:"valid"`
	Error     string `json:"error,omitempty"`
	ErrorCode string `json:"error_code,omitempty"`
	Scenario  string `json:"scenario_name"`
}

ValidateStudioResult reports graph/scenario validation output for Studio.

type WebhookHTTPHandlerConfig added in v0.1.2

type WebhookHTTPHandlerConfig struct {
	Framework    *Framework
	MaxBodyBytes int64
}

type WiringOptions added in v0.1.4

type WiringOptions struct {
	RequireLLM                 bool
	AllowMockProviderWithoutGW bool
}

WiringOptions controls ValidateWiring and optional New-time checks.

type WorkflowBuilder added in v0.1.10

type WorkflowBuilder = scenariobuilder.WorkflowBuilder

WorkflowBuilder constructs workflow graphs with a fluent Go API.

Directories

Path Synopsis
examples
go/builder command
go/hitl-resume command
go/http-worker command
go/minimal command
go/postgres command
go/scenario
Package scenario provides builder stacks shared by examples/go programs.
Package scenario provides builder stacks shared by examples/go programs.
go/tier-memory command
go/tier-worker command
tier-worker runs the tier-memory builder stack with Postgres warm tier, file or blob cold tier, and async memory.reconcile jobs via a shared job queue.
tier-worker runs the tier-memory builder stack with Postgres warm tier, file or blob cold tier, and async memory.reconcile jobs via a shared job queue.
go/validate command
internal
pkg
builder
Package builder is the **primary** way to construct core.Scenario values for agentflow-go.
Package builder is the **primary** way to construct core.Scenario values for agentflow-go.
llm
log
mcp
testutil
Package testutil provides helpers for testing applications built on agentflow.
Package testutil provides helpers for testing applications built on agentflow.
Package schemas embeds machine-readable configuration schemas for AgentFlow.
Package schemas embeds machine-readable configuration schemas for AgentFlow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL