Documentation
¶
Overview ¶
Package workflow implements workflow DAG validation, topological ordering, and the orchestrator that advances workflow state on task completion.
Index ¶
- func AllTasksCompleted(instance *types.WorkflowInstance) bool
- func AnyTaskFailed(instance *types.WorkflowInstance) bool
- func ReadyTasks(instance *types.WorkflowInstance) []string
- func RootTasks(def *types.WorkflowDefinition) []string
- func TopologicalOrder(def *types.WorkflowDefinition) ([]string, error)
- func ValidateDAG(def *types.WorkflowDefinition) error
- type Orchestrator
- type OrchestratorConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AllTasksCompleted ¶
func AllTasksCompleted(instance *types.WorkflowInstance) bool
AllTasksCompleted returns true if every task in the workflow has completed.
func AnyTaskFailed ¶
func AnyTaskFailed(instance *types.WorkflowInstance) bool
AnyTaskFailed returns true if any task in the workflow has failed.
func ReadyTasks ¶
func ReadyTasks(instance *types.WorkflowInstance) []string
ReadyTasks returns the names of tasks in a workflow instance that are ready to be dispatched: all their dependencies are completed and the task itself is still pending.
func RootTasks ¶
func RootTasks(def *types.WorkflowDefinition) []string
RootTasks returns the names of tasks with no dependencies.
func TopologicalOrder ¶
func TopologicalOrder(def *types.WorkflowDefinition) ([]string, error)
TopologicalOrder returns the tasks in a valid execution order.
func ValidateDAG ¶
func ValidateDAG(def *types.WorkflowDefinition) error
ValidateDAG checks that a workflow definition forms a valid DAG: - All task names are unique - All DependsOn references exist - No circular dependencies (Kahn's algorithm)
Types ¶
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator runs on the leader and advances workflow state when individual tasks complete or fail.
func NewOrchestrator ¶
func NewOrchestrator(cfg OrchestratorConfig) *Orchestrator
NewOrchestrator creates a new workflow orchestrator.
func (*Orchestrator) Start ¶
func (o *Orchestrator) Start(ctx context.Context)
Start begins listening for job completion/failure events.
type OrchestratorConfig ¶
type OrchestratorConfig struct {
Store *store.RedisStore
Dispatcher *dispatcher.Dispatcher
Logger gochainedlog.Logger
}
OrchestratorConfig holds the configuration for the workflow orchestrator.