Documentation
¶
Overview ¶
Package workflow implements workflow DAG validation, topological ordering, and the orchestrator that advances workflow state on task completion.
Index ¶
- Constants
- func AllTasksCompleted(instance *types.WorkflowInstance) bool
- func AnyTaskFailed(instance *types.WorkflowInstance) bool
- func AnyTaskFatallyFailed(instance *types.WorkflowInstance) bool
- func DecrementPendingDeps(instance *types.WorkflowInstance, completedTask string)
- func InitPendingDeps(instance *types.WorkflowInstance)
- func MaxIterationsForTask(task *types.WorkflowTask) int
- func ReadyTasks(instance *types.WorkflowInstance) []string
- func RootTasks(def *types.WorkflowDefinition) []string
- func TopologicalOrder(def *types.WorkflowDefinition) ([]string, error)
- func ValidateDAG(def *types.WorkflowDefinition) error
- type Orchestrator
- type OrchestratorConfig
Constants ¶
const DefaultMaxConditionIterations = 100
DefaultMaxConditionIterations is the system-wide limit for condition node re-evaluation.
Variables ¶
This section is empty.
Functions ¶
func AllTasksCompleted ¶
func AllTasksCompleted(instance *types.WorkflowInstance) bool
AllTasksCompleted returns true if every task in the workflow has reached a terminal state. Tasks with AllowFailure=true count as "done" even if they failed.
func AnyTaskFailed ¶
func AnyTaskFailed(instance *types.WorkflowInstance) bool
AnyTaskFailed returns true if any task in the workflow has failed (including AllowFailure tasks).
func AnyTaskFatallyFailed ¶ added in v0.1.10
func AnyTaskFatallyFailed(instance *types.WorkflowInstance) bool
AnyTaskFatallyFailed returns true if any task has failed and is NOT marked AllowFailure. Used to detect stuck workflows that can never complete.
func DecrementPendingDeps ¶ added in v0.1.10
func DecrementPendingDeps(instance *types.WorkflowInstance, completedTask string)
DecrementPendingDeps decrements the pending dep count for all successors of the completed task. For condition nodes, only the selected route's target is decremented (not all successors).
func InitPendingDeps ¶ added in v0.1.10
func InitPendingDeps(instance *types.WorkflowInstance)
InitPendingDeps initializes the PendingDeps map for a workflow instance. Call this once at workflow creation to set up reference counting.
func MaxIterationsForTask ¶ added in v0.1.10
func MaxIterationsForTask(task *types.WorkflowTask) int
MaxIterationsForTask returns the max iterations for a condition task, using the task's configured value or the system default.
func ReadyTasks ¶
func ReadyTasks(instance *types.WorkflowInstance) []string
ReadyTasks returns the names of tasks in a workflow instance that are ready to be dispatched. Uses reference counting (PendingDeps) when available for O(1) per-task checking, falling back to full scan for backwards compatibility. Results are sorted by priority (highest first).
func RootTasks ¶
func RootTasks(def *types.WorkflowDefinition) []string
RootTasks returns the names of tasks with no dependencies.
func TopologicalOrder ¶
func TopologicalOrder(def *types.WorkflowDefinition) ([]string, error)
TopologicalOrder returns the tasks in a valid execution order.
func ValidateDAG ¶
func ValidateDAG(def *types.WorkflowDefinition) error
ValidateDAG checks that a workflow definition forms a valid graph:
- All task names are unique
- All DependsOn references exist
- No circular dependencies in purely static tasks (Kahn's algorithm)
- Condition nodes may create back-edges (cycles are allowed through condition nodes and are bounded by MaxIterations at runtime)
- ConditionRoutes targets exist as task names
Types ¶
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator runs on the leader and advances workflow state when individual tasks complete or fail.
func NewOrchestrator ¶
func NewOrchestrator(cfg OrchestratorConfig) *Orchestrator
NewOrchestrator creates a new workflow orchestrator.
func (*Orchestrator) Start ¶
func (o *Orchestrator) Start(ctx context.Context)
Start begins listening for job completion/failure events.
type OrchestratorConfig ¶
type OrchestratorConfig struct {
Store *store.RedisStore
Dispatcher *dispatcher.Dispatcher
Logger gochainedlog.Logger
}
OrchestratorConfig holds the configuration for the workflow orchestrator.