workflows

package
v0.26.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStepConditionFailed indicates a step's condition check failed.
	ErrStepConditionFailed = errors.New(errors.InvalidWorkflowState, "step condition check failed")

	// ErrStepNotFound indicates a referenced step doesn't exist in workflow.
	ErrStepNotFound = errors.New(errors.ResourceNotFound, "step not found in workflow")

	// ErrInvalidInput indicates missing or invalid input parameters.
	ErrInvalidInput = errors.New(errors.InvalidInput, "invalid input parameters")

	// ErrDuplicateStepID indicates attempt to add step with existing ID.
	ErrDuplicateStepID = errors.New(errors.ValidationFailed, "duplicate step ID")

	// ErrCyclicDependency indicates circular dependencies between steps.
	ErrCyclicDependency = errors.New(errors.WorkflowExecutionFailed, "cyclic dependency detected in workflow")
)

Functions

func WrapWorkflowError added in v0.13.0

func WrapWorkflowError(err error, fields map[string]interface{}) error

Types

type BaseWorkflow

type BaseWorkflow struct {
	// contains filtered or unexported fields
}

BaseWorkflow provides common workflow functionality.

func NewBaseWorkflow

func NewBaseWorkflow(memory agents.Memory) *BaseWorkflow

func (*BaseWorkflow) AddStep

func (w *BaseWorkflow) AddStep(step *Step) error

func (*BaseWorkflow) GetSteps

func (w *BaseWorkflow) GetSteps() []*Step

func (*BaseWorkflow) ValidateWorkflow

func (w *BaseWorkflow) ValidateWorkflow() error

ValidateWorkflow checks if the workflow structure is valid.

type ChainWorkflow

type ChainWorkflow struct {
	*BaseWorkflow
}

ChainWorkflow executes steps in a linear sequence, where each step's output can be used as input for subsequent steps.

func NewChainWorkflow

func NewChainWorkflow(memory agents.Memory) *ChainWorkflow

func (*ChainWorkflow) Execute

func (w *ChainWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs steps sequentially, passing state from one step to the next.

type ParallelWorkflow

type ParallelWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

ParallelWorkflow executes multiple steps concurrently.

func NewParallelWorkflow

func NewParallelWorkflow(memory agents.Memory, maxConcurrent int) *ParallelWorkflow

func (*ParallelWorkflow) Execute

func (w *ParallelWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// BackoffMultiplier determines how long to wait between retries
	BackoffMultiplier float64
}

RetryConfig defines how to handle step failures.

type RouterWorkflow

type RouterWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

RouterWorkflow directs inputs to different processing paths based on a classification step.

func NewRouterWorkflow

func NewRouterWorkflow(memory agents.Memory, classifierStep *Step) *RouterWorkflow

func (*RouterWorkflow) AddRoute

func (w *RouterWorkflow) AddRoute(classification string, steps []*Step) error

AddRoute associates a classification value with a sequence of steps.

func (*RouterWorkflow) Execute

func (w *RouterWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

type Step

type Step struct {
	// ID uniquely identifies this step within the workflow
	ID string

	// Module is the underlying DSPy module that performs the actual computation
	// This could be a Predict, ChainOfThought, or any other DSPy module type
	Module core.Module

	// NextSteps contains the IDs of steps that should execute after this one
	// This allows us to define branching and conditional execution paths
	NextSteps []string

	// Condition is an optional function that determines if this step should execute
	// It can examine the current workflow state to make this decision
	Condition func(state map[string]interface{}) bool

	// RetryConfig specifies how to handle failures of this step
	RetryConfig *RetryConfig
}

Step represents a single unit of computation in a workflow. Each step wraps a DSPy module and adds workflow-specific metadata and control logic.

func (*Step) Execute

func (s *Step) Execute(ctx context.Context, inputs map[string]interface{}) (*StepResult, error)

Execute runs the step's DSPy module with the provided inputs.

type StepResult

type StepResult struct {
	// StepID identifies which step produced this result
	StepID string

	// Outputs contains the data produced by this step
	Outputs map[string]interface{}

	// Metadata contains additional information about the execution
	Metadata map[string]interface{}

	// NextSteps indicates which steps should run next (may be modified by step execution)
	NextSteps []string
}

StepResult holds the outputs and metadata from executing a step.

type Workflow

type Workflow interface {
	// Execute runs the workflow with the provided inputs
	Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

	// GetSteps returns all steps in this workflow
	GetSteps() []*Step

	// AddStep adds a new step to the workflow
	AddStep(step *Step) error
}

Workflow represents a sequence of steps that accomplish a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL