taskengine

package
v0.32.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Package taskengine orchestrates an agent: it drives LLM turns, tool calls, and routing in a loop, defined as a JSON chain you version in git. Handlers are chat_completion, execute_tool_calls, tools, route, raise_error, and noop; transitions route control flow (equals, contains, starts_with, ends_with, default, edge_traversed_at_least).

It is an agent control-flow engine, not a dataflow/workflow engine. The unit of execution is the conversation, and the TaskEvent stream is the contract clients consume. It deliberately does not transform structured data across steps: data is shaped into a turn (templates, observable in the conversation) or produced by a tool call (observable as an event) — never mutated invisibly on the forward path where no event would see it.

Index

Constants

View Source
const (
	ContextKeyOutputByteLimit contextKey = "output_byte_limit"
	ContextKeyToolCallID      contextKey = "tool_call_id"
)
View Source
const (
	// TransitionExecuted: a chat_completion turn finished with no tool calls.
	TransitionExecuted = "executed"
	// TransitionToolCall: a chat_completion turn requested one or more tool calls.
	// (Snake_case to match the "tool_call" task-event kind; pre-1.0 this replaced
	// the earlier hyphenated "tool-call".)
	TransitionToolCall = "tool_call"
	// TransitionNoop: the noop handler ran, or execute_tool_calls saw empty history.
	TransitionNoop = "noop"
	// TransitionNoCallsFound: the model's last message carried no tool calls to run.
	TransitionNoCallsFound = "no_calls_found"
	// TransitionToolsExecuted: a tools task ran its tool successfully.
	TransitionToolsExecuted = "tools_executed"
	// TransitionFailed: a tools task failed.
	TransitionFailed = "failed"
)

Transition-eval tokens are the control values a handler emits as its transition "eval"; a TransitionBranch matches them via its When field (with the default Operator, exact string equality). These are part of the DSL contract — branch on these constants, not the model's free text:

  • chat_completion → TransitionToolCall (model requested tools) | TransitionExecuted (finished, no tool calls)
  • execute_tool_calls → TransitionNoop (empty history) | TransitionNoCallsFound (model produced no tool calls) | TransitionToolsExecuted | TransitionFailed
  • tools → TransitionToolsExecuted | TransitionFailed (or, when OutputTemplate is set, its rendered text)
  • noop → TransitionNoop

To branch on the model's actual text, use the `route` handler, whose eval IS the model's chosen label.

View Source
const TaskEventSubjectAll = "taskengine.events"
View Source
const (
	TermEnd = "end"
)

Variables

View Source
var ErrContextLengthExceeded = errors.New("exceeds context length")

ErrContextLengthExceeded is returned when the input or chat history exceeds the allowed context length.

View Source
var ErrToolsNotFound = errors.New("tools not found")

ErrToolsNotFound is returned when a named tools is not registered in any repo.

View Source
var ErrToolsToolsUnavailable = errors.New("tools tools unavailable")

ErrToolsToolsUnavailable is returned when a tools is registered but its tool list cannot be loaded (e.g. MCP server unreachable or list-tools failed). ExecEnv treats this like a missing tools for tool preload: skip tools, continue the chain.

View Source
var ErrUnsupportedTaskType = errors.New("executor does not support the task type")

ErrUnsupportedTaskType indicates unrecognized task type

Functions

func ConvertToType

func ConvertToType(value interface{}, dataType DataType) (interface{}, error)

ConvertToType converts a value to the specified DataType

func EdgeCountsFromContext

func EdgeCountsFromContext(ctx context.Context) map[string]int

EdgeCountsFromContext returns the edge counts attached via WithEdgeCounts, or nil if not set. A nil map is safe to read (lookup returns zero).

func ExportedResolveToolsNames

func ExportedResolveToolsNames(ctx context.Context, allowlist []string, provider ToolsProvider) ([]string, error)

ExportedResolveToolsNames is a test-only export of resolveToolsNames.

func ExtractJSONArray

func ExtractJSONArray(s string) string

ExtractJSONArray scans s for the outermost [...] block and returns it. It first strips code fences, then skips any preamble text the LLM may have placed before the JSON array to be robust to inconsistent model output.

func ExtractJSONObject

func ExtractJSONObject(s string) string

ExtractJSONObject scans s for the outermost {...} block and returns it. It strips code fences first, same spirit as ExtractJSONArray.

func GetPrimaryModel

func GetPrimaryModel(llmCall *LLMExecutionConfig) string

func MergeTemplateVars

func MergeTemplateVars(ctx context.Context, overlay map[string]string) context.Context

MergeTemplateVars overlays keys onto any template vars already in ctx, then attaches the combined map. Use this when a nested step must add request_id / previous_output without dropping caller-supplied vars like model and provider.

func RuntimeToolsAllowlistFromContext

func RuntimeToolsAllowlistFromContext(ctx context.Context) ([]string, bool)

RuntimeToolsAllowlistFromContext returns (allowlist, true) when an allowlist was attached via WithRuntimeToolsAllowlist. The returned slice follows the same grammar as TaskDefinition.Tools. Returns (nil, false) when no runtime allowlist is attached — callers should treat this as "no restriction".

func StateSubject

func StateSubject(reqID string) string

func StripCodeFences

func StripCodeFences(s string) string

func SupportedOperators

func SupportedOperators() []string

func TaskEventRequestSubject

func TaskEventRequestSubject(requestID string) string

func TemplateVarsFromContext

func TemplateVarsFromContext(ctx context.Context) (map[string]string, error)

TemplateVarsFromContext returns the template variables map from the context. Returns nil if not set; a nil map is safe to read (key lookup returns false). MacroEnv will return an error for any {{var:key}} whose key is absent.

func ToolsArgsFromContext

func ToolsArgsFromContext(ctx context.Context, toolsName string) map[string]string

ToolsArgsFromContext returns the args previously stored for toolsName, or nil if none were set. The returned map must not be mutated by the caller.

func ToolsToolsUnavailable

func ToolsToolsUnavailable(toolsName string, cause error) error

ToolsToolsUnavailable wraps cause as ErrToolsToolsUnavailable for toolsName (for errors.Is).

func WithEdgeCounts

func WithEdgeCounts(ctx context.Context, counts map[string]int) context.Context

WithEdgeCounts attaches the in-flight edge traversal counts for the current chain run to ctx. SimpleEnv updates this between each task step so handlers and step-time macros (e.g. {{edge_count:from->to}}) can read counts that reflect the loop iteration they are in, not the chain-start zero.

Layout of the map: keys are "<fromTaskID>-><toTaskID>", values are the number of times that edge has been traversed in this run.

func WithRetryOutcomeSink

func WithRetryOutcomeSink(ctx context.Context, sink *RetryOutcomeSink) context.Context

WithRetryOutcomeSink attaches sink to ctx so chat_completion tasks can append outcomes via [appendRetryOutcome].

func WithRuntimeToolsAllowlist

func WithRuntimeToolsAllowlist(ctx context.Context, allowlist []string) context.Context

WithRuntimeToolsAllowlist attaches a caller-supplied tools allowlist to ctx that is intersected with each task's own allowlist inside resolveToolsNames. A caller can only further restrict — never expand — what a chain JSON permits. Grammar matches TaskDefinition.Tools: nil/[]/["*"]/exact names/["*","!name"].

Use this when a host must enforce per-call policy (such as disabling local_shell for a step) regardless of what the chain JSON declares. Absent key means "no runtime restriction" — behavior matches pre-feature code.

func WithTaskEventScope

func WithTaskEventScope(ctx context.Context, scope TaskEventScope) context.Context

func WithTaskEventSink

func WithTaskEventSink(ctx context.Context, sink TaskEventSink) context.Context

func WithTemplateVars

func WithTemplateVars(ctx context.Context, vars map[string]string) context.Context

WithTemplateVars attaches a map of template variables to the context. MacroEnv expands {{var:name}} from this map. The engine never reads os.Getenv; callers (e.g. Contenox CLI, API) build the map and attach it here.

func WithToolsArgs

func WithToolsArgs(ctx context.Context, toolsName string, args map[string]string) context.Context

WithToolsArgs stores a copy of args for the named tools in ctx.

The map is copied on entry so the stored value is immutable — callers must not rely on mutating the original map being visible to tools implementations. This ensures no data races when the same context is read concurrently (e.g. during tool-list construction in ExecEnv).

Types

type BusInspector

type BusInspector struct {
	// contains filtered or unexported fields
}

func NewBusInspector

func NewBusInspector(inner Inspector, bus libbus.Messenger, tracker libtracker.ActivityTracker) *BusInspector

func (*BusInspector) Start

func (i *BusInspector) Start(ctx context.Context) StackTrace

type BusTaskEventSink

type BusTaskEventSink struct {
	// contains filtered or unexported fields
}

func NewBusTaskEventSink

func NewBusTaskEventSink(bus libbus.Messenger) *BusTaskEventSink

func (*BusTaskEventSink) Enabled

func (s *BusTaskEventSink) Enabled() bool

func (*BusTaskEventSink) PublishTaskEvent

func (s *BusTaskEventSink) PublishTaskEvent(ctx context.Context, event TaskEvent) error

type CapturedPayloadSummary added in v0.29.0

type CapturedPayloadSummary struct {
	Truncated         bool   `json:"truncated"`
	Reason            string `json:"reason"`
	OriginalType      string `json:"originalType,omitempty"`
	OriginalJSONBytes int    `json:"originalJsonBytes,omitempty"`
	SHA256            string `json:"sha256,omitempty"`
	Preview           string `json:"preview,omitempty"`
	PreviewBytes      int    `json:"previewBytes,omitempty"`
}

CapturedPayloadSummary replaces oversized or non-JSON-marshallable captured payloads in persisted/streamed state. In-memory execution history keeps the original values; this type is for observability storage only.

type CapturedStateUnit

type CapturedStateUnit struct {
	TaskID      string        `json:"taskID" example:"validate_input"`
	TaskHandler string        `json:"taskHandler" example:"chat_completion"`
	InputType   DataType      `json:"inputType" example:"string" openapi_include_type:"string"`
	OutputType  DataType      `json:"outputType" example:"string" openapi_include_type:"string"`
	Transition  string        `json:"transition" example:"valid_input"`
	Duration    time.Duration `json:"duration" example:"452000000"`
	Error       ErrorResponse `json:"error" openapi_include_type:"taskengine.ErrorResponse"`
	Input       any           `json:"input,omitempty"`
	Output      any           `json:"output,omitempty"`
	InputVar    string        `json:"inputVar" example:"input"`

	RetryIndex   int         `json:"retryIndex"`
	Cancelled    bool        `json:"cancelled,omitempty"`
	TimedOut     bool        `json:"timedOut,omitempty"`
	ProviderType string      `json:"providerType,omitempty"`
	ModelName    string      `json:"modelName,omitempty"`
	ToolNames    []string    `json:"toolNames,omitempty"`
	TokenUsage   *TokenUsage `json:"tokenUsage,omitempty"`
}

type ChainContext

type ChainContext struct {
	Tools                     map[string]ToolWithResolution
	ClientTools               []Tool
	UnavailableToolsProviders []UnavailableToolsProvider
	Debug                     bool
}

type ChainTerms

type ChainTerms string

type ChatHistory

type ChatHistory struct {
	// Messages is the list of messages in the conversation.
	Messages []Message `json:"messages"`
	// Model is the name of the model to use for the conversation.
	Model string `json:"model" example:"mistral:instruct"`
	// InputTokens will be filled by the engine and will hold the number of tokens used for the input.
	InputTokens int `json:"inputTokens" example:"15"`
	// OutputTokens will be filled by the engine and will hold the number of tokens used for the output.
	OutputTokens int `json:"outputTokens" example:"10"`
}

ChatHistory represents a conversation history with an LLM.

type DataType

type DataType int

DataType represents the type of data passed between tasks.

const (
	DataTypeAny DataType = iota
	DataTypeString
	DataTypeInt
	DataTypeJSON
	DataTypeChatHistory
	DataTypeNil
)

func DataTypeFromString

func DataTypeFromString(s string) (DataType, error)

DataTypeFromString converts a string to DataType.

func InferDataType

func InferDataType(v any) DataType

InferDataType picks the narrowest concrete DataType for a runtime value.

func NormalizeDataType

func NormalizeDataType(v any, dt DataType) (any, DataType, error)

NormalizeDataType upgrades DataTypeAny to a concrete type and coerces the value with ConvertToType.

func NormalizeFinalChainOutput

func NormalizeFinalChainOutput(value any, dt DataType) (any, DataType, error)

func (DataType) MarshalJSON

func (d DataType) MarshalJSON() ([]byte, error)

func (DataType) MarshalYAML

func (d DataType) MarshalYAML() (any, error)

func (*DataType) String

func (d *DataType) String() string

String returns the string representation of the data type.

func (*DataType) UnmarshalJSON

func (dt *DataType) UnmarshalJSON(data []byte) error

func (*DataType) UnmarshalYAML

func (dt *DataType) UnmarshalYAML(value *yaml.Node) error

type EnvExecutor

type EnvExecutor interface {
	ExecEnv(ctx context.Context, chain *TaskChainDefinition, input any, dataType DataType) (any, DataType, []CapturedStateUnit, error)
}

EnvExecutor executes complete task chains with input and environment management.

func NewEnv

func NewEnv(
	ctx context.Context,
	tracker libtracker.ActivityTracker,
	exec TaskExecutor,
	inspector Inspector,
	toolsProvider ToolsRepo,
) (EnvExecutor, error)

NewEnv creates a new SimpleEnv with the given tracker and task executor.

func NewMacroEnv

func NewMacroEnv(inner EnvExecutor, toolsProvider ToolsRepo) (EnvExecutor, error)

NewMacroEnv wraps an existing EnvExecutor with macro expansion.

type ErrorResponse

type ErrorResponse struct {
	ErrorInternal error  `json:"-"`
	Error         string `json:"error" example:"validation failed: input contains prohibited content"`
}

type FunctionCall

type FunctionCall struct {
	Name      string `json:"name" example:"get_current_weather"`
	Arguments string `json:"arguments" example:"{\n  \"location\": \"San Francisco, CA\",\n  \"unit\": \"celsius\"\n}"`
}

FunctionCall specifies the function name and arguments for a tool call.

type FunctionCallObject

type FunctionCallObject struct {
	Name      string `json:"name" example:"get_current_weather"`
	Arguments any    `json:"arguments"`
}

type FunctionTool

type FunctionTool struct {
	Name        string      `json:"name"`
	Description string      `json:"description,omitempty"`
	Parameters  interface{} `json:"parameters,omitempty"` // JSON Schema object
}

FunctionTool defines the schema for a function-type tool.

type Inspector

type Inspector interface {
	Start(ctx context.Context) StackTrace
}

func NewSimpleInspector

func NewSimpleInspector() Inspector

type KVInspector

type KVInspector struct {
	// contains filtered or unexported fields
}

func NewKVInspector

func NewKVInspector(inner Inspector, kv libkv.KVManager, tracker libtracker.ActivityTracker) *KVInspector

func (*KVInspector) GetExecutionStateByRequestID

func (i *KVInspector) GetExecutionStateByRequestID(ctx context.Context, reqID string) ([]CapturedStateUnit, error)

func (*KVInspector) GetStatefulRequests

func (i *KVInspector) GetStatefulRequests(ctx context.Context) ([]string, error)

func (*KVInspector) Start

func (i *KVInspector) Start(ctx context.Context) StackTrace

type LLMExecutionConfig

type LLMExecutionConfig struct {
	// Model is the primary model: it is placed first in the candidate list and is
	// the model used for token counting (see GetPrimaryModel). When both Model and
	// Models are set, Model plus Models form the candidate set (Model first);
	// the resolver then picks a reachable one — so set exactly Model for a single
	// pinned model, or use Models for an explicit candidate pool.
	Model string `yaml:"model" json:"model" example:"mistral:instruct"`
	// Models is an additional candidate pool, considered alongside Model.
	Models []string `yaml:"models,omitempty" json:"models,omitempty" example:"[\"gpt-4\", \"gpt-3.5-turbo\"]"`
	// Provider is the primary provider, placed first in the candidate list;
	// Providers supplies additional candidates.
	Provider  string   `yaml:"provider,omitempty" json:"provider,omitempty" example:"ollama"`
	Providers []string `yaml:"providers,omitempty" json:"providers,omitempty" example:"[\"ollama\", \"openai\"]"`
	// Temperature is the sampling temperature; pointer so "unset" (nil) is
	// distinguishable from an explicit 0.0. When set it is honored everywhere.
	// When unset: chat_completion uses the provider default; the prompt/route
	// handlers use 0.0 (route depends on deterministic single-label output).
	Temperature *float32 `yaml:"temperature,omitempty" json:"temperature,omitempty" example:"0.7"`
	// Tools is the allowlist of registry tool names this task may invoke. (Client-
	// passed tools are governed separately by PassClientsTools.) For
	// execute_tool_calls tasks, an explicitly present tools field is also enforced
	// at execution time; omit it to preserve legacy chain-wide tool resolution.
	//
	// Patterns supported:
	//   - absent/null/[] — NO registry tools exposed. Note: omitempty collapses
	//                      nil and [] to the same wire form, so they are identical.
	//   - ["*"]          — all registered tools
	//   - ["a","b"]      — only the named tools (unknown names are ignored)
	//   - ["*","!name"]  — all tools except the excluded name(s)
	//
	// Exclusions ("!name") are only meaningful combined with "*"; an exclusion-only
	// list resolves to no tools.
	Tools []string `yaml:"tools,omitempty" json:"tools,omitempty" example:"[\"local_shell\", \"nws\"]"`
	// HideTools suppresses specific tools by (namespaced) name from BOTH the
	// registry tools selected via Tools and the client-passed tools.
	HideTools []string `yaml:"hide_tools,omitempty" json:"hide_tools,omitempty" example:"[\"tool1\", \"tools_name1.tool1\"]"`
	// ToolsPolicies carries per-tools policy overrides for this task.
	// Keys are tools names; values are maps of policy key → value pairs.
	// These are injected into the context before GetToolsForToolsByName is called,
	// so tools can produce dynamic tool descriptions and enforce the policy at Exec time.
	//
	// Example (local_shell):
	//   tools_policies:
	//     local_shell:
	//       _allowed_commands: "git,go,ls,cat,grep"
	//       _denied_commands:  "sudo,su,dd,mkfs"
	ToolsPolicies    map[string]map[string]string `yaml:"tools_policies,omitempty" json:"tools_policies,omitempty"`
	PassClientsTools bool                         `yaml:"pass_clients_tools" json:"pass_clients_tools"`
	// Think controls reasoning mode for supported models.
	// Accepts auto, off, minimal, low, medium, high, xhigh, plus boolean-style aliases.
	// Empty = provider default; user-facing built-in chains set this via {{var:think}}.
	Think string `yaml:"think,omitempty" json:"think,omitempty" example:"high"`
	// MaxTokens caps the model's output tokens for this task. When unset, NO
	// explicit output cap is sent and the provider default applies — the engine
	// deliberately does NOT fall back to the chain's TokenLimit (that is the
	// input+output context window, not an output cap, and conflating them trips
	// per-model output limits, e.g. Vertex Gemini 2.5 Pro's 65536 cap).
	MaxTokens *int `yaml:"max_tokens,omitempty" json:"max_tokens,omitempty" example:"8192"`
	// MaxTokensTemplate stores a string max_tokens macro from chain JSON until
	// MacroEnv expands it into MaxTokens. It is not emitted as a separate field.
	MaxTokensTemplate string `yaml:"-" json:"-"`
	// Shift allows the context window to slide on overflow instead of erroring.
	Shift bool `yaml:"shift,omitempty" json:"shift,omitempty"`
	// RetryPolicy wraps the underlying chat/prompt call with classified retry
	// (rate-limit / server-error / timeout) and an optional model fallback.
	// Nil or zero-value disables retry — current default. See [llmretry.Do].
	RetryPolicy *llmretry.RetryPolicy `yaml:"retry_policy,omitempty" json:"retry_policy,omitempty"`
}

LLMExecutionConfig represents configuration for executing tasks using Large Language Models (LLMs).

func (LLMExecutionConfig) MarshalJSON added in v0.29.0

func (c LLMExecutionConfig) MarshalJSON() ([]byte, error)

func (*LLMExecutionConfig) UnmarshalJSON added in v0.29.0

func (c *LLMExecutionConfig) UnmarshalJSON(data []byte) error

func (*LLMExecutionConfig) UnmarshalYAML added in v0.29.0

func (c *LLMExecutionConfig) UnmarshalYAML(value *yaml.Node) error

type MacroEnv

type MacroEnv struct {
	// contains filtered or unexported fields
}

MacroEnv is a transparent decorator around EnvExecutor that expands special macros in task templates before execution. Supported macros:

  • {{toolservice:list}} -> JSON map of tools name -> tool names
  • {{toolservice:tools}} -> JSON array of tools names
  • {{toolservice:tools <tools_name>}} -> JSON array of tool names for that tools
  • {{var:<name>}} -> value from context template vars (set by caller via WithTemplateVars; engine never reads env); errors if key is missing
  • {{var:<name>|<fallback>}} -> value from context template vars, or fallback when the var is missing or empty
  • {{var:<name>|var:<fallback-name>}} -> value from context template vars, or another template var when the first is missing or empty
  • {{date}} or {{date:<layout>}} -> current local date (default 2006-01-02)
  • {{now}} or {{now:<layout>}} -> current time (default RFC3339; layout e.g. 2006-01-02)
  • {{chain:id}} -> chain ID of the chain being executed

The engine does not expand any env:VAR-style macro; var:* is populated only by the caller.

func (*MacroEnv) ExecEnv

func (m *MacroEnv) ExecEnv(
	ctx context.Context,
	chain *TaskChainDefinition,
	input any,
	dataType DataType,
) (any, DataType, []CapturedStateUnit, error)

type Message

type Message struct {
	// ID is the unique identifier for the message.
	// This field is not used by the engine. It can be filled as part of the Request, or left empty.
	// The ID is useful for tracking messages and computing differences of histories before storage.
	ID string `json:"id" example:"msg_123456"`
	// Role is the role of the message sender.
	Role string `json:"role" example:"user"`
	// Content is the content of the message.
	Content string `json:"content,omitempty" example:"What is the capital of France?"`
	// Thinking is the model's internal reasoning trace.
	// Only populated when thinking is enabled; never sent back to the model as history.
	Thinking string `json:"thinking,omitempty"`
	// ToolCallID is the ID of the tool call associated with the message.
	ToolCallID string `json:"tool_call_id,omitempty"`
	// CallTools is the tool call of the message sender.
	CallTools []ToolCall `json:"callTools,omitempty"`
	// Timestamp is the time the message was sent.
	Timestamp time.Time `json:"timestamp" example:"2023-11-15T14:30:45Z"`
}

Message represents a single message in a chat conversation.

func SynthesizeHistory

func SynthesizeHistory(prior []Message, units []CapturedStateUnit, chainErr error) []Message

SynthesizeHistory rebuilds a conversation transcript from a chain run by walking the captured step stream. It exists so that hard-failed turns (errors, timeouts, cancellations, denied/timed-out HITL gates) make it into the persisted ChatHistory — the chain's returned ChatHistory only contains messages from steps that completed successfully.

prior is the session history that was sent into the chain. units is the captured step stream from Inspector.GetExecutionHistory(). chainErr is the error returned by the chain runner, if any.

The result is a candidate []Message ready for chatservice.PersistDiff — PersistDiff handles dedupe against already-stored messages by ID, so the synthesizer is free to emit overlapping prefixes between runs.

type MockTaskExecutor

type MockTaskExecutor struct {
	// Single value responses
	MockOutput          any
	MockTransitionValue string
	MockError           error

	// Sequence responses
	MockOutputSequence          []any
	MockTaskTypeSequence        []DataType
	MockTransitionValueSequence []string
	ErrorSequence               []error

	// Tracking
	CalledWithTask   *TaskDefinition
	CalledWithInput  any
	CalledWithPrompt string
	// contains filtered or unexported fields
}

MockTaskExecutor is a mock implementation of taskengine.TaskExecutor.

func (*MockTaskExecutor) CallCount

func (m *MockTaskExecutor) CallCount() int

CallCount returns how many times TaskExec was called

func (*MockTaskExecutor) Reset

func (m *MockTaskExecutor) Reset()

Reset clears all mock state between tests

func (*MockTaskExecutor) TaskExec

func (m *MockTaskExecutor) TaskExec(ctx context.Context, startingTime time.Time, tokenLimit int, chainContext *ChainContext, currentTask *TaskDefinition, input any, dataType DataType) (any, DataType, string, error)

TaskExec is the mock implementation of the TaskExec method.

type NoopTaskEventSink

type NoopTaskEventSink struct{}

func (NoopTaskEventSink) Enabled

func (NoopTaskEventSink) Enabled() bool

func (NoopTaskEventSink) PublishTaskEvent

func (NoopTaskEventSink) PublishTaskEvent(context.Context, TaskEvent) error

type OperatorTerm

type OperatorTerm string

OperatorTerm represents logical operators used for task transition evaluation

const (
	OpEquals     OperatorTerm = "equals"
	OpContains   OperatorTerm = "contains"
	OpStartsWith OperatorTerm = "starts_with"
	OpEndsWith   OperatorTerm = "ends_with"
	OpDefault    OperatorTerm = "default"
	// OpEdgeTraversedAtLeast fires when the edge specified by TransitionBranch.Edge
	// (formatted "fromTaskID->toTaskID") has been traversed at least the integer
	// in TransitionBranch.When times during the current chain run. Reads engine
	// state, not task output. Use it to bound workflow loops:
	//
	//   { "operator": "edge_traversed_at_least",
	//     "edge": "chat->run_tools", "when": "20", "goto": "summarise_failure" }
	//
	// Place this branch ahead of the normal loop branch so it intercepts before
	// the next loop iteration fires.
	OpEdgeTraversedAtLeast OperatorTerm = "edge_traversed_at_least"
)

func ToOperatorTerm

func ToOperatorTerm(s string) (OperatorTerm, error)

func (OperatorTerm) String

func (t OperatorTerm) String() string

type RetryOutcomeSink

type RetryOutcomeSink struct {
	// contains filtered or unexported fields
}

RetryOutcomeSink collects per-call retry outcomes from chat_completion tasks running inside one chain invocation. It is safe for concurrent appenders.

A host attaches a sink via WithRetryOutcomeSink before running a chain so it can observe whether any chat call retried, used fallback, or hit a non-retryable class (e.g. capacity).

func (*RetryOutcomeSink) Append

func (s *RetryOutcomeSink) Append(o llmretry.Outcome)

Append records one outcome. Safe for concurrent use.

func (*RetryOutcomeSink) LastErrorClass

func (s *RetryOutcomeSink) LastErrorClass() llmretry.ErrorClass

LastErrorClass returns the class of the most recent recorded outcome, or llmretry.ClassNone if no outcomes were recorded.

func (*RetryOutcomeSink) Outcomes

func (s *RetryOutcomeSink) Outcomes() []llmretry.Outcome

Outcomes returns a snapshot of recorded outcomes in append order.

type SimpleEnv

type SimpleEnv struct {
	// contains filtered or unexported fields
}

SimpleEnv is the default implementation of EnvExecutor.

func (SimpleEnv) ExecEnv

func (env SimpleEnv) ExecEnv(ctx context.Context, chain *TaskChainDefinition, input any, dataType DataType) (result any, resultType DataType, history []CapturedStateUnit, retErr error)

ExecEnv executes the given chain with the provided input.

type SimpleExec

type SimpleExec struct {
	// contains filtered or unexported fields
}

SimpleExec is a basic implementation of TaskExecutor. It executes chat completion, tools, route, raise_error, and noop tasks.

func (*SimpleExec) Prompt

func (exe *SimpleExec) Prompt(ctx context.Context, systemInstruction string, llmCall LLMExecutionConfig, prompt string, ctxLength int) (string, error)

Prompt resolves a model client using the resolver policy and sends the prompt to be executed. Returns the trimmed response string or an error.

func (*SimpleExec) TaskExec

func (exe *SimpleExec) TaskExec(taskCtx context.Context, startingTime time.Time, ctxLength int, chainContext *ChainContext, currentTask *TaskDefinition, input any, dataType DataType) (any, DataType, string, error)

type SimpleStackTrace

type SimpleStackTrace struct {
	// contains filtered or unexported fields
}

func (*SimpleStackTrace) GetExecutionHistory

func (s *SimpleStackTrace) GetExecutionHistory() []CapturedStateUnit

func (*SimpleStackTrace) RecordStep

func (s *SimpleStackTrace) RecordStep(step CapturedStateUnit)

type StackTrace

type StackTrace interface {
	RecordStep(step CapturedStateUnit)
	GetExecutionHistory() []CapturedStateUnit
}

type TaskChainDefinition

type TaskChainDefinition struct {
	// ID uniquely identifies the chain.
	ID string `yaml:"id" json:"id"`

	// Enables capturing user input and output.
	Debug bool `yaml:"debug" json:"debug"`

	// Description provides a human-readable summary of the chain's purpose.
	Description string `yaml:"description" json:"description"`

	// Tasks is the list of tasks to execute in sequence.
	Tasks []TaskDefinition `yaml:"tasks" json:"tasks" openapi_include_type:"taskengine.TaskDefinition"`

	// TokenLimit is the token limit for the context window (used during execution).
	TokenLimit int64 `yaml:"token_limit" json:"token_limit"`
}

TaskChainDefinition describes a sequence of tasks to execute in order, along with branching logic, retry policies, and model preferences.

TaskChainDefinition support dynamic routing based on LLM outputs or conditions, and can include tools to perform external actions (e.g., sending emails).

type TaskDefinition

type TaskDefinition struct {
	// ID uniquely identifies the task within the chain.
	ID string `yaml:"id" json:"id" example:"validate_input"`

	// Description is a human-readable summary of what the task does.
	Description string `yaml:"description" json:"description" example:"Validates user input meets quality requirements"`

	// Handler determines how the LLM output (or tools) will be interpreted.
	Handler TaskHandler `yaml:"handler" json:"handler" example:"chat_completion" openapi_include_type:"string"`

	// SystemInstruction provides additional instructions to the LLM, if applicable system level will be used.
	SystemInstruction string `` /* 158-byte string literal not displayed */

	// ExecuteConfig defines the configuration for executing prompt or chat model tasks.
	ExecuteConfig *LLMExecutionConfig `yaml:"execute_config,omitempty" json:"execute_config,omitempty" openapi_include_type:"taskengine.LLMExecutionConfig"`

	// Tools defines an external action to run.
	// Required for Tools tasks, must be nil/omitted for all other types.
	// Example: {type: "send_email", args: {"to": "user@example.com"}}
	Tools *ToolsCall `yaml:"tools,omitempty" json:"tools,omitempty" openapi_include_type:"taskengine.ToolsCall"`

	// Print optionally formats the output for display/logging.
	// Supports template variables from previous task outputs.
	// Optional for all task types except Tools where it's rarely used.
	// Example: "The score is: {{.previous_output}}"
	Print string `yaml:"print,omitempty" json:"print,omitempty" example:"Validation result: {{.validate_input}}"`

	// PromptTemplate is the text prompt sent to the LLM.
	// Optional; when set it overrides the resolved input as the prompt.
	// Supports template variables from previous task outputs.
	// Example: "Rate the quality from 1-10: {{.input}}"
	PromptTemplate string `yaml:"prompt_template,omitempty" json:"prompt_template,omitempty" example:"Is this input valid? {{.input}}"`

	// OutputTemplate is an optional go template to format the output of a tools.
	// If specified, the tools's JSON output will be used as data for the template.
	// The final output of the task will be the rendered string.
	// Example: "The weather is {{.weather}} with a temperature of {{.temperature}}."
	OutputTemplate string `yaml:"output_template,omitempty" json:"output_template,omitempty" example:"Tools result: {{.status}}"`

	// InputVar is the name of the variable to use as input for the task.
	// Example: "input" for the original input.
	// Each task stores its output in a variable named with it's task id.
	InputVar string `yaml:"input_var,omitempty" json:"input_var,omitempty" example:"input"`

	// Transition defines what to do after this task completes.
	Transition TaskTransition `yaml:"transition" json:"transition" openapi_include_type:"taskengine.TaskTransition"`

	// Timeout optionally sets a timeout for task execution.
	// Format: "10s", "2m", "1h" etc.
	// Optional for all task types.
	Timeout string `yaml:"timeout,omitempty" json:"timeout,omitempty" example:"30s"`

	// RetryOnFailure sets how many times to retry this task on failure.
	// Applies to all task types including Tools.
	// Default: 0 (no retries)
	RetryOnFailure int `yaml:"retry_on_failure,omitempty" json:"retry_on_failure,omitempty" example:"2"`
}

type TaskEvent

type TaskEvent struct {
	Kind         TaskEventKind `json:"kind"`
	Timestamp    time.Time     `json:"timestamp"`
	RequestID    string        `json:"request_id,omitempty"`
	ChainID      string        `json:"chain_id,omitempty"`
	TaskID       string        `json:"task_id,omitempty"`
	TaskHandler  string        `json:"task_handler,omitempty"`
	Retry        int           `json:"retry"`
	ModelName    string        `json:"model_name,omitempty"`
	ProviderType string        `json:"provider_type,omitempty"`
	BackendID    string        `json:"backend_id,omitempty"`
	OutputType   string        `json:"output_type,omitempty"`
	Transition   string        `json:"transition,omitempty"`
	Content      string        `json:"content,omitempty"`
	Thinking     string        `json:"thinking,omitempty"`
	Error        string        `json:"error,omitempty"`

	ApprovalID   string         `json:"approval_id,omitempty"`
	HookName     string         `json:"hook_name,omitempty"`
	ToolName     string         `json:"tool_name,omitempty"`
	ApprovalArgs map[string]any `json:"approval_args,omitempty"`
	ApprovalDiff string         `json:"approval_diff,omitempty"`

	HITLAction            string `json:"hitl_action,omitempty"`
	HITLReason            string `json:"hitl_reason,omitempty"`
	HITLPolicyName        string `json:"hitl_policy_name,omitempty"`
	HITLPolicyPath        string `json:"hitl_policy_path,omitempty"`
	HITLArgsSummary       string `json:"hitl_args_summary,omitempty"`
	HITLMatchedRule       *int   `json:"hitl_matched_rule,omitempty"`
	HITLTimeoutS          int    `json:"hitl_timeout_s,omitempty"`
	HITLApprovalRequested *bool  `json:"hitl_approval_requested,omitempty"`

	ToolDiffPath    string `json:"tool_diff_path,omitempty"`
	ToolDiffOldText string `json:"tool_diff_old_text,omitempty"`
	ToolDiffNewText string `json:"tool_diff_new_text,omitempty"`
}

func NewTaskEvent

func NewTaskEvent(ctx context.Context, kind TaskEventKind) TaskEvent

type TaskEventKind

type TaskEventKind string
const (
	TaskEventChainStarted   TaskEventKind = "chain_started"
	TaskEventStepStarted    TaskEventKind = "step_started"
	TaskEventStepChunk      TaskEventKind = "step_chunk"
	TaskEventStepCompleted  TaskEventKind = "step_completed"
	TaskEventStepFailed     TaskEventKind = "step_failed"
	TaskEventChainCompleted TaskEventKind = "chain_completed"
	TaskEventChainFailed    TaskEventKind = "chain_failed"

	TaskEventApprovalRequested TaskEventKind = "approval_requested"
	TaskEventHITLDecision      TaskEventKind = "hitl_decision"
	TaskEventToolCallPending   TaskEventKind = "tool_call_pending"
	TaskEventToolCall          TaskEventKind = "tool_call"
	TaskEventPrint             TaskEventKind = "print"
)

type TaskEventScope

type TaskEventScope struct {
	ChainID     string
	TaskID      string
	TaskHandler string
	Retry       int
}

type TaskEventSink

type TaskEventSink interface {
	PublishTaskEvent(ctx context.Context, event TaskEvent) error
	Enabled() bool
}

type TaskExecutor

type TaskExecutor interface {
	// TaskExec executes a single task with the given input and data type.
	// Returns:
	// - output: The processed task result
	// - outputType: The data type of the output
	// - transitionEval: String used for transition evaluation
	// - error: Any execution error encountered
	//
	// Parameters:
	// - ctx: Context for cancellation and timeouts
	// - startingTime: Chain start time for consistent timing
	// - ctxLength: Token context length limit for LLM operations
	// - chainContext: Immutable context of the chain
	// - currentTask: The task definition to execute
	// - input: Task input data
	// - dataType: Type of the input data
	TaskExec(ctx context.Context, startingTime time.Time, ctxLength int, chainContext *ChainContext, currentTask *TaskDefinition, input any, dataType DataType) (any, DataType, string, error)
}

TaskExecutor executes individual tasks within a workflow. Implementations should handle all task types and return appropriate outputs.

func NewExec

func NewExec(
	ctx context.Context,
	repo llmrepo.ModelRepo,
	toolsProvider ToolsRepo,
	tracker libtracker.ActivityTracker,
) (TaskExecutor, error)

NewExec creates a new SimpleExec instance

type TaskHandler

type TaskHandler string

TaskHandler defines how task outputs are processed and interpreted.

const (
	HandleRaiseError       TaskHandler = "raise_error"
	HandleRoute            TaskHandler = "route"
	HandleChatCompletion   TaskHandler = "chat_completion"
	HandleExecuteToolCalls TaskHandler = "execute_tool_calls"
	HandleNoop             TaskHandler = "noop"
	HandleTools            TaskHandler = "tools"
)

func (TaskHandler) String

func (t TaskHandler) String() string

type TaskTransition

type TaskTransition struct {
	// OnFailure is the task ID to jump to in case of failure.
	OnFailure string `yaml:"on_failure" json:"on_failure" example:"error_handler"`

	// Branches defines conditional branches for successful task completion.
	Branches []TransitionBranch `yaml:"branches" json:"branches" openapi_include_type:"taskengine.TransitionBranch"`
}

TaskTransition defines what happens after a task completes, including which task to go to next and how to handle errors.

type TokenUsage

type TokenUsage struct {
	Prompt     int `json:"prompt"`
	Completion int `json:"completion"`
	Total      int `json:"total"`
}

type Tool

type Tool struct {
	Type     string       `json:"type"`
	Function FunctionTool `json:"function"`
}

Tool represents a tool that can be called by the model.

type ToolCall

type ToolCall struct {
	ID       string       `json:"id" example:"call_abc123"`
	Type     string       `json:"type" example:"function"`
	Function FunctionCall `json:"function" openapi_include_type:"taskengine.FunctionCall"`
	// ProviderMeta carries opaque provider-specific data (e.g. Gemini thought_signature)
	// that must be round-tripped back on the next turn.
	ProviderMeta map[string]string `json:"provider_meta,omitempty" example:"{\"thought_signature\":\"123456\"}"`
}

ToolCall represents a tool call requested by the model.

type ToolWithResolution

type ToolWithResolution struct {
	Tool
	ToolsName string
}

type ToolsCall

type ToolsCall struct {
	// Name is the registered tools-PROVIDER (the service/server, e.g. "slack"),
	// not the tool. Required.
	Name string `yaml:"name" json:"name" example:"slack"`

	// ToolName is the specific TOOL to invoke on that provider
	// (e.g. "send_slack_notification").
	ToolName string `yaml:"tool_name" json:"tool_name" example:"send_slack_notification"`
	// Args are key-value pairs passed to the tool call.
	// Example: {"to": "user@example.com", "subject": "Notification"}
	Args map[string]string `` /* 126-byte string literal not displayed */
}

ToolsCall configures a `tools` task — a direct, deterministic call to one tool of one registered tools-provider (e.g. an MCP server), distinct from the model-driven tool calls of chat_completion/execute_tool_calls.

type ToolsProvider

type ToolsProvider interface {
	ToolsRegistry
	ToolsWithSchema
}

type ToolsRegistry

type ToolsRegistry interface {
	Supports(ctx context.Context) ([]string, error)
}

type ToolsRepo

type ToolsRepo interface {
	Exec(ctx context.Context, startingTime time.Time, input any, debug bool, args *ToolsCall) (any, DataType, error)
	ToolsRegistry
	ToolsWithSchema
}

ToolsRepo defines interface for external system integrations and side effects.

type ToolsWithSchema

type ToolsWithSchema interface {
	GetSchemasForSupportedTools(ctx context.Context) (map[string]*openapi3.T, error)
	GetToolsForToolsByName(ctx context.Context, name string) ([]Tool, error)
}

type TransitionBranch

type TransitionBranch struct {
	// Operator defines how to compare the task's transition eval to When. It is
	// REQUIRED and must be one of SupportedOperators() — an empty or unknown
	// operator is rejected at chain validation (at runtime it would never match,
	// a silent dead branch). The comparison for equals/contains/starts_with/
	// ends_with is byte-exact and CASE-SENSITIVE with no trimming — a trailing
	// newline (common in multiline template literals) will not match. Only the
	// `route` handler normalizes its answer.
	Operator OperatorTerm `yaml:"operator,omitempty" json:"operator,omitempty" example:"equals" openapi_include_type:"string"`

	// When is the value this branch matches against the task's transition eval.
	// What the eval is depends on the handler:
	//   - chat_completion / execute_tool_calls / tools / noop → a control token,
	//     one of the Transition* constants (e.g. "tool_call", "executed",
	//     "tools_executed", "no_calls_found", "noop", "failed"). You CANNOT branch
	//     on the model's free text here — use the `route` handler for that.
	//   - route → the model's chosen label (one of the declared branch targets).
	//   - edge_traversed_at_least → an integer threshold (see that operator).
	When string `yaml:"when" json:"when" example:"tool_call"`

	// Goto specifies the target task ID if this branch is taken.
	// Leave empty or use taskengine.TermEnd to end the chain.
	Goto string `yaml:"goto" json:"goto" example:"positive_response"`

	// Edge identifies a graph edge "fromTaskID->toTaskID" whose traversal
	// count is consulted by edge-state operators (e.g. edge_traversed_at_least).
	// Required when Operator is one of those; ignored otherwise.
	Edge string `yaml:"edge,omitempty" json:"edge,omitempty" example:"chat->run_tools"`
}

TransitionBranch defines a single possible path in the workflow, selected when the task's output matches the specified condition.

type UnavailableToolsProvider

type UnavailableToolsProvider struct {
	Name   string
	Reason string
}

Directories

Path Synopsis
Package llmretry wraps a single LLM call with classified retry, exponential backoff, and an optional model fallback.
Package llmretry wraps a single LLM call with classified retry, exponential backoff, and an optional model fallback.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL