workflow

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package workflow implements workflow DAG validation, topological ordering, and the orchestrator that advances workflow state on task completion.

Index

Constants

View Source
const DefaultMaxConditionIterations = 100

DefaultMaxConditionIterations is the system-wide limit for condition node re-evaluation.

Variables

This section is empty.

Functions

func AllTasksCompleted

func AllTasksCompleted(instance *types.WorkflowInstance) bool

AllTasksCompleted returns true if every task in the workflow has reached a terminal state. Tasks with AllowFailure=true count as "done" even if they failed.

func AnyTaskFailed

func AnyTaskFailed(instance *types.WorkflowInstance) bool

AnyTaskFailed returns true if any task in the workflow has failed (including AllowFailure tasks).

func AnyTaskFatallyFailed added in v0.1.10

func AnyTaskFatallyFailed(instance *types.WorkflowInstance) bool

AnyTaskFatallyFailed returns true if any task has failed and is NOT marked AllowFailure. Used to detect stuck workflows that can never complete.

func DecrementPendingDeps added in v0.1.10

func DecrementPendingDeps(instance *types.WorkflowInstance, completedTask string)

DecrementPendingDeps decrements the pending dep count for all successors of the completed task. For condition nodes, only the selected route's target is decremented (not all successors).

func InitPendingDeps added in v0.1.10

func InitPendingDeps(instance *types.WorkflowInstance)

InitPendingDeps initializes the PendingDeps map for a workflow instance. Call this once at workflow creation to set up reference counting.

func MaxIterationsForTask added in v0.1.10

func MaxIterationsForTask(task *types.WorkflowTask) int

MaxIterationsForTask returns the max iterations for a condition task, using the task's configured value or the system default.

func ReadyTasks

func ReadyTasks(instance *types.WorkflowInstance) []string

ReadyTasks returns the names of tasks in a workflow instance that are ready to be dispatched. Uses reference counting (PendingDeps) when available for O(1) per-task checking, falling back to full scan for backwards compatibility. Results are sorted by priority (highest first).

func RootTasks

func RootTasks(def *types.WorkflowDefinition) []string

RootTasks returns the names of tasks with no dependencies.

func TopologicalOrder

func TopologicalOrder(def *types.WorkflowDefinition) ([]string, error)

TopologicalOrder returns the tasks in a valid execution order.

func ValidateDAG

func ValidateDAG(def *types.WorkflowDefinition) error

ValidateDAG checks that a workflow definition forms a valid graph:

  • All task names are unique
  • All DependsOn references exist
  • No circular dependencies in purely static tasks (Kahn's algorithm)
  • Condition nodes may create back-edges (cycles are allowed through condition nodes and are bounded by MaxIterations at runtime)
  • ConditionRoutes targets exist as task names

Types

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator runs on the leader and advances workflow state when individual tasks complete or fail.

func NewOrchestrator

func NewOrchestrator(cfg OrchestratorConfig) *Orchestrator

NewOrchestrator creates a new workflow orchestrator.

func (*Orchestrator) Start

func (o *Orchestrator) Start(ctx context.Context)

Start begins listening for job completion/failure events.

func (*Orchestrator) Stop

func (o *Orchestrator) Stop()

Stop halts the orchestrator.

type OrchestratorConfig

type OrchestratorConfig struct {
	Store      *store.RedisStore
	Dispatcher *dispatcher.Dispatcher
	Logger     gochainedlog.Logger
}

OrchestratorConfig holds the configuration for the workflow orchestrator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL