workflow

package
v0.2.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package workflow defines the execution blackboard and high-level agent runtime types.

Package workflow defines the execution blackboard (Board: Vars + Channels), the Runtime orchestration API (Run, MemorySession, prepare/finish), Agent/Strategy/Memory abstractions, and Request/Result types.

Graph execution Strategy lives in subpackage workflow/flowgraph (imports graph). Callers typically construct a Runtime with WithPrepareBoard for platform-specific board setup and WithDependencies for Factory + Executor when using flowgraph.

Deprecated: the workflow package is superseded by the agent + engine + graph runtime introduced in v0.2.x and is scheduled for removal in v0.3.0. The breakdown below documents where each concept moved so callers can migrate incrementally; until v0.3.0 every symbol in this package keeps working unchanged.

Migration map (workflow → new location):

  • workflow.Board / BoardSnapshot / NewBoard / RestoreBoard / GetTyped / Cloneable / MainChannel → engine.Board / engine.BoardSnapshot / engine.NewBoard / engine.RestoreBoard / engine.GetTyped / engine.Cloneable / engine.MainChannel (re-exported by sdk/graph for graph callers).

  • workflow.Runtime / NewRuntime / RuntimeOption / WithMemoryFactory / WithPrepareBoard / WithDependencies → sdk/agent: agent.Agent + agent.Run. The Runtime "prepare board → strategy.Build → run → finish" pipeline is folded into agent.Run; per-platform board prep becomes an agent.Seeder.

  • workflow.Agent / NewAgent / AgentOption / AgentCard / Skill / AgentCapabilities → sdk/agent: agent.Agent (interface), agent.New (constructor), agent.Card (descriptor). Skills are agent.Decider + agent.Tool wiring on the agent value.

  • workflow.Strategy / Runnable / StrategyCapabilities / Dependencies / SetDep / GetDep / NewDependencies → sdk/agent.Decider for the runtime selection logic; graph/runner.Runner replaces the Build/Runnable split for graph strategies. Dependency wiring becomes constructor arguments on the concrete factory (e.g. graph/node/llmnode.Deps).

  • workflow.Request / RequestConfig / NewTextRequest / MessageText → sdk/agent.Request and direct use of model.Message helpers (model.NewTextMessage, msg.Text()) — there is no longer a separate "request text" projection.

  • workflow.Result / TaskStatus / Artifact → sdk/agent.Result + sdk/agent.Disposition (typed status). Artifact slots are now first-class fields on agent.Result.

  • workflow.Memory / MemorySession / MemoryFactory / BaseSession / ContextAssembler / IncrementalSaver → sdk/agent.Observer (lifecycle hooks) + sdk/agent.Seeder (initial board state). History persistence is no longer a runtime concern; persist agent.Result downstream.

  • workflow.RunOption / WithHistory / WithStreamCallback / WithMaxIterations / WithBoard / ApplyRunOpts / RunConfig → executor.RunOption (graph-level) + agent.RunOption (agent-level). Streaming moves to engine.Host.Publisher; subscribers register at the host or via event.Bus directly.

  • workflow.StreamEvent / StreamCallback → event.Envelope + event.Bus. Nodes emit through graph.StreamPublisher (handed via ExecutionContext); aliases remain in graph/deprecated.go for one minor release.

  • workflow.Task / TaskManager → sdk/agent.Run (per-invocation handle). There is no replacement for the global TaskManager; orchestration of multiple agent runs is the host application's concern.

Until v0.3.0 the agent, engine and graph packages are the sanctioned way to build new code; existing workflow callers continue to compile against the legacy API but will see staticcheck SA1019 warnings.

Index

Constants

View Source
const (
	VarQuery            = "query"
	VarAnswer           = "answer"
	VarMessages         = "messages"
	VarRunID            = "__run_id"
	VarStartTime        = "__start_time"
	VarInternalUsage    = "__usage"
	VarInterruptedNode  = "__interrupted_node"
	VarOutputSchema     = "__output_schema"
	VarPrevMessageCount = "__prev_message_count"
	VarSummaryIndex     = "__summary_index"
)

Board variable keys for the workflow/runtime layer.

View Source
const MainChannel = ""

MainChannel is the default message channel key (empty string).

Variables

This section is empty.

Functions

func GetDep

func GetDep[T any](d *Dependencies, key string) (T, error)

GetDep retrieves a typed dependency. It returns an error if the key is missing or the stored value does not match the requested type.

func GetTyped

func GetTyped[T any](b *Board, key string) (T, bool)

GetTyped retrieves a typed value from the Board's vars.

func MessageText

func MessageText(m model.Message) string

MessageText returns the plain text content of a user message, or "".

func SetDep

func SetDep[T any](d *Dependencies, key string, val T)

SetDep stores a typed dependency value. The type parameter is for documentation only at the call site; retrieval is type-checked by GetDep.

Types

type Agent

type Agent interface {
	ID() string
	Card() AgentCard
	Strategy() Strategy
	Tools() []string
}

Agent describes identity and execution strategy for one logical agent.

func NewAgent

func NewAgent(id string, strategy Strategy, opts ...AgentOption) Agent

NewAgent constructs a basic Agent backed by the given Strategy.

type AgentCapabilities

type AgentCapabilities struct {
	Streaming        bool `json:"streaming,omitempty"`
	PushNotification bool `json:"push_notification,omitempty"`
	StateTransition  bool `json:"state_transition,omitempty"`
}

AgentCapabilities declares optional runtime features.

type AgentCard

type AgentCard struct {
	Name         string
	Description  string
	Skills       []Skill
	InputModes   []string
	OutputModes  []string
	Capabilities AgentCapabilities
}

AgentCard describes capabilities for discovery (e.g. A2A).

type AgentOption

type AgentOption func(*simpleAgent)

AgentOption configures NewAgent.

func WithAgentDescription

func WithAgentDescription(desc string) AgentOption

WithAgentDescription sets the card description.

func WithAgentTools

func WithAgentTools(tools []string) AgentOption

WithAgentTools sets tool names exposed to the runtime.

type Artifact

type Artifact struct {
	Name  string       `json:"name"`
	Parts []model.Part `json:"parts,omitempty"`
}

Artifact is a named bundle of parts produced during a run.

type BaseSession

type BaseSession struct{}

BaseSession is a no-op MemorySession for tests or disabled memory.

func (BaseSession) Close

func (BaseSession) Load

func (BaseSession) Save

func (BaseSession) Vars

func (BaseSession) Vars(context.Context) (map[string]any, error)

type Board

type Board struct {
	// contains filtered or unexported fields
}

Board is the graph execution blackboard: typed message channels plus control vars. Card-based kanban coordination lives in kanban.Board, not here.

Thread safety: public methods use a mutex (matches historical graph.Board behavior).

func NewBoard

func NewBoard() *Board

NewBoard creates an empty Board with an initialized main channel.

func RestoreBoard

func RestoreBoard(snap *BoardSnapshot) *Board

RestoreBoard reconstructs a Board from a snapshot.

func (*Board) AppendChannelMessage

func (b *Board) AppendChannelMessage(name string, msg model.Message)

AppendChannelMessage appends a message to a channel.

func (*Board) AppendSliceVar

func (b *Board) AppendSliceVar(key string, value any) error

AppendSliceVar atomically appends a value to a slice stored in a board variable. It returns an error if the existing value is not a []any (instead of silently overwriting).

func (*Board) Channel

func (b *Board) Channel(name string) []model.Message

Channel returns a copy of messages for the given channel (empty slice if missing).

func (*Board) ChannelsCopy

func (b *Board) ChannelsCopy() map[string][]model.Message

ChannelsCopy returns a deep copy of all channel message lists (for parallel merge).

func (*Board) GetVar

func (b *Board) GetVar(key string) (any, bool)

GetVar retrieves a board-level variable.

func (*Board) GetVarString

func (b *Board) GetVarString(key string) string

GetVarString retrieves a board variable as a string, returning "" if missing or wrong type.

func (*Board) RestoreFrom

func (b *Board) RestoreFrom(snap *BoardSnapshot)

RestoreFrom overwrites this board from a snapshot (executor retry rollback).

func (*Board) SetChannel

func (b *Board) SetChannel(name string, msgs []model.Message)

SetChannel replaces the entire message list for a channel.

func (*Board) SetVar

func (b *Board) SetVar(key string, value any)

SetVar sets a board-level variable.

func (*Board) Snapshot

func (b *Board) Snapshot() *BoardSnapshot

Snapshot returns a serializable snapshot (vars + channels only).

func (*Board) UpdateSliceVarItem

func (b *Board) UpdateSliceVarItem(key string, match func(any) bool, update func(any) any)

UpdateSliceVarItem finds and updates the first matching item in a slice variable.

func (*Board) Vars

func (b *Board) Vars() map[string]any

Vars returns a shallow copy of all board-level variables.

type BoardSnapshot

type BoardSnapshot struct {
	Vars     map[string]any             `json:"vars"`
	Channels map[string][]model.Message `json:"channels,omitempty"`
}

BoardSnapshot is a serializable representation of execution state (no kanban cards).

type Cloneable

type Cloneable interface {
	Clone() any
}

Cloneable may be implemented by values stored in Board vars to provide a type-safe deep copy instead of the JSON fallback.

type ContextAssembler added in v0.1.1

type ContextAssembler interface {
	Assemble(ctx context.Context, req *Request) ([]model.Message, error)
}

ContextAssembler is an optional interface that a MemorySession may implement. When present, Runtime calls Assemble instead of Load to obtain history messages. This allows implementations to perform custom context assembly (e.g. RAG, summarization, sliding window) based on the current request.

The returned messages should NOT include req.Message; the runtime appends it.

type Dependencies

type Dependencies struct {
	// contains filtered or unexported fields
}

Dependencies is a type-safe container for resources available to Strategy.Build. Each Strategy defines its own key constants and retrieves values via GetDep.

func NewDependencies

func NewDependencies() *Dependencies

NewDependencies creates an empty Dependencies container.

func (*Dependencies) Set

func (d *Dependencies) Set(key string, val any)

Set stores a dependency value under the given key.

type IncrementalSaver added in v0.1.1

type IncrementalSaver interface {
	Append(ctx context.Context, newMessages []model.Message) error
}

IncrementalSaver is an optional interface that a MemorySession may implement. When present, Runtime calls Append with only the newly produced messages instead of calling Save with the full message history.

type Memory

type Memory interface {
	Session(ctx context.Context, contextID string) (MemorySession, error)
}

Memory provides per-agent session memory.

type MemoryFactory

type MemoryFactory func(ctx context.Context, agent Agent) (Memory, error)

MemoryFactory creates a Memory for an agent.

type MemorySession

type MemorySession interface {
	Load(ctx context.Context) ([]model.Message, error)
	Vars(ctx context.Context) (map[string]any, error)
	Save(ctx context.Context, messages []model.Message) error
	Close(ctx context.Context, runErr error) error
}

MemorySession is one Run's memory lifecycle (Load → Vars → Save → Close). All methods accept a context.Context to support timeout and cancellation for implementations backed by databases or network services.

type Request

type Request struct {
	TaskID     string         `json:"task_id,omitempty"`
	ContextID  string         `json:"context_id,omitempty"`
	RuntimeID  string         `json:"runtime_id,omitempty"`
	RunID      string         `json:"run_id,omitempty"`
	Message    model.Message  `json:"message"`
	Inputs     map[string]any `json:"inputs,omitempty"`
	Config     *RequestConfig `json:"config,omitempty"`
	Extensions map[string]any `json:"extensions,omitempty"`
}

Request is one agent turn: user message plus optional inputs and metadata.

func NewTextRequest

func NewTextRequest(text string) *Request

NewTextRequest builds a Request whose Message is a single user text turn.

type RequestConfig

type RequestConfig struct {
	AcceptedOutputModes []string `json:"accepted_output_modes,omitempty"`
}

RequestConfig holds optional request-level settings (A2A alignment, etc.).

type Result

type Result struct {
	TaskID    string           `json:"task_id,omitempty"`
	Status    TaskStatus       `json:"status"`
	Messages  []model.Message  `json:"messages,omitempty"`
	Artifacts []Artifact       `json:"artifacts,omitempty"`
	Usage     model.TokenUsage `json:"usage"`
	State     map[string]any   `json:"state,omitempty"`
	Err       error            `json:"-"`
	LastBoard *Board           `json:"-"`
}

Result is returned by Runtime.Run after execution and finish logic.

func (*Result) Text

func (r *Result) Text() string

Text returns the last assistant text message in Messages, or "".

type RunConfig

type RunConfig struct {
	History        []model.Message
	Board          *Board
	StreamCallback StreamCallback
	MaxIterations  int
}

RunConfig holds resolved run-level settings. Exported so that Strategy implementations (e.g. flowgraph) can read stream callback, max iterations, etc.

func ApplyRunOpts

func ApplyRunOpts(opts []RunOption) RunConfig

ApplyRunOpts resolves a slice of RunOption into a RunConfig.

type RunOption

type RunOption func(*RunConfig)

RunOption configures a single Run call.

func WithBoard added in v0.1.7

func WithBoard(b *Board) RunOption

WithBoard injects a pre-built Board, skipping the normal prepareBoard phase. Used for resume-from-snapshot flows.

func WithHistory

func WithHistory(msgs []model.Message) RunOption

WithHistory injects message history into the main channel when no Memory session is used. Ignored when a non-nil Memory session is opened (MemoryFactory path wins).

func WithMaxIterations

func WithMaxIterations(n int) RunOption

WithMaxIterations caps the number of graph execution steps.

func WithStreamCallback

func WithStreamCallback(cb StreamCallback) RunOption

WithStreamCallback sets a streaming event callback for the execution.

type Runnable

type Runnable interface {
	Execute(ctx context.Context, board *Board, req *Request, opts ...RunOption) (*Board, error)
}

Runnable is the compiled execution unit produced by Strategy.Build.

type Runtime

type Runtime interface {
	Run(ctx context.Context, agent Agent, req *Request, opts ...RunOption) (*Result, error)
}

Runtime executes one agent Request using Strategy + optional Memory.

func NewRuntime

func NewRuntime(opts ...RuntimeOption) Runtime

NewRuntime constructs a Runtime with optional MemoryFactory and board preparation hook.

type RuntimeOption

type RuntimeOption func(*runtime)

RuntimeOption configures NewRuntime.

func WithDependencies

func WithDependencies(d *Dependencies) RuntimeOption

WithDependencies supplies Strategy.Build with factories and executors (flowgraph uses these).

func WithMemoryFactory

func WithMemoryFactory(f MemoryFactory) RuntimeOption

WithMemoryFactory sets the MemoryFactory used by openSession.

func WithPrepareBoard

func WithPrepareBoard(fn func(ctx context.Context, agent Agent, req *Request, session MemorySession, opts []RunOption) (*Board, error)) RuntimeOption

WithPrepareBoard sets a custom board preparation (platform graph vars, schema, etc.). When nil, prepareBoard uses the generic Request + session + history path only.

type Skill

type Skill struct {
	ID          string `json:"id"`
	Name        string `json:"name"`
	Description string `json:"description,omitempty"`
}

Skill is a lightweight skill declaration on an AgentCard.

type Strategy

type Strategy interface {
	Kind() string
	Build(ctx context.Context, deps *Dependencies) (Runnable, error)
	Capabilities() StrategyCapabilities
}

Strategy describes how to execute one turn (graph, script, remote, …).

type StrategyCapabilities

type StrategyCapabilities struct {
	AnswerKey string
}

StrategyCapabilities describes how Runtime reads outputs from the Board.

func (StrategyCapabilities) AnswerVar

func (c StrategyCapabilities) AnswerVar() string

AnswerVar returns the board var key for the final answer, defaulting to "answer".

type StreamCallback

type StreamCallback func(event StreamEvent)

StreamCallback receives streaming events during execution.

type StreamEvent

type StreamEvent struct {
	Type    string `json:"type"`
	NodeID  string `json:"node_id"`
	Payload any    `json:"payload,omitempty"`
}

StreamEvent carries a streaming event emitted by a node during execution.

type Task

type Task struct {
	ID        string     `json:"id"`
	Status    TaskStatus `json:"status"`
	CreatedAt time.Time  `json:"created_at"`
	Result    *Result    `json:"result,omitempty"`
}

Task represents a tracked unit of work managed by a TaskManager.

type TaskManager

type TaskManager interface {
	GetTask(id string) (*Task, error)
	CancelTask(id string) error
	ListTasks() ([]*Task, error)
}

TaskManager is an optional capability for run/task orchestration (list/cancel). Default Runtime implementations may return a no-op or nil adapter.

type TaskStatus

type TaskStatus string

TaskStatus is the terminal or intermediate status of a run.

const (
	StatusCompleted     TaskStatus = "completed"
	StatusWorking       TaskStatus = "working"
	StatusFailed        TaskStatus = "failed"
	StatusInputRequired TaskStatus = "input_required"
	StatusCanceled      TaskStatus = "canceled"
	StatusInterrupted   TaskStatus = "interrupted"
	StatusAborted       TaskStatus = "aborted"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL