agentkit

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 12 Imported by: 0

README

AgentKit

中文文档

A lightweight, event-stream-driven Agent toolkit built on top of CloudWeGo Eino ADK.

Inspired by pi-agent-core, AgentKit brings event streaming, message queuing, and human-in-the-loop (HITL) capabilities to the Go + Eino ecosystem.

Features

  • Event-stream architecture — Subscribe to fine-grained events (message deltas, tool calls, errors, etc.)
  • Steering & follow-up queues — Inject messages mid-execution to redirect the agent or append follow-up tasks
  • Human-in-the-loop (HITL) — Interrupt agent execution and resume with user-provided data
  • Streaming support — Real-time token-by-token output via Eino ADK streaming
  • Reasoning model support — First-class support for thinking/reasoning models (DeepSeek-R1, o1, etc.) with streaming reasoning output
  • Multimodal input — Send text, images, audio, video, and files via Send() with ergonomic constructors
  • Tool integration — Plug in any Eino-compatible tool with automatic tool-call handling
  • Type aliases — Use agentkit.ChatModel, agentkit.Tool, agentkit.ToolCall, etc. without importing eino packages directly

Installation

go get github.com/wsshow/agentkit

Quick Start

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/cloudwego/eino-ext/components/model/openai"
	"github.com/wsshow/agentkit"
)

func main() {
	ctx := context.Background()

	chatModel, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
		APIKey:  "your-api-key",
		BaseURL: "https://api.openai.com/v1",
		Model:   "gpt-4o",
	})

	agent, err := agentkit.New(ctx, &agentkit.Config{
		Name:         "assistant",
		SystemPrompt: "You are a helpful assistant.",
		Model:        chatModel,
	})
	if err != nil {
		log.Fatalln(err)
	}
	defer agent.Close()

	agent.Subscribe(func(e agentkit.Event) {
		switch e.Type {
		case agentkit.EventReasoningDelta:
			fmt.Print(e.Delta) // reasoning/thinking stream (for reasoning models)
		case agentkit.EventMessageDelta:
			fmt.Print(e.Delta)
		case agentkit.EventMessageEnd:
			fmt.Println()
		case agentkit.EventError:
			fmt.Printf("Error: %v\n", e.Error)
		}
	})

	if err := agent.Prompt(ctx, "Hello!"); err != nil {
		log.Fatalln(err)
	}
}

Event Types

Event Description
EventAgentStart Agent begins processing
EventTurnStart New turn starts (one LLM call + tool execution cycle)
EventMessageStart Message begins (streaming or non-streaming)
EventReasoningDelta Reasoning/thinking stream delta (Event.Delta), for reasoning models
EventMessageDelta Incremental streaming text (Event.Delta)
EventMessageEnd Message complete (Event.Content, Event.ReasoningContent, Event.ResponseMeta)
EventToolStart Tool call requested (Event.ToolCalls)
EventToolUpdate Tool execution progress update (Event.Content)
EventToolEnd Tool call result returned (Event.Content)
EventTurnEnd Turn complete
EventTransfer Agent transfer (multi-agent)
EventInterrupted HITL interrupt (Event.Interrupt)
EventAgentEnd Agent processing complete
EventError Error occurred (Event.Error)
Event Struct
type Event struct {
    Type             EventType
    Agent            string           // source agent name
    Content          string           // full text (message_end / tool_end)
    Delta            string           // streaming delta (message_delta / reasoning_delta)
    ReasoningContent string           // full reasoning content (message_end, reasoning models only)
    ResponseMeta     *ResponseMeta    // token usage, finish reason (message_end)
    ToolCalls        []ToolCall       // tool call list (tool_start)
    Interrupt        []InterruptPoint // interrupt points (interrupted)
    Error            error            // error details (error)
}

API Reference

Creating an Agent
agent, err := agentkit.New(ctx, &agentkit.Config{
    Name:            "my-agent",
    Description:     "Agent description",
    SystemPrompt:    "System instructions",
    Model:           chatModel,                          // agentkit.ChatModel
    Tools:           []agentkit.Tool{myTool},             // optional
    Middlewares:      []adk.AgentMiddleware{},             // agent-level hooks (optional)
    ModelMiddlewares: []adk.ChatModelAgentMiddleware{},    // model-level hooks (optional)
    MaxIterations:   20,                                  // max LLM call cycles (default: 20)
    CheckPointStore: store,                               // checkpoint store (optional)
})
defer agent.Close()
Core Methods
// Send user text input and drive agent execution (blocking, thread-safe)
err := agent.Prompt(ctx, "user message")

// Send multimodal input (text + images, audio, video, files)
err := agent.Send(ctx,
    agentkit.Text("What is in this image?"),
    agentkit.ImageURL("https://example.com/cat.jpg"),
)

// Resume from current state without new message (e.g. retry after error)
err := agent.Continue(ctx)

// Resume from a HITL interrupt
err := agent.Resume(ctx, map[string]any{"interruptID": data})

// Subscribe to events, returns unsubscribe function
unsubscribe := agent.Subscribe(func(e agentkit.Event) { ... })

// Cancel current execution and wait for completion
agent.Abort()

// Reset agent state (waits for completion, then clears history and queues)
agent.Reset()

// Get full conversation history (eino schema.Message, for debugging/persistence)
history := agent.History()

// Get agent state (message records, streaming status)
state := agent.State()

// Close agent and release resources (implements io.Closer)
agent.Close()

Prompt, Continue, and Resume are mutually exclusive — calling one while another is running returns an error.

Steering & Follow-Up
// Inject a steering message during execution (checked after each tool result)
agent.Steer("Please focus on topic X instead")

// Append a follow-up message (processed after current task completes)
agent.FollowUp("Also check Y")

// Configure queue processing mode
agent.SetSteeringMode(agentkit.QueueModeAll)        // process all queued messages at once
agent.SetFollowUpMode(agentkit.QueueModeOneAtATime)  // process one at a time (default)

// Clear queues
agent.ClearSteeringQueue()
agent.ClearFollowUpQueue()
agent.ClearAllQueues()
HITL (Human-in-the-Loop)
// In a tool: trigger interrupt
return "", agentkit.Interrupt(ctx, "Need user confirmation")

// With state preservation
return "", agentkit.StatefulInterrupt(ctx, "Confirm?", myState)

// In a resumed tool: check interrupt state
wasInterrupted, hasState, state := agentkit.GetInterruptState[MyState](ctx)

// Get resume data from user
isTarget, hasData, data := agentkit.GetResumeContext[bool](ctx)
Multimodal Input

Send accepts variadic ContentPart values built with constructor functions:

// Text + image
agent.Send(ctx,
    agentkit.Text("What is in this image?"),
    agentkit.ImageURL("https://example.com/cat.jpg"),
)

// Image with quality control
agent.Send(ctx,
    agentkit.Text("Describe in detail"),
    agentkit.ImageURL("https://example.com/photo.jpg", agentkit.ImageDetailHigh),
)

// Base64 encoded image
agent.Send(ctx,
    agentkit.Text("Identify this"),
    agentkit.ImageBase64(base64Data, "image/png"),
)

// Audio / Video / File
agent.Send(ctx, agentkit.Text("Transcribe"), agentkit.AudioURL("https://example.com/speech.mp3"))
agent.Send(ctx, agentkit.Text("Summarize"), agentkit.VideoURL("https://example.com/clip.mp4"))
agent.Send(ctx, agentkit.Text("Analyze"), agentkit.FileURL("https://example.com/report.pdf"))

Available constructors:

Constructor Description
Text(s) Text content
ImageURL(url, detail...) Image from URL (optional quality)
ImageBase64(data, mime, detail...) Image from Base64
AudioURL(url) Audio from URL
AudioBase64(data, mime) Audio from Base64
VideoURL(url) Video from URL
VideoBase64(data, mime) Video from Base64
FileURL(url) File from URL
FileBase64(data, mime, name...) File from Base64 (optional filename)
Tool Progress Updates

Tools can emit progress events during execution:

func myTool(ctx context.Context, input string) (string, error) {
    agentkit.EmitToolUpdate(ctx, "Processing step 1...")
    // ... do work ...
    agentkit.EmitToolUpdate(ctx, "Processing step 2...")
    return "result", nil
}
Type Aliases

AgentKit provides type aliases so consumers don't need to import eino packages directly:

Alias Eino Type
ChatModel model.BaseChatModel
Tool tool.BaseTool
ToolCall schema.ToolCall
ResponseMeta schema.ResponseMeta
TokenUsage schema.TokenUsage
ContentPart schema.MessageInputPart
ImageURLDetail schema.ImageURLDetail

Examples

See the examples directory:

  • simple — Minimal multi-turn conversation (~60 lines)
  • full — Comprehensive 7-scenario demo (tools, history, reset, follow-up, steer, HITL, state inspection)

License

See LICENSE for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EmitToolUpdate

func EmitToolUpdate(ctx context.Context, content string)

EmitToolUpdate 在工具执行中发送进度更新事件。 工具通过 context 获取 Emitter 并发送 tool_update 事件:

func myTool(ctx context.Context, input string) (string, error) {
    agentkit.EmitToolUpdate(ctx, "正在处理...")
    return "done", nil
}

func GetInterruptState

func GetInterruptState[T any](ctx context.Context) (wasInterrupted bool, hasState bool, state T)

GetInterruptState 在工具中检查是否从中断恢复,并获取之前保存的状态。

返回值:

  • wasInterrupted: 此工具是否曾被中断
  • hasState: 是否有保存的状态且成功转换为类型 T
  • state: 保存的状态(hasState 为 false 时为零值)

func GetResumeContext

func GetResumeContext[T any](ctx context.Context) (isResumeTarget bool, hasData bool, data T)

GetResumeContext 在工具中检查是否是 Resume 的显式目标,并获取恢复数据。

返回值:

  • isResumeTarget: 此工具是否被显式指定为恢复目标
  • hasData: 是否提供了恢复数据
  • data: 恢复数据(hasData 为 false 时为零值)

用于区分:

  • 作为恢复目标被调用(应继续执行)
  • 因兄弟工具被恢复而重新执行(应再次中断)

用法:

func myTool(ctx context.Context, input string) (string, error) {
    wasInterrupted, _, _ := agentkit.GetInterruptState[any](ctx)
    if !wasInterrupted {
        return "", agentkit.Interrupt(ctx, "需要用户输入")
    }
    isTarget, hasData, data := agentkit.GetResumeContext[string](ctx)
    if !isTarget {
        return "", agentkit.Interrupt(ctx, nil) // 非目标,重新中断
    }
    if hasData {
        return "用户输入: " + data, nil
    }
    return "已确认", nil
}

func Interrupt

func Interrupt(ctx context.Context, info any) error

Interrupt 在工具执行中触发 HITL 中断。 调用后工具应立即返回,Agent 将暂停执行并发出 EventInterrupted 事件。 用户可通过 Agent.Resume 恢复执行。

info 是面向用户的中断原因描述,会通过 InterruptPoint.Info 传递给订阅者。

用法:

func myTool(ctx context.Context, input string) (string, error) {
    if needsConfirmation(input) {
        return "", agentkit.Interrupt(ctx, "请确认是否继续")
    }
    return doWork(input), nil
}

func StatefulInterrupt

func StatefulInterrupt(ctx context.Context, info any, state any) error

StatefulInterrupt 在工具执行中触发带状态保存的 HITL 中断。 恢复时可通过 GetInterruptState 取回保存的状态。

state 必须是可通过 gob 序列化的类型。

用法:

type MyState struct { Step int }

func myTool(ctx context.Context, input string) (string, error) {
    wasInterrupted, hasState, state := agentkit.GetInterruptState[MyState](ctx)
    if !wasInterrupted {
        return "", agentkit.StatefulInterrupt(ctx, "处理中", MyState{Step: 1})
    }
    if hasState {
        return fmt.Sprintf("从步骤 %d 恢复", state.Step), nil
    }
    return "已恢复", nil
}

Types

type Agent

type Agent struct {
	// contains filtered or unexported fields
}

Agent 独立的轻量级 Agent,封装 eino ADK,提供事件流驱动的交互

func New

func New(ctx context.Context, cfg *Config) (*Agent, error)

New 创建 Agent

func (*Agent) Abort

func (a *Agent) Abort()

Abort 取消当前执行并等待完成

func (*Agent) ClearAllQueues

func (a *Agent) ClearAllQueues()

ClearAllQueues 清空所有消息队列

func (*Agent) ClearFollowUpQueue

func (a *Agent) ClearFollowUpQueue()

ClearFollowUpQueue 清空后续消息队列

func (*Agent) ClearSteeringQueue

func (a *Agent) ClearSteeringQueue()

ClearSteeringQueue 清空转向消息队列

func (*Agent) Close

func (a *Agent) Close() error

Close 关闭 Agent,释放资源。实现 io.Closer 接口。

func (*Agent) Continue

func (a *Agent) Continue(ctx context.Context) error

Continue 从当前状态恢复执行(不添加新消息),用于错误后重试。 如果 Agent 已在执行中,返回错误。

func (*Agent) FollowUp

func (a *Agent) FollowUp(content string)

FollowUp 在 Agent 完成当前工作后追加后续消息。 只有在没有转向消息时才会被处理。

func (*Agent) History

func (a *Agent) History() []*schema.Message

History 获取完整对话历史(含 assistant/tool 的 schema.Message),用于调试或持久化。

func (*Agent) Name

func (a *Agent) Name() string

Name 获取 Agent 名称

func (*Agent) Prompt

func (a *Agent) Prompt(ctx context.Context, input string) error

Prompt 发送用户输入并驱动 Agent 执行,事件通过 Subscribe 订阅。 如果 Agent 已在执行中,返回错误。

func (*Agent) Reset

func (a *Agent) Reset()

Reset 重置 Agent 状态(清空消息历史和队列)。 如果 Agent 正在执行,先等待执行完成。

func (*Agent) Resume

func (a *Agent) Resume(ctx context.Context, targets map[string]any) error

Resume 从 HITL 中断恢复执行。 targets 格式为 map[interruptID]data,interruptID 来自 Event.Interrupt[].ID。 如果 Agent 已在执行中,返回错误。

func (*Agent) Send added in v1.1.0

func (a *Agent) Send(ctx context.Context, parts ...ContentPart) error

Send 发送多模态内容并驱动 Agent 执行。 使用 Text、ImageURL、AudioURL 等构造函数创建 ContentPart。 如果 Agent 已在执行中,返回错误。

func (*Agent) SetFollowUpMode

func (a *Agent) SetFollowUpMode(mode QueueMode)

SetFollowUpMode 设置后续消息处理模式

func (*Agent) SetSteeringMode

func (a *Agent) SetSteeringMode(mode QueueMode)

SetSteeringMode 设置转向消息处理模式

func (*Agent) State

func (a *Agent) State() *State

State 获取当前状态

func (*Agent) Steer

func (a *Agent) Steer(content string)

Steer 在 Agent 执行期间插入转向消息。 工具结果返回后检查队列,若有消息则中断当前执行并注入新消息。

func (*Agent) Subscribe

func (a *Agent) Subscribe(fn Subscriber) func()

Subscribe 订阅事件流,返回取消订阅函数

type ChatModel

type ChatModel = model.BaseChatModel

ChatModel 基础聊天模型接口

type Config

type Config struct {
	Name             string
	Description      string
	SystemPrompt     string
	Model            ChatModel                      // 聊天模型(可直接使用 agentkit.ChatModel 别名)
	Tools            []Tool                         // 工具列表(可直接使用 agentkit.Tool 别名)
	Middlewares      []adk.AgentMiddleware          // Agent 级中间件:结构体式钩子(BeforeChatModel/AfterChatModel)和工具调用包装 WrapToolCall(高级)
	ModelMiddlewares []adk.ChatModelAgentMiddleware // 模型级处理器:接口式扩展(WrapModel/WrapToolCall)和状态重写(BeforeModelRewriteState/AfterModelRewriteState),在 Middlewares 之后执行(高级)
	MaxIterations    int                            // 默认 20
	CheckPointStore  compose.CheckPointStore        // 自定义 CheckPoint 存储,默认使用内存存储
}

Config Agent 配置

type ContentPart added in v1.1.0

type ContentPart = schema.MessageInputPart

ContentPart 表示用户输入的一个内容片段(文本、图片、音频、视频、文件)。 通过 Text、ImageURL 等构造函数创建,用于 Agent.Send 多模态输入。

func AudioBase64 added in v1.1.0

func AudioBase64(data, mimeType string) ContentPart

AudioBase64 创建 Base64 编码音频内容片段。

func AudioURL added in v1.1.0

func AudioURL(url string) ContentPart

AudioURL 创建音频 URL 内容片段。

func FileBase64 added in v1.1.0

func FileBase64(data, mimeType string, name ...string) ContentPart

FileBase64 创建 Base64 编码文件内容片段。name 为文件名(可选)。

func FileURL added in v1.1.0

func FileURL(url string) ContentPart

FileURL 创建文件 URL 内容片段。

func ImageBase64 added in v1.1.0

func ImageBase64(data, mimeType string, detail ...ImageURLDetail) ContentPart

ImageBase64 创建 Base64 编码图片内容片段。

func ImageURL added in v1.1.0

func ImageURL(url string, detail ...ImageURLDetail) ContentPart

ImageURL 创建图片 URL 内容片段。 detail 可选,控制图片识别质量(默认由模型决定)。

func Text added in v1.1.0

func Text(s string) ContentPart

Text 创建文本内容片段。

func VideoBase64 added in v1.1.0

func VideoBase64(data, mimeType string) ContentPart

VideoBase64 创建 Base64 编码视频内容片段。

func VideoURL added in v1.1.0

func VideoURL(url string) ContentPart

VideoURL 创建视频 URL 内容片段。

type Event

type Event struct {
	Type             EventType
	Agent            string           // 产生事件的 Agent 名称
	Content          string           // 文本内容(message_end / tool_end)
	Delta            string           // 流式增量内容(message_delta / reasoning_delta)
	ReasoningContent string           // 完整推理内容(message_end,仅推理模型)
	ResponseMeta     *ResponseMeta    // 响应元数据:token 用量、完成原因(message_end)
	ToolCalls        []ToolCall       // 工具调用列表(tool_start)
	Interrupt        []InterruptPoint // 中断点列表(interrupted)
	Error            error            // 错误信息(error)
}

Event 统一事件

type EventType

type EventType string

EventType 事件类型

const (
	EventAgentStart     EventType = "agent_start"     // Agent 开始处理
	EventTurnStart      EventType = "turn_start"      // 新一轮开始(一次 LLM 调用 + 工具执行)
	EventMessageStart   EventType = "message_start"   // 消息开始(流式或非流式)
	EventReasoningDelta EventType = "reasoning_delta" // 推理模型思考过程增量(如 DeepSeek-R1、o1)
	EventMessageDelta   EventType = "message_delta"   // 流式增量文本
	EventMessageEnd     EventType = "message_end"     // 消息结束
	EventToolStart      EventType = "tool_start"      // 工具调用请求
	EventToolUpdate     EventType = "tool_update"     // 工具执行进度更新
	EventToolEnd        EventType = "tool_end"        // 工具调用结果
	EventTurnEnd        EventType = "turn_end"        // 一轮结束
	EventTransfer       EventType = "transfer"        // Agent 转移
	EventInterrupted    EventType = "interrupted"     // HITL 中断(等待用户输入)
	EventAgentEnd       EventType = "agent_end"       // Agent 处理完成
	EventError          EventType = "error"           // 错误
)

type ImageURLDetail added in v1.1.0

type ImageURLDetail = schema.ImageURLDetail

ImageURLDetail 控制图片识别质量。

type InterruptPoint

type InterruptPoint struct {
	ID   string // 中断点唯一标识,Resume 时传入此 ID
	Info any    // 中断原因/上下文信息
}

InterruptPoint HITL 中断点信息

type Message

type Message struct {
	Role             RoleType
	Agent            string // 产生消息的 Agent 名称
	Content          string
	ReasoningContent string // 推理模型的思考内容(如 DeepSeek-R1、o1),非推理模型为空
}

Message 消息记录

type QueueMode

type QueueMode string

QueueMode 消息队列处理模式

const (
	QueueModeOneAtATime QueueMode = "one-at-a-time" // 每次处理一条
	QueueModeAll        QueueMode = "all"           // 一次性处理全部
)

type ResponseMeta

type ResponseMeta = schema.ResponseMeta

ResponseMeta 聊天响应元数据,包含 token 用量、完成原因、log probabilities 等。 通常附着于 EventMessageEnd 事件,在流式场景下来自最后一个 chunk。

type RoleType

type RoleType string

RoleType 消息角色类型

const (
	RoleUser      RoleType = "user"
	RoleAssistant RoleType = "assistant"
	RoleTool      RoleType = "tool"
)

type State

type State struct {
	// contains filtered or unexported fields
}

State Agent 状态管理(线程安全)

func (*State) AddMessage

func (s *State) AddMessage(msg Message)

AddMessage 添加消息

func (*State) Clear

func (s *State) Clear()

Clear 清空状态

func (*State) IsStreaming

func (s *State) IsStreaming() bool

IsStreaming 是否正在流式输出

func (*State) Messages

func (s *State) Messages() []Message

Messages 获取所有消息的副本

type Subscriber

type Subscriber func(Event)

Subscriber 事件订阅函数

type TokenUsage

type TokenUsage = schema.TokenUsage

TokenUsage 表示一次聊天请求的 token 用量统计。

type Tool

type Tool = tool.BaseTool

Tool 基础工具接口

type ToolCall

type ToolCall = schema.ToolCall

ToolCall 工具调用信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL