workflow

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkflowNameEmpty = errors.New("workflow name is empty")
	ErrNoWorkflowSteps   = errors.New("workflow has no steps")
	ErrStepIDEmpty       = errors.New("step ID is empty")
)

Functions

func RenderPrompt

func RenderPrompt(tmpl string, results map[string]string) (string, error)

RenderPrompt substitutes {{stepID.result}} placeholders in a prompt template with actual results from previous steps. It returns an error if a referenced step has no result available.

func Validate

func Validate(w *Workflow) error

Validate checks that a Workflow is well-formed.

Types

type AgentRunner

type AgentRunner interface {
	Run(ctx context.Context, sessionKey string, prompt string) (string, error)
}

AgentRunner executes agent prompts (avoids import cycles with orchestration).

type ChannelSender

type ChannelSender interface {
	SendMessage(ctx context.Context, channel string, message string) error
}

ChannelSender sends results to communication channels.

type DAG

type DAG struct {
	// contains filtered or unexported fields
}

DAG represents a directed acyclic graph of workflow steps.

func NewDAG

func NewDAG(steps []Step) (*DAG, error)

NewDAG builds a DAG from a slice of workflow steps. It returns an error if a circular dependency is detected.

func (*DAG) Ready

func (d *DAG) Ready(completed map[string]bool) []string

Ready returns step IDs whose dependencies are all in the completed set.

func (*DAG) Roots

func (d *DAG) Roots() []string

Roots returns step IDs that have no dependencies.

func (*DAG) TopologicalSort

func (d *DAG) TopologicalSort() ([][]string, error)

TopologicalSort returns layers of step IDs that can be executed in parallel. Layer 0 contains steps with no dependencies, layer 1 contains steps whose dependencies are all in layer 0, and so on.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates DAG-based workflow execution.

func NewEngine

func NewEngine(
	runner AgentRunner,
	state *StateStore,
	sender ChannelSender,
	maxConcurrent int,
	defaultTimeout time.Duration,
	logger *zap.SugaredLogger,
) *Engine

NewEngine creates a new workflow execution engine.

func (*Engine) Cancel

func (e *Engine) Cancel(runID string) error

Cancel requests cancellation of a running workflow.

func (*Engine) ListRuns

func (e *Engine) ListRuns(ctx context.Context, limit int) ([]RunStatus, error)

ListRuns returns the most recent workflow runs.

func (*Engine) Resume

func (e *Engine) Resume(ctx context.Context, runID string) (*RunResult, error)

Resume re-executes a workflow from where it left off.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, w *Workflow) (*RunResult, error)

Run executes a workflow from start to finish synchronously. The context is detached from the parent to prevent cancellation when the originating request completes.

func (*Engine) RunAsync

func (e *Engine) RunAsync(ctx context.Context, w *Workflow) (string, error)

RunAsync validates, creates the run record and step records, then executes the DAG in a background goroutine. It returns the runID immediately so the caller can poll via Status().

func (*Engine) Shutdown

func (e *Engine) Shutdown()

Shutdown cancels all running workflows.

func (*Engine) Status

func (e *Engine) Status(ctx context.Context, runID string) (*RunStatus, error)

Status returns the current status of a workflow execution.

type RunResult

type RunResult struct {
	RunID        string
	WorkflowName string
	Status       string
	StepResults  map[string]string // stepID -> result
	Error        string
	StartedAt    time.Time
	CompletedAt  time.Time
}

RunResult holds the final result of a workflow execution.

type RunStatus

type RunStatus struct {
	RunID          string
	WorkflowName   string
	Status         string
	TotalSteps     int
	CompletedSteps int
	StartedAt      time.Time
	StepStatuses   []StepStatus
}

RunStatus holds the current status of a workflow execution.

type StateStore

type StateStore struct {
	// contains filtered or unexported fields
}

StateStore persists workflow execution state for resume capability.

func NewStateStore

func NewStateStore(client *ent.Client, logger *zap.SugaredLogger) *StateStore

NewStateStore creates a new StateStore backed by the given ent client.

func (*StateStore) CompleteRun

func (s *StateStore) CompleteRun(ctx context.Context, runID string, status string, errMsg string) error

CompleteRun marks a workflow run as finished with a final status and optional error message.

func (*StateStore) CreateRun

func (s *StateStore) CreateRun(ctx context.Context, w *Workflow) (string, error)

CreateRun creates a new workflow run record and returns its ID.

func (*StateStore) CreateStepRun

func (s *StateStore) CreateStepRun(ctx context.Context, runID string, step Step, renderedPrompt string) error

CreateStepRun creates a new step run record for a workflow run.

func (*StateStore) GetRunStatus

func (s *StateStore) GetRunStatus(ctx context.Context, runID string) (*RunStatus, error)

GetRunStatus returns the current status of a workflow run including all step statuses.

func (*StateStore) GetStepResults

func (s *StateStore) GetStepResults(ctx context.Context, runID string) (map[string]string, error)

GetStepResults returns a map of stepID -> result for all completed steps.

func (*StateStore) ListRuns

func (s *StateStore) ListRuns(ctx context.Context, limit int) ([]RunStatus, error)

ListRuns returns the most recent workflow runs, ordered by start time descending.

func (*StateStore) UpdateRunStatus

func (s *StateStore) UpdateRunStatus(ctx context.Context, runID string, status string) error

UpdateRunStatus updates the status of a workflow run.

func (*StateStore) UpdateStepStatus

func (s *StateStore) UpdateStepStatus(ctx context.Context, runID string, stepID string, status string, result string, errMsg string) error

UpdateStepStatus updates the status, result, and error message of a step run.

type Step

type Step struct {
	ID        string        `yaml:"id"`
	Agent     string        `yaml:"agent"`  // executor | researcher | planner | memory-manager
	Prompt    string        `yaml:"prompt"` // Go template with {{step-id.result}}
	DependsOn []string      `yaml:"depends_on"`
	DeliverTo []string      `yaml:"deliver_to"` // per-step delivery
	Timeout   time.Duration `yaml:"timeout"`
}

Step represents a single unit of work in a workflow.

type StepStatus

type StepStatus struct {
	StepID string
	Agent  string
	Status string
	Error  string
}

StepStatus holds the current status of a single step.

type Workflow

type Workflow struct {
	Name        string   `yaml:"name"`
	Description string   `yaml:"description"`
	Schedule    string   `yaml:"schedule"`   // optional cron expression
	DeliverTo   []string `yaml:"deliver_to"` // optional result delivery targets
	Steps       []Step   `yaml:"steps"`
}

Workflow represents a declared multi-step workflow.

func Parse

func Parse(data []byte) (*Workflow, error)

Parse parses YAML data into a Workflow.

func ParseFile

func ParseFile(path string) (*Workflow, error)

ParseFile reads a YAML file and parses it into a Workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL