engine

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package engine — multi-subscriber event broker.

Engine.OnEvent was a single-slot callback owned by the SSE handler. As more consumers want the live stream (workflow_watch long-poll, future Loki sink, in-process metrics), the slot becomes a bottleneck — only one consumer wins, the rest miss events.

The broker keeps Engine.OnEvent intact for backward compatibility (SSE setup still writes to it) and adds an internal subscriber list fired in parallel with the legacy hook. Subscribers receive a copy of (id, runID, ev) per published event; the broker never blocks on a slow subscriber — it skips writes that would block more than a few millis so a hung consumer can't stall the engine event loop.

Package engine — optional declarer interfaces that let executors describe their per-node dependencies and templateable fields without the consumer (workflow_describe, validate, simulate) needing to hardcode a switch on NodeType.

Both interfaces are OPTIONAL — executors that don't implement them keep working unchanged. workflow_describe falls back to a generic reflection-based scan that covers the common shapes (Prompt, URL, Body, Expr, Input, Expression, SQL, plus Args map values).

Implement when:

  • your node touches an external surface that wick should surface in dependency listings (e.g. a `google_sheet` node should declare the spreadsheet ID it reads as a dep), OR

  • your node carries templateable strings on fields the generic scan won't reach (custom field names, nested struct fields, fields outside the wf.Node common pool).

The wickdocs.Docs already covers documentation; declarers cover the runtime metadata callers need to walk a workflow without parsing YAML themselves.

Package engine walks a workflow graph and executes nodes via the registered Executors. One Engine instance can run many workflows concurrently — per-workflow FIFO queuing lives in package trigger.

Index

Constants

View Source
const (
	DepKindChannel   = "channel"
	DepKindConnector = "connector"
	DepKindProvider  = "provider"
	DepKindDataset   = "dataset"
	DepKindEnv       = "env"
	DepKindSecret    = "secret"
	DepKindWebhook   = "webhook"
	DepKindSheet     = "sheet"
	DepKindHTTP      = "http"
	DepKindFile      = "file"
)

Canonical dependency kinds. Use these when applicable; emit a custom kind string only when none of these fit.

Variables

This section is empty.

Functions

func NewRunID

func NewRunID() string

NewRunID returns a fresh run id. Plain UUID — chronological ordering for the Runs panel comes from the sharded index file (`runs/index/<date>-<seq>.jsonl`), so the per-run dir name doesn't need to encode time anymore.

func SortedKeys

func SortedKeys[V any](m map[string]V) []string

SortedKeys is a small util for stable test output (kept here so we don't need a util pkg).

Types

type DependencyDeclarer

type DependencyDeclarer interface {
	Dependencies(n workflow.Node) []NodeDependency
}

DependencyDeclarer is implemented by executors that want to surface their per-node dependencies through workflow_describe.

Called once per Node when describing a workflow; the implementation inspects n.<fields> to decide what the node actually touches. Return nil for nodes whose runtime is pure / has no external dep.

type Describer

type Describer interface {
	Descriptor() NodeDescriptor
}

Describer is implemented by executors that want to expose schema + docs to the MCP catalog. Optional — executor that does not implement gets a bare descriptor with no schema.

type Engine

type Engine struct {
	Layout      config.Layout
	Service     service.Service
	StateStore  state.Store
	Executors   map[workflow.NodeType]workflow.Executor
	Descriptors map[workflow.NodeType]NodeDescriptor
	// Triggers carries the per-trigger-type descriptors (schema + docs).
	// MCP `workflow_node_detail` resolves `trigger:<type>` against this
	// registry. Seeded by setup; zero value = no triggers discoverable.
	Triggers *TriggerRegistry
	Now      func() time.Time
	IDGen    func() string
	// OnEvent is the legacy single-slot broadcast hook (SSE owns it).
	// Newer subscribers should use Engine.Subscribe — that registers
	// against a fan-out broker. Both fire per event; legacy first,
	// broker second.
	OnEvent func(id, runID string, ev workflow.RunEvent)
	// contains filtered or unexported fields
}

func New

func New(layout config.Layout, svc service.Service, ss state.Store) *Engine

New builds a bare engine — no executors registered. Caller must Register at least the node types the workflow uses.

The Triggers registry is pre-seeded with DefaultTriggerDescriptors so MCP discovery surfaces the canonical trigger types out of the box. Channel setup code overrides the entry for `trigger:channel` to attach channel-specific Docs (multi-trigger routing quirk etc.).

func (*Engine) PublishForTest

func (e *Engine) PublishForTest(id, runID string, ev workflow.RunEvent)

PublishForTest publishes one event to the broker without going through the engine's emit pipeline. Test-only — wires into the same fan-out code path real events use so subscriber assertions stay realistic.

func (*Engine) Register

func (e *Engine) Register(t workflow.NodeType, ex workflow.Executor)

Register attaches an executor for a node type. If the executor implements Describer, its Descriptor is captured so MCP catalog auto-reflects schema.

func (*Engine) RegisterWithDesc

func (e *Engine) RegisterWithDesc(t workflow.NodeType, ex workflow.Executor, desc NodeDescriptor)

RegisterWithDesc attaches an executor + explicit descriptor. Used when the same executor instance serves multiple node types (e.g. dataset executor handles 7 types).

func (*Engine) Run

Run starts a fresh run. Blocking; returns the final state.

Caller may pre-assign a run ID via evt.Payload["run_id"] so the HTTP handler can return the ID to the browser before Run starts — letting the client SSE-subscribe in time to catch the first events. Falls back to IDGen when the payload doesn't supply one.

func (*Engine) SetEventHook

func (e *Engine) SetEventHook(fn func(id, runID string, ev workflow.RunEvent))

SetEventHook installs the broadcast callback. Fires after each StateStore.AppendEvent so callers see persistent + ephemeral payloads in lockstep.

func (*Engine) Subscribe

func (e *Engine) Subscribe(bufferSize int) *Subscription

Subscribe returns a new live subscription. bufferSize is the channel buffer (recommend 32; high enough for small bursts, low enough that a runaway subscription is loud). Caller MUST defer sub.Cancel() — otherwise the subscriber leaks until Engine shutdown.

type NodeDependency

type NodeDependency struct {
	Kind    string         `json:"kind"`
	Ref     string         `json:"ref"`
	Details map[string]any `json:"details,omitempty"`
}

NodeDependency is one external surface a node touches at runtime. Kind is a short tag the consumer uses to bucket entries; Ref is the human-readable identifier (channel name, connector module.op pair, provider name, sheet ID, …). Optional Details map carries kind-specific extras the consumer may want — keep it small.

type NodeDescriptor

type NodeDescriptor struct {
	Type        workflow.NodeType
	Description string
	WhenToUse   string
	Example     string
	Schema      map[string]any    // reflected from per-node schema struct
	Output      map[string]string // field → description
	wickdocs.Docs
}

Engine walks a workflow graph and dispatches each node to its Executor. Caller wires concrete executors via Register; the engine stays decoupled from individual node impls.

OnEvent is an optional broadcast hook fired after every event hits the StateStore. The UI subscribes via SSE to paint per-node progress without polling state.json. Set via SetEventHook so existing callers stay source-compatible. NodeDescriptor bundles the schema + docs for a node type. Populated via RegisterWithDesc — single source of truth lives in the executor file itself (nodes/<type>.go Descriptor method).

Docs carries the opt-in self-documentation contract (quirks, examples, templateable fields, pair-with, common pitfalls) projected by the MCP `workflow_node_detail` op. Zero-value Docs = current behaviour; populate per executor when worth it. See internal/agents/workflow/docs and internal/docs/workflow/24-describe-contract.md.

type SubEvent

type SubEvent struct {
	WorkflowID string
	RunID      string
	Event      workflow.RunEvent
}

SubEvent is the payload one subscriber receives.

type Subscription

type Subscription struct {
	Ch chan SubEvent
	// contains filtered or unexported fields
}

Subscription is the per-call subscription handle returned by Engine.Subscribe. Call Cancel() to unregister; safe to call multiple times. Receive events via the Ch channel — buffered to avoid losing events on routine producers, but bounded so a stalled subscriber doesn't pile memory.

func (*Subscription) Cancel

func (s *Subscription) Cancel()

Cancel unregisters the subscription. Safe to call multiple times. After Cancel the broker stops sending to Ch; existing buffered events remain readable until the channel is drained, then a subsequent receive returns the zero value.

type TemplateableFieldsDeclarer

type TemplateableFieldsDeclarer interface {
	TemplateableFields(n workflow.Node) map[string]string
}

TemplateableFieldsDeclarer is implemented by executors whose node type carries templateable strings on fields beyond the common pool scanned by default (Prompt, PromptFile, URL, Body, Expr, Input, Expression, SQL).

Return name → value pairs for each templateable string field present on this node so the cross-ref scan in workflow_describe can find {{.Node.X}} references on custom fields. The `name` is the field label surfaced in the issue path (e.g. "code", "sheet_range") — pick labels that match the wick:"key=..." tag for the field when one exists.

type TriggerDescriptor

type TriggerDescriptor struct {
	Type        workflow.TriggerType
	Description string
	Schema      map[string]any
	Example     string
	wickdocs.Docs
}

TriggerDescriptor bundles the schema + docs for a trigger type (cron / channel / webhook / manual / schedule_at / error). One descriptor per workflow.TriggerType — the MCP `workflow_node_detail` op resolves a `trigger:<type>` key to this struct and projects it to the unified detail response.

Schema is a JSON-Schema-ish map of the fields a trigger entry carries on workflow.Trigger; Example is a copy-pasteable YAML block. Docs is the opt-in self-documentation bundle (examples, quirks, templateable fields, pair-with, common pitfalls).

Unlike NodeDescriptor (auto-populated by Engine.Register from the executor's Describer method), TriggerDescriptor is hand-registered at setup time because triggers don't have executors — they're dispatch keys handled by the router. The default catalog lives in engine.DefaultTriggerDescriptors() so callers can choose between registering the canonical set or overriding individual entries.

func DefaultTriggerDescriptors

func DefaultTriggerDescriptors() []TriggerDescriptor

DefaultTriggerDescriptors returns the canonical set of trigger type descriptors wick ships with. Callers seed the registry at boot via `reg.RegisterMany(engine.DefaultTriggerDescriptors()...)`, then optionally override individual entries to attach `Docs` (e.g. the channel package decorates `trigger:channel` with the per-trigger entry-node multi-trigger quirk).

type TriggerRegistry

type TriggerRegistry struct {
	// contains filtered or unexported fields
}

TriggerRegistry holds every TriggerDescriptor wired into one engine. Concurrent-safe — registration happens at boot, reads happen for every MCP discovery call.

func NewTriggerRegistry

func NewTriggerRegistry() *TriggerRegistry

NewTriggerRegistry constructs an empty registry. Callers usually follow with RegisterMany(DefaultTriggerDescriptors()...) to seed the canonical set, then override specific entries to attach Docs.

func (*TriggerRegistry) Get

Get returns the descriptor for a type, or (zero, false) when not registered. Used by `workflow_node_detail` to resolve `trigger:<t>`.

func (*TriggerRegistry) List

func (r *TriggerRegistry) List() []TriggerDescriptor

List returns a snapshot of every registered descriptor, sorted by type so MCP responses are deterministic.

func (*TriggerRegistry) Register

func (r *TriggerRegistry) Register(d TriggerDescriptor)

Register adds (or replaces) one descriptor. Replacing is allowed so per-channel setup code can attach Docs to a default entry without rebuilding the entire catalog.

func (*TriggerRegistry) RegisterMany

func (r *TriggerRegistry) RegisterMany(ds ...TriggerDescriptor)

RegisterMany is a convenience wrapper for seeding the registry from a slice (typical at boot).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL