Documentation
¶
Index ¶
- Variables
- func GetArg[T any](sh Snapshot, stepName, key string) (T, error)
- func NoConsoleError(err error) error
- func RunWithBackoff(ctx context.Context, w *Workflow, stage *Stage, step *Step) error
- func RunWithLinear(ctx context.Context, w *Workflow, stage *Stage, step *Step) error
- type FailData
- type RetryPolicyFn
- type Snapshot
- type Stage
- type StageOption
- type Step
- func (s *Step) SetAfterFn(value func(context.Context, *Step) error) *Step
- func (s *Step) SetBeforeFn(value func(context.Context, *Step) error) *Step
- func (s *Step) SetFunc(value StepFunc) *Step
- func (s *Step) SetMaxRetries(value int) *Step
- func (s *Step) SetRetryPolicy(value RetryPolicyFn) *Step
- func (s *Step) SetStepFinishWorkflow(value bool) *Step
- func (s *Step) SetTimeout(value time.Duration) *Step
- type StepFunc
- type StepOption
- func WithStepAfterFn(fn func(context.Context, *Step) error) StepOption
- func WithStepArgs(args any) StepOption
- func WithStepBeforeFn(fn func(context.Context, *Step) error) StepOption
- func WithStepKind(k string) StepOption
- func WithStepMaxRetries(r int) StepOption
- func WithStepRetryPolicy(fn RetryPolicyFn) StepOption
- func WithStepTimeout(d time.Duration) StepOption
- type StepState
- func (s *StepState) SetArg(key string, value any) (*StepState, error)
- func (s *StepState) SetArgs(args map[string]any) (*StepState, error)
- func (s *StepState) SetCurrentStage(stage string) *StepState
- func (s *StepState) SetCurrentStep(step string) *StepState
- func (s *StepState) SetEndTime(t time.Time) *StepState
- func (s *StepState) SetError(err error) *StepState
- func (s *StepState) SetNextStage(stage string) *StepState
- func (s *StepState) SetNextStep(step string) *StepState
- func (s *StepState) SetPreviousStage(stage string) *StepState
- func (s *StepState) SetPreviousStep(step string) *StepState
- func (s *StepState) SetStartTime(t time.Time) *StepState
- func (s *StepState) SetStatus(status StepStatus) *StepState
- type StepStatus
- type Workflow
- func (w *Workflow) CurrentStage() *Stage
- func (w *Workflow) CurrentStep() *Step
- func (w *Workflow) Debugf(format string, args ...any)
- func (w *Workflow) Errorf(format string, args ...any)
- func (w *Workflow) GetJSONSnapshot() string
- func (w *Workflow) GetSnapshot() Snapshot
- func (w *Workflow) Infof(format string, args ...any)
- func (w *Workflow) Logger() logger
- func (w *Workflow) Run(ctx context.Context) (err error)
- func (w *Workflow) SetAfterFn(fn func(context.Context, *Workflow) error) *Workflow
- func (w *Workflow) SetBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) *Workflow
- func (w *Workflow) SetBeforeFn(fn func(context.Context, *Workflow) error) *Workflow
- func (w *Workflow) SetDebug(debug bool) *Workflow
- func (w *Workflow) SetJSONSnapshot(snapshot string) error
- func (w *Workflow) SetLogger(l logger) *Workflow
- func (w *Workflow) SetName(name string) *Workflow
- func (w *Workflow) SetOnFailureFn(fn func(context.Context, *Workflow, error) error) *Workflow
- func (w *Workflow) SetSkipError(skip bool) *Workflow
- func (w *Workflow) SetSnapshot(snapshot Snapshot) error
- func (w *Workflow) SetStages(stages []*Stage) *Workflow
- func (w *Workflow) SetState(state WorkflowState) *Workflow
- func (w *Workflow) StepStates() []*StepState
- func (w *Workflow) Warnf(format string, args ...any)
- type WorkflowOption
- func WithAfterAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) WorkflowOption
- func WithAfterFn(fn func(context.Context, *Workflow) error) WorkflowOption
- func WithBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) WorkflowOption
- func WithBeforeFn(fn func(context.Context, *Workflow) error) WorkflowOption
- func WithDebug(d bool) WorkflowOption
- func WithLogger(l logger) WorkflowOption
- func WithName(n string) WorkflowOption
- func WithOnFailureFn(fn func(context.Context, *Workflow, error) error) WorkflowOption
- func WithState(s WorkflowState) WorkflowOption
- type WorkflowState
- func (w *WorkflowState) GetCustomError() json.RawMessage
- func (w *WorkflowState) SetCompleted(value bool) *WorkflowState
- func (w *WorkflowState) SetCustomError(value []byte) *WorkflowState
- func (w *WorkflowState) SetError(err error) *WorkflowState
- func (w *WorkflowState) SetErrorMsg(msg string) *WorkflowState
- func (w *WorkflowState) SetFailed(value bool) *WorkflowState
- func (w *WorkflowState) SetNextStage(stage string) *WorkflowState
- func (w *WorkflowState) SetNextStep(step string) *WorkflowState
- func (w *WorkflowState) SetSuspended(value bool) *WorkflowState
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSkipStep is used to skip the current step. ErrSkipStep = errors.New("skip step") // ErrSkipStage is used to skip the current step. ErrSkipStage = errors.New("skip stage") // ErrBreakStages is used to break the root loop. ErrBreakStages = errors.New("break stages") // ErrExitWorkflow is used to exit the workflow. ErrExitWorkflow = errors.New("exit") // ErrNoConsole ErrNoConsole = errors.New("no console") // NotFound ErrNotFound = errors.New("not found") )
Functions ¶
func NoConsoleError ¶
func RunWithBackoff ¶
RunWithBackoff runs a step function with a simple backoff retry mechanism.
Types ¶
type Snapshot ¶
type Snapshot struct { StepsStates []*StepState `json:"steps_states"` WorkflowState WorkflowState `json:"workflow_state"` }
Snapshot represents a snapshot of the workflow.
type Stage ¶
type Stage struct { // Name is the name of the stage. Name string // Steps is the list of steps for the stage. Steps []*Step // BeforeFn is the before start function for the stage. BeforeFn func(context.Context, *Stage) error // AfterFn is the after complete function for the stage. AfterFn func(context.Context, *Stage) error }
Stage represents a unique stage that can contain multiple steps.
func NewStage ¶
func NewStage(name string, opts ...StageOption) *Stage
NewStage returns a new stage.
type StageOption ¶
type StageOption func(*Stage)
StageOption defines the function signature for a stage option.
func WithStageAfterFn ¶
func WithStageAfterFn(fn func(context.Context, *Stage) error) StageOption
WithStageAfterFn sets the after complete function for the stage.
func WithStageBeforeFn ¶
func WithStageBeforeFn(fn func(context.Context, *Stage) error) StageOption
WithStageBeforeFn sets the before start function for the stage.
func WithStageSteps ¶
func WithStageSteps(steps []*Step) StageOption
WithStageSteps sets the steps for the stage.
type Step ¶
type Step struct { // Name is the name of the step. Name should be unique within a stage. Name string // Kind is the kind of the step. Based on the step kind you can implement similar triggers in the workflow. Kind string // Func is the function to be executed for the step. Func StepFunc // Args is the arguments for the step. Args any // Timeout is the duration for the step. By default, it is set to 1 second. Timeout time.Duration // MaxRetries is the maximum number of retries for the step. By default, it is set to 1. MaxRetries int // RetryPolicy is the retry policy for the step. By default, it is set by timeout. RetryPolicy RetryPolicyFn // BeforeFn is the before start function for the step. BeforeFn func(context.Context, *Step) error // AfterFn is the after complete function for the step. AfterFn func(context.Context, *Step) error // State State *StepState // FinishWorkflow is a flag to finish the workflow after the step. FinishWorkflow bool }
Step represents a unique step within a stage.
func NewStep ¶
func NewStep(name string, fn StepFunc, opts ...StepOption) *Step
NewStep returns a new step.
func (*Step) SetAfterFn ¶
SetAfterFn sets the after function.
func (*Step) SetBeforeFn ¶
SetBeforeFn sets the before function.
func (*Step) SetMaxRetries ¶
SetMaxRetries sets the max retries.
func (*Step) SetRetryPolicy ¶
func (s *Step) SetRetryPolicy(value RetryPolicyFn) *Step
SetRetryPolicy sets the retry policy.
func (*Step) SetStepFinishWorkflow ¶
SetStepFinishWorkflow sets the finish workflow flag.
type StepOption ¶
type StepOption func(*Step)
StepOption defines the function signature for a step option.
func WithStepAfterFn ¶
func WithStepAfterFn(fn func(context.Context, *Step) error) StepOption
WithStepAfterFn sets the after complete function for the step.
func WithStepArgs ¶
func WithStepArgs(args any) StepOption
WithStepArgs sets the arguments for the step.
func WithStepBeforeFn ¶
func WithStepBeforeFn(fn func(context.Context, *Step) error) StepOption
WithStepBeforeFn sets the before start function for the step.
func WithStepMaxRetries ¶
func WithStepMaxRetries(r int) StepOption
WithStepMaxRetries sets the max retries for the step.
func WithStepRetryPolicy ¶
func WithStepRetryPolicy(fn RetryPolicyFn) StepOption
WithStepRetryPolicy sets the retry policy for the step.
func WithStepTimeout ¶
func WithStepTimeout(d time.Duration) StepOption
WithStepTimeout sets the timeout for the step.
type StepState ¶
type StepState struct { PreviousStage *string `json:"previous_stage,omitempty"` PreviousStep *string `json:"previous_step,omitempty"` CurrentStage string `json:"current_stage"` CurrentStep string `json:"current_step"` NextStage *string `json:"next_stage,omitempty"` NextStep *string `json:"next_step,omitempty"` StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` Status StepStatus `json:"status"` Error string `json:"error,omitempty"` Args map[string]any `json:"args,omitempty"` // contains filtered or unexported fields }
func NewStepState ¶
func NewStepState() *StepState
func (*StepState) SetCurrentStage ¶
SetCurrentStage sets the current stage for the step.
func (*StepState) SetCurrentStep ¶
SetCurrentStep sets the current step for the step.
func (*StepState) SetEndTime ¶
SetEndTime sets the end time for the step.
func (*StepState) SetNextStage ¶
SetNextStage sets the next stage for the step.
func (*StepState) SetNextStep ¶
SetNextStep sets the next step for the step.
func (*StepState) SetPreviousStage ¶
SetPreviousStage sets the previous stage for the step.
func (*StepState) SetPreviousStep ¶
SetPreviousStep sets the previous step for the step.
func (*StepState) SetStartTime ¶
SetStartTime sets the start time for the step.
func (*StepState) SetStatus ¶
func (s *StepState) SetStatus(status StepStatus) *StepState
SetStatus sets the status for the step.
type StepStatus ¶
type StepStatus string
const ( StepStatusPending StepStatus = "pending" StepStatusProcessing StepStatus = "processing" StepStatusCompleted StepStatus = "completed" StepStatusFailed StepStatus = "failed" StepStatusSuspended StepStatus = "suspended" StepStatusSkipped StepStatus = "skipped" )
type Workflow ¶
type Workflow struct { // Name is the name of the workflow. Name string // State is the state of the workflow. State WorkflowState // Stages is the list of stages for the workflow. Stages []*Stage // BeforeFn is the before start function for the workflow. BeforeFn func(context.Context, *Workflow) error // AfterFn is the after complete function for the workflow. AfterFn func(context.Context, *Workflow) error // OnFailureFn is the on failure function for the workflow. OnFailureFn func(context.Context, *Workflow, error) error // BeforeAllStagesFn is the before all steps function for the workflow. BeforeAllStepsFn func(context.Context, *Workflow, *Stage, *Step) error // AfterAllStagesFn is the after all steps function for the workflow. AfterAllStepsFn func(context.Context, *Workflow, *Stage, *Step) error // Debug allows to enable debug mode. Debug bool // contains filtered or unexported fields }
Workflow represents the entire workflow composed of multiple stages.
func New ¶
func New(opts ...WorkflowOption) *Workflow
func (*Workflow) CurrentStage ¶
CurrentStage returns the current stage for the workflow.
func (*Workflow) CurrentStep ¶
CurrentStep returns the current step for the workflow.
func (*Workflow) Logger ¶
func (w *Workflow) Logger() logger
Logger returns the logger for the workflow.
func (*Workflow) SetAfterFn ¶
SetAfterFn sets the after complete function for the workflow.
func (*Workflow) SetBeforeAllStepsFn ¶
func (w *Workflow) SetBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) *Workflow
SetBeforeAllStepsFn sets the before start function for the workflow.
func (*Workflow) SetBeforeFn ¶
SetBeforeFn sets the before start function for the workflow.
func (*Workflow) SetJSONSnapshot ¶
SetSnapshot
func (*Workflow) SetOnFailureFn ¶
SetOnFailureFn sets the on failure function for the workflow.
func (*Workflow) SetState ¶
func (w *Workflow) SetState(state WorkflowState) *Workflow
SetState sets the state for the workflow.
type WorkflowOption ¶
type WorkflowOption func(*Workflow) //nolint:revive
WorkflowOption defines the function signature for a workflow option.
func WithAfterAllStepsFn ¶
WithAfterAllStepsFn sets the after all steps function for the workflow.
func WithAfterFn ¶
func WithAfterFn(fn func(context.Context, *Workflow) error) WorkflowOption
WithAfterFn sets the after complete function for the workflow.
func WithBeforeAllStepsFn ¶
WithBeforeAllStepsFn sets the before all steps function for the workflow.
func WithBeforeFn ¶
func WithBeforeFn(fn func(context.Context, *Workflow) error) WorkflowOption
WithBeforeFn sets the before start function for the workflow.
func WithDebug ¶
func WithDebug(d bool) WorkflowOption
WithDebug sets the debug flag for the workflow.
func WithLogger ¶
func WithLogger(l logger) WorkflowOption
WithLogger sets the logger for the workflow.
func WithOnFailureFn ¶
WithOnFailureFn sets the on failure function for the workflow.
func WithState ¶
func WithState(s WorkflowState) WorkflowOption
WithState sets the state for the workflow.
type WorkflowState ¶
type WorkflowState struct { // IsSuspended is the flag to indicate if the workflow is suspended (in pause). IsSuspended bool `json:"is_suspended"` // IsCompleted is the flag to indicate if the workflow is fully completed. IsCompleted bool `json:"is_completed"` // IsFailed is the flag to indicate if the workflow is failed. IsFailed bool `json:"is_failed"` // CustomError information about fail CustomError json.RawMessage `json:"custom_error,omitempty"` // Error formatted error msg Error string `json:"error,omitempty"` // NextStage is the next stage for the workflow. NextStage string `json:"next_stage,omitempty"` // NextStep is the next step for the workflow. NextStep string `json:"next_step,omitempty"` }
WorkflowState represents the state of the workflow.
func (*WorkflowState) GetCustomError ¶
func (w *WorkflowState) GetCustomError() json.RawMessage
func (*WorkflowState) SetCompleted ¶
func (w *WorkflowState) SetCompleted(value bool) *WorkflowState
SetCompleted sets the completed flag for the workflow.
func (*WorkflowState) SetCustomError ¶
func (w *WorkflowState) SetCustomError(value []byte) *WorkflowState
func (*WorkflowState) SetError ¶
func (w *WorkflowState) SetError(err error) *WorkflowState
SetError sets the error for the workflow.
func (*WorkflowState) SetErrorMsg ¶
func (w *WorkflowState) SetErrorMsg(msg string) *WorkflowState
func (*WorkflowState) SetFailed ¶
func (w *WorkflowState) SetFailed(value bool) *WorkflowState
SetFailed sets the failed flag for the workflow.
func (*WorkflowState) SetNextStage ¶
func (w *WorkflowState) SetNextStage(stage string) *WorkflowState
SetNextStage sets the next stage for the workflow.
func (*WorkflowState) SetNextStep ¶
func (w *WorkflowState) SetNextStep(step string) *WorkflowState
SetNextStep sets the next step for the workflow.
func (*WorkflowState) SetSuspended ¶
func (w *WorkflowState) SetSuspended(value bool) *WorkflowState
SetSuspended sets the suspended flag for the workflow.