trigger

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package trigger is the dispatch + queue + dedup layer between trigger sources (cron, channel adapter, webhook, manual, error) and the engine. One Router instance per Engine.

Package trigger implements the workflow router: it receives workflow.Event values from channels / cron / webhooks, finds the workflows subscribed to that event, and enqueues a run per match.

The router itself stays channel-agnostic. Two design choices underline that:

  1. Trigger index — at Register time, each workflow's triggers declare their route key(s) ("channel/slack/message", "cron", "webhook"). Dispatch builds the event's route keys and looks up the index in O(1) instead of iterating every workflow.

  2. No match DSL in the router — workflow YAML's `match:` map is not evaluated here. Workflows filter inside the graph (branch / transform / shell / classify) which keeps the engine free of channel-specific match grammars and lets operators express arbitrary filter logic.

Trigger-level checks the router still does: channel + event name (via route key), webhook path + method, error source workflow + severity. Everything that needs the payload contents happens in the graph.

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueFull = errors.New("workflow queue full (reject policy)")

ErrQueueFull is returned when overflow=reject and the queue is full.

View Source
var StopTimeout = 30 * time.Second

Stop unregisters all and waits for workers to drain. StopTimeout is the deadline given to in-flight workers during Stop. Overridable in tests.

Functions

func MatchTrigger

func MatchTrigger(tr workflow.Trigger, evt workflow.Event) bool

MatchTrigger is kept for backward-compatible test ergonomics — it runs the router-side checks (channel name, event subtype, webhook path/method, error source) against a single trigger.

func PathMatches

func PathMatches(tmpl, got string) bool

PathMatches compares a trigger path template against an actual request path. Supports `{param}` segments.

func VerifyHMAC

func VerifyHMAC(body []byte, secret, want string) bool

VerifyHMAC computes SHA-256 HMAC and constant-time compares.

Types

type CronScheduler

type CronScheduler struct {
	// contains filtered or unexported fields
}

CronScheduler ticks every minute and fires workflow cron triggers whose schedule matches the current minute. Registered workflows are kept in sync with the Router — call Sync(workflows) whenever a workflow is registered, hot-reloaded, or unregistered.

Standalone from internal/jobs/* (which is row-backed admin-toggleable for per-tool jobs). Workflow cron lives in the same in-process trigger plane as channel events + webhooks so the whole subsystem drains through one Router queue.

func NewCronScheduler

func NewCronScheduler(r *Router) *CronScheduler

NewCronScheduler wires a scheduler against a Router.

func (*CronScheduler) Start

func (c *CronScheduler) Start(ctx context.Context)

Start launches the minute-tick goroutine. Cancel ctx to stop.

func (*CronScheduler) Stop

func (c *CronScheduler) Stop()

Stop signals the loop to exit and waits.

func (*CronScheduler) Sync

func (c *CronScheduler) Sync(id string, w workflow.Workflow)

Sync replaces the cron entries for one id. Pass an empty slice to drop all cron schedules for that id. Idempotent.

func (*CronScheduler) Unsync

func (c *CronScheduler) Unsync(id string)

Unsync removes cron entries for id.

type Dedup

type Dedup struct {
	// contains filtered or unexported fields
}

Dedup tracks already-seen (channel, event_id) pairs with LRU + TTL semantics.

func NewDedup

func NewDedup(capacity int, ttl time.Duration) *Dedup

NewDedup constructs a dedup with the given capacity + TTL.

func (*Dedup) Len

func (d *Dedup) Len() int

Len returns current entry count.

func (*Dedup) Seen

func (d *Dedup) Seen(key string) bool

Seen reports whether key was inserted recently and not yet expired. Inserts key on miss, refreshes timestamp on hit.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a per-workflow FIFO with overflow policy.

func NewQueue

func NewQueue(maxSize int, onOverflow string) *Queue

NewQueue constructs a queue. maxSize=0 means unbounded.

func (*Queue) Close

func (q *Queue) Close()

Close prevents future enqueues and unblocks Dequeue waiters.

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context) (WorkItem, bool)

Dequeue blocks until an item is available or ctx cancels.

func (*Queue) Enqueue

func (q *Queue) Enqueue(it WorkItem) error

Enqueue adds an item.

func (*Queue) Len

func (q *Queue) Len() int

Len returns current queue depth.

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(e *engine.Engine, svc service.Service) *Router

NewRouter wires a Router to an Engine + Service.

func (*Router) Dispatch

func (r *Router) Dispatch(ctx context.Context, evt workflow.Event) int

Dispatch routes an event to every subscribed workflow.

Pipeline:

  1. Build the event's route keys (specific + wildcards).
  2. Look each up in the trigger index, union the resulting triggerRefs, dedup by workflow id so a workflow with multiple matching triggers still enqueues once.
  3. For each candidate, run the cheap router-side checks that need the raw trigger (webhook path/method, error source, target), then dedup the event, then enqueue.

Returns the number of workflows that accepted the event.

func (*Router) Register

func (r *Router) Register(ctx context.Context, w workflow.Workflow)

Register adds a workflow to the router and spawns its worker goroutine.

func (*Router) RunNow

func (r *Router) RunNow(ctx context.Context, id string, evt workflow.Event) error

RunNow enqueues a manual run for one explicit id, bypassing Enabled + trigger-match checks. Used by the UI Run-Now button so admins can fire a disabled workflow (e.g. dry-run before enable) without going through the Dispatch matcher.

Returns an error if the workflow isn't registered with the router (caller should HotReload first).

func (*Router) RunNowWith

func (r *Router) RunNowWith(ctx context.Context, id string, w *workflow.Workflow, evt workflow.Event) error

RunNowWith is RunNow with an explicit workflow override. Pass a non-nil `w` to execute that exact definition (typically the draft loaded from disk) instead of the published copy registered in Router.defs. The router still owns the per-id queue + worker machinery — the override only affects which Workflow value the engine receives.

When `w` is nil, behaviour is identical to RunNow.

func (*Router) Stop

func (r *Router) Stop()

func (*Router) Unregister

func (r *Router) Unregister(id string)

Unregister stops the worker for id and frees its queue.

func (*Router) WebhookSecretFor

func (r *Router) WebhookSecretFor(reqPath string) (secretRef string, found bool)

WebhookSecretFor returns the SecretRef for the incoming reqPath. Lookup is O(1) via webhookIndex: exact match first, then wildcard "*". Used by WebhookHandler to verify HMAC before dispatching.

type RunResult

type RunResult struct {
	State workflow.RunState
	Err   error
}

RunResult is delivered back via WorkItem.Done.

type ScheduleAtScheduler

type ScheduleAtScheduler struct {

	// RemoveFn is called (id, triggerID) when a delete_after trigger fires.
	// Optional — pass nil to skip the post-fire cleanup.
	RemoveFn func(id, triggerID string)
	// contains filtered or unexported fields
}

ScheduleAtScheduler fires one-shot triggers whose type is schedule_at. Each trigger has an At time.Time field; the scheduler arms a timer for every trigger that hasn't fired yet (At is in the future at Sync time).

Unlike CronScheduler, timers are one-shot: after firing they are removed. If DeleteAfter is set on the trigger, the scheduler also calls the optional RemoveFn so the workflow service can drop the trigger from YAML.

Wired alongside CronScheduler in Manager.Start / HotReload / Bootstrap.

func NewScheduleAtScheduler

func NewScheduleAtScheduler(r *Router) *ScheduleAtScheduler

NewScheduleAtScheduler wires a scheduler against a Router.

func (*ScheduleAtScheduler) Sync

Sync reconciles all schedule_at triggers for one workflow id. Call on Bootstrap and HotReload. Cancels stale timers for triggers that were removed or already have an At in the past.

func (*ScheduleAtScheduler) Unsync

func (s *ScheduleAtScheduler) Unsync(id string)

Unsync cancels all pending timers for id (called on workflow delete).

type WebhookHandler

type WebhookHandler struct {
	Router       *Router
	SecretLookup func(secretRef string) (string, error)
}

WebhookHandler turns inbound HTTP POSTs into Events and dispatches them via the Router. Mount at `/hooks/` on the wick HTTP server.

func NewWebhookHandler

func NewWebhookHandler(r *Router) *WebhookHandler

NewWebhookHandler builds a handler.

func (*WebhookHandler) ServeHTTP

func (h *WebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP parses the request and dispatches.

type WorkItem

type WorkItem struct {
	ID       string
	Event    workflow.Event
	Workflow *workflow.Workflow
	Done     chan RunResult
}

WorkItem is one queued run request.

Workflow is an optional override: when set, the worker runs THIS workflow definition instead of the one registered in Router.defs[id]. The UI's Run Now path uses this to execute a freshly-loaded DRAFT (workflow.draft.yaml) without first publishing it — Router.defs only ever holds the published copy so triggers (cron, channel, webhook) keep firing the live version while the user iterates on the draft.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL