workflow

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package workflow provides declarative workflow execution with step outputs and resumable state.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkflowNameRequired = &ValidationError{Field: "name", Message: "workflow name is required"}
	ErrNoStepsDefined       = &ValidationError{Field: "steps", Message: "at least one step is required"}
)

Common validation errors.

Functions

func ExpandTilde

func ExpandTilde(path string) (string, error)

ExpandTilde expands ~ to home directory in paths.

func GenerateRunID

func GenerateRunID() string

GenerateRunID creates a unique run ID.

func ParseAndExtractJSON

func ParseAndExtractJSON(stdout string, fields []string) (map[string]interface{}, error)

ParseAndExtractJSON attempts to parse stdout as JSON and extract specified fields. This is used to capture outputs from gpd commands that output JSON.

func ValidateCommandSyntax

func ValidateCommandSyntax(command string) error

ValidateCommandSyntax performs basic validation of a command string.

Types

type DefaultLogger

type DefaultLogger struct{}

func (*DefaultLogger) Debug

func (l *DefaultLogger) Debug(msg string, args ...interface{})

func (*DefaultLogger) Error

func (l *DefaultLogger) Error(msg string, args ...interface{})

func (*DefaultLogger) Info

func (l *DefaultLogger) Info(msg string, args ...interface{})

type Interpolator

type Interpolator struct {
	// contains filtered or unexported fields
}

Interpolator handles variable substitution in workflow commands.

func NewInterpolator

func NewInterpolator(stepOutputs map[string]StepOutput, env map[string]string) *Interpolator

NewInterpolator creates a new interpolator with the given context.

func (*Interpolator) Interpolate

func (i *Interpolator) Interpolate(input string) (string, error)

Interpolate replaces all variable references in a string. Supported formats:

  • ${steps.<name>.<field>} - Step output field
  • ${env.<name>} - Environment variable
  • ${<name>} - Environment variable (shorthand)

func (*Interpolator) InterpolateMap

func (i *Interpolator) InterpolateMap(inputs map[string]string) (map[string]string, error)

InterpolateMap applies interpolation to all values in a map.

func (*Interpolator) InterpolateSlice

func (i *Interpolator) InterpolateSlice(inputs []string) ([]string, error)

InterpolateSlice applies interpolation to all strings in a slice.

type Logger

type Logger interface {
	Info(msg string, args ...interface{})
	Debug(msg string, args ...interface{})
	Error(msg string, args ...interface{})
}

type Parser

type Parser struct {
}

Parser handles workflow definition parsing and validation.

func NewParser

func NewParser() *Parser

NewParser creates a new workflow parser.

func (*Parser) Parse

func (p *Parser) Parse(data []byte) (*Workflow, error)

Parse parses workflow definition from JSON bytes.

func (*Parser) ParseFile

func (p *Parser) ParseFile(path string) (*Workflow, error)

ParseFile reads and parses a workflow from a JSON file.

func (*Parser) TopologicalSort

func (p *Parser) TopologicalSort(workflow *Workflow) ([]Step, error)

TopologicalSort returns steps in dependency order.

type ResumeInfo

type ResumeInfo struct {
	CanResume      bool
	RunID          string
	Status         RunStatus
	FailedStep     string
	CompletedCount int
	TotalSteps     int
}

ResumeInfo contains information needed to resume a workflow.

type RunOptions

type RunOptions struct {
	DryRun      bool
	ResumeRunID string
	Force       bool
	Verbose     bool
	GlobalEnv   map[string]string
	WorkingDir  string
	Watch       bool
	WatchFormat WatchFormat
}

RunOptions configures workflow execution.

type RunState

type RunState struct {
	RunID       string                `json:"runId"`
	Workflow    Workflow              `json:"workflow"`
	Status      RunStatus             `json:"status"`
	CurrentStep string                `json:"currentStep,omitempty"`
	StepResults []StepResult          `json:"stepResults"`
	StepOutputs map[string]StepOutput `json:"stepOutputs"`
	StartedAt   time.Time             `json:"startedAt"`
	FinishedAt  *time.Time            `json:"finishedAt,omitempty"`
	Error       string                `json:"error,omitempty"`
	Env         map[string]string     `json:"env,omitempty"`
}

RunState represents the persisted state of a workflow run.

func NewRunState

func NewRunState(workflow Workflow) *RunState

NewRunState creates a new run state for a workflow.

func (*RunState) AddStepResult

func (r *RunState) AddStepResult(result StepResult)

AddStepResult adds a step result to the run state.

func (*RunState) GetNextPendingStep

func (r *RunState) GetNextPendingStep() *Step

GetNextPendingStep returns the first step that hasn't been completed yet.

func (*RunState) GetStepOutput

func (r *RunState) GetStepOutput(stepName string) (StepOutput, bool)

GetStepOutput retrieves the output from a completed step.

func (*RunState) IsStepCompleted

func (r *RunState) IsStepCompleted(stepName string) bool

IsStepCompleted checks if a step has been completed in this run.

type RunStatus

type RunStatus string

RunStatus represents the current status of a workflow run.

const (
	// RunStatusPending indicates the run has not started.
	RunStatusPending RunStatus = "pending"
	// RunStatusRunning indicates the run is in progress.
	RunStatusRunning RunStatus = "running"
	// RunStatusCompleted indicates the run completed successfully.
	RunStatusCompleted RunStatus = "completed"
	// RunStatusFailed indicates the run failed.
	RunStatusFailed RunStatus = "failed"
	// RunStatusCancelled indicates the run was cancelled.
	RunStatusCancelled RunStatus = "cancelled"
)

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(stateManager *StateManager, options RunOptions) *Runner

func (*Runner) GetStateManager

func (r *Runner) GetStateManager() *StateManager

func (*Runner) Run

func (r *Runner) Run(ctx context.Context, workflowPath string) (*RunState, error)

func (*Runner) RunWorkflow

func (r *Runner) RunWorkflow(ctx context.Context, workflow *Workflow) (*RunState, error)

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager handles persistence of workflow run states.

func NewStateManager

func NewStateManager(baseDir string) *StateManager

NewStateManager creates a new state manager.

func (*StateManager) Delete

func (sm *StateManager) Delete(runID string) error

Delete removes a run state from disk.

func (*StateManager) EnsureDefaultDirs

func (sm *StateManager) EnsureDefaultDirs() error

EnsureDefaultDirs creates the default directory structure if it doesn't exist.

func (*StateManager) GetResumeInfo

func (sm *StateManager) GetResumeInfo(runID string) (*ResumeInfo, error)

GetResumeInfo returns information about whether a run can be resumed.

func (*StateManager) List

func (sm *StateManager) List() ([]RunState, error)

List returns all saved run states.

func (*StateManager) Load

func (sm *StateManager) Load(runID string) (*RunState, error)

Load retrieves a run state from disk.

func (*StateManager) Save

func (sm *StateManager) Save(state *RunState) error

Save persists a run state to disk.

type Step

type Step struct {
	Name            string            `json:"name"`
	Type            StepType          `json:"type,omitempty"`
	Command         string            `json:"command"`
	WorkingDir      string            `json:"workingDir,omitempty"`
	Env             map[string]string `json:"env,omitempty"`
	DependsOn       []string          `json:"dependsOn,omitempty"`
	CaptureOutputs  []string          `json:"captureOutputs,omitempty"`
	Condition       string            `json:"condition,omitempty"`
	ContinueOnError bool              `json:"continueOnError,omitempty"`
	Timeout         time.Duration     `json:"timeout,omitempty"`
	RetryCount      int               `json:"retryCount,omitempty"`
	RetryDelay      time.Duration     `json:"retryDelay,omitempty"`
	RetryBackoff    string            `json:"retryBackoff,omitempty"`
	Parallel        bool              `json:"parallel,omitempty"`
}

Step represents a single workflow step.

type StepOutput

type StepOutput struct {
	StepName   string                 `json:"stepName"`
	ExitCode   int                    `json:"exitCode"`
	Stdout     string                 `json:"stdout,omitempty"`
	Stderr     string                 `json:"stderr,omitempty"`
	Data       map[string]interface{} `json:"data,omitempty"`
	StartedAt  time.Time              `json:"startedAt"`
	FinishedAt time.Time              `json:"finishedAt"`
	Duration   time.Duration          `json:"duration"`
	Error      string                 `json:"error,omitempty"`
	RetryCount int                    `json:"retryCount,omitempty"`
	Retries    int                    `json:"retries,omitempty"`
}

StepOutput represents the captured output from a step execution.

type StepResult

type StepResult struct {
	Step   Step       `json:"step"`
	Output StepOutput `json:"output"`
}

StepResult represents the result of executing a single step.

type StepType

type StepType string

StepType represents the type of a workflow step.

const (
	// StepTypeShell executes a shell command.
	StepTypeShell StepType = "shell"
	// StepTypeGPD executes a gpd CLI command.
	StepTypeGPD StepType = "gpd"
)

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError represents a workflow validation error.

func (*ValidationError) Error

func (e *ValidationError) Error() string

type VerboseLogger

type VerboseLogger struct{}

func (*VerboseLogger) Debug

func (l *VerboseLogger) Debug(msg string, args ...interface{})

func (*VerboseLogger) Error

func (l *VerboseLogger) Error(msg string, args ...interface{})

func (*VerboseLogger) Info

func (l *VerboseLogger) Info(msg string, args ...interface{})

type WatchFormat

type WatchFormat string

WatchFormat defines the output format for watch mode.

const (
	// WatchFormatText displays simple text progress updates.
	WatchFormatText WatchFormat = "text"
	// WatchFormatJSON streams progress events as JSON lines.
	WatchFormatJSON WatchFormat = "json"
	// WatchFormatTUI displays a TUI-style progress bar.
	WatchFormatTUI WatchFormat = "tui"
)

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher monitors workflow execution and emits progress events.

func NewWatcher

func NewWatcher(opts WatcherOptions) *Watcher

NewWatcher creates a new workflow watcher with the given options.

func (*Watcher) EmitStepCompleted

func (w *Watcher) EmitStepCompleted(stepName string, stepNum, totalSteps int, duration time.Duration)

EmitStepCompleted emits an event when a step completes successfully.

func (*Watcher) EmitStepFailed

func (w *Watcher) EmitStepFailed(stepName string, stepNum, totalSteps int, err error, exitCode int)

EmitStepFailed emits an event when a step fails.

func (*Watcher) EmitStepSkipped

func (w *Watcher) EmitStepSkipped(stepName string, stepNum, totalSteps int, reason string)

EmitStepSkipped emits an event when a step is skipped.

func (*Watcher) EmitStepStarted

func (w *Watcher) EmitStepStarted(stepName string, stepNum, totalSteps int)

EmitStepStarted emits an event when a step begins execution.

func (*Watcher) EmitWorkflowCompleted

func (w *Watcher) EmitWorkflowCompleted(duration time.Duration)

EmitWorkflowCompleted emits an event when workflow finishes successfully.

func (*Watcher) EmitWorkflowFailed

func (w *Watcher) EmitWorkflowFailed(err error, duration time.Duration)

EmitWorkflowFailed emits an event when workflow fails.

func (*Watcher) EmitWorkflowStarted

func (w *Watcher) EmitWorkflowStarted(workflow, runID string, totalSteps int)

EmitWorkflowStarted emits an event when workflow execution begins.

func (*Watcher) IsRunning

func (w *Watcher) IsRunning() bool

IsRunning returns true if the watcher is currently running.

func (*Watcher) Start

func (w *Watcher) Start()

Start begins watching for workflow events.

func (*Watcher) Stop

func (w *Watcher) Stop()

Stop stops the watcher and waits for pending events to be processed.

type WatcherOptions

type WatcherOptions struct {
	// Format specifies the output format (text, json, tui).
	Format WatchFormat
	// Output is the writer to write events to (defaults to stdout).
	Output io.Writer
	// UpdateInterval controls how often progress is updated in TUI mode.
	UpdateInterval time.Duration
	// ShowTimestamps includes timestamps in text output.
	ShowTimestamps bool
}

WatcherOptions configures the watcher behavior.

func DefaultWatcherOptions

func DefaultWatcherOptions() WatcherOptions

DefaultWatcherOptions returns default watcher options.

type Workflow

type Workflow struct {
	Name        string            `json:"name"`
	Description string            `json:"description,omitempty"`
	Env         map[string]string `json:"env,omitempty"`
	MaxParallel int               `json:"maxParallel,omitempty"`
	Steps       []Step            `json:"steps"`
}

Workflow represents a declarative workflow definition.

func MergeWorkflows

func MergeWorkflows(base, override *Workflow) *Workflow

MergeWorkflows merges two workflows, with the second taking precedence.

func (*Workflow) GetStep

func (w *Workflow) GetStep(name string) (*Step, bool)

GetStep returns a step by name.

func (*Workflow) Validate

func (w *Workflow) Validate() error

Validate performs basic validation on the workflow.

type WorkflowEvent

type WorkflowEvent struct {
	Type       WorkflowEventType `json:"type"`
	Timestamp  time.Time         `json:"timestamp"`
	Workflow   string            `json:"workflow"`
	RunID      string            `json:"runId"`
	StepName   string            `json:"stepName,omitempty"`
	StepNum    int               `json:"stepNum,omitempty"`
	TotalSteps int               `json:"totalSteps,omitempty"`
	Duration   time.Duration     `json:"duration,omitempty"`
	Error      string            `json:"error,omitempty"`
	ExitCode   int               `json:"exitCode,omitempty"`
	Message    string            `json:"message,omitempty"`
}

WorkflowEvent represents an event during workflow execution.

type WorkflowEventType

type WorkflowEventType string

WorkflowEventType represents the type of workflow event.

const (
	// EventWorkflowStarted is emitted when workflow execution begins.
	EventWorkflowStarted WorkflowEventType = "workflow_started"
	// EventWorkflowCompleted is emitted when workflow finishes successfully.
	EventWorkflowCompleted WorkflowEventType = "workflow_completed"
	// EventWorkflowFailed is emitted when workflow fails.
	EventWorkflowFailed WorkflowEventType = "workflow_failed"
	// EventStepStarted is emitted when a step begins execution.
	EventStepStarted WorkflowEventType = "step_started"
	// EventStepCompleted is emitted when a step completes successfully.
	EventStepCompleted WorkflowEventType = "step_completed"
	// EventStepFailed is emitted when a step fails.
	EventStepFailed WorkflowEventType = "step_failed"
	// EventStepSkipped is emitted when a step is skipped (condition not met or already completed).
	EventStepSkipped WorkflowEventType = "step_skipped"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL