Documentation
¶
Overview ¶
Package pipeline provides the event-driven pipeline execution engine.
Package pipeline provides the event-driven pipeline execution engine.
Index ¶
- Constants
- func StreamName(runID int64) string
- type CheckpointData
- type Clock
- type Definition
- type DefinitionReader
- type DefinitionRecord
- type EditorDefinition
- type Engine
- func (e *Engine) ExecuteWebhook(ctx context.Context, def *Definition, event types.DataEvent) error
- func (e *Engine) Handler() func(ctx context.Context, event types.DataEvent) error
- func (e *Engine) MutexFor(name string) *sync.Mutex
- func (e *Engine) RegisterWebhooks() (map[string]*Definition, error)
- func (e *Engine) ResumePipeline(ctx context.Context, runID int64) error
- func (e *Engine) SetCallback(cb StepCallback)
- func (e *Engine) Stop()
- type FakeClock
- type RealClock
- type RenderContext
- type RunStore
- type Step
- type StepCallback
- type StepProgressEvent
- type StepResult
- type Trigger
- type TriggerEntry
- type WebhookAuthConfig
- type WebhookConfig
Constants ¶
const StreamTTLDrain = 5 * time.Minute
StreamTTLDrain is the TTL after completion for SSE clients to drain.
const StreamTTLFailsafe = 24 * time.Hour
StreamTTLFailsafe is the TTL set on stream creation to prevent leaks on crash.
Variables ¶
This section is empty.
Functions ¶
func StreamName ¶
StreamName returns the Redis Stream name for a given run ID.
Types ¶
type CheckpointData ¶
type CheckpointData struct {
StepIndex int `json:"step_index"`
StepResults map[string]*StepResult `json:"step_results"`
Event types.DataEvent `json:"event"`
HeartbeatAt time.Time `json:"heartbeat_at"`
}
CheckpointData is the intermediate state saved at each pipeline step boundary.
type Definition ¶
type Definition struct {
Name string
Description string
Enabled bool
Resumable bool
Trigger Trigger
Steps []Step
ParentName string
}
func ExpandDefinitions ¶
func ExpandDefinitions(defs []EditorDefinition) []Definition
ExpandDefinitions fans out an editor definition with multiple triggers into engine Definition instances with compound names to avoid key collisions.
func FindByEvent ¶
func FindByEvent(defs []Definition, eventType string) []Definition
func LoadConfig ¶
func LoadConfig(cfg []config.Pipeline) []Definition
func LoadFromDB ¶
func LoadFromDB(ctx context.Context, reader DefinitionReader) ([]Definition, error)
LoadFromDB loads published pipeline definitions from a DefinitionReader.
func (Definition) FindByEvent ¶
func (d Definition) FindByEvent(eventType string) []Definition
type DefinitionReader ¶
type DefinitionReader interface {
ListPublishedDefinitions(ctx context.Context) ([]DefinitionRecord, error)
}
DefinitionReader is the interface for loading published definitions from a store.
type DefinitionRecord ¶
DefinitionRecord holds a published pipeline definition loaded from the database.
type EditorDefinition ¶
type EditorDefinition struct {
Name string `json:"name" yaml:"name"`
Description string `json:"description" yaml:"description"`
Enabled bool `json:"enabled" yaml:"enabled"`
Resumable bool `json:"resumable" yaml:"resumable"`
Triggers []TriggerEntry `json:"triggers" yaml:"triggers"`
Steps []Step `json:"steps" yaml:"steps"`
}
EditorDefinition is the YAML schema used by the pipeline editor UI. It supports multiple triggers (array) unlike the engine's Definition (single Trigger).
func ParseEditorYAML ¶
func ParseEditorYAML(yamlStr string) (*EditorDefinition, error)
ParseEditorYAML parses a YAML string into an EditorDefinition.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func NewEngine ¶
func NewEngine(defs []Definition, store RunStore, auditor audit.Auditor, pc *metrics.PipelineCollector, ec *metrics.EventCollector) *Engine
func NewEngineWithClock ¶
func NewEngineWithClock(defs []Definition, store RunStore, auditor audit.Auditor, pc *metrics.PipelineCollector, ec *metrics.EventCollector, clock Clock) *Engine
func (*Engine) ExecuteWebhook ¶
ExecuteWebhook executes a pipeline from a webhook trigger. It uses the per-pipeline mutex for concurrency control and calls executePipeline with a synthetic event.
func (*Engine) MutexFor ¶
MutexFor returns the per-pipeline mutex for the given pipeline name. Exported for testing (BDD specs).
func (*Engine) RegisterWebhooks ¶
func (e *Engine) RegisterWebhooks() (map[string]*Definition, error)
RegisterWebhooks returns a map of webhook path to pipeline Definition for all webhook-enabled pipelines. Duplicate paths return an error.
func (*Engine) ResumePipeline ¶
ResumePipeline attempts to resume a pipeline run from its last checkpoint. It reloads the checkpoint, reconstructs the RenderContext, and continues from the checkpointed step index.
func (*Engine) SetCallback ¶
func (e *Engine) SetCallback(cb StepCallback)
SetCallback sets the progress event callback. Pass nil to disable.
type FakeClock ¶
type FakeClock struct {
// contains filtered or unexported fields
}
FakeClock provides deterministic time for tests. All timer channels fire in order when Advance is called.
func NewFakeClock ¶
NewFakeClock returns a new FakeClock seeded at the given time.
func (*FakeClock) Advance ¶
Advance moves the clock forward by d and fires all timers whose deadlines are at or before the new time, in chronological order.
type RealClock ¶
type RealClock struct{}
RealClock delegates to the system clock.
type RenderContext ¶
type RenderContext struct {
Event types.DataEvent
Steps map[string]map[string]any
Input map[string]any
// contains filtered or unexported fields
}
func NewRenderContext ¶
func NewRenderContext(event types.DataEvent) *RenderContext
func (*RenderContext) RecordStepResult ¶
func (rc *RenderContext) RecordStepResult(stepName string, result map[string]any)
func (*RenderContext) RenderParams ¶
func (*RenderContext) RenderString ¶
func (rc *RenderContext) RenderString(s string) (string, error)
type RunStore ¶
type RunStore interface {
CreateRun(ctx context.Context, pipelineName, eventID, eventType, triggerSource string) (*gen.PipelineRun, error)
UpdateRunStatus(ctx context.Context, runID int64, status int, errMsg string) error
CreateStepRun(ctx context.Context, runID int64, stepName, capability, operation string, params map[string]any, attempt int) (*gen.PipelineStepRun, error)
UpdateStepRun(ctx context.Context, stepRunID int64, status int, result map[string]any, errMsg string, attempt int) error
SaveCheckpoint(ctx context.Context, runID int64, data any) error
GetIncompleteRuns(ctx context.Context) ([]*gen.PipelineRun, error)
GetCheckpoint(ctx context.Context, runID int64, target any) error
GetRun(ctx context.Context, runID int64) (*gen.PipelineRun, error)
UpdateRunHeartbeat(ctx context.Context, runID int64) error
HasConsumed(ctx context.Context, consumerName, eventID string) (bool, error)
RecordConsumption(ctx context.Context, consumerName, eventID string) error
RecordResourceLink(ctx context.Context, link *gen.ResourceLink) error
}
RunStore abstracts persistence for pipeline runs, steps, checkpoints and event consumption.
type StepCallback ¶
type StepCallback interface {
OnRunStart(ctx context.Context, runID int64, pipelineName string,
trigger string, totalSteps int, stepNames []string)
OnStepStart(ctx context.Context, runID int64, pipelineName string,
stepIndex int, stepName string, input map[string]any)
OnStepDone(ctx context.Context, runID int64, pipelineName string,
stepIndex int, stepName string, output map[string]any, elapsedMs int64)
OnStepError(ctx context.Context, runID int64, pipelineName string,
stepIndex int, stepName string, err error, elapsedMs int64)
OnRunComplete(ctx context.Context, runID int64, pipelineName string,
elapsedMs int64, failed bool, errMsg string)
}
StepCallback receives progress events during pipeline execution. All methods are called synchronously from the step execution loop. nil receiver is safe — Engine skips calls when callback is nil.
type StepProgressEvent ¶
type StepProgressEvent struct {
RunID int64 `json:"run_id"`
PipelineName string `json:"pipeline_name"`
StepIndex int `json:"step_index"`
StepName string `json:"step_name"`
Status string `json:"status"`
Input map[string]any `json:"input,omitempty"`
Output map[string]any `json:"output,omitempty"`
ElapsedMs int64 `json:"elapsed_ms,omitempty"`
Error string `json:"error,omitempty"`
TotalSteps int `json:"total_steps,omitempty"`
}
StepProgressEvent is the JSON payload for a single progress update. StepIndex of -1 indicates a run-level event (start/complete/failed).
type StepResult ¶
type StepResult struct {
Name string `json:"name"`
Capability string `json:"capability"`
Operation string `json:"operation"`
Output map[string]any `json:"output"`
CompletedAt time.Time `json:"completed_at"`
}
StepResult captures the output of a completed pipeline step.
type Trigger ¶
type Trigger struct {
Event string
Cron string
CronTimeout time.Duration
Webhook *WebhookConfig
}
type TriggerEntry ¶
type TriggerEntry struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Type string `json:"type" yaml:"type"` // "event", "cron", "webhook"
Event string `json:"event,omitempty" yaml:"event,omitempty"`
Cron string `json:"cron,omitempty" yaml:"cron,omitempty"`
CronTimeout string `json:"cron_timeout,omitempty" yaml:"cron_timeout,omitempty"`
Webhook *WebhookConfig `json:"webhook,omitempty" yaml:"webhook,omitempty"`
}
TriggerEntry represents a single trigger in the editor's triggers array.
type WebhookAuthConfig ¶
type WebhookConfig ¶
type WebhookConfig struct {
Path string
Method string
Auth WebhookAuthConfig
Payload config.WebhookPayloadMode
EventType string
}