Documentation
¶
Overview ¶
Package nodes contains the concrete Executor impls for every Node type. Each constructor (NewXExecutor) returns a workflow.Executor the engine registers via Engine.Register.
Index ¶
- func DataTableDescriptor(t workflow.NodeType) engine.NodeDescriptor
- func DefaultRunSessionID(id, runID string) string
- type AgentEvent
- type AgentExecutor
- type AgentSubscribeFn
- type BranchExecutor
- type ChannelExecutor
- func (e *ChannelExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
- func (e *ChannelExecutor) Descriptor() engine.NodeDescriptor
- func (e *ChannelExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
- func (e *ChannelExecutor) TemplateableFields(n workflow.Node) map[string]string
- type ClassifyExecutor
- type ConnectorExecutor
- func (e *ConnectorExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
- func (e *ConnectorExecutor) Descriptor() engine.NodeDescriptor
- func (e *ConnectorExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
- func (e *ConnectorExecutor) TemplateableFields(n workflow.Node) map[string]string
- type DBQueryExecutor
- type DataTableExecutor
- type EndExecutor
- type GoScriptExecutor
- type GoScriptSchema
- type HTTPExecutor
- func (e *HTTPExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
- func (e *HTTPExecutor) Descriptor() engine.NodeDescriptor
- func (e *HTTPExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
- func (e *HTTPExecutor) TemplateableFields(n workflow.Node) map[string]string
- type HTTPSchema
- type SessionInitExecutor
- type ShellExecutor
- type SwitchExecutor
- type TransformExecutor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DataTableDescriptor ¶ added in v0.13.1
func DataTableDescriptor(t workflow.NodeType) engine.NodeDescriptor
DataTableDescriptor returns the descriptor for one datatable_* node type. Used by setup/manager.go RegisterWithDesc since one executor handles all 7 datatable types.
func DefaultRunSessionID ¶
DefaultRunSessionID is the engine fallback when neither a session_init node nor a per-node override is set. Format is "wf_<id>_run_<runID>" — underscores keep the string inside the sessionID charset (no colon; the storage validator at internal/agents/storage/validate.go limits the alphabet to `[A-Za-z0-9._-]`).
Types ¶
type AgentEvent ¶
AgentEvent is the minimal event shape the agent executor consumes while waiting for a turn to complete. Defined here (not imported from tools/agents) so the workflow package stays free of a cycle on the UI broadcaster.
Type values mirror agents/event.EventType.String() output: "text_delta", "tool_use", "tool_result", "done", "error", "thinking", "session_start". Anything else is ignored by the executor.
type AgentExecutor ¶
type AgentExecutor struct {
Providers *provider.Registry
Pool *pool.Pool
Subscribe AgentSubscribeFn
}
AgentExecutor invokes a provider's AgentCall for a `type: agent` node. When Pool + Subscribe are wired and the resolved provider can route via pool, the executor enqueues through the agent pool (queue FIFO, session reuse, sidebar visibility); otherwise it falls back to the non-pool provider path for codex/gemini.
func NewAgentExecutor ¶
func NewAgentExecutor(reg *provider.Registry, p *pool.Pool, sub AgentSubscribeFn) *AgentExecutor
NewAgentExecutor wires the executor. Pool + Subscribe may be nil for non-claude-only test setups; in that case all agent calls go through provider.AgentCall (the non-pool path).
func (*AgentExecutor) Dependencies ¶
func (e *AgentExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
Dependencies surfaces provider name + each declared skill so workflow_describe shows the impact surface of the agent node.
func (*AgentExecutor) Descriptor ¶
func (e *AgentExecutor) Descriptor() engine.NodeDescriptor
func (*AgentExecutor) Execute ¶
func (e *AgentExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute runs the agent node. Routes via pool when configured.
type AgentSubscribeFn ¶
type AgentSubscribeFn func(sessionID string) (<-chan AgentEvent, func())
AgentSubscribeFn returns a receive channel of AgentEvents for one sessionID plus an unsub function. The executor subscribes before dispatching the pool send so no leading event is lost. Setup wires a concrete adapter around tools/agents.Broadcaster.
type BranchExecutor ¶
type BranchExecutor struct{}
BranchExecutor evaluates a Go-template expression and exposes the result as Verdict so the engine filters outgoing edges by `case:`.
func NewBranchExecutor ¶
func NewBranchExecutor() *BranchExecutor
NewBranchExecutor constructs the branch executor.
func (*BranchExecutor) Descriptor ¶
func (e *BranchExecutor) Descriptor() engine.NodeDescriptor
func (*BranchExecutor) Execute ¶
func (e *BranchExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute renders n.Expr; if it contains a binary operator, treats as boolean compare → "true"/"false". Otherwise the rendered string IS the verdict (string switch).
type ChannelExecutor ¶
type ChannelExecutor struct {
Registry *integration.Registry
}
ChannelExecutor dispatches `type: channel` action nodes through the integration registry. Each registered (channel, action) pair describes its own input schema and Execute closure, so this executor is just glue: render args → look up descriptor → call Execute.
Adding a new outbound op = drop a file under internal/agents/channels/<name>/workflow/ that registers an ActionDescriptor. No engine change required.
func NewChannelExecutor ¶
func NewChannelExecutor(reg *integration.Registry) *ChannelExecutor
NewChannelExecutor wires the executor to the integration registry.
func (*ChannelExecutor) Dependencies ¶
func (e *ChannelExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
Dependencies surfaces (channel, action) pairs through workflow_describe so impact analysis sees the exact op the node invokes — not just the channel module name.
func (*ChannelExecutor) Descriptor ¶
func (e *ChannelExecutor) Descriptor() engine.NodeDescriptor
func (*ChannelExecutor) Execute ¶
func (e *ChannelExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute renders the node's args, resolves the descriptor by "<channel>.<op>", and dispatches.
func (*ChannelExecutor) TemplateableFields ¶
func (e *ChannelExecutor) TemplateableFields(n workflow.Node) map[string]string
TemplateableFields surfaces the per-arg values so describe's template cross-ref scan reaches them. Each Args entry is exposed as args.<key> in issue paths so the warning shows which arg referenced an undeclared node.
type ClassifyExecutor ¶
ClassifyExecutor implements the 6-layer reliability stack:
- structured_output (defer to provider.StructuredCall)
- normalize — lowercase, trim, strip punct/quotes
- exact match — verdict ∈ output_cases?
- fuzzy_match — Levenshtein/substring against cases
- retry_on_mismatch — stricter system prompt, re-ask
- confidence_threshold — < threshold → default
func NewClassifyExecutor ¶
func NewClassifyExecutor(reg *provider.Registry) *ClassifyExecutor
NewClassifyExecutor wires the executor.
func (*ClassifyExecutor) Dependencies ¶
func (e *ClassifyExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
Dependencies surfaces the provider name to workflow_describe.
func (*ClassifyExecutor) Descriptor ¶
func (e *ClassifyExecutor) Descriptor() engine.NodeDescriptor
func (*ClassifyExecutor) Execute ¶
func (e *ClassifyExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute runs the classify node.
type ConnectorExecutor ¶
ConnectorExecutor dispatches workflow connector nodes through the existing connector module ExecuteFunc.
func NewConnectorExecutor ¶
func NewConnectorExecutor(reg *connector.Registry) *ConnectorExecutor
NewConnectorExecutor wires the executor.
func (*ConnectorExecutor) Dependencies ¶
func (e *ConnectorExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
Dependencies surfaces "<module>.<op>" pairs to workflow_describe.
func (*ConnectorExecutor) Descriptor ¶
func (e *ConnectorExecutor) Descriptor() engine.NodeDescriptor
func (*ConnectorExecutor) Execute ¶
func (e *ConnectorExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute invokes the resolved (module, op) Execute func.
func (*ConnectorExecutor) TemplateableFields ¶
func (e *ConnectorExecutor) TemplateableFields(n workflow.Node) map[string]string
TemplateableFields exposes each Args value as args.<key> for the describe scan.
type DBQueryExecutor ¶
type DBQueryExecutor struct{}
DBQueryExecutor runs a parameterized SQL query against a named database. The Database field resolves to an env key whose value is the DSN. Supported schemes: postgres://, sqlite:, file:.
Output fields:
- rows — []map[string]any, one entry per row
- row_count — int, same as len(rows)
- columns — []string
func NewDBQueryExecutor ¶
func NewDBQueryExecutor() *DBQueryExecutor
NewDBQueryExecutor builds the executor.
func (*DBQueryExecutor) Descriptor ¶
func (e *DBQueryExecutor) Descriptor() engine.NodeDescriptor
func (*DBQueryExecutor) Execute ¶
func (e *DBQueryExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute connects, runs the SQL, and returns rows.
type DataTableExecutor ¶ added in v0.13.1
DataTableExecutor handles all 7 datatable_* node types.
func NewDataTableExecutor ¶ added in v0.13.1
func NewDataTableExecutor(svc datatable.Service) *DataTableExecutor
NewDataTableExecutor wires the executor.
func (*DataTableExecutor) Execute ¶ added in v0.13.1
func (e *DataTableExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute dispatches per node.Type.
type EndExecutor ¶
type EndExecutor struct{}
EndExecutor is the terminator. Captures n.Result so downstream {{.Run.final_result}}-style reads can pick it up.
func (*EndExecutor) Descriptor ¶
func (e *EndExecutor) Descriptor() engine.NodeDescriptor
func (*EndExecutor) Execute ¶
func (e *EndExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute renders n.Result if set.
type GoScriptExecutor ¶
type GoScriptExecutor struct{}
GoScriptExecutor runs a Go program inside the yaegi interpreter and pipes RenderCtx JSON to its stdin / parses JSON back from its stdout. No external toolchain required — yaegi ships with stdlib symbols (go1.21 surface), so the script can import anything the stdlib bundle covers (encoding/json, strings, time, regexp, math, net/url, …).
Contract:
Input : os.Stdin = JSON-encoded RenderCtx
(keys: Event, Node, Env, Secret, Workflow, Run, DataTable)
Output : os.Stdout = JSON value of any shape; engine parses it and
exposes as Node.<id>.result + merges top-level keys when
the value is a JSON object so {{.Node.<id>.foo}} works.
os.Stderr = free-form logs; surfaced as
Node.<id>.stderr without further parsing.
Template pre-rendering: Code itself is rendered through the wick template engine before yaegi sees it. Lets users splice values like `{{.Env.api_url}}` directly into source if they want — but the idiomatic path is to read from os.Stdin.
func NewGoScriptExecutor ¶
func NewGoScriptExecutor() *GoScriptExecutor
NewGoScriptExecutor wires the executor.
func (*GoScriptExecutor) Descriptor ¶
func (e *GoScriptExecutor) Descriptor() engine.NodeDescriptor
Descriptor exposes schema + docs for the MCP catalog.
func (*GoScriptExecutor) Execute ¶
func (e *GoScriptExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute renders code template, runs it under yaegi w/ stdin/stdout piped, and parses the stdout as JSON.
type GoScriptSchema ¶
type GoScriptSchema struct {
Code string `` /* 249-byte string literal not displayed */
Timeout string `wick:"key=timeout_sec;number;desc=Script timeout in seconds (default 10)."`
}
GoScriptSchema reflects the inspector form. The Code field uses the textarea widget; UI swaps it for an Ace editor at hydrate time.
type HTTPExecutor ¶
type HTTPExecutor struct {
// contains filtered or unexported fields
}
HTTPExecutor performs an HTTP request. Retry policy from n.Retry; default GET; parse_response raw|json|bytes.
func NewHTTPExecutor ¶
func NewHTTPExecutor() *HTTPExecutor
NewHTTPExecutor builds the HTTP executor with a 30s default client.
func (*HTTPExecutor) Dependencies ¶
func (e *HTTPExecutor) Dependencies(n workflow.Node) []engine.NodeDependency
Dependencies surfaces the URL (or its host when easy to extract) as a generic http dependency so workflow_describe groups HTTP outbound under deps.other.http.
func (*HTTPExecutor) Descriptor ¶
func (e *HTTPExecutor) Descriptor() engine.NodeDescriptor
Descriptor exposes the schema + docs for the MCP catalog.
func (*HTTPExecutor) Execute ¶
func (e *HTTPExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute runs the request described by node n.
func (*HTTPExecutor) TemplateableFields ¶
func (e *HTTPExecutor) TemplateableFields(n workflow.Node) map[string]string
TemplateableFields exposes per-header / per-query values to workflow_describe's cross-ref scan in addition to the generic pool (url + body live in the default set).
type HTTPSchema ¶
type HTTPSchema struct {
Method string `wick:"required;key=method;dropdown=GET|POST|PUT|PATCH|DELETE;desc=HTTP method"`
URL string `` /* 144-byte string literal not displayed */
Headers string `` /* 194-byte string literal not displayed */
Query string `wick:"key=query;kvlist=name|value;desc=Query string params. Each value is rendered as a Go template."`
Body string `` /* 139-byte string literal not displayed */
ParseResponse string `` /* 138-byte string literal not displayed */
TimeoutSec string `wick:"key=timeout_sec;number;desc=Request timeout in seconds (default 30)"`
}
HTTPSchema is the per-field schema reflected via integration.StructSchema for workflow_node_types. Single source of truth for AI consumers and the editor UI (the editor-side module reflects the same struct via entity.StructToConfigs to render ArgForm).
type SessionInitExecutor ¶
SessionInitExecutor implements the `session_init` node. It writes the resolved sessionID into rc.DefaultAgentSessionID so downstream agent nodes that don't override session: themselves inherit it, and (when a pool is wired) calls Pool.EnsureSession to materialize the sidebar row immediately.
No subprocess is spawned here — pool.Send is what triggers the actual spawn, and that happens lazily on the first agent node downstream. session_init's only side effect is "establish the sessionID + sidebar entry early so the user sees it before the first send latency hits."
func NewSessionInitExecutor ¶
func NewSessionInitExecutor(p *pool.Pool) *SessionInitExecutor
NewSessionInitExecutor builds the executor. Pool may be nil when the workflow runtime is wired without an agent pool (tests, headless MCP); in that case the sessionID is still resolved + stored on RunContext, the pool side effect is skipped.
func (*SessionInitExecutor) Execute ¶
func (e *SessionInitExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute resolves the sessionID, mutates RunContext, and ensures the pool session record exists.
type ShellExecutor ¶
type ShellExecutor struct{}
ShellExecutor runs a process and captures stdout/stderr/exit_code. parse_output: raw (default) | json | lines.
func NewShellExecutor ¶
func NewShellExecutor() *ShellExecutor
NewShellExecutor constructs the shell executor.
func (*ShellExecutor) Descriptor ¶
func (e *ShellExecutor) Descriptor() engine.NodeDescriptor
func (*ShellExecutor) Execute ¶
func (e *ShellExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute runs the shell command described by node n.
type SwitchExecutor ¶
type SwitchExecutor struct{}
SwitchExecutor evaluates each rule's `when` expression in order; the first rule that returns true sets Verdict to its `case` label. Falls back to DefaultCase when nothing matches. Same matching semantics as `branch` (binary ops or truthy string).
func NewSwitchExecutor ¶
func NewSwitchExecutor() *SwitchExecutor
NewSwitchExecutor wires the executor.
func (*SwitchExecutor) Descriptor ¶
func (e *SwitchExecutor) Descriptor() engine.NodeDescriptor
Descriptor exposes schema + docs for the MCP catalog.
func (*SwitchExecutor) Execute ¶
func (e *SwitchExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute walks cases in order; returns the first match's Case label as Verdict. When no rule wins and DefaultCase is set, that label is emitted. Empty DefaultCase with no match = error (fail closed) so the workflow halts instead of silently dropping the run.
type TransformExecutor ¶
type TransformExecutor struct{}
TransformExecutor runs an in-process transform on an input value.
- gotemplate (default) — Go template render
- jsonpath — minimal walker (placeholder)
- jq — not implemented in this build
func NewTransformExecutor ¶
func NewTransformExecutor() *TransformExecutor
NewTransformExecutor builds the transform executor.
func (*TransformExecutor) Descriptor ¶
func (e *TransformExecutor) Descriptor() wfengine.NodeDescriptor
func (*TransformExecutor) Execute ¶
func (e *TransformExecutor) Execute(ctx context.Context, n workflow.Node, rc *workflow.RunContext) (workflow.NodeOutput, error)
Execute runs the transform described by node n.