workflow

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkflowNameEmpty = errors.New("workflow name is empty")
	ErrNoWorkflowSteps   = errors.New("workflow has no steps")
	ErrStepIDEmpty       = errors.New("step ID is empty")
)

Functions

func BuildTools added in v0.7.0

func BuildTools(engine *Engine, stateDir string, defaultDeliverTo []string) []*agent.Tool

BuildTools creates tools for executing and managing workflows.

func RenderPrompt

func RenderPrompt(tmpl string, results map[string]string) (string, error)

RenderPrompt substitutes {{stepID.result}} placeholders in a prompt template with actual results from previous steps. It returns an error if a referenced step has no result available.

func Validate

func Validate(w *Workflow) error

Validate checks that a Workflow is well-formed.

Types

type AgentRunner

type AgentRunner interface {
	Run(ctx context.Context, sessionKey string, prompt string) (string, error)
}

AgentRunner executes agent prompts (avoids import cycles with orchestration).

type ChannelSender

type ChannelSender interface {
	SendMessage(ctx context.Context, channel string, message string) error
}

ChannelSender sends results to communication channels.

type DAG

type DAG struct {
	// contains filtered or unexported fields
}

DAG represents a directed acyclic graph of workflow steps.

func NewDAG

func NewDAG(steps []Step) (*DAG, error)

NewDAG builds a DAG from a slice of workflow steps. It returns an error if a circular dependency is detected.

func (*DAG) Ready

func (d *DAG) Ready(completed map[string]bool) []string

Ready returns step IDs whose dependencies are all in the completed set.

func (*DAG) Roots

func (d *DAG) Roots() []string

Roots returns step IDs that have no dependencies.

func (*DAG) TopologicalSort

func (d *DAG) TopologicalSort() ([][]string, error)

TopologicalSort returns layers of step IDs that can be executed in parallel. Layer 0 contains steps with no dependencies, layer 1 contains steps whose dependencies are all in layer 0, and so on.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates DAG-based workflow execution.

func NewEngine

func NewEngine(
	runner AgentRunner,
	state RunStore,
	sender ChannelSender,
	maxConcurrent int,
	defaultTimeout time.Duration,
	logger *zap.SugaredLogger,
) *Engine

NewEngine creates a new workflow execution engine.

func (*Engine) Cancel

func (e *Engine) Cancel(runID string) error

Cancel requests cancellation of a running workflow.

func (*Engine) ListRuns

func (e *Engine) ListRuns(ctx context.Context, limit int) ([]RunStatus, error)

ListRuns returns the most recent workflow runs.

func (*Engine) Resume

func (e *Engine) Resume(ctx context.Context, runID string) (*RunResult, error)

Resume re-executes a workflow from where it left off.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, w *Workflow) (*RunResult, error)

Run executes a workflow from start to finish synchronously. The context is detached from the parent to prevent cancellation when the originating request completes.

func (*Engine) RunAsync

func (e *Engine) RunAsync(ctx context.Context, w *Workflow) (string, error)

RunAsync validates, creates the run record and step records, then executes the DAG in a background goroutine. It returns the runID immediately so the caller can poll via Status().

func (*Engine) Shutdown

func (e *Engine) Shutdown(ctx context.Context) error

Shutdown cancels all running workflows and waits for goroutines to finish.

func (*Engine) Status

func (e *Engine) Status(ctx context.Context, runID string) (*RunStatus, error)

Status returns the current status of a workflow execution.

type RunResult

type RunResult struct {
	RunID        string
	WorkflowName string
	Status       string
	StepResults  map[string]string // stepID -> result
	Error        string
	StartedAt    time.Time
	CompletedAt  time.Time
}

RunResult holds the final result of a workflow execution.

type RunStatus

type RunStatus struct {
	RunID          string
	WorkflowName   string
	Status         string
	TotalSteps     int
	CompletedSteps int
	StartedAt      time.Time
	StepStatuses   []StepStatus
}

RunStatus holds the current status of a workflow execution.

type RunStore added in v0.7.0

type RunStore interface {
	CreateRun(ctx context.Context, w *Workflow) (string, error)
	UpdateRunStatus(ctx context.Context, runID string, status string) error
	CompleteRun(ctx context.Context, runID string, status string, errMsg string) error
	CreateStepRun(ctx context.Context, runID string, step Step, renderedPrompt string) error
	UpdateStepStatus(ctx context.Context, runID string, stepID string, status string, result string, errMsg string) error
	GetRunStatus(ctx context.Context, runID string) (*RunStatus, error)
	GetStepResults(ctx context.Context, runID string) (map[string]string, error)
	ListRuns(ctx context.Context, limit int) ([]RunStatus, error)
}

RunStore is the persistence contract used by the workflow engine.

type StateStore

type StateStore struct {
	// contains filtered or unexported fields
}

StateStore persists workflow execution state for resume capability.

func NewStateStore

func NewStateStore(client *ent.Client, logger *zap.SugaredLogger) *StateStore

NewStateStore creates a new StateStore backed by the given ent client.

func (*StateStore) CompleteRun

func (s *StateStore) CompleteRun(ctx context.Context, runID string, status string, errMsg string) error

CompleteRun marks a workflow run as finished with a final status and optional error message.

func (*StateStore) CreateRun

func (s *StateStore) CreateRun(ctx context.Context, w *Workflow) (string, error)

CreateRun creates a new workflow run record and returns its ID.

func (*StateStore) CreateRunWithID added in v0.7.0

func (s *StateStore) CreateRunWithID(ctx context.Context, runID string, w *Workflow) error

CreateRunWithID creates a new workflow run using a caller-provided canonical ID.

func (*StateStore) CreateStepRun

func (s *StateStore) CreateStepRun(ctx context.Context, runID string, step Step, renderedPrompt string) error

CreateStepRun creates a new step run record for a workflow run.

func (*StateStore) GetRunStatus

func (s *StateStore) GetRunStatus(ctx context.Context, runID string) (*RunStatus, error)

GetRunStatus returns the current status of a workflow run including all step statuses.

func (*StateStore) GetStepResults

func (s *StateStore) GetStepResults(ctx context.Context, runID string) (map[string]string, error)

GetStepResults returns a map of stepID -> result for all completed steps.

func (*StateStore) ListRuns

func (s *StateStore) ListRuns(ctx context.Context, limit int) ([]RunStatus, error)

ListRuns returns the most recent workflow runs, ordered by start time descending.

func (*StateStore) UpdateRunStatus

func (s *StateStore) UpdateRunStatus(ctx context.Context, runID string, status string) error

UpdateRunStatus updates the status of a workflow run.

func (*StateStore) UpdateStepStatus

func (s *StateStore) UpdateStepStatus(ctx context.Context, runID string, stepID string, status string, result string, errMsg string) error

UpdateStepStatus updates the status, result, and error message of a step run.

type Step

type Step struct {
	ID        string        `yaml:"id"`
	Agent     string        `yaml:"agent"`  // executor | researcher | planner | memory-manager
	Prompt    string        `yaml:"prompt"` // Go template with {{step-id.result}}
	DependsOn []string      `yaml:"depends_on"`
	DeliverTo []string      `yaml:"deliver_to"` // per-step delivery
	Timeout   time.Duration `yaml:"timeout"`
}

Step represents a single unit of work in a workflow.

type StepStatus

type StepStatus struct {
	StepID string
	Agent  string
	Status string
	Error  string
}

StepStatus holds the current status of a single step.

type Workflow

type Workflow struct {
	Name        string   `yaml:"name"`
	Description string   `yaml:"description"`
	Schedule    string   `yaml:"schedule"`   // optional cron expression
	DeliverTo   []string `yaml:"deliver_to"` // optional result delivery targets
	Steps       []Step   `yaml:"steps"`
}

Workflow represents a declared multi-step workflow.

func Parse

func Parse(data []byte) (*Workflow, error)

Parse parses YAML data into a Workflow.

func ParseFile

func ParseFile(path string) (*Workflow, error)

ParseFile reads a YAML file and parses it into a Workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL