nodes

package
v0.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package nodes contains the concrete Executor impls for every Node type. Each constructor (NewXExecutor) returns a workflow.Executor the engine registers via Engine.Register.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DataTableDescriptor added in v0.13.1

func DataTableDescriptor(t workflow.NodeType) engine.NodeDescriptor

DataTableDescriptor returns the descriptor for one datatable_* node type. Used by setup/manager.go RegisterWithDesc since one executor handles all 7 datatable types.

func DefaultRunSessionID

func DefaultRunSessionID(id, runID string) string

DefaultRunSessionID is the engine fallback when neither a session_init node nor a per-node override is set. Format is "wf_<id>_run_<runID>" — underscores keep the string inside the sessionID charset (no colon; the storage validator at internal/agents/storage/validate.go limits the alphabet to `[A-Za-z0-9._-]`).

Types

type AgentEvent

type AgentEvent struct {
	Type string
	Data string
}

AgentEvent is the minimal event shape the agent executor consumes while waiting for a turn to complete. Defined here (not imported from tools/agents) so the workflow package stays free of a cycle on the UI broadcaster.

Type values mirror agents/event.EventType.String() output: "text_delta", "tool_use", "tool_result", "done", "error", "thinking", "session_start". Anything else is ignored by the executor.

type AgentExecutor

type AgentExecutor struct {
	Providers *provider.Registry
	Pool      *pool.Pool
	Subscribe AgentSubscribeFn
}

AgentExecutor invokes a provider's AgentCall for a `type: agent` node. When Pool + Subscribe are wired and the resolved provider can route via pool, the executor enqueues through the agent pool (queue FIFO, session reuse, sidebar visibility); otherwise it falls back to the non-pool provider path for codex/gemini.

func NewAgentExecutor

func NewAgentExecutor(reg *provider.Registry, p *pool.Pool, sub AgentSubscribeFn) *AgentExecutor

NewAgentExecutor wires the executor. Pool + Subscribe may be nil for non-claude-only test setups; in that case all agent calls go through provider.AgentCall (the non-pool path).

func (*AgentExecutor) Dependencies

func (e *AgentExecutor) Dependencies(n workflow.Node) []engine.NodeDependency

Dependencies surfaces provider name + each declared skill so workflow_describe shows the impact surface of the agent node.

func (*AgentExecutor) Descriptor

func (e *AgentExecutor) Descriptor() engine.NodeDescriptor

func (*AgentExecutor) Execute

Execute runs the agent node. Routes via pool when configured.

type AgentSubscribeFn

type AgentSubscribeFn func(sessionID string) (<-chan AgentEvent, func())

AgentSubscribeFn returns a receive channel of AgentEvents for one sessionID plus an unsub function. The executor subscribes before dispatching the pool send so no leading event is lost. Setup wires a concrete adapter around tools/agents.Broadcaster.

type BranchExecutor

type BranchExecutor struct{}

BranchExecutor evaluates a Go-template expression and exposes the result as Verdict so the engine filters outgoing edges by `case:`.

func NewBranchExecutor

func NewBranchExecutor() *BranchExecutor

NewBranchExecutor constructs the branch executor.

func (*BranchExecutor) Descriptor

func (e *BranchExecutor) Descriptor() engine.NodeDescriptor

func (*BranchExecutor) Execute

Execute renders n.Expr; if it contains a binary operator, treats as boolean compare → "true"/"false". Otherwise the rendered string IS the verdict (string switch).

type ChannelExecutor

type ChannelExecutor struct {
	Registry *integration.Registry
}

ChannelExecutor dispatches `type: channel` action nodes through the integration registry. Each registered (channel, action) pair describes its own input schema and Execute closure, so this executor is just glue: render args → look up descriptor → call Execute.

Adding a new outbound op = drop a file under internal/agents/channels/<name>/workflow/ that registers an ActionDescriptor. No engine change required.

func NewChannelExecutor

func NewChannelExecutor(reg *integration.Registry) *ChannelExecutor

NewChannelExecutor wires the executor to the integration registry.

func (*ChannelExecutor) Dependencies

func (e *ChannelExecutor) Dependencies(n workflow.Node) []engine.NodeDependency

Dependencies surfaces (channel, action) pairs through workflow_describe so impact analysis sees the exact op the node invokes — not just the channel module name.

func (*ChannelExecutor) Descriptor

func (e *ChannelExecutor) Descriptor() engine.NodeDescriptor

func (*ChannelExecutor) Execute

Execute renders the node's args, resolves the descriptor by "<channel>.<op>", and dispatches.

func (*ChannelExecutor) TemplateableFields

func (e *ChannelExecutor) TemplateableFields(n workflow.Node) map[string]string

TemplateableFields surfaces the per-arg values so describe's template cross-ref scan reaches them. Each Args entry is exposed as args.<key> in issue paths so the warning shows which arg referenced an undeclared node.

type ClassifyExecutor

type ClassifyExecutor struct {
	Providers *provider.Registry
}

ClassifyExecutor implements the 6-layer reliability stack:

  1. structured_output (defer to provider.StructuredCall)
  2. normalize — lowercase, trim, strip punct/quotes
  3. exact match — verdict ∈ output_cases?
  4. fuzzy_match — Levenshtein/substring against cases
  5. retry_on_mismatch — stricter system prompt, re-ask
  6. confidence_threshold — < threshold → default

func NewClassifyExecutor

func NewClassifyExecutor(reg *provider.Registry) *ClassifyExecutor

NewClassifyExecutor wires the executor.

func (*ClassifyExecutor) Dependencies

func (e *ClassifyExecutor) Dependencies(n workflow.Node) []engine.NodeDependency

Dependencies surfaces the provider name to workflow_describe.

func (*ClassifyExecutor) Descriptor

func (e *ClassifyExecutor) Descriptor() engine.NodeDescriptor

func (*ClassifyExecutor) Execute

Execute runs the classify node.

type ConnectorExecutor

type ConnectorExecutor struct {
	Registry *connector.Registry
}

ConnectorExecutor dispatches workflow connector nodes through the existing connector module ExecuteFunc.

func NewConnectorExecutor

func NewConnectorExecutor(reg *connector.Registry) *ConnectorExecutor

NewConnectorExecutor wires the executor.

func (*ConnectorExecutor) Dependencies

func (e *ConnectorExecutor) Dependencies(n workflow.Node) []engine.NodeDependency

Dependencies surfaces "<module>.<op>" pairs to workflow_describe.

func (*ConnectorExecutor) Descriptor

func (e *ConnectorExecutor) Descriptor() engine.NodeDescriptor

func (*ConnectorExecutor) Execute

Execute invokes the resolved (module, op) Execute func.

func (*ConnectorExecutor) TemplateableFields

func (e *ConnectorExecutor) TemplateableFields(n workflow.Node) map[string]string

TemplateableFields exposes each Args value as args.<key> for the describe scan.

type DBQueryExecutor

type DBQueryExecutor struct{}

DBQueryExecutor runs a parameterized SQL query against a named database. The Database field resolves to an env key whose value is the DSN. Supported schemes: postgres://, sqlite:, file:.

Output fields:

  • rows — []map[string]any, one entry per row
  • row_count — int, same as len(rows)
  • columns — []string

func NewDBQueryExecutor

func NewDBQueryExecutor() *DBQueryExecutor

NewDBQueryExecutor builds the executor.

func (*DBQueryExecutor) Descriptor

func (e *DBQueryExecutor) Descriptor() engine.NodeDescriptor

func (*DBQueryExecutor) Execute

Execute connects, runs the SQL, and returns rows.

type DataTableExecutor added in v0.13.1

type DataTableExecutor struct {
	Service datatable.Service
}

DataTableExecutor handles all 7 datatable_* node types.

func NewDataTableExecutor added in v0.13.1

func NewDataTableExecutor(svc datatable.Service) *DataTableExecutor

NewDataTableExecutor wires the executor.

func (*DataTableExecutor) Execute added in v0.13.1

Execute dispatches per node.Type.

type EndExecutor

type EndExecutor struct{}

EndExecutor is the terminator. Captures n.Result so downstream {{.Run.final_result}}-style reads can pick it up.

func NewEndExecutor

func NewEndExecutor() *EndExecutor

NewEndExecutor builds the end executor.

func (*EndExecutor) Descriptor

func (e *EndExecutor) Descriptor() engine.NodeDescriptor

func (*EndExecutor) Execute

Execute renders n.Result if set.

type GoScriptExecutor

type GoScriptExecutor struct{}

GoScriptExecutor runs a Go program inside the yaegi interpreter and pipes RenderCtx JSON to its stdin / parses JSON back from its stdout. No external toolchain required — yaegi ships with stdlib symbols (go1.21 surface), so the script can import anything the stdlib bundle covers (encoding/json, strings, time, regexp, math, net/url, …).

Contract:

Input  : os.Stdin = JSON-encoded RenderCtx
         (keys: Event, Node, Env, Secret, Workflow, Run, DataTable)
Output : os.Stdout = JSON value of any shape; engine parses it and
         exposes as Node.<id>.result + merges top-level keys when
         the value is a JSON object so {{.Node.<id>.foo}} works.
         os.Stderr = free-form logs; surfaced as
         Node.<id>.stderr without further parsing.

Template pre-rendering: Code itself is rendered through the wick template engine before yaegi sees it. Lets users splice values like `{{.Env.api_url}}` directly into source if they want — but the idiomatic path is to read from os.Stdin.

func NewGoScriptExecutor

func NewGoScriptExecutor() *GoScriptExecutor

NewGoScriptExecutor wires the executor.

func (*GoScriptExecutor) Descriptor

func (e *GoScriptExecutor) Descriptor() engine.NodeDescriptor

Descriptor exposes schema + docs for the MCP catalog.

func (*GoScriptExecutor) Execute

Execute renders code template, runs it under yaegi w/ stdin/stdout piped, and parses the stdout as JSON.

type GoScriptSchema

type GoScriptSchema struct {
	Code    string `` /* 249-byte string literal not displayed */
	Timeout string `wick:"key=timeout_sec;number;desc=Script timeout in seconds (default 10)."`
}

GoScriptSchema reflects the inspector form. The Code field uses the textarea widget; UI swaps it for an Ace editor at hydrate time.

type HTTPExecutor

type HTTPExecutor struct {
	// contains filtered or unexported fields
}

HTTPExecutor performs an HTTP request. Retry policy from n.Retry; default GET; parse_response raw|json|bytes.

func NewHTTPExecutor

func NewHTTPExecutor() *HTTPExecutor

NewHTTPExecutor builds the HTTP executor with a 30s default client.

func (*HTTPExecutor) Dependencies

func (e *HTTPExecutor) Dependencies(n workflow.Node) []engine.NodeDependency

Dependencies surfaces the URL (or its host when easy to extract) as a generic http dependency so workflow_describe groups HTTP outbound under deps.other.http.

func (*HTTPExecutor) Descriptor

func (e *HTTPExecutor) Descriptor() engine.NodeDescriptor

Descriptor exposes the schema + docs for the MCP catalog.

func (*HTTPExecutor) Execute

Execute runs the request described by node n.

func (*HTTPExecutor) TemplateableFields

func (e *HTTPExecutor) TemplateableFields(n workflow.Node) map[string]string

TemplateableFields exposes per-header / per-query values to workflow_describe's cross-ref scan in addition to the generic pool (url + body live in the default set).

type HTTPSchema

type HTTPSchema struct {
	Method        string `wick:"required;key=method;dropdown=GET|POST|PUT|PATCH|DELETE;desc=HTTP method"`
	URL           string `` /* 144-byte string literal not displayed */
	Headers       string `` /* 194-byte string literal not displayed */
	Query         string `wick:"key=query;kvlist=name|value;desc=Query string params. Each value is rendered as a Go template."`
	Body          string `` /* 139-byte string literal not displayed */
	ParseResponse string `` /* 138-byte string literal not displayed */
	TimeoutSec    string `wick:"key=timeout_sec;number;desc=Request timeout in seconds (default 30)"`
}

HTTPSchema is the per-field schema reflected via integration.StructSchema for workflow_node_types. Single source of truth for AI consumers and the editor UI (the editor-side module reflects the same struct via entity.StructToConfigs to render ArgForm).

type SessionInitExecutor

type SessionInitExecutor struct {
	Pool *pool.Pool
}

SessionInitExecutor implements the `session_init` node. It writes the resolved sessionID into rc.DefaultAgentSessionID so downstream agent nodes that don't override session: themselves inherit it, and (when a pool is wired) calls Pool.EnsureSession to materialize the sidebar row immediately.

No subprocess is spawned here — pool.Send is what triggers the actual spawn, and that happens lazily on the first agent node downstream. session_init's only side effect is "establish the sessionID + sidebar entry early so the user sees it before the first send latency hits."

func NewSessionInitExecutor

func NewSessionInitExecutor(p *pool.Pool) *SessionInitExecutor

NewSessionInitExecutor builds the executor. Pool may be nil when the workflow runtime is wired without an agent pool (tests, headless MCP); in that case the sessionID is still resolved + stored on RunContext, the pool side effect is skipped.

func (*SessionInitExecutor) Execute

Execute resolves the sessionID, mutates RunContext, and ensures the pool session record exists.

type ShellExecutor

type ShellExecutor struct{}

ShellExecutor runs a process and captures stdout/stderr/exit_code. parse_output: raw (default) | json | lines.

func NewShellExecutor

func NewShellExecutor() *ShellExecutor

NewShellExecutor constructs the shell executor.

func (*ShellExecutor) Descriptor

func (e *ShellExecutor) Descriptor() engine.NodeDescriptor

func (*ShellExecutor) Execute

Execute runs the shell command described by node n.

type SwitchExecutor

type SwitchExecutor struct{}

SwitchExecutor evaluates each rule's `when` expression in order; the first rule that returns true sets Verdict to its `case` label. Falls back to DefaultCase when nothing matches. Same matching semantics as `branch` (binary ops or truthy string).

func NewSwitchExecutor

func NewSwitchExecutor() *SwitchExecutor

NewSwitchExecutor wires the executor.

func (*SwitchExecutor) Descriptor

func (e *SwitchExecutor) Descriptor() engine.NodeDescriptor

Descriptor exposes schema + docs for the MCP catalog.

func (*SwitchExecutor) Execute

Execute walks cases in order; returns the first match's Case label as Verdict. When no rule wins and DefaultCase is set, that label is emitted. Empty DefaultCase with no match = error (fail closed) so the workflow halts instead of silently dropping the run.

type TransformExecutor

type TransformExecutor struct{}

TransformExecutor runs an in-process transform on an input value.

  • gotemplate (default) — Go template render
  • jsonpath — minimal walker (placeholder)
  • jq — not implemented in this build

func NewTransformExecutor

func NewTransformExecutor() *TransformExecutor

NewTransformExecutor builds the transform executor.

func (*TransformExecutor) Descriptor

func (e *TransformExecutor) Descriptor() wfengine.NodeDescriptor

func (*TransformExecutor) Execute

Execute runs the transform described by node n.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL