Documentation
¶
Index ¶
- Variables
- func GetArg[T any](sh Snapshot, stepName, key string) (T, error)
- func NoConsoleError(err error) error
- func RunWithBackoff(ctx context.Context, w *Workflow, stage *Stage, step *Step) error
- func RunWithLinear(ctx context.Context, w *Workflow, stage *Stage, step *Step) error
- type FailData
- type RetryPolicyFn
- type Snapshot
- type Stage
- type StageOption
- type Step
- func (s *Step) SetAfterFn(value func(context.Context, *Step) error) *Step
- func (s *Step) SetBeforeFn(value func(context.Context, *Step) error) *Step
- func (s *Step) SetFunc(value StepFunc) *Step
- func (s *Step) SetMaxRetries(value int) *Step
- func (s *Step) SetRetryPolicy(value RetryPolicyFn) *Step
- func (s *Step) SetStepFinishWorkflow(value bool) *Step
- func (s *Step) SetTimeout(value time.Duration) *Step
- type StepFunc
- type StepOption
- func WithStepAfterFn(fn func(context.Context, *Step) error) StepOption
- func WithStepArgs(args any) StepOption
- func WithStepBeforeFn(fn func(context.Context, *Step) error) StepOption
- func WithStepKind(k string) StepOption
- func WithStepMaxRetries(r int) StepOption
- func WithStepRetryPolicy(fn RetryPolicyFn) StepOption
- func WithStepTimeout(d time.Duration) StepOption
- type StepState
- func (s *StepState) SetArg(key string, value any) (*StepState, error)
- func (s *StepState) SetArgs(args map[string]any) (*StepState, error)
- func (s *StepState) SetCurrentStage(stage string) *StepState
- func (s *StepState) SetCurrentStep(step string) *StepState
- func (s *StepState) SetEndTime(t time.Time) *StepState
- func (s *StepState) SetError(err error) *StepState
- func (s *StepState) SetNextStage(stage string) *StepState
- func (s *StepState) SetNextStep(step string) *StepState
- func (s *StepState) SetPreviousStage(stage string) *StepState
- func (s *StepState) SetPreviousStep(step string) *StepState
- func (s *StepState) SetStartTime(t time.Time) *StepState
- func (s *StepState) SetStatus(status StepStatus) *StepState
- type StepStatus
- type Workflow
- func (w *Workflow) CurrentStage() *Stage
- func (w *Workflow) CurrentStep() *Step
- func (w *Workflow) Debugf(format string, args ...any)
- func (w *Workflow) Errorf(format string, args ...any)
- func (w *Workflow) GetJSONSnapshot() string
- func (w *Workflow) GetSnapshot() Snapshot
- func (w *Workflow) Infof(format string, args ...any)
- func (w *Workflow) Logger() logger
- func (w *Workflow) Run(ctx context.Context) (err error)
- func (w *Workflow) SetAfterFn(fn func(context.Context, *Workflow) error) *Workflow
- func (w *Workflow) SetBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) *Workflow
- func (w *Workflow) SetBeforeFn(fn func(context.Context, *Workflow) error) *Workflow
- func (w *Workflow) SetDebug(debug bool) *Workflow
- func (w *Workflow) SetJSONSnapshot(snapshot string) error
- func (w *Workflow) SetLogger(l logger) *Workflow
- func (w *Workflow) SetName(name string) *Workflow
- func (w *Workflow) SetOnFailureFn(fn func(context.Context, *Workflow, error) error) *Workflow
- func (w *Workflow) SetSkipError(skip bool) *Workflow
- func (w *Workflow) SetSnapshot(snapshot Snapshot) error
- func (w *Workflow) SetStages(stages []*Stage) *Workflow
- func (w *Workflow) SetState(state WorkflowState) *Workflow
- func (w *Workflow) StepStates() []*StepState
- func (w *Workflow) Warnf(format string, args ...any)
- type WorkflowOption
- func WithAfterAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) WorkflowOption
- func WithAfterFn(fn func(context.Context, *Workflow) error) WorkflowOption
- func WithBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) WorkflowOption
- func WithBeforeFn(fn func(context.Context, *Workflow) error) WorkflowOption
- func WithDebug(d bool) WorkflowOption
- func WithLogger(l logger) WorkflowOption
- func WithName(n string) WorkflowOption
- func WithOnFailureFn(fn func(context.Context, *Workflow, error) error) WorkflowOption
- func WithState(s WorkflowState) WorkflowOption
- type WorkflowState
- func (w *WorkflowState) GetCustomError() json.RawMessage
- func (w *WorkflowState) SetCompleted(value bool) *WorkflowState
- func (w *WorkflowState) SetCustomError(value []byte) *WorkflowState
- func (w *WorkflowState) SetError(err error) *WorkflowState
- func (w *WorkflowState) SetErrorMsg(msg string) *WorkflowState
- func (w *WorkflowState) SetFailed(value bool) *WorkflowState
- func (w *WorkflowState) SetNextStage(stage string) *WorkflowState
- func (w *WorkflowState) SetNextStep(step string) *WorkflowState
- func (w *WorkflowState) SetSuspended(value bool) *WorkflowState
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSkipStep is used to skip the current step. ErrSkipStep = errors.New("skip step") // ErrSkipStage is used to skip the current step. ErrSkipStage = errors.New("skip stage") // ErrBreakStages is used to break the root loop. ErrBreakStages = errors.New("break stages") // ErrExitWorkflow is used to exit the workflow. ErrExitWorkflow = errors.New("exit") // ErrNoConsole ErrNoConsole = errors.New("no console") // NotFound ErrNotFound = errors.New("not found") )
Functions ¶
func NoConsoleError ¶
func RunWithBackoff ¶
RunWithBackoff runs a step function with a simple backoff retry mechanism.
Types ¶
type Snapshot ¶
type Snapshot struct {
StepsStates []*StepState `json:"steps_states"`
WorkflowState WorkflowState `json:"workflow_state"`
}
Snapshot represents a snapshot of the workflow.
type Stage ¶
type Stage struct {
// Name is the name of the stage.
Name string
// Steps is the list of steps for the stage.
Steps []*Step
// BeforeFn is the before start function for the stage.
BeforeFn func(context.Context, *Stage) error
// AfterFn is the after complete function for the stage.
AfterFn func(context.Context, *Stage) error
}
Stage represents a unique stage that can contain multiple steps.
func NewStage ¶
func NewStage(name string, opts ...StageOption) *Stage
NewStage returns a new stage.
type StageOption ¶
type StageOption func(*Stage)
StageOption defines the function signature for a stage option.
func WithStageAfterFn ¶
func WithStageAfterFn(fn func(context.Context, *Stage) error) StageOption
WithStageAfterFn sets the after complete function for the stage.
func WithStageBeforeFn ¶
func WithStageBeforeFn(fn func(context.Context, *Stage) error) StageOption
WithStageBeforeFn sets the before start function for the stage.
func WithStageSteps ¶
func WithStageSteps(steps []*Step) StageOption
WithStageSteps sets the steps for the stage.
type Step ¶
type Step struct {
// Name is the name of the step. Name should be unique within a stage.
Name string
// Kind is the kind of the step. Based on the step kind you can implement similar triggers in the workflow.
Kind string
// Func is the function to be executed for the step.
Func StepFunc
// Args is the arguments for the step.
Args any
// Timeout is the duration for the step. By default, it is set to 1 second.
Timeout time.Duration
// MaxRetries is the maximum number of retries for the step. By default, it is set to 1.
MaxRetries int
// RetryPolicy is the retry policy for the step. By default, it is set by timeout.
RetryPolicy RetryPolicyFn
// BeforeFn is the before start function for the step.
BeforeFn func(context.Context, *Step) error
// AfterFn is the after complete function for the step.
AfterFn func(context.Context, *Step) error
// State
State *StepState
// FinishWorkflow is a flag to finish the workflow after the step.
FinishWorkflow bool
}
Step represents a unique step within a stage.
func NewStep ¶
func NewStep(name string, fn StepFunc, opts ...StepOption) *Step
NewStep returns a new step.
func (*Step) SetAfterFn ¶
SetAfterFn sets the after function.
func (*Step) SetBeforeFn ¶
SetBeforeFn sets the before function.
func (*Step) SetMaxRetries ¶
SetMaxRetries sets the max retries.
func (*Step) SetRetryPolicy ¶
func (s *Step) SetRetryPolicy(value RetryPolicyFn) *Step
SetRetryPolicy sets the retry policy.
func (*Step) SetStepFinishWorkflow ¶
SetStepFinishWorkflow sets the finish workflow flag.
type StepOption ¶
type StepOption func(*Step)
StepOption defines the function signature for a step option.
func WithStepAfterFn ¶
func WithStepAfterFn(fn func(context.Context, *Step) error) StepOption
WithStepAfterFn sets the after complete function for the step.
func WithStepArgs ¶
func WithStepArgs(args any) StepOption
WithStepArgs sets the arguments for the step.
func WithStepBeforeFn ¶
func WithStepBeforeFn(fn func(context.Context, *Step) error) StepOption
WithStepBeforeFn sets the before start function for the step.
func WithStepMaxRetries ¶
func WithStepMaxRetries(r int) StepOption
WithStepMaxRetries sets the max retries for the step.
func WithStepRetryPolicy ¶
func WithStepRetryPolicy(fn RetryPolicyFn) StepOption
WithStepRetryPolicy sets the retry policy for the step.
func WithStepTimeout ¶
func WithStepTimeout(d time.Duration) StepOption
WithStepTimeout sets the timeout for the step.
type StepState ¶
type StepState struct {
PreviousStage *string `json:"previous_stage,omitempty"`
PreviousStep *string `json:"previous_step,omitempty"`
CurrentStage string `json:"current_stage"`
CurrentStep string `json:"current_step"`
NextStage *string `json:"next_stage,omitempty"`
NextStep *string `json:"next_step,omitempty"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
Status StepStatus `json:"status"`
Error string `json:"error,omitempty"`
Args map[string]any `json:"args,omitempty"`
// contains filtered or unexported fields
}
func NewStepState ¶
func NewStepState() *StepState
func (*StepState) SetCurrentStage ¶
SetCurrentStage sets the current stage for the step.
func (*StepState) SetCurrentStep ¶
SetCurrentStep sets the current step for the step.
func (*StepState) SetEndTime ¶
SetEndTime sets the end time for the step.
func (*StepState) SetNextStage ¶
SetNextStage sets the next stage for the step.
func (*StepState) SetNextStep ¶
SetNextStep sets the next step for the step.
func (*StepState) SetPreviousStage ¶
SetPreviousStage sets the previous stage for the step.
func (*StepState) SetPreviousStep ¶
SetPreviousStep sets the previous step for the step.
func (*StepState) SetStartTime ¶
SetStartTime sets the start time for the step.
func (*StepState) SetStatus ¶
func (s *StepState) SetStatus(status StepStatus) *StepState
SetStatus sets the status for the step.
type StepStatus ¶
type StepStatus string
const ( StepStatusPending StepStatus = "pending" StepStatusProcessing StepStatus = "processing" StepStatusCompleted StepStatus = "completed" StepStatusFailed StepStatus = "failed" StepStatusSuspended StepStatus = "suspended" StepStatusSkipped StepStatus = "skipped" )
type Workflow ¶
type Workflow struct {
// Name is the name of the workflow.
Name string
// State is the state of the workflow.
State WorkflowState
// Stages is the list of stages for the workflow.
Stages []*Stage
// BeforeFn is the before start function for the workflow.
BeforeFn func(context.Context, *Workflow) error
// AfterFn is the after complete function for the workflow.
AfterFn func(context.Context, *Workflow) error
// OnFailureFn is the on failure function for the workflow.
OnFailureFn func(context.Context, *Workflow, error) error
// BeforeAllStagesFn is the before all steps function for the workflow.
BeforeAllStepsFn func(context.Context, *Workflow, *Stage, *Step) error
// AfterAllStagesFn is the after all steps function for the workflow.
AfterAllStepsFn func(context.Context, *Workflow, *Stage, *Step) error
// Debug allows to enable debug mode.
Debug bool
// contains filtered or unexported fields
}
Workflow represents the entire workflow composed of multiple stages.
func New ¶
func New(opts ...WorkflowOption) *Workflow
func (*Workflow) CurrentStage ¶
CurrentStage returns the current stage for the workflow.
func (*Workflow) CurrentStep ¶
CurrentStep returns the current step for the workflow.
func (*Workflow) Logger ¶
func (w *Workflow) Logger() logger
Logger returns the logger for the workflow.
func (*Workflow) SetAfterFn ¶
SetAfterFn sets the after complete function for the workflow.
func (*Workflow) SetBeforeAllStepsFn ¶
func (w *Workflow) SetBeforeAllStepsFn(fn func(context.Context, *Workflow, *Stage, *Step) error) *Workflow
SetBeforeAllStepsFn sets the before start function for the workflow.
func (*Workflow) SetBeforeFn ¶
SetBeforeFn sets the before start function for the workflow.
func (*Workflow) SetJSONSnapshot ¶
SetSnapshot
func (*Workflow) SetOnFailureFn ¶
SetOnFailureFn sets the on failure function for the workflow.
func (*Workflow) SetState ¶
func (w *Workflow) SetState(state WorkflowState) *Workflow
SetState sets the state for the workflow.
type WorkflowOption ¶
type WorkflowOption func(*Workflow) //nolint:revive
WorkflowOption defines the function signature for a workflow option.
func WithAfterAllStepsFn ¶
WithAfterAllStepsFn sets the after all steps function for the workflow.
func WithAfterFn ¶
func WithAfterFn(fn func(context.Context, *Workflow) error) WorkflowOption
WithAfterFn sets the after complete function for the workflow.
func WithBeforeAllStepsFn ¶
WithBeforeAllStepsFn sets the before all steps function for the workflow.
func WithBeforeFn ¶
func WithBeforeFn(fn func(context.Context, *Workflow) error) WorkflowOption
WithBeforeFn sets the before start function for the workflow.
func WithDebug ¶
func WithDebug(d bool) WorkflowOption
WithDebug sets the debug flag for the workflow.
func WithLogger ¶
func WithLogger(l logger) WorkflowOption
WithLogger sets the logger for the workflow.
func WithOnFailureFn ¶
WithOnFailureFn sets the on failure function for the workflow.
func WithState ¶
func WithState(s WorkflowState) WorkflowOption
WithState sets the state for the workflow.
type WorkflowState ¶
type WorkflowState struct {
// IsSuspended is the flag to indicate if the workflow is suspended (in pause).
IsSuspended bool `json:"is_suspended"`
// IsCompleted is the flag to indicate if the workflow is fully completed.
IsCompleted bool `json:"is_completed"`
// IsFailed is the flag to indicate if the workflow is failed.
IsFailed bool `json:"is_failed"`
// CustomError information about fail
CustomError json.RawMessage `json:"custom_error,omitempty"`
// Error formatted error msg
Error string `json:"error,omitempty"`
// NextStage is the next stage for the workflow.
NextStage string `json:"next_stage,omitempty"`
// NextStep is the next step for the workflow.
NextStep string `json:"next_step,omitempty"`
}
WorkflowState represents the state of the workflow.
func (*WorkflowState) GetCustomError ¶
func (w *WorkflowState) GetCustomError() json.RawMessage
func (*WorkflowState) SetCompleted ¶
func (w *WorkflowState) SetCompleted(value bool) *WorkflowState
SetCompleted sets the completed flag for the workflow.
func (*WorkflowState) SetCustomError ¶
func (w *WorkflowState) SetCustomError(value []byte) *WorkflowState
func (*WorkflowState) SetError ¶
func (w *WorkflowState) SetError(err error) *WorkflowState
SetError sets the error for the workflow.
func (*WorkflowState) SetErrorMsg ¶
func (w *WorkflowState) SetErrorMsg(msg string) *WorkflowState
func (*WorkflowState) SetFailed ¶
func (w *WorkflowState) SetFailed(value bool) *WorkflowState
SetFailed sets the failed flag for the workflow.
func (*WorkflowState) SetNextStage ¶
func (w *WorkflowState) SetNextStage(stage string) *WorkflowState
SetNextStage sets the next stage for the workflow.
func (*WorkflowState) SetNextStep ¶
func (w *WorkflowState) SetNextStep(step string) *WorkflowState
SetNextStep sets the next step for the workflow.
func (*WorkflowState) SetSuspended ¶
func (w *WorkflowState) SetSuspended(value bool) *WorkflowState
SetSuspended sets the suspended flag for the workflow.