pipeline

package
v0.92.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2026 License: GPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package pipeline provides the event-driven pipeline execution engine.

Package pipeline provides the event-driven pipeline execution engine.

Index

Constants

View Source
const StreamTTLDrain = 5 * time.Minute

StreamTTLDrain is the TTL after completion for SSE clients to drain.

View Source
const StreamTTLFailsafe = 24 * time.Hour

StreamTTLFailsafe is the TTL set on stream creation to prevent leaks on crash.

Variables

This section is empty.

Functions

func StreamName

func StreamName(runID int64) string

StreamName returns the Redis Stream name for a given run ID.

Types

type CheckpointData

type CheckpointData struct {
	StepIndex   int                    `json:"step_index"`
	StepResults map[string]*StepResult `json:"step_results"`
	Event       types.DataEvent        `json:"event"`
	HeartbeatAt time.Time              `json:"heartbeat_at"`
}

CheckpointData is the intermediate state saved at each pipeline step boundary.

type Clock

type Clock interface {
	Now() time.Time
	After(d time.Duration) <-chan time.Time
}

Clock abstracts time operations for testable scheduling.

type Definition

type Definition struct {
	Name        string
	Description string
	Enabled     bool
	Resumable   bool
	Trigger     Trigger
	Steps       []Step
	ParentName  string
}

func ExpandDefinitions

func ExpandDefinitions(defs []EditorDefinition) []Definition

ExpandDefinitions fans out an editor definition with multiple triggers into engine Definition instances with compound names to avoid key collisions.

func FindByEvent

func FindByEvent(defs []Definition, eventType string) []Definition

func LoadConfig

func LoadConfig(cfg []config.Pipeline) []Definition

func LoadFromDB

func LoadFromDB(ctx context.Context, reader DefinitionReader) ([]Definition, error)

LoadFromDB loads published pipeline definitions from a DefinitionReader.

func (Definition) FindByEvent

func (d Definition) FindByEvent(eventType string) []Definition

type DefinitionReader

type DefinitionReader interface {
	ListPublishedDefinitions(ctx context.Context) ([]DefinitionRecord, error)
}

DefinitionReader is the interface for loading published definitions from a store.

type DefinitionRecord

type DefinitionRecord struct {
	Name        string
	Description string
	YAML        string
	UpdatedAt   time.Time
}

DefinitionRecord holds a published pipeline definition loaded from the database.

type EditorDefinition

type EditorDefinition struct {
	Name        string         `json:"name" yaml:"name"`
	Description string         `json:"description" yaml:"description"`
	Enabled     bool           `json:"enabled" yaml:"enabled"`
	Resumable   bool           `json:"resumable" yaml:"resumable"`
	Triggers    []TriggerEntry `json:"triggers" yaml:"triggers"`
	Steps       []Step         `json:"steps" yaml:"steps"`
}

EditorDefinition is the YAML schema used by the pipeline editor UI. It supports multiple triggers (array) unlike the engine's Definition (single Trigger).

func ParseEditorYAML

func ParseEditorYAML(yamlStr string) (*EditorDefinition, error)

ParseEditorYAML parses a YAML string into an EditorDefinition.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine(defs []Definition, store RunStore, auditor audit.Auditor, pc *metrics.PipelineCollector, ec *metrics.EventCollector) *Engine

func NewEngineWithClock

func NewEngineWithClock(defs []Definition, store RunStore, auditor audit.Auditor, pc *metrics.PipelineCollector, ec *metrics.EventCollector, clock Clock) *Engine

func (*Engine) ExecuteWebhook

func (e *Engine) ExecuteWebhook(ctx context.Context, def *Definition, event types.DataEvent) error

ExecuteWebhook executes a pipeline from a webhook trigger. It uses the per-pipeline mutex for concurrency control and calls executePipeline with a synthetic event.

func (*Engine) Handler

func (e *Engine) Handler() func(ctx context.Context, event types.DataEvent) error

func (*Engine) MutexFor

func (e *Engine) MutexFor(name string) *sync.Mutex

MutexFor returns the per-pipeline mutex for the given pipeline name. Exported for testing (BDD specs).

func (*Engine) RegisterWebhooks

func (e *Engine) RegisterWebhooks() (map[string]*Definition, error)

RegisterWebhooks returns a map of webhook path to pipeline Definition for all webhook-enabled pipelines. Duplicate paths return an error.

func (*Engine) ResumePipeline

func (e *Engine) ResumePipeline(ctx context.Context, runID int64) error

ResumePipeline attempts to resume a pipeline run from its last checkpoint. It reloads the checkpoint, reconstructs the RenderContext, and continues from the checkpointed step index.

func (*Engine) SetCallback

func (e *Engine) SetCallback(cb StepCallback)

SetCallback sets the progress event callback. Pass nil to disable.

func (*Engine) Stop

func (e *Engine) Stop()

Stop shuts down the cron scheduler. It waits up to 30 seconds for in-flight jobs to complete, then force-cancels and logs a warning.

type FakeClock

type FakeClock struct {
	// contains filtered or unexported fields
}

FakeClock provides deterministic time for tests. All timer channels fire in order when Advance is called.

func NewFakeClock

func NewFakeClock(seed time.Time) *FakeClock

NewFakeClock returns a new FakeClock seeded at the given time.

func (*FakeClock) Advance

func (c *FakeClock) Advance(d time.Duration)

Advance moves the clock forward by d and fires all timers whose deadlines are at or before the new time, in chronological order.

func (*FakeClock) After

func (c *FakeClock) After(d time.Duration) <-chan time.Time

After schedules a timer that fires when Advance is called past its deadline.

func (*FakeClock) Now

func (c *FakeClock) Now() time.Time

Now returns the current fake time.

type RealClock

type RealClock struct{}

RealClock delegates to the system clock.

func NewRealClock

func NewRealClock() *RealClock

NewRealClock returns a new RealClock.

func (*RealClock) After

func (*RealClock) After(d time.Duration) <-chan time.Time

After returns a channel that fires after duration d.

func (*RealClock) Now

func (*RealClock) Now() time.Time

Now returns the current system time.

type RenderContext

type RenderContext struct {
	Event types.DataEvent
	Steps map[string]map[string]any
	Input map[string]any
	// contains filtered or unexported fields
}

func NewRenderContext

func NewRenderContext(event types.DataEvent) *RenderContext

func (*RenderContext) RecordStepResult

func (rc *RenderContext) RecordStepResult(stepName string, result map[string]any)

func (*RenderContext) RenderParams

func (rc *RenderContext) RenderParams(params map[string]any) (map[string]any, error)

func (*RenderContext) RenderString

func (rc *RenderContext) RenderString(s string) (string, error)

type RunStore

type RunStore interface {
	CreateRun(ctx context.Context, pipelineName, eventID, eventType, triggerSource string) (*gen.PipelineRun, error)
	UpdateRunStatus(ctx context.Context, runID int64, status int, errMsg string) error
	CreateStepRun(ctx context.Context, runID int64, stepName, capability, operation string, params map[string]any, attempt int) (*gen.PipelineStepRun, error)
	UpdateStepRun(ctx context.Context, stepRunID int64, status int, result map[string]any, errMsg string, attempt int) error
	SaveCheckpoint(ctx context.Context, runID int64, data any) error
	GetIncompleteRuns(ctx context.Context) ([]*gen.PipelineRun, error)
	GetCheckpoint(ctx context.Context, runID int64, target any) error
	GetRun(ctx context.Context, runID int64) (*gen.PipelineRun, error)
	UpdateRunHeartbeat(ctx context.Context, runID int64) error
	HasConsumed(ctx context.Context, consumerName, eventID string) (bool, error)
	RecordConsumption(ctx context.Context, consumerName, eventID string) error
	RecordResourceLink(ctx context.Context, link *gen.ResourceLink) error
}

RunStore abstracts persistence for pipeline runs, steps, checkpoints and event consumption.

type Step

type Step struct {
	Name       string
	Capability hub.CapabilityType
	Operation  string
	Params     map[string]any
	Retry      *backoff.Config
}

type StepCallback

type StepCallback interface {
	OnRunStart(ctx context.Context, runID int64, pipelineName string,
		trigger string, totalSteps int, stepNames []string)
	OnStepStart(ctx context.Context, runID int64, pipelineName string,
		stepIndex int, stepName string, input map[string]any)
	OnStepDone(ctx context.Context, runID int64, pipelineName string,
		stepIndex int, stepName string, output map[string]any, elapsedMs int64)
	OnStepError(ctx context.Context, runID int64, pipelineName string,
		stepIndex int, stepName string, err error, elapsedMs int64)
	OnRunComplete(ctx context.Context, runID int64, pipelineName string,
		elapsedMs int64, failed bool, errMsg string)
}

StepCallback receives progress events during pipeline execution. All methods are called synchronously from the step execution loop. nil receiver is safe — Engine skips calls when callback is nil.

type StepProgressEvent

type StepProgressEvent struct {
	RunID        int64          `json:"run_id"`
	PipelineName string         `json:"pipeline_name"`
	StepIndex    int            `json:"step_index"`
	StepName     string         `json:"step_name"`
	Status       string         `json:"status"`
	Input        map[string]any `json:"input,omitempty"`
	Output       map[string]any `json:"output,omitempty"`
	ElapsedMs    int64          `json:"elapsed_ms,omitempty"`
	Error        string         `json:"error,omitempty"`
	TotalSteps   int            `json:"total_steps,omitempty"`
}

StepProgressEvent is the JSON payload for a single progress update. StepIndex of -1 indicates a run-level event (start/complete/failed).

type StepResult

type StepResult struct {
	Name        string         `json:"name"`
	Capability  string         `json:"capability"`
	Operation   string         `json:"operation"`
	Output      map[string]any `json:"output"`
	CompletedAt time.Time      `json:"completed_at"`
}

StepResult captures the output of a completed pipeline step.

type Trigger

type Trigger struct {
	Event       string
	Cron        string
	CronTimeout time.Duration
	Webhook     *WebhookConfig
}

type TriggerEntry

type TriggerEntry struct {
	Enabled     bool           `json:"enabled" yaml:"enabled"`
	Type        string         `json:"type" yaml:"type"` // "event", "cron", "webhook"
	Event       string         `json:"event,omitempty" yaml:"event,omitempty"`
	Cron        string         `json:"cron,omitempty" yaml:"cron,omitempty"`
	CronTimeout string         `json:"cron_timeout,omitempty" yaml:"cron_timeout,omitempty"`
	Webhook     *WebhookConfig `json:"webhook,omitempty" yaml:"webhook,omitempty"`
}

TriggerEntry represents a single trigger in the editor's triggers array.

type WebhookAuthConfig

type WebhookAuthConfig struct {
	Token       string
	HMACSecret  string
	HMACHeader  string
	TokenHeader string
}

type WebhookConfig

type WebhookConfig struct {
	Path      string
	Method    string
	Auth      WebhookAuthConfig
	Payload   config.WebhookPayloadMode
	EventType string
}

Directories

Path Synopsis
Package template provides pipeline template rendering engine.
Package template provides pipeline template rendering engine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL