workflow

package
v0.9.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSkipStep is used to skip the current step.
	ErrSkipStep = errors.New("skip step")
	// ErrSkipStage is used to skip the current step.
	ErrSkipStage = errors.New("skip stage")
	// ErrBreakStages is used to break the root loop.
	ErrBreakStages = errors.New("break stages")
	// ErrExitWorkflow is used to exit the workflow.
	ErrExitWorkflow = errors.New("exit")
	// ErrNoConsole
	ErrNoConsole = errors.New("no console")
	// NotFound
	ErrNotFound = errors.New("not found")
)

Functions

func GetArg

func GetArg[T any](sh Snapshot, stepName, key string) (T, error)

func NoConsoleError

func NoConsoleError(err error) error

func RunWithBackoff

func RunWithBackoff(ctx context.Context, w *Workflow, stage *Stage, step *Step) error

RunWithBackoff runs a step function with a simple backoff retry mechanism.

func RunWithLinear

func RunWithLinear(ctx context.Context, w *Workflow, stage *Stage, step *Step) error

RunWithLinear runs a step function with a simple linear retry mechanism.

Types

type FailData

type FailData struct {
	FailedStepName  string `json:"failed_step_name,omitempty"`
	FailedStageName string `json:"failed_stage_name,omitempty"`
}

type RetryPolicyFn

type RetryPolicyFn func(context.Context, *Workflow, *Stage, *Step) error

type Snapshot

type Snapshot struct {
	StepsStates   []*StepState  `json:"steps_states"`
	WorkflowState WorkflowState `json:"workflow_state"`
}

Snapshot represents a snapshot of the workflow.

type Stage

type Stage struct {
	// Name is the name of the stage.
	Name string
	// Steps is the list of steps for the stage.
	Steps []*Step
	// BeforeFn is the before start function for the stage.
	BeforeFn func(context.Context, *Stage) error
	// AfterFn is the after complete function for the stage.
	AfterFn func(context.Context, *Stage) error
}

Stage represents a unique stage that can contain multiple steps.

func NewStage

func NewStage(name string, opts ...StageOption) *Stage

NewStage returns a new stage.

type StageOption

type StageOption func(*Stage)

StageOption defines the function signature for a stage option.

func WithStageAfterFn

func WithStageAfterFn(fn func(context.Context, *Stage) error) StageOption

WithStageAfterFn sets the after complete function for the stage.

func WithStageBeforeFn

func WithStageBeforeFn(fn func(context.Context, *Stage) error) StageOption

WithStageBeforeFn sets the before start function for the stage.

func WithStageSteps

func WithStageSteps(steps []*Step) StageOption

WithStageSteps sets the steps for the stage.

type Step

type Step struct {
	// Name is the name of the step. Name should be unique within a stage.
	Name string
	// Kind is the kind of the step. Based on the step kind you can implement similar triggers in the workflow.
	Kind string
	// Func is the function to be executed for the step.
	Func StepFunc
	// Args is the arguments for the step.
	Args any
	// Timeout is the duration for the step. By default, it is set to 1 second.
	Timeout time.Duration
	// MaxRetries is the maximum number of retries for the step. By default, it is set to 1.
	MaxRetries int
	// RetryPolicy is the retry policy for the step. By default, it is set by timeout.
	RetryPolicy RetryPolicyFn
	// BeforeFn is the before start function for the step.
	BeforeFn func(context.Context, *Step) error
	// AfterFn is the after complete function for the step.
	AfterFn func(context.Context, *Step) error

	// State
	State *StepState

	// FinishWorkflow is a flag to finish the workflow after the step.
	FinishWorkflow bool
}

Step represents a unique step within a stage.

func NewStep

func NewStep(name string, fn StepFunc, opts ...StepOption) *Step

NewStep returns a new step.

func (*Step) SetAfterFn

func (s *Step) SetAfterFn(value func(context.Context, *Step) error) *Step

SetAfterFn sets the after function.

func (*Step) SetBeforeFn

func (s *Step) SetBeforeFn(value func(context.Context, *Step) error) *Step

SetBeforeFn sets the before function.

func (*Step) SetFunc

func (s *Step) SetFunc(value StepFunc) *Step

SetFunc sets the step function.

func (*Step) SetMaxRetries

func (s *Step) SetMaxRetries(value int) *Step

SetMaxRetries sets the max retries.

func (*Step) SetRetryPolicy

func (s *Step) SetRetryPolicy(value RetryPolicyFn) *Step

SetRetryPolicy sets the retry policy.

func (*Step) SetStepFinishWorkflow

func (s *Step) SetStepFinishWorkflow(value bool) *Step

SetStepFinishWorkflow sets the finish workflow flag.

func (*Step) SetTimeout

func (s *Step) SetTimeout(value time.Duration) *Step

SetTimeout sets the timeout.

type StepFunc

type StepFunc func(ctx context.Context, workflow *Workflow, stage *Stage, step *Step) error

StepFunc defines the function signature for a step.

type StepOption

type StepOption func(*Step)

StepOption defines the function signature for a step option.

func WithStepAfterFn

func WithStepAfterFn(fn func(context.Context, *Step) error) StepOption

WithStepAfterFn sets the after complete function for the step.

func WithStepArgs

func WithStepArgs(args any) StepOption

WithStepArgs sets the arguments for the step.

func WithStepBeforeFn

func WithStepBeforeFn(fn func(context.Context, *Step) error) StepOption

WithStepBeforeFn sets the before start function for the step.

func WithStepKind

func WithStepKind(k string) StepOption

WithStepKind sets the kind for the step.

func WithStepMaxRetries

func WithStepMaxRetries(r int) StepOption

WithStepMaxRetries sets the max retries for the step.

func WithStepRetryPolicy

func WithStepRetryPolicy(fn RetryPolicyFn) StepOption

WithStepRetryPolicy sets the retry policy for the step.

func WithStepTimeout

func WithStepTimeout(d time.Duration) StepOption

WithStepTimeout sets the timeout for the step.

type StepState

type StepState struct {
	PreviousStage *string `json:"previous_stage,omitempty"`
	PreviousStep  *string `json:"previous_step,omitempty"`
	CurrentStage  string  `json:"current_stage"`
	CurrentStep   string  `json:"current_step"`
	NextStage     *string `json:"next_stage,omitempty"`
	NextStep      *string `json:"next_step,omitempty"`

	StartTime *time.Time `json:"start_time"`
	EndTime   *time.Time `json:"end_time"`

	Status StepStatus `json:"status"`

	Error string `json:"error,omitempty"`

	Args map[string]any `json:"args,omitempty"`
	// contains filtered or unexported fields
}

func NewStepState

func NewStepState() *StepState

func (*StepState) SetArg

func (s *StepState) SetArg(key string, value any) (*StepState, error)

SetArgs sets the arguments for the step.

func (*StepState) SetArgs

func (s *StepState) SetArgs(args map[string]any) (*StepState, error)

SetArgs sets the arguments for the step.

func (*StepState) SetCurrentStage

func (s *StepState) SetCurrentStage(stage string) *StepState

SetCurrentStage sets the current stage for the step.

func (*StepState) SetCurrentStep

func (s *StepState) SetCurrentStep(step string) *StepState

SetCurrentStep sets the current step for the step.

func (*StepState) SetEndTime

func (s *StepState) SetEndTime(t time.Time) *StepState

SetEndTime sets the end time for the step.

func (*StepState) SetError

func (s *StepState) SetError(err error) *StepState

SetError sets the error for the step.

func (*StepState) SetNextStage

func (s *StepState) SetNextStage(stage string) *StepState

SetNextStage sets the next stage for the step.

func (*StepState) SetNextStep

func (s *StepState) SetNextStep(step string) *StepState

SetNextStep sets the next step for the step.

func (*StepState) SetPreviousStage

func (s *StepState) SetPreviousStage(stage string) *StepState

SetPreviousStage sets the previous stage for the step.

func (*StepState) SetPreviousStep

func (s *StepState) SetPreviousStep(step string) *StepState

SetPreviousStep sets the previous step for the step.

func (*StepState) SetStartTime

func (s *StepState) SetStartTime(t time.Time) *StepState

SetStartTime sets the start time for the step.

func (*StepState) SetStatus

func (s *StepState) SetStatus(status StepStatus) *StepState

SetStatus sets the status for the step.

type StepStatus

type StepStatus string
const (
	StepStatusPending    StepStatus = "pending"
	StepStatusProcessing StepStatus = "processing"
	StepStatusCompleted  StepStatus = "completed"
	StepStatusFailed     StepStatus = "failed"
	StepStatusSuspended  StepStatus = "suspended"
	StepStatusSkipped    StepStatus = "skipped"
)

func AllStepStatuses

func AllStepStatuses() []StepStatus

AllStepStatuses

func (StepStatus) String

func (s StepStatus) String() string

String

func (StepStatus) Valid

func (s StepStatus) Valid() bool

Valid

type Workflow

type Workflow struct {
	// Name is the name of the workflow.
	Name string
	// State is the state of the workflow.
	State WorkflowState
	// Stages is the list of stages for the workflow.
	Stages []*Stage
	// BeforeFn is the before start function for the workflow.
	BeforeFn func(context.Context, *Workflow) error
	// AfterFn is the after complete function for the workflow.
	AfterFn func(context.Context, *Workflow) error
	// OnFailureFn is the on failure function for the workflow.
	OnFailureFn func(context.Context, *Workflow, error) error

	// BeforeAllStagesFn is the before all steps function for the workflow.
	BeforeAllStepsFn func(context.Context, *Workflow, *Stage, *Step) error
	// AfterAllStagesFn is the after all steps function for the workflow.
	AfterAllStepsFn func(context.Context, *Workflow, *Stage, *Step) error

	// Debug allows to enable debug mode.
	Debug bool
	// contains filtered or unexported fields
}

Workflow represents the entire workflow composed of multiple stages.

func New

func New(opts ...WorkflowOption) *Workflow

func (*Workflow) CurrentStage

func (w *Workflow) CurrentStage() *Stage

CurrentStage returns the current stage for the workflow.

func (*Workflow) CurrentStep

func (w *Workflow) CurrentStep() *Step

CurrentStep returns the current step for the workflow.

func (*Workflow) Debugf

func (w *Workflow) Debugf(format string, args ...any)

Debugf logs the debug message.

func (*Workflow) Errorf

func (w *Workflow) Errorf(format string, args ...any)

Errorf logs the error message.

func (*Workflow) GetJSONSnapshot

func (w *Workflow) GetJSONSnapshot() string

GetSnapshot

func (*Workflow) GetSnapshot

func (w *Workflow) GetSnapshot() Snapshot

GetSnapshot

func (*Workflow) Infof

func (w *Workflow) Infof(format string, args ...any)

infof logs the info message.

func (*Workflow) Logger

func (w *Workflow) Logger() logger

Logger returns the logger for the workflow.

func (*Workflow) Run

func (w *Workflow) Run(ctx context.Context) (err error)

Run executes the workflow.

func (*Workflow) SetAfterFn

func (w *Workflow) SetAfterFn(fn func(context.Context, *Workflow) error) *Workflow

SetAfterFn sets the after complete function for the workflow.

func (*Workflow) SetBeforeAllStepsFn

func (w *Workflow) SetBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) *Workflow

SetBeforeAllStepsFn sets the before start function for the workflow.

func (*Workflow) SetBeforeFn

func (w *Workflow) SetBeforeFn(fn func(context.Context, *Workflow) error) *Workflow

SetBeforeFn sets the before start function for the workflow.

func (*Workflow) SetDebug

func (w *Workflow) SetDebug(debug bool) *Workflow

SetDebug sets the debug flag for the workflow.

func (*Workflow) SetJSONSnapshot

func (w *Workflow) SetJSONSnapshot(snapshot string) error

SetSnapshot

func (*Workflow) SetLogger

func (w *Workflow) SetLogger(l logger) *Workflow

SetLogger sets the logger for the workflow.

func (*Workflow) SetName

func (w *Workflow) SetName(name string) *Workflow

SetName sets the name for the workflow.

func (*Workflow) SetOnFailureFn

func (w *Workflow) SetOnFailureFn(fn func(context.Context, *Workflow, error) error) *Workflow

SetOnFailureFn sets the on failure function for the workflow.

func (*Workflow) SetSkipError

func (w *Workflow) SetSkipError(skip bool) *Workflow

SetSkipError

func (*Workflow) SetSnapshot

func (w *Workflow) SetSnapshot(snapshot Snapshot) error

SetSnapshot

func (*Workflow) SetStages

func (w *Workflow) SetStages(stages []*Stage) *Workflow

SetStages sets the stages for the workflow.

func (*Workflow) SetState

func (w *Workflow) SetState(state WorkflowState) *Workflow

SetState sets the state for the workflow.

func (*Workflow) StepStates

func (w *Workflow) StepStates() []*StepState

Step states

func (*Workflow) Warnf

func (w *Workflow) Warnf(format string, args ...any)

Warnf logs the warning message.

type WorkflowOption

type WorkflowOption func(*Workflow) //nolint:revive

WorkflowOption defines the function signature for a workflow option.

func WithAfterAllStepsFn

func WithAfterAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) WorkflowOption

WithAfterAllStepsFn sets the after all steps function for the workflow.

func WithAfterFn

func WithAfterFn(fn func(context.Context, *Workflow) error) WorkflowOption

WithAfterFn sets the after complete function for the workflow.

func WithBeforeAllStepsFn

func WithBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) WorkflowOption

WithBeforeAllStepsFn sets the before all steps function for the workflow.

func WithBeforeFn

func WithBeforeFn(fn func(context.Context, *Workflow) error) WorkflowOption

WithBeforeFn sets the before start function for the workflow.

func WithDebug

func WithDebug(d bool) WorkflowOption

WithDebug sets the debug flag for the workflow.

func WithLogger

func WithLogger(l logger) WorkflowOption

WithLogger sets the logger for the workflow.

func WithName

func WithName(n string) WorkflowOption

WithName sets the name for the workflow.

func WithOnFailureFn

func WithOnFailureFn(fn func(context.Context, *Workflow, error) error) WorkflowOption

WithOnFailureFn sets the on failure function for the workflow.

func WithState

func WithState(s WorkflowState) WorkflowOption

WithState sets the state for the workflow.

type WorkflowState

type WorkflowState struct {
	// IsSuspended is the flag to indicate if the workflow is suspended (in pause).
	IsSuspended bool `json:"is_suspended"`
	// IsCompleted is the flag to indicate if the workflow is fully completed.
	IsCompleted bool `json:"is_completed"`
	// IsFailed is the flag to indicate if the workflow is failed.
	IsFailed bool `json:"is_failed"`

	// CustomError information about fail
	CustomError json.RawMessage `json:"custom_error,omitempty"`

	// Error formatted error msg
	Error string `json:"error,omitempty"`

	// NextStage is the next stage for the workflow.
	NextStage string `json:"next_stage,omitempty"`
	// NextStep is the next step for the workflow.
	NextStep string `json:"next_step,omitempty"`
}

WorkflowState represents the state of the workflow.

func (*WorkflowState) GetCustomError

func (w *WorkflowState) GetCustomError() json.RawMessage

func (*WorkflowState) SetCompleted

func (w *WorkflowState) SetCompleted(value bool) *WorkflowState

SetCompleted sets the completed flag for the workflow.

func (*WorkflowState) SetCustomError

func (w *WorkflowState) SetCustomError(value []byte) *WorkflowState

func (*WorkflowState) SetError

func (w *WorkflowState) SetError(err error) *WorkflowState

SetError sets the error for the workflow.

func (*WorkflowState) SetErrorMsg

func (w *WorkflowState) SetErrorMsg(msg string) *WorkflowState

func (*WorkflowState) SetFailed

func (w *WorkflowState) SetFailed(value bool) *WorkflowState

SetFailed sets the failed flag for the workflow.

func (*WorkflowState) SetNextStage

func (w *WorkflowState) SetNextStage(stage string) *WorkflowState

SetNextStage sets the next stage for the workflow.

func (*WorkflowState) SetNextStep

func (w *WorkflowState) SetNextStep(step string) *WorkflowState

SetNextStep sets the next step for the workflow.

func (*WorkflowState) SetSuspended

func (w *WorkflowState) SetSuspended(value bool) *WorkflowState

SetSuspended sets the suspended flag for the workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL