Documentation
¶
Overview ¶
Package trigger is the dispatch + queue + dedup layer between trigger sources (cron, channel adapter, webhook, manual, error) and the engine. One Router instance per Engine.
Package trigger implements the workflow router: it receives workflow.Event values from channels / cron / webhooks, finds the workflows subscribed to that event, and enqueues a run per match.
The router itself stays channel-agnostic. Two design choices underline that:
Trigger index — at Register time, each workflow's triggers declare their route key(s) ("channel/slack/message", "cron", "webhook"). Dispatch builds the event's route keys and looks up the index in O(1) instead of iterating every workflow.
No match DSL in the router — workflow YAML's `match:` map is not evaluated here. Workflows filter inside the graph (branch / transform / shell / classify) which keeps the engine free of channel-specific match grammars and lets operators express arbitrary filter logic.
Trigger-level checks the router still does: channel + event name (via route key), webhook path + method, error source workflow + severity. Everything that needs the payload contents happens in the graph.
Index ¶
- Variables
- func MatchTrigger(tr workflow.Trigger, evt workflow.Event) bool
- func PathMatches(tmpl, got string) bool
- func VerifyHMAC(body []byte, secret, want string) bool
- type CronScheduler
- type Dedup
- type Queue
- type Router
- func (r *Router) Dispatch(ctx context.Context, evt workflow.Event) int
- func (r *Router) Register(ctx context.Context, w workflow.Workflow)
- func (r *Router) RunNow(ctx context.Context, id string, evt workflow.Event) error
- func (r *Router) RunNowWith(ctx context.Context, id string, w *workflow.Workflow, evt workflow.Event) error
- func (r *Router) Stop()
- func (r *Router) Unregister(id string)
- func (r *Router) WebhookSecretFor(reqPath string) (secretRef string, found bool)
- type RunResult
- type ScheduleAtScheduler
- type WebhookHandler
- type WorkItem
Constants ¶
This section is empty.
Variables ¶
var ErrQueueFull = errors.New("workflow queue full (reject policy)")
ErrQueueFull is returned when overflow=reject and the queue is full.
var StopTimeout = 30 * time.Second
Stop unregisters all and waits for workers to drain. StopTimeout is the deadline given to in-flight workers during Stop. Overridable in tests.
Functions ¶
func MatchTrigger ¶
MatchTrigger is kept for backward-compatible test ergonomics — it runs the router-side checks (channel name, event subtype, webhook path/method, error source) against a single trigger.
func PathMatches ¶
PathMatches compares a trigger path template against an actual request path. Supports `{param}` segments.
func VerifyHMAC ¶
VerifyHMAC computes SHA-256 HMAC and constant-time compares.
Types ¶
type CronScheduler ¶
type CronScheduler struct {
// contains filtered or unexported fields
}
CronScheduler ticks every minute and fires workflow cron triggers whose schedule matches the current minute. Registered workflows are kept in sync with the Router — call Sync(workflows) whenever a workflow is registered, hot-reloaded, or unregistered.
Standalone from internal/jobs/* (which is row-backed admin-toggleable for per-tool jobs). Workflow cron lives in the same in-process trigger plane as channel events + webhooks so the whole subsystem drains through one Router queue.
func NewCronScheduler ¶
func NewCronScheduler(r *Router) *CronScheduler
NewCronScheduler wires a scheduler against a Router.
func (*CronScheduler) Start ¶
func (c *CronScheduler) Start(ctx context.Context)
Start launches the minute-tick goroutine. Cancel ctx to stop.
func (*CronScheduler) Stop ¶
func (c *CronScheduler) Stop()
Stop signals the loop to exit and waits.
func (*CronScheduler) Sync ¶
func (c *CronScheduler) Sync(id string, w workflow.Workflow)
Sync replaces the cron entries for one id. Pass an empty slice to drop all cron schedules for that id. Idempotent.
func (*CronScheduler) Unsync ¶
func (c *CronScheduler) Unsync(id string)
Unsync removes cron entries for id.
type Dedup ¶
type Dedup struct {
// contains filtered or unexported fields
}
Dedup tracks already-seen (channel, event_id) pairs with LRU + TTL semantics.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a per-workflow FIFO with overflow policy.
func (*Queue) Close ¶
func (q *Queue) Close()
Close prevents future enqueues and unblocks Dequeue waiters.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
func (*Router) Dispatch ¶
Dispatch routes an event to every subscribed workflow.
Pipeline:
- Build the event's route keys (specific + wildcards).
- Look each up in the trigger index, union the resulting triggerRefs, dedup by workflow id so a workflow with multiple matching triggers still enqueues once.
- For each candidate, run the cheap router-side checks that need the raw trigger (webhook path/method, error source, target), then dedup the event, then enqueue.
Returns the number of workflows that accepted the event.
func (*Router) RunNow ¶
RunNow enqueues a manual run for one explicit id, bypassing Enabled + trigger-match checks. Used by the UI Run-Now button so admins can fire a disabled workflow (e.g. dry-run before enable) without going through the Dispatch matcher.
Returns an error if the workflow isn't registered with the router (caller should HotReload first).
func (*Router) RunNowWith ¶
func (r *Router) RunNowWith(ctx context.Context, id string, w *workflow.Workflow, evt workflow.Event) error
RunNowWith is RunNow with an explicit workflow override. Pass a non-nil `w` to execute that exact definition (typically the draft loaded from disk) instead of the published copy registered in Router.defs. The router still owns the per-id queue + worker machinery — the override only affects which Workflow value the engine receives.
When `w` is nil, behaviour is identical to RunNow.
func (*Router) Unregister ¶
Unregister stops the worker for id and frees its queue.
func (*Router) WebhookSecretFor ¶
WebhookSecretFor returns the SecretRef for the incoming reqPath. Lookup is O(1) via webhookIndex: exact match first, then wildcard "*". Used by WebhookHandler to verify HMAC before dispatching.
type ScheduleAtScheduler ¶
type ScheduleAtScheduler struct {
// RemoveFn is called (id, triggerID) when a delete_after trigger fires.
// Optional — pass nil to skip the post-fire cleanup.
RemoveFn func(id, triggerID string)
// contains filtered or unexported fields
}
ScheduleAtScheduler fires one-shot triggers whose type is schedule_at. Each trigger has an At time.Time field; the scheduler arms a timer for every trigger that hasn't fired yet (At is in the future at Sync time).
Unlike CronScheduler, timers are one-shot: after firing they are removed. If DeleteAfter is set on the trigger, the scheduler also calls the optional RemoveFn so the workflow service can drop the trigger from YAML.
Wired alongside CronScheduler in Manager.Start / HotReload / Bootstrap.
func NewScheduleAtScheduler ¶
func NewScheduleAtScheduler(r *Router) *ScheduleAtScheduler
NewScheduleAtScheduler wires a scheduler against a Router.
func (*ScheduleAtScheduler) Sync ¶
func (s *ScheduleAtScheduler) Sync(id string, w workflow.Workflow)
Sync reconciles all schedule_at triggers for one workflow id. Call on Bootstrap and HotReload. Cancels stale timers for triggers that were removed or already have an At in the past.
func (*ScheduleAtScheduler) Unsync ¶
func (s *ScheduleAtScheduler) Unsync(id string)
Unsync cancels all pending timers for id (called on workflow delete).
type WebhookHandler ¶
WebhookHandler turns inbound HTTP POSTs into Events and dispatches them via the Router. Mount at `/hooks/` on the wick HTTP server.
func NewWebhookHandler ¶
func NewWebhookHandler(r *Router) *WebhookHandler
NewWebhookHandler builds a handler.
func (*WebhookHandler) ServeHTTP ¶
func (h *WebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP parses the request and dispatches.
type WorkItem ¶
type WorkItem struct {
ID string
Event workflow.Event
Workflow *workflow.Workflow
Done chan RunResult
}
WorkItem is one queued run request.
Workflow is an optional override: when set, the worker runs THIS workflow definition instead of the one registered in Router.defs[id]. The UI's Run Now path uses this to execute a freshly-loaded DRAFT (workflow.draft.yaml) without first publishing it — Router.defs only ever holds the published copy so triggers (cron, channel, webhook) keep firing the live version while the user iterates on the draft.