workflows

package
v0.32.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStepConditionFailed indicates a step's condition check failed.
	ErrStepConditionFailed = errors.New(errors.InvalidWorkflowState, "step condition check failed")

	// ErrStepNotFound indicates a referenced step doesn't exist in workflow.
	ErrStepNotFound = errors.New(errors.ResourceNotFound, "step not found in workflow")

	// ErrInvalidInput indicates missing or invalid input parameters.
	ErrInvalidInput = errors.New(errors.InvalidInput, "invalid input parameters")

	// ErrDuplicateStepID indicates attempt to add step with existing ID.
	ErrDuplicateStepID = errors.New(errors.ValidationFailed, "duplicate step ID")

	// ErrCyclicDependency indicates circular dependencies between steps.
	ErrCyclicDependency = errors.New(errors.WorkflowExecutionFailed, "cyclic dependency detected in workflow")
)

Functions

func WrapWorkflowError added in v0.13.0

func WrapWorkflowError(err error, fields map[string]interface{}) error

Types

type BaseWorkflow

type BaseWorkflow struct {
	// contains filtered or unexported fields
}

BaseWorkflow provides common workflow functionality.

func NewBaseWorkflow

func NewBaseWorkflow(memory agents.Memory) *BaseWorkflow

func (*BaseWorkflow) AddStep

func (w *BaseWorkflow) AddStep(step *Step) error

func (*BaseWorkflow) GetSteps

func (w *BaseWorkflow) GetSteps() []*Step

func (*BaseWorkflow) ValidateWorkflow

func (w *BaseWorkflow) ValidateWorkflow() error

ValidateWorkflow checks if the workflow structure is valid.

type BuilderConfig added in v0.31.0

type BuilderConfig struct {
	EnableValidation     bool // Whether to perform validation during build
	EnableOptimization   bool // Whether to optimize the workflow graph
	EnableTracing        bool // Whether to add tracing to steps
	MaxConcurrency       int  // Maximum concurrent steps for parallel execution
	DefaultRetryAttempts int  // Default retry attempts for steps
}

BuilderConfig holds configuration options for the workflow builder.

func DefaultBuilderConfig added in v0.31.0

func DefaultBuilderConfig() *BuilderConfig

DefaultBuilderConfig returns sensible defaults for workflow builder configuration.

type BuilderStage added in v0.31.0

type BuilderStage struct {
	ID          string
	Type        StageType
	Module      core.Module
	Steps       []*BuilderStep
	Condition   ConditionalFunc
	Branches    map[string]*BuilderStage // For conditional execution
	Next        []string                 // IDs of next stages
	RetryConfig *RetryConfig
	Metadata    map[string]interface{} // Additional metadata

	// Advanced pattern fields
	LoopCondition    LoopConditionFunc     // For while/until loops
	IteratorFunc     IteratorFunc          // For forEach loops
	TemplateParams   TemplateParameterFunc // For templates
	MaxIterations    int                   // Safety limit for loops
	LoopBody         *WorkflowBuilder      // Nested workflow for loop body
	TemplateWorkflow *WorkflowBuilder      // Template workflow definition
	TimeoutMs        int64                 // Timeout in milliseconds
}

BuilderStage represents a stage in the workflow builder that can contain multiple execution patterns (sequential, parallel, conditional, loops, templates).

type BuilderStep added in v0.31.0

type BuilderStep struct {
	ID          string
	Module      core.Module
	Description string
	Metadata    map[string]interface{}
}

BuilderStep represents a single step within a stage.

func NewStep added in v0.31.0

func NewStep(id string, module core.Module) *BuilderStep

NewStep creates a new BuilderStep for use in parallel stages.

func (*BuilderStep) WithDescription added in v0.31.0

func (bs *BuilderStep) WithDescription(description string) *BuilderStep

WithDescription adds a description to a BuilderStep.

func (*BuilderStep) WithStepMetadata added in v0.31.0

func (bs *BuilderStep) WithStepMetadata(key string, value interface{}) *BuilderStep

WithStepMetadata adds metadata to a BuilderStep.

type ChainWorkflow

type ChainWorkflow struct {
	*BaseWorkflow
}

ChainWorkflow executes steps in a linear sequence, where each step's output can be used as input for subsequent steps.

func NewChainWorkflow

func NewChainWorkflow(memory agents.Memory) *ChainWorkflow

func (*ChainWorkflow) Execute

func (w *ChainWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs steps sequentially, passing state from one step to the next.

type CompositeWorkflow added in v0.32.0

type CompositeWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

CompositeWorkflow handles complex workflow patterns including loops and templates.

func NewCompositeWorkflow added in v0.32.0

func NewCompositeWorkflow(memory agents.Memory) *CompositeWorkflow

NewCompositeWorkflow creates a new composite workflow.

func (*CompositeWorkflow) AddBuilderStage added in v0.32.0

func (cw *CompositeWorkflow) AddBuilderStage(stage *BuilderStage)

AddBuilderStage adds a builder stage to the composite workflow.

func (*CompositeWorkflow) Execute added in v0.32.0

func (cw *CompositeWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs the composite workflow with support for advanced patterns.

type ConditionResult added in v0.31.0

type ConditionResult struct {
	ShouldExecute bool
	BranchName    string
	Error         error
}

ConditionResult represents the result of a conditional evaluation.

type ConditionalBuilder added in v0.31.0

type ConditionalBuilder struct {
	// contains filtered or unexported fields
}

ConditionalBuilder provides a fluent interface for building conditional workflow stages.

func (*ConditionalBuilder) Else added in v0.31.0

Else adds an alternative branch that executes when the condition is false.

func (*ConditionalBuilder) ElseIf added in v0.31.0

func (cb *ConditionalBuilder) ElseIf(condition ConditionalFunc, module core.Module) *ConditionalBuilder

ElseIf adds an additional conditional branch with its own condition.

func (*ConditionalBuilder) End added in v0.31.0

End completes the conditional builder and returns to the main workflow builder.

func (*ConditionalBuilder) If added in v0.31.0

If adds a conditional branch that executes the given module when the condition is true.

type ConditionalFunc added in v0.31.0

type ConditionalFunc func(ctx context.Context, state map[string]interface{}) (bool, error)

ConditionalFunc defines the signature for conditional execution logic.

type ConditionalRouterWorkflow added in v0.32.0

type ConditionalRouterWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

ConditionalRouterWorkflow handles conditional routing patterns.

func NewConditionalRouterWorkflow added in v0.32.0

func NewConditionalRouterWorkflow(memory agents.Memory, classifier core.Module) *ConditionalRouterWorkflow

NewConditionalRouterWorkflow creates a new conditional router workflow.

func (*ConditionalRouterWorkflow) AddRoute added in v0.32.0

func (crw *ConditionalRouterWorkflow) AddRoute(route string, step *Step) error

AddRoute adds a route to the router.

func (*ConditionalRouterWorkflow) Execute added in v0.32.0

func (crw *ConditionalRouterWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs the router workflow.

func (*ConditionalRouterWorkflow) SetDefaultRoute added in v0.32.0

func (crw *ConditionalRouterWorkflow) SetDefaultRoute(step *Step)

SetDefaultRoute sets the default route for unmatched classifications.

type IteratorFunc added in v0.32.0

type IteratorFunc func(ctx context.Context, state map[string]interface{}) ([]interface{}, error)

IteratorFunc defines the signature for forEach iteration logic.

type LoopConditionFunc added in v0.32.0

type LoopConditionFunc func(ctx context.Context, state map[string]interface{}, iteration int) (bool, error)

LoopConditionFunc defines the signature for loop conditions (while/until).

type ParallelWorkflow

type ParallelWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

ParallelWorkflow executes multiple steps concurrently.

func NewParallelWorkflow

func NewParallelWorkflow(memory agents.Memory, maxConcurrent int) *ParallelWorkflow

func (*ParallelWorkflow) Execute

func (w *ParallelWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// BackoffMultiplier determines how long to wait between retries
	BackoffMultiplier float64
}

RetryConfig defines how to handle step failures.

type RouterWorkflow

type RouterWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

RouterWorkflow directs inputs to different processing paths based on a classification step.

func NewRouterWorkflow

func NewRouterWorkflow(memory agents.Memory, classifierStep *Step) *RouterWorkflow

func (*RouterWorkflow) AddRoute

func (w *RouterWorkflow) AddRoute(classification string, steps []*Step) error

AddRoute associates a classification value with a sequence of steps.

func (*RouterWorkflow) Execute

func (w *RouterWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

type StageType added in v0.31.0

type StageType int

StageType defines the execution pattern for a stage.

const (
	StageTypeSequential StageType = iota
	StageTypeParallel
	StageTypeConditional
	StageTypeLoop
	StageTypeForEach
	StageTypeWhile
	StageTypeUntil
	StageTypeTemplate
)

type Step

type Step struct {
	// ID uniquely identifies this step within the workflow
	ID string

	// Module is the underlying DSPy module that performs the actual computation
	// This could be a Predict, ChainOfThought, or any other DSPy module type
	Module core.Module

	// NextSteps contains the IDs of steps that should execute after this one
	// This allows us to define branching and conditional execution paths
	NextSteps []string

	// Condition is an optional function that determines if this step should execute
	// It can examine the current workflow state to make this decision
	Condition func(state map[string]interface{}) bool

	// RetryConfig specifies how to handle failures of this step
	RetryConfig *RetryConfig
}

Step represents a single unit of computation in a workflow. Each step wraps a DSPy module and adds workflow-specific metadata and control logic.

func (*Step) Execute

func (s *Step) Execute(ctx context.Context, inputs map[string]interface{}) (*StepResult, error)

Execute runs the step's DSPy module with the provided inputs.

type StepResult

type StepResult struct {
	// StepID identifies which step produced this result
	StepID string

	// Outputs contains the data produced by this step
	Outputs map[string]interface{}

	// Metadata contains additional information about the execution
	Metadata map[string]interface{}

	// NextSteps indicates which steps should run next (may be modified by step execution)
	NextSteps []string
}

StepResult holds the outputs and metadata from executing a step.

type TemplateParameterFunc added in v0.32.0

type TemplateParameterFunc func(ctx context.Context, params map[string]interface{}) (map[string]interface{}, error)

TemplateParameterFunc defines the signature for template parameter resolution.

type Workflow

type Workflow interface {
	// Execute runs the workflow with the provided inputs
	Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

	// GetSteps returns all steps in this workflow
	GetSteps() []*Step

	// AddStep adds a new step to the workflow
	AddStep(step *Step) error
}

Workflow represents a sequence of steps that accomplish a task.

type WorkflowBuilder added in v0.31.0

type WorkflowBuilder struct {
	// contains filtered or unexported fields
}

WorkflowBuilder provides a fluent API for constructing workflows declaratively. It maintains backward compatibility with the existing workflow system while providing a more intuitive and powerful interface for workflow composition.

func NewBuilder added in v0.31.0

func NewBuilder(memory agents.Memory) *WorkflowBuilder

NewBuilder creates a new WorkflowBuilder instance with the provided memory store. If memory is nil, an in-memory store will be used.

func (*WorkflowBuilder) Build added in v0.31.0

func (wb *WorkflowBuilder) Build() (Workflow, error)

Build constructs the final Workflow from the builder configuration. It performs validation, optimization, and creates the appropriate workflow type based on the stages and their connections.

func (*WorkflowBuilder) Conditional added in v0.31.0

func (wb *WorkflowBuilder) Conditional(id string, condition ConditionalFunc) *ConditionalBuilder

Conditional creates a conditional execution stage that branches based on a condition function. The condition function receives the current workflow state and returns whether to execute the associated module or branch to alternative execution paths.

func (*WorkflowBuilder) ForEach added in v0.32.0

func (wb *WorkflowBuilder) ForEach(id string, iteratorFunc IteratorFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

ForEach creates a loop stage that iterates over a collection. The iteratorFunc returns the collection to iterate over. The body builder defines the workflow to execute for each item.

func (*WorkflowBuilder) Parallel added in v0.31.0

func (wb *WorkflowBuilder) Parallel(id string, steps ...*BuilderStep) *WorkflowBuilder

Parallel creates a parallel execution stage with multiple steps that run concurrently. Steps can be added using the NewStep helper function.

func (*WorkflowBuilder) Stage added in v0.31.0

func (wb *WorkflowBuilder) Stage(id string, module core.Module) *WorkflowBuilder

Stage adds a sequential stage to the workflow with the given ID and module. This is the most common workflow pattern where steps execute one after another.

func (*WorkflowBuilder) Template added in v0.32.0

func (wb *WorkflowBuilder) Template(id string, parameterFunc TemplateParameterFunc, templateBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

Template creates a reusable workflow template that can be instantiated with parameters.

func (*WorkflowBuilder) Then added in v0.31.0

func (wb *WorkflowBuilder) Then(nextStageID string) *WorkflowBuilder

Then creates a connection between the current stage and the next stage. This is used to explicitly define the workflow execution order.

func (*WorkflowBuilder) Until added in v0.32.0

func (wb *WorkflowBuilder) Until(id string, condition LoopConditionFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

Until creates a loop stage that continues until a condition becomes true.

func (*WorkflowBuilder) While added in v0.32.0

func (wb *WorkflowBuilder) While(id string, condition LoopConditionFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

While creates a loop stage that continues while a condition is true.

func (*WorkflowBuilder) WithConfig added in v0.31.0

func (wb *WorkflowBuilder) WithConfig(config *BuilderConfig) *WorkflowBuilder

WithConfig sets custom configuration for the workflow builder.

func (*WorkflowBuilder) WithMaxIterations added in v0.32.0

func (wb *WorkflowBuilder) WithMaxIterations(maxIterations int) *WorkflowBuilder

WithMaxIterations sets the maximum number of iterations for the most recently added loop stage.

func (*WorkflowBuilder) WithMetadata added in v0.31.0

func (wb *WorkflowBuilder) WithMetadata(key string, value interface{}) *WorkflowBuilder

WithMetadata adds metadata to the most recently added stage.

func (*WorkflowBuilder) WithRetry added in v0.31.0

func (wb *WorkflowBuilder) WithRetry(retryConfig *RetryConfig) *WorkflowBuilder

WithRetry adds retry configuration to the most recently added stage.

func (*WorkflowBuilder) WithTimeout added in v0.32.0

func (wb *WorkflowBuilder) WithTimeout(timeoutMs int64) *WorkflowBuilder

WithTimeout adds a timeout to the most recently added stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL