workflow

package
v0.92.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2026 License: GPL-3.0 Imports: 20 Imported by: 0

Documentation

Overview

Package workflow provides the workflow definition loader and runtime.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DetermineRuntimeType

func DetermineRuntimeType(t *types.Task) string

func LoadFile

func LoadFile(path string) (*types.WorkflowMetadata, error)

func ParseYAML

func ParseYAML(data []byte) (*types.WorkflowMetadata, error)

func ValidateDAG

func ValidateDAG(tasks []types.WorkflowTask) error

func WorkflowTaskToTask

func WorkflowTaskToTask(wt types.WorkflowTask) (*types.Task, error)

Types

type ActionInfo

type ActionInfo struct {
	Type         string
	Details      string
	IsCapability bool
	CapType      string
	Operation    string
}

func ParseAction

func ParseAction(action string) ActionInfo

type CheckpointData

type CheckpointData struct {
	StepIndex      int               `json:"step_index"`
	CompletedTasks map[string]bool   `json:"completed_tasks"`
	StepResults    map[string]string `json:"step_results"`
	Input          types.KV          `json:"input"`
	HeartbeatAt    time.Time         `json:"heartbeat_at"`
}

CheckpointData is the intermediate state saved at each workflow step boundary.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner() *Runner

NewRunner creates a Runner without persistence. Use NewRunnerWithStore to enable run records.

func NewRunnerWithStore

func NewRunnerWithStore(store WorkflowRunStore, auditor audit.Auditor, wc *metrics.WorkflowCollector, workflowFile, triggerType string) *Runner

NewRunnerWithStore creates a Runner that persists run and step records to the given store. workflowFile and triggerType are recorded in the run for audit and potential resume.

func (*Runner) Close

func (r *Runner) Close() error

Close releases all executor engine resources (Docker clients, SSH connections, capability runtimes).

func (*Runner) Execute

func (r *Runner) Execute(ctx context.Context, wf types.WorkflowMetadata, input types.KV, file string) error

func (*Runner) ResumeWorkflow

func (r *Runner) ResumeWorkflow(ctx context.Context, runID int64) error

ResumeWorkflow resumes a previously failed or incomplete workflow run from its checkpoint.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context, t *types.Task) error

type WorkflowRunStore

type WorkflowRunStore interface {
	CreateRun(ctx context.Context, workflowName, workflowFile, triggerType string, triggerInfo, inputParams map[string]any) (*gen.WorkflowRun, error)
	UpdateRunStatus(ctx context.Context, runID int64, status int, errMsg string) error
	CreateStepRun(ctx context.Context, runID int64, stepID, stepName, action, actionType string, params map[string]any, attempt int) (*gen.WorkflowStepRun, error)
	UpdateStepRun(ctx context.Context, stepRunID int64, status int, result map[string]any, errMsg string, attempt int) error
	SaveCheckpoint(ctx context.Context, runID int64, data any) error
	GetIncompleteRuns(ctx context.Context) ([]*gen.WorkflowRun, error)
	GetCheckpoint(ctx context.Context, runID int64, target any) error
	GetRun(ctx context.Context, runID int64) (*gen.WorkflowRun, error)
	UpdateRunHeartbeat(ctx context.Context, runID int64) error
}

WorkflowRunStore persists workflow runs, step runs, and checkpoint data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL