Documentation
¶
Overview ¶
Package workflow provides the workflow definition loader and runtime.
Index ¶
- func DetermineRuntimeType(t *types.Task) string
- func LoadFile(path string) (*types.WorkflowMetadata, error)
- func ParseYAML(data []byte) (*types.WorkflowMetadata, error)
- func ValidateDAG(tasks []types.WorkflowTask) error
- func WorkflowTaskToTask(wt types.WorkflowTask) (*types.Task, error)
- type ActionInfo
- type CheckpointData
- type Runner
- type WorkflowRunStore
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DetermineRuntimeType ¶
func ValidateDAG ¶
func ValidateDAG(tasks []types.WorkflowTask) error
func WorkflowTaskToTask ¶
func WorkflowTaskToTask(wt types.WorkflowTask) (*types.Task, error)
Types ¶
type ActionInfo ¶
type ActionInfo struct {
Type string
Details string
IsCapability bool
CapType string
Operation string
}
func ParseAction ¶
func ParseAction(action string) ActionInfo
type CheckpointData ¶
type CheckpointData struct {
StepIndex int `json:"step_index"`
CompletedTasks map[string]bool `json:"completed_tasks"`
StepResults map[string]string `json:"step_results"`
Input types.KV `json:"input"`
HeartbeatAt time.Time `json:"heartbeat_at"`
}
CheckpointData is the intermediate state saved at each workflow step boundary.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
func NewRunner ¶
func NewRunner() *Runner
NewRunner creates a Runner without persistence. Use NewRunnerWithStore to enable run records.
func NewRunnerWithStore ¶
func NewRunnerWithStore(store WorkflowRunStore, auditor audit.Auditor, wc *metrics.WorkflowCollector, workflowFile, triggerType string) *Runner
NewRunnerWithStore creates a Runner that persists run and step records to the given store. workflowFile and triggerType are recorded in the run for audit and potential resume.
func (*Runner) Close ¶
Close releases all executor engine resources (Docker clients, SSH connections, capability runtimes).
func (*Runner) ResumeWorkflow ¶
ResumeWorkflow resumes a previously failed or incomplete workflow run from its checkpoint.
type WorkflowRunStore ¶
type WorkflowRunStore interface {
CreateRun(ctx context.Context, workflowName, workflowFile, triggerType string, triggerInfo, inputParams map[string]any) (*gen.WorkflowRun, error)
UpdateRunStatus(ctx context.Context, runID int64, status int, errMsg string) error
CreateStepRun(ctx context.Context, runID int64, stepID, stepName, action, actionType string, params map[string]any, attempt int) (*gen.WorkflowStepRun, error)
UpdateStepRun(ctx context.Context, stepRunID int64, status int, result map[string]any, errMsg string, attempt int) error
SaveCheckpoint(ctx context.Context, runID int64, data any) error
GetIncompleteRuns(ctx context.Context) ([]*gen.WorkflowRun, error)
GetCheckpoint(ctx context.Context, runID int64, target any) error
GetRun(ctx context.Context, runID int64) (*gen.WorkflowRun, error)
UpdateRunHeartbeat(ctx context.Context, runID int64) error
}
WorkflowRunStore persists workflow runs, step runs, and checkpoint data.