flowengine

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MIT Imports: 10 Imported by: 0

README

FlowEngine

Go Reference Go Report Card

中文文档

FlowEngine is an agent-style LLM orchestration framework for building composable, production-ready AI workflows. It provides a zero-external-dependency core with RPC-style action definitions and type-safe response handling.

Features

  • 🎯 Agent-style Step Definitions - Context, Guide, Task, Input layers for structured prompts
  • 🔧 RPC-style Actions - Automatic JSON Schema generation from Go structs
  • 🔄 Session Management - Context chaining with TTL and size limits
  • 🔁 Retry with Validation - Configurable retry with hint injection
  • 📦 Type-safe Parameters - Generic parameter extraction
  • 🧩 Composable Flows - Chain, Decide, Parallel, Iterate primitives
  • 🛠️ Function Calling - Native LLM function call support
  • 💾 Caching Adapter - Built-in response caching decorator
  • 📊 Observability - Logging and metrics middleware

Installation

go get github.com/northseadl/flowengine

For Volcengine Ark (Doubao) adapter:

go get github.com/northseadl/flowengine/adapter/ark

Quick Start

1. Define a Step with Actions
package main

import (
    "context"
    "fmt"
    
    "github.com/northseadl/flowengine"
)

// Define action parameter structs
type SearchParams struct {
    Keywords string `json:"keywords" desc:"search keywords"`
    MaxCount int    `json:"max_count,omitempty" desc:"max results count"`
}

type ChatParams struct {
    Response string `json:"response" desc:"direct response to user"`
}

func main() {
    // Create a step with actions
    step := flowengine.NewStep("intent").
        Context("You are an intent classifier for an e-commerce platform").
        Task("Classify user query intent and extract parameters").
        Input("Query: {{.query}}").
        Guide("- Do not guess", "- Be precise").
        Action("search", SearchParams{}).
        Action("chat", ChatParams{}).
        Build()

    // Create engine with your adapter
    engine := flowengine.New(yourAdapter)

    // Run with metadata
    ctx := context.Background()
    fc, resp, err := engine.Run(ctx, step, flowengine.Metadata{
        "query": "I want a red dress under $50",
    })
    if err != nil {
        panic(err)
    }

    // Type-safe parameter extraction
    if params, ok := flowengine.ParamsAs[SearchParams](resp, "search"); ok {
        fmt.Printf("Keywords: %s, MaxCount: %d\n", params.Keywords, params.MaxCount)
    }
}
2. Composable Flows
// Sequential execution
flow := flowengine.Chain(step1, step2, step3)

// Conditional branching
flow := flowengine.Decide(
    flowengine.When("search", searchHandler),
    flowengine.When("chat", chatHandler),
    flowengine.Otherwise(defaultHandler),
)

// Parallel execution
flow := flowengine.Parallel(step1, step2)

// Iteration until condition
flow := flowengine.Iterate(step, func(r *flowengine.Response) bool {
    return r.Is("done")
})
3. Session Management
// Run within a session (context is preserved across calls)
fc1, resp1, _ := engine.RunWithSession(ctx, "session-id", step1, metadata)
fc2, resp2, _ := engine.RunWithSession(ctx, "session-id", step2, metadata)

// Delete session when done
engine.DeleteSession(ctx, "session-id")

// Configure session manager
manager := flowengine.NewSessionManager(
    flowengine.WithTTL(30 * time.Minute),
    flowengine.WithMaxSessions(1000),
)
4. Using Volcengine Ark Adapter
import (
    "github.com/northseadl/flowengine"
    "github.com/northseadl/flowengine/adapter/ark"
    "github.com/volcengine/volcengine-go-sdk/service/arkruntime"
)

func main() {
    // Create Ark client
    client := arkruntime.NewClientWithApiKey("your-api-key")

    // Create adapter with options
    adapter := ark.New(client,
        ark.WithModel("your-model-endpoint"),
        ark.WithThinking(), // Enable reasoning mode
    )

    // Create engine
    engine := flowengine.New(adapter)

    // Use the engine...
}

Three-Layer Prompt Model

FlowEngine enforces a structured prompt model:

┌─────────────────────────────────────┐
│  Layer 1: Context (System)          │  ← Just-in-time data
│  <context>...</context>             │
├─────────────────────────────────────┤
│  Layer 2: Guide (System)            │  ← How to operate
│  <guide>                            │
│    Available actions: ...           │
│    Constraints: ...                 │
│  </guide>                           │
├─────────────────────────────────────┤
│  Layer 3: Task (User)               │  ← What to do
│  <task>...</task>                   │
│  <input>...</input>                 │
└─────────────────────────────────────┘

Adapter Interface

Implement these interfaces to integrate with any LLM provider:

// Basic adapter
type Adapter interface {
    Call(ctx context.Context, system, user string) (string, error)
    Stream(ctx context.Context, system, user string) (<-chan string, error)
}

// Session-aware adapter
type SessionAdapter interface {
    Adapter
    CallWithSession(ctx context.Context, system, user, previousResponseID string) (resp, responseID string, err error)
    DeleteSession(ctx context.Context, responseID string) error
}

// Function Call adapter
type ToolAdapter interface {
    Adapter
    CallWithTools(ctx context.Context, system, user string, tools []ToolDef) (*ToolCall, error)
    CallWithToolsAndSession(ctx context.Context, system, user, previousResponseID string, tools []ToolDef) (*ToolCall, string, error)
}

Middleware

// Add logging
adapter = flowengine.WithHooks(adapter, flowengine.CallHook{
    OnStart: func(ctx context.Context, system, user string) {
        log.Printf("LLM call starting")
    },
    OnEnd: func(ctx context.Context, system, user, response string, err error) {
        log.Printf("LLM call completed")
    },
})

// Add caching
cache := flowengine.NewMemoryCache()
adapter = flowengine.WithCache(adapter, cache, true)

// Add metrics
adapter = flowengine.WithMetrics(adapter, yourMetricsCollector)

License

MIT License - see LICENSE for details.

Documentation

Overview

Package flowengine provides an agent-style LLM orchestration framework.

Overview

FlowEngine is a zero-external-dependency framework for building composable LLM workflows with RPC-style action definitions and type-safe response handling. It is designed for orchestrating relatively stable flows within a single context.

Core Features

  • Agent-style Step definitions with Context, Guide, Task, Input
  • RPC-style action registration with automatic JSON Schema generation
  • Session management with context chaining
  • Configurable retry with validation and hint injection
  • Type-safe parameter extraction using generics
  • Composable flows: Chain, Decide, Parallel, Iterate
  • Function Call output mode for structured responses
  • Caching adapter decorator
  • Zero external dependencies

Three-Layer Prompt Model

The engine enforces a strict "Three-Layer" structure for prompts:

  • Layer 1: Context (System) - Inject "Just-in-Time" data using <context> tag
  • Layer 2: Guide (System) - Define "How to operate" using <guide> tag
  • Layer 3: Task (User) - Define "What to do" using <task> tag

Quick Start

// 1. Define a step with actions
step := flowengine.NewStep("intent").
	Context("You are an intent classifier").
	Task("Classify user query intent").
	Input("Query: {{.query}}").
	Action("search", SearchParams{}).
	Action("chat", ChatParams{}).
	Build()

// 2. Create engine with adapter
engine := flowengine.New(yourAdapter)

// 3. Run with metadata
fc, resp, err := engine.Run(ctx, step, flowengine.Metadata{"query": "red dress"})

// 4. Type-safe parameter extraction
if params, ok := flowengine.ParamsAs[SearchParams](resp, "search"); ok {
	// Use params.Keywords, params.MaxCount
}

Session Support

Session management enables conversation context chaining:

// Run within a session (context is preserved across calls)
fc1, resp1, _ := engine.RunWithSession(ctx, "session-id", step1, metadata)
fc2, resp2, _ := engine.RunWithSession(ctx, "session-id", step2, metadata)

// Delete session when done
engine.DeleteSession(ctx, "session-id")

Composable Flows

// Sequential execution
flow := flowengine.Chain(step1, step2, step3)

// Conditional branching
flow := flowengine.Decide(
	flowengine.When("search", searchHandler),
	flowengine.When("chat", chatHandler),
	flowengine.Otherwise(defaultHandler),
)

// Parallel execution
flow := flowengine.Parallel(step1, step2)

// Iteration until condition
flow := flowengine.Iterate(step, func(r *Response) bool {
	return r.Is("done")
})

Adapter Interfaces

Implement the Adapter interface to integrate with any LLM provider:

type Adapter interface {
	Call(ctx context.Context, system, user string) (string, error)
	Stream(ctx context.Context, system, user string) (<-chan string, error)
}

// Session-aware adapter
type SessionAdapter interface {
	Adapter
	CallWithSession(ctx context.Context, system, user, previousResponseID string) (resp, responseID string, error)
	DeleteSession(ctx context.Context, responseID string) error
}

// Function Call adapter
type ToolAdapter interface {
	Adapter
	CallWithTools(ctx context.Context, system, user string, tools []ToolDef) (*ToolCall, error)
}

Index

Constants

View Source
const DefaultMaxSessions = 1000

DefaultMaxSessions is the default maximum session count.

View Source
const DefaultSessionTTL = 30 * time.Minute

DefaultSessionTTL is the default session TTL (30 minutes).

Variables

This section is empty.

Functions

func ParamsAs

func ParamsAs[T any](r *Response, action string) (T, bool)

ParamsAs extracts typed parameters for a specific action.

Types

type Action

type Action struct {
	// Name is the action identifier (function name).
	Name string

	// Description explains what this action does.
	Description string

	// Schema is the JSON Schema for action parameters.
	Schema map[string]any

	// Example shows sample parameters for this action (for debugging).
	Example string
}

Action defines a Function Call action.

type ActionRegistry

type ActionRegistry struct {
	// contains filtered or unexported fields
}

ActionRegistry manages action definitions for a step.

func NewActionRegistry

func NewActionRegistry() *ActionRegistry

NewActionRegistry creates a new action registry.

func (*ActionRegistry) Actions

func (r *ActionRegistry) Actions() []Action

Actions returns all registered actions.

func (*ActionRegistry) IsEmpty

func (r *ActionRegistry) IsEmpty() bool

IsEmpty returns true if no actions are registered.

func (*ActionRegistry) Names

func (r *ActionRegistry) Names() []string

Names returns all action names.

func (*ActionRegistry) Register

func (r *ActionRegistry) Register(name string, prototype any) *ActionRegistry

Register adds an action from a struct prototype. Struct fields become JSON Schema properties. Use `desc` tag to add field descriptions.

func (*ActionRegistry) RegisterWithDesc

func (r *ActionRegistry) RegisterWithDesc(name, description string, prototype any) *ActionRegistry

RegisterWithDesc adds an action with description.

func (*ActionRegistry) ToToolDefs

func (r *ActionRegistry) ToToolDefs() []ToolDef

ToToolDefs converts actions to ToolDef slice for Function Call.

func (*ActionRegistry) ValidateAction

func (r *ActionRegistry) ValidateAction(name string) bool

ValidateAction checks if an action name is registered.

type Adapter

type Adapter interface {
	// Call makes a synchronous LLM call and returns the response.
	Call(ctx context.Context, system, user string) (string, error)

	// Stream makes a streaming LLM call and returns a channel of response chunks.
	Stream(ctx context.Context, system, user string) (<-chan string, error)
}

Adapter is the interface for LLM providers.

func WithHooks

func WithHooks(inner Adapter, hook CallHook) Adapter

WithHooks creates an Adapter with observability hooks.

type Cache

type Cache interface {
	Get(key string) (string, bool)
	Set(key string, value string)
	Delete(key string)
}

Cache is the interface for response caching.

type CachingAdapter

type CachingAdapter struct {
	// contains filtered or unexported fields
}

CachingAdapter adds caching support to an Adapter.

func WithCache

func WithCache(inner Adapter, cache Cache, enabled bool) *CachingAdapter

WithCache creates a caching adapter.

func (*CachingAdapter) Call

func (c *CachingAdapter) Call(ctx context.Context, system, user string) (string, error)

func (*CachingAdapter) CallWithSession

func (c *CachingAdapter) CallWithSession(ctx context.Context, system, user, previousResponseID string) (string, string, error)

func (*CachingAdapter) CallWithTools

func (c *CachingAdapter) CallWithTools(ctx context.Context, system, user string, tools []ToolDef) (*ToolCall, error)

func (*CachingAdapter) CallWithToolsAndSession

func (c *CachingAdapter) CallWithToolsAndSession(ctx context.Context, system, user, previousResponseID string, tools []ToolDef) (*ToolCall, string, error)

func (*CachingAdapter) DeleteSession

func (c *CachingAdapter) DeleteSession(ctx context.Context, responseID string) error

func (*CachingAdapter) IsEnabled

func (c *CachingAdapter) IsEnabled() bool

IsEnabled returns whether caching is enabled.

func (*CachingAdapter) SetEnabled

func (c *CachingAdapter) SetEnabled(enabled bool)

SetEnabled toggles caching.

func (*CachingAdapter) Stream

func (c *CachingAdapter) Stream(ctx context.Context, system, user string) (<-chan string, error)

type CallHook

type CallHook struct {
	// OnStart is called before the LLM call.
	OnStart func(ctx context.Context, system, user string)

	// OnEnd is called after the LLM call completes.
	OnEnd func(ctx context.Context, system, user, response string, err error)
}

CallHook is called before and after each LLM call.

type ContextBuilder

type ContextBuilder struct {
	// contains filtered or unexported fields
}

ContextBuilder dynamically builds step context.

func NewContextBuilder

func NewContextBuilder() *ContextBuilder

NewContextBuilder creates a new context builder.

func (*ContextBuilder) Add

func (b *ContextBuilder) Add(name, content string) *ContextBuilder

Add adds a named data block.

func (*ContextBuilder) AddIf

func (b *ContextBuilder) AddIf(condition bool, name, content string) *ContextBuilder

AddIf conditionally adds a data block.

func (*ContextBuilder) AddTemplate

func (b *ContextBuilder) AddTemplate(name, template string) *ContextBuilder

AddTemplate adds a template data block.

func (*ContextBuilder) Build

func (b *ContextBuilder) Build() string

Build generates the final context string.

func (*ContextBuilder) String

func (b *ContextBuilder) String() string

String implements Stringer.

type DecideCase

type DecideCase struct {
	// contains filtered or unexported fields
}

DecideCase represents a branch condition for Decide.

func Otherwise

func Otherwise(r Runnable) DecideCase

Otherwise creates the default decision branch.

func When

func When(action string, r Runnable) DecideCase

When creates a decision branch for a specific action.

func WhenFunc

func WhenFunc(condition func(data map[string]any) bool, r Runnable) DecideCase

WhenFunc creates a decision branch with a custom condition function.

func WhenParam

func WhenParam(key string, value any, r Runnable) DecideCase

WhenParam creates a branch based on parameter value matching.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the flowengine execution engine.

func New

func New(adapter Adapter, opts ...EngineOption) *Engine

New creates a new Engine with the given adapter.

func (*Engine) Adapter

func (e *Engine) Adapter() Adapter

Adapter returns the underlying adapter.

func (*Engine) Close

func (e *Engine) Close()

Close stops background session cleanup.

func (*Engine) DeleteSession

func (e *Engine) DeleteSession(ctx context.Context, sessionID string) error

DeleteSession deletes a session.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, r Runnable, metadata Metadata) (*FlowContext, *Response, error)

Run executes a Runnable.

func (*Engine) RunWithSession

func (e *Engine) RunWithSession(ctx context.Context, sessionID string, r Runnable, metadata Metadata) (*FlowContext, *Response, error)

RunWithSession executes a Runnable within a session context.

func (*Engine) Sessions

func (e *Engine) Sessions() *SessionManager

Sessions returns the session manager.

func (*Engine) Stream

func (e *Engine) Stream(ctx context.Context, step *Step, metadata Metadata) (<-chan string, error)

Stream executes a step and returns a streaming response channel.

type EngineOption

type EngineOption func(*Engine)

EngineOption configures Engine.

func WithDefaultRetries

func WithDefaultRetries(n int) EngineOption

WithDefaultRetries sets the default retry count.

type FlowContext

type FlowContext struct {
	context.Context
	// contains filtered or unexported fields
}

FlowContext embeds context.Context and manages all state for a single flow execution.

func NewFlowContext

func NewFlowContext(ctx context.Context, engine *Engine, metadata Metadata) *FlowContext

NewFlowContext creates a new FlowContext.

func (*FlowContext) Engine

func (fc *FlowContext) Engine() *Engine

Engine returns the associated engine.

func (*FlowContext) GetNodeData

func (fc *FlowContext) GetNodeData(name string) (NodeData, bool)

GetNodeData returns node data by name.

func (*FlowContext) GetNodes

func (fc *FlowContext) GetNodes() []NodeData

GetNodes returns all node data.

func (*FlowContext) Metadata

func (fc *FlowContext) Metadata() Metadata

Metadata returns the business metadata.

func (*FlowContext) MetadataSnapshot

func (fc *FlowContext) MetadataSnapshot() Metadata

MetadataSnapshot 返回 metadata 的浅拷贝,用于并发读取。

func (*FlowContext) PrevAction

func (fc *FlowContext) PrevAction() string

PrevAction returns the previous step's action name.

func (*FlowContext) PrevParams

func (fc *FlowContext) PrevParams() map[string]any

PrevParams returns the previous step's parameters.

func (*FlowContext) PrevResponse

func (fc *FlowContext) PrevResponse() *Response

PrevResponse returns the previous step's response.

func (*FlowContext) RecordNode

func (fc *FlowContext) RecordNode(name, action string, params map[string]any)

RecordNode records a node's execution result.

func (*FlowContext) Session

func (fc *FlowContext) Session() *Session

Session returns the current session.

func (*FlowContext) WithSession

func (fc *FlowContext) WithSession(sess *Session) *FlowContext

WithSession attaches a session to the context.

type Guide

type Guide struct {
	Constraints []string
	AutoRPC     bool
}

Guide defines RPC declarations and constraints.

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger is the logging interface.

type MemoryCache

type MemoryCache struct {
	// contains filtered or unexported fields
}

MemoryCache is a simple in-memory cache.

func NewMemoryCache

func NewMemoryCache() *MemoryCache

NewMemoryCache creates a new MemoryCache.

func (*MemoryCache) Delete

func (c *MemoryCache) Delete(key string)

func (*MemoryCache) Get

func (c *MemoryCache) Get(key string) (string, bool)

func (*MemoryCache) Set

func (c *MemoryCache) Set(key string, value string)

type Message

type Message struct {
	Role    string `json:"role"` // system, user, assistant
	Content string `json:"content"`
}

Message represents a session message.

type Metadata

type Metadata map[string]any

Metadata represents business data for template rendering and data passing.

func (Metadata) Get

func (m Metadata) Get(key string) (any, bool)

Get returns a value from metadata.

func (Metadata) GetInt64

func (m Metadata) GetInt64(key string) int64

GetInt64 returns an int64 value from metadata.

func (Metadata) GetString

func (m Metadata) GetString(key string) string

GetString returns a string value from metadata.

func (Metadata) Merge

func (m Metadata) Merge(others ...Metadata) Metadata

Merge combines multiple Metadata maps.

type MetricsCollector

type MetricsCollector interface {
	IncrementCalls()
	IncrementErrors()
	RecordLatency(milliseconds int64)
}

MetricsCollector is the metrics collection interface.

type Middleware

type Middleware func(Adapter) Adapter

Middleware wraps an Adapter to add behavior.

func ChainMiddleware

func ChainMiddleware(middlewares ...Middleware) Middleware

ChainMiddleware applies multiple middlewares from left to right.

func WithLogging

func WithLogging(logger Logger) Middleware

WithLogging creates middleware that logs LLM calls.

func WithMetrics

func WithMetrics(collector MetricsCollector) Middleware

WithMetrics creates middleware that collects LLM call metrics.

type MockAdapter

type MockAdapter struct {
	// contains filtered or unexported fields
}

MockAdapter is a mock adapter for testing.

func NewMockAdapter

func NewMockAdapter(response string) *MockAdapter

NewMockAdapter creates a mock adapter returning a fixed response.

func NewMockAdapterSequence

func NewMockAdapterSequence(responses ...string) *MockAdapter

NewMockAdapterSequence creates a mock adapter returning sequential responses.

func NewMockAdapterWithToolCall

func NewMockAdapterWithToolCall(name string, args map[string]any) *MockAdapter

NewMockAdapterWithToolCall creates a mock adapter returning a tool call.

func (*MockAdapter) Call

func (m *MockAdapter) Call(ctx context.Context, system, user string) (string, error)

func (*MockAdapter) CallCount

func (m *MockAdapter) CallCount() int

CallCount returns the number of times Call was invoked.

func (*MockAdapter) CallWithSession

func (m *MockAdapter) CallWithSession(ctx context.Context, system, user, previousResponseID string) (string, string, error)

func (*MockAdapter) CallWithTools

func (m *MockAdapter) CallWithTools(ctx context.Context, system, user string, tools []ToolDef) (*ToolCall, error)

func (*MockAdapter) CallWithToolsAndSession

func (m *MockAdapter) CallWithToolsAndSession(ctx context.Context, system, user, previousResponseID string, tools []ToolDef) (*ToolCall, string, error)

func (*MockAdapter) DeleteSession

func (m *MockAdapter) DeleteSession(ctx context.Context, responseID string) error

func (*MockAdapter) Reset

func (m *MockAdapter) Reset()

Reset resets the mock state.

func (*MockAdapter) Stream

func (m *MockAdapter) Stream(ctx context.Context, system, user string) (<-chan string, error)

type NamedRunnable

type NamedRunnable interface {
	GetName() string
}

NamedRunnable is an optional interface for getting a runnable's name.

type NodeData

type NodeData struct {
	Name     string         `json:"name"`
	Action   string         `json:"action"`
	Params   map[string]any `json:"params"`
	Metadata Metadata       `json:"metadata"` // Shared reference with Metadata[Name]
}

NodeData records a single node's execution.

type ParallelError

type ParallelError struct {
	Errors    []error
	Responses []*Response
}

ParallelError aggregates errors from parallel execution.

func (*ParallelError) Error

func (e *ParallelError) Error() string

func (*ParallelError) FailedIndexes

func (e *ParallelError) FailedIndexes() []int

FailedIndexes returns the indexes of failed steps.

func (*ParallelError) Unwrap

func (e *ParallelError) Unwrap() []error

Unwrap returns all non-nil errors.

type Response

type Response struct {
	Type    ResponseType   `json:"type"`
	Action  string         `json:"action,omitempty"`
	Params  map[string]any `json:"params,omitempty"`
	Message string         `json:"message,omitempty"`
	Raw     string         `json:"-"`
}

Response represents a structured LLM output.

func (*Response) Is

func (r *Response) Is(action string) bool

Is checks if the response matches a specific action.

func (*Response) IsMessage

func (r *Response) IsMessage() bool

IsMessage checks if the response type is a message.

func (*Response) IsMethod

func (r *Response) IsMethod() bool

IsMethod checks if the response type is a method call.

func (*Response) ParseParams

func (r *Response) ParseParams(dest any) error

ParseParams parses Params into a typed struct.

type ResponseType

type ResponseType string

ResponseType defines the type of LLM response.

const (
	ResponseMethod  ResponseType = "method"
	ResponseMessage ResponseType = "message"
)

type Runnable

type Runnable interface {
	Run(fc *FlowContext) (*Response, error)
}

Runnable is the interface for an executable unit.

func Chain

func Chain(steps ...Runnable) Runnable

Chain creates a flow that executes steps sequentially.

func Decide

func Decide(cases ...DecideCase) Runnable

Decide creates a branching flow based on the previous response action.

func Guard

func Guard(condition func(*FlowContext) bool, then Runnable) Runnable

Guard creates a runnable that only executes when the condition is true.

func Iterate

func Iterate(step Runnable, until func(*Response) bool) Runnable

Iterate creates a loop that repeats until the condition returns true.

func IterateMax

func IterateMax(step Runnable, until func(*Response) bool, max int) Runnable

IterateMax creates a loop with a custom maximum iteration count.

func Parallel

func Parallel(steps ...Runnable) Runnable

Parallel creates a flow that executes all steps concurrently.

func Transform

func Transform(fn func(*Response) *Response) Runnable

Transform creates a runnable that transforms the previous response.

type RunnableFunc

type RunnableFunc func(fc *FlowContext) (*Response, error)

RunnableFunc adapts a function to the Runnable interface.

func (RunnableFunc) Run

func (f RunnableFunc) Run(fc *FlowContext) (*Response, error)

Run implements Runnable.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session represents a session with context chaining.

func NewSession

func NewSession(id string) *Session

NewSession creates a new session with the given ID.

func (*Session) AppendMessage

func (s *Session) AppendMessage(role, content string)

AppendMessage adds a message to session history.

func (*Session) Chain

func (s *Session) Chain() []string

Chain returns the execution chain.

func (*Session) Clear

func (s *Session) Clear()

Clear resets the session state.

func (*Session) GetResponseID

func (s *Session) GetResponseID(stepName string) (string, bool)

GetResponseID returns the LLM response ID for a step.

func (*Session) ID

func (s *Session) ID() string

ID returns the session ID.

func (*Session) LastAccessed

func (s *Session) LastAccessed() time.Time

LastAccessed returns the session's last access time.

func (*Session) LastResponseID

func (s *Session) LastResponseID() string

LastResponseID returns the most recent LLM response ID.

func (*Session) Messages

func (s *Session) Messages() []Message

Messages returns all session messages.

func (*Session) RecordNode

func (s *Session) RecordNode(stepName, responseID string)

RecordNode records step execution with its LLM response ID.

func (*Session) RootResponseID

func (s *Session) RootResponseID() string

RootResponseID returns the root LLM response ID.

func (*Session) SetRootResponseID

func (s *Session) SetRootResponseID(id string)

SetRootResponseID sets the root LLM response ID.

type SessionAdapter

type SessionAdapter interface {
	Adapter

	// CallWithSession makes a call with session context.
	// previousResponseID is the ID from the previous call for context chaining.
	CallWithSession(ctx context.Context, system, user, previousResponseID string) (resp string, responseID string, err error)

	// DeleteSession deletes a session and all derived contexts.
	DeleteSession(ctx context.Context, responseID string) error
}

SessionAdapter extends Adapter with session-based context management.

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager manages sessions.

func NewSessionManager

func NewSessionManager(opts ...SessionManagerOption) *SessionManager

NewSessionManager creates a new session manager.

func (*SessionManager) Clear

func (m *SessionManager) Clear()

Clear removes all sessions.

func (*SessionManager) Count

func (m *SessionManager) Count() int

Count returns the number of active sessions.

func (*SessionManager) Delete

func (m *SessionManager) Delete(id string)

Delete removes a session and all its contexts.

func (*SessionManager) Exists

func (m *SessionManager) Exists(id string) bool

Exists checks if a session exists.

func (*SessionManager) Get

func (m *SessionManager) Get(id string) *Session

Get returns or creates a session.

func (*SessionManager) GetIfExists

func (m *SessionManager) GetIfExists(id string) *Session

GetIfExists returns a session if it exists, nil otherwise.

func (*SessionManager) MaxSize

func (m *SessionManager) MaxSize() int

MaxSize returns the current max size setting.

func (*SessionManager) Stop

func (m *SessionManager) Stop()

Stop stops the cleanup goroutine.

func (*SessionManager) TTL

func (m *SessionManager) TTL() time.Duration

TTL returns the current TTL setting.

type SessionManagerOption

type SessionManagerOption func(*SessionManager)

SessionManagerOption configures SessionManager.

func WithMaxSessions

func WithMaxSessions(max int) SessionManagerOption

WithMaxSessions sets the maximum session count.

func WithTTL

func WithTTL(ttl time.Duration) SessionManagerOption

WithTTL sets the session TTL.

type Step

type Step struct {
	Name       string
	Context    string
	Guide      *Guide
	Task       string
	Input      string
	Actions    *ActionRegistry
	MaxRetries int
	Validator  Validator
}

Step represents a single LLM call in an agent workflow.

func SimpleStep

func SimpleStep(name, context, input string) *Step

SimpleStep creates a simple message output step.

func (*Step) ActionNames

func (s *Step) ActionNames() []string

ActionNames returns registered action names.

func (*Step) BuildSystemPrompt

func (s *Step) BuildSystemPrompt(fc *FlowContext) (string, error)

BuildSystemPrompt builds the system prompt.

func (*Step) BuildUserPrompt

func (s *Step) BuildUserPrompt(fc *FlowContext, hint string) (string, error)

BuildUserPrompt builds the user prompt.

func (*Step) GetName

func (s *Step) GetName() string

GetName returns the step name.

func (*Step) Run

func (s *Step) Run(fc *FlowContext) (*Response, error)

Run executes the step.

func (*Step) ToolDefs

func (s *Step) ToolDefs() []ToolDef

ToolDefs returns tool definitions for Function Call.

func (*Step) ValidateAction

func (s *Step) ValidateAction(name string) bool

ValidateAction checks if an action name is valid.

type StepBuilder

type StepBuilder struct {
	// contains filtered or unexported fields
}

StepBuilder provides a fluent API for building Steps.

func NewStep

func NewStep(name string) *StepBuilder

NewStep creates a new StepBuilder with the given name.

func (*StepBuilder) Action

func (b *StepBuilder) Action(name string, prototype any) *StepBuilder

Action registers an action.

func (*StepBuilder) ActionWithDesc

func (b *StepBuilder) ActionWithDesc(name, description string, prototype any) *StepBuilder

ActionWithDesc registers an action with description.

func (*StepBuilder) Build

func (b *StepBuilder) Build() *Step

Build returns the built Step.

func (*StepBuilder) Context

func (b *StepBuilder) Context(template string) *StepBuilder

Context sets the data context.

func (*StepBuilder) Guide

func (b *StepBuilder) Guide(constraints ...string) *StepBuilder

Guide sets the constraints.

func (*StepBuilder) Input

func (b *StepBuilder) Input(template string) *StepBuilder

Input sets the user input template.

func (*StepBuilder) Role

func (b *StepBuilder) Role(r string) *StepBuilder

Role sets the role context (syntactic sugar).

func (*StepBuilder) Task

func (b *StepBuilder) Task(description string) *StepBuilder

Task sets the task description.

func (*StepBuilder) WithRetries

func (b *StepBuilder) WithRetries(n int) *StepBuilder

WithRetries sets the maximum retry count.

func (*StepBuilder) WithValidator

func (b *StepBuilder) WithValidator(v Validator) *StepBuilder

WithValidator sets the validator.

type ToolAdapter

type ToolAdapter interface {
	Adapter

	// CallWithTools makes a call with tool definitions.
	// Returns a tool call if the LLM chooses to call a function.
	CallWithTools(ctx context.Context, system, user string, tools []ToolDef) (*ToolCall, error)

	// CallWithToolsAndSession combines session and tool support.
	CallWithToolsAndSession(ctx context.Context, system, user, previousResponseID string, tools []ToolDef) (call *ToolCall, responseID string, err error)
}

ToolAdapter extends Adapter with Function Call support.

type ToolCall

type ToolCall struct {
	// Name is the function to call.
	Name string `json:"name"`

	// Arguments contains the function arguments.
	Arguments map[string]any `json:"arguments"`
}

ToolCall represents an LLM's function call.

type ToolDef

type ToolDef struct {
	// Name is the function name.
	Name string `json:"name"`

	// Description describes what this function does.
	Description string `json:"description"`

	// Parameters is the JSON Schema for function parameters.
	Parameters map[string]any `json:"parameters"`
}

ToolDef defines an LLM-callable function/tool.

type ValidationError

type ValidationError struct {
	Err      error
	Response *Response
	Attempt  int
}

ValidationError indicates a response validation failure.

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

type Validator

type Validator func(*Response) error

Validator validates a response.

Directories

Path Synopsis
adapter
ark
Package ark provides the Volcengine Ark (Doubao) adapter for flowengine.
Package ark provides the Volcengine Ark (Doubao) adapter for flowengine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL