workflows

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStepConditionFailed indicates a step's condition check failed.
	ErrStepConditionFailed = errors.New(errors.InvalidWorkflowState, "step condition check failed")

	// ErrStepNotFound indicates a referenced step doesn't exist in workflow.
	ErrStepNotFound = errors.New(errors.ResourceNotFound, "step not found in workflow")

	// ErrInvalidInput indicates missing or invalid input parameters.
	ErrInvalidInput = errors.New(errors.InvalidInput, "invalid input parameters")

	// ErrDuplicateStepID indicates attempt to add step with existing ID.
	ErrDuplicateStepID = errors.New(errors.ValidationFailed, "duplicate step ID")

	// ErrCyclicDependency indicates circular dependencies between steps.
	ErrCyclicDependency = errors.New(errors.WorkflowExecutionFailed, "cyclic dependency detected in workflow")
)

Functions

func WrapWorkflowError

func WrapWorkflowError(err error, fields map[string]interface{}) error

Types

type BackpressureStrategy

type BackpressureStrategy int

BackpressureStrategy defines how to handle event overflow.

const (
	BackpressureBlock BackpressureStrategy = iota
	BackpressureDropOldest
	BackpressureDropNewest
	BackpressureDropLowest
)

type BaseWorkflow

type BaseWorkflow struct {
	// contains filtered or unexported fields
}

BaseWorkflow provides common workflow functionality.

func NewBaseWorkflow

func NewBaseWorkflow(memory agents.Memory) *BaseWorkflow

func (*BaseWorkflow) AddStep

func (w *BaseWorkflow) AddStep(step *Step) error

func (*BaseWorkflow) GetSteps

func (w *BaseWorkflow) GetSteps() []*Step

func (*BaseWorkflow) ValidateWorkflow

func (w *BaseWorkflow) ValidateWorkflow() error

ValidateWorkflow checks if the workflow structure is valid.

type BuilderConfig

type BuilderConfig struct {
	EnableValidation     bool // Whether to perform validation during build
	EnableOptimization   bool // Whether to optimize the workflow graph
	EnableTracing        bool // Whether to add tracing to steps
	MaxConcurrency       int  // Maximum concurrent steps for parallel execution
	DefaultRetryAttempts int  // Default retry attempts for steps
}

BuilderConfig holds configuration options for the workflow builder.

func DefaultBuilderConfig

func DefaultBuilderConfig() *BuilderConfig

DefaultBuilderConfig returns sensible defaults for workflow builder configuration.

type BuilderStage

type BuilderStage struct {
	ID          string
	Type        StageType
	Module      core.Module
	Steps       []*BuilderStep
	Condition   ConditionalFunc
	Branches    map[string]*BuilderStage // For conditional execution
	Next        []string                 // IDs of next stages
	RetryConfig *RetryConfig
	Metadata    map[string]interface{} // Additional metadata

	// Advanced pattern fields
	LoopCondition    LoopConditionFunc     // For while/until loops
	IteratorFunc     IteratorFunc          // For forEach loops
	TemplateParams   TemplateParameterFunc // For templates
	MaxIterations    int                   // Safety limit for loops
	LoopBody         *WorkflowBuilder      // Nested workflow for loop body
	TemplateWorkflow *WorkflowBuilder      // Template workflow definition
	TimeoutMs        int64                 // Timeout in milliseconds
}

BuilderStage represents a stage in the workflow builder that can contain multiple execution patterns (sequential, parallel, conditional, loops, templates).

type BuilderStep

type BuilderStep struct {
	ID          string
	Module      core.Module
	Description string
	Metadata    map[string]interface{}
}

BuilderStep represents a single step within a stage.

func NewStep

func NewStep(id string, module core.Module) *BuilderStep

NewStep creates a new BuilderStep for use in parallel stages.

func (*BuilderStep) WithDescription

func (bs *BuilderStep) WithDescription(description string) *BuilderStep

WithDescription adds a description to a BuilderStep.

func (*BuilderStep) WithStepMetadata

func (bs *BuilderStep) WithStepMetadata(key string, value interface{}) *BuilderStep

WithStepMetadata adds metadata to a BuilderStep.

type ChainWorkflow

type ChainWorkflow struct {
	*BaseWorkflow
}

ChainWorkflow executes steps in a linear sequence, where each step's output can be used as input for subsequent steps.

func NewChainWorkflow

func NewChainWorkflow(memory agents.Memory) *ChainWorkflow

func (*ChainWorkflow) Execute

func (w *ChainWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs steps sequentially, passing state from one step to the next.

type CompositeWorkflow

type CompositeWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

CompositeWorkflow handles complex workflow patterns including loops and templates.

func NewCompositeWorkflow

func NewCompositeWorkflow(memory agents.Memory) *CompositeWorkflow

NewCompositeWorkflow creates a new composite workflow.

func (*CompositeWorkflow) AddBuilderStage

func (cw *CompositeWorkflow) AddBuilderStage(stage *BuilderStage)

AddBuilderStage adds a builder stage to the composite workflow.

func (*CompositeWorkflow) Execute

func (cw *CompositeWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs the composite workflow with support for advanced patterns.

type ConditionResult

type ConditionResult struct {
	ShouldExecute bool
	BranchName    string
	Error         error
}

ConditionResult represents the result of a conditional evaluation.

type ConditionalBuilder

type ConditionalBuilder struct {
	// contains filtered or unexported fields
}

ConditionalBuilder provides a fluent interface for building conditional workflow stages.

func (*ConditionalBuilder) Else

Else adds an alternative branch that executes when the condition is false.

func (*ConditionalBuilder) ElseIf

func (cb *ConditionalBuilder) ElseIf(condition ConditionalFunc, module core.Module) *ConditionalBuilder

ElseIf adds an additional conditional branch with its own condition.

func (*ConditionalBuilder) End

End completes the conditional builder and returns to the main workflow builder.

func (*ConditionalBuilder) If

If adds a conditional branch that executes the given module when the condition is true.

type ConditionalFunc

type ConditionalFunc func(ctx context.Context, state map[string]interface{}) (bool, error)

ConditionalFunc defines the signature for conditional execution logic.

type ConditionalRouterWorkflow

type ConditionalRouterWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

ConditionalRouterWorkflow handles conditional routing patterns.

func NewConditionalRouterWorkflow

func NewConditionalRouterWorkflow(memory agents.Memory, classifier core.Module) *ConditionalRouterWorkflow

NewConditionalRouterWorkflow creates a new conditional router workflow.

func (*ConditionalRouterWorkflow) AddRoute

func (crw *ConditionalRouterWorkflow) AddRoute(route string, step *Step) error

AddRoute adds a route to the router.

func (*ConditionalRouterWorkflow) Execute

func (crw *ConditionalRouterWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

Execute runs the router workflow.

func (*ConditionalRouterWorkflow) SetDefaultRoute

func (crw *ConditionalRouterWorkflow) SetDefaultRoute(step *Step)

SetDefaultRoute sets the default route for unmatched classifications.

type Event

type Event struct {
	// ID uniquely identifies this event instance
	ID string `json:"id"`

	// Type categorizes the event (e.g., "pr_created", "user_message")
	Type string `json:"type"`

	// Data contains the event payload
	Data interface{} `json:"data"`

	// Priority affects event processing order (1=highest, 10=lowest)
	Priority int `json:"priority"`

	// Timestamp when the event was created
	Timestamp time.Time `json:"timestamp"`

	// Context provides additional metadata
	Context map[string]interface{} `json:"context"`

	// Source identifies where the event originated
	Source string `json:"source"`

	// CorrelationID links related events
	CorrelationID string `json:"correlation_id"`
}

Event represents a reactive event that can trigger workflow execution.

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus provides centralized event distribution for agent communication.

func NewEventBus

func NewEventBus(config EventBusConfig) *EventBus

NewEventBus creates a new event bus for agent communication.

func (*EventBus) AddFilter

func (eb *EventBus) AddFilter(filter EventFilter)

AddFilter adds an event filter to the bus.

func (*EventBus) AddTransformer

func (eb *EventBus) AddTransformer(transformer EventTransformer)

AddTransformer adds an event transformer to the bus.

func (*EventBus) Broadcast

func (eb *EventBus) Broadcast(event Event) error

Broadcast sends an event to all subscribers regardless of type.

func (*EventBus) Emit

func (eb *EventBus) Emit(event Event) error

Emit publishes an event to the bus.

func (*EventBus) Request

func (eb *EventBus) Request(event Event, timeout time.Duration) (interface{}, error)

Request sends an event and waits for a response.

func (*EventBus) Respond

func (eb *EventBus) Respond(requestID string, response interface{}) error

Respond sends a response to a request event.

func (*EventBus) Start

func (eb *EventBus) Start(ctx context.Context) error

Start begins event processing on the bus.

func (*EventBus) Stop

func (eb *EventBus) Stop() error

Stop terminates event processing.

func (*EventBus) Subscribe

func (eb *EventBus) Subscribe(eventType string, handler EventHandler) error

Subscribe registers a handler for events of a specific type.

func (*EventBus) Unsubscribe

func (eb *EventBus) Unsubscribe(eventType string)

Unsubscribe removes handlers for an event type.

type EventBusConfig

type EventBusConfig struct {
	BufferSize           int
	BackpressureStrategy BackpressureStrategy
	MaxHandlers          int
	HandlerTimeout       time.Duration
	EnablePersistence    bool
	PersistenceStore     EventStore
}

EventBusConfig configures the event bus behavior.

func DefaultEventBusConfig

func DefaultEventBusConfig() EventBusConfig

DefaultEventBusConfig returns sensible defaults.

type EventFilter

type EventFilter func(event Event) bool

EventFilter determines if an event should be processed.

type EventHandler

type EventHandler func(ctx context.Context, event Event) error

EventHandler defines the function signature for handling events.

type EventStore

type EventStore interface {
	Store(event Event) error
	Retrieve(id string) (Event, error)
	List(filter EventFilter) ([]Event, error)
	Delete(id string) error
}

EventStore interface for persisting events.

type EventTransformer

type EventTransformer func(event Event) Event

EventTransformer modifies events before processing.

type IteratorFunc

type IteratorFunc func(ctx context.Context, state map[string]interface{}) ([]interface{}, error)

IteratorFunc defines the signature for forEach iteration logic.

type LoopConditionFunc

type LoopConditionFunc func(ctx context.Context, state map[string]interface{}, iteration int) (bool, error)

LoopConditionFunc defines the signature for loop conditions (while/until).

type ParallelWorkflow

type ParallelWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

ParallelWorkflow executes multiple steps concurrently.

func NewParallelWorkflow

func NewParallelWorkflow(memory agents.Memory, maxConcurrent int) *ParallelWorkflow

func (*ParallelWorkflow) Execute

func (w *ParallelWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

type ReactiveWorkflow

type ReactiveWorkflow struct {
	// contains filtered or unexported fields
}

ReactiveWorkflow enables event-driven workflow execution.

func NewReactiveWorkflow

func NewReactiveWorkflow(memory agents.Memory) *ReactiveWorkflow

NewReactiveWorkflow creates a new reactive workflow.

func (*ReactiveWorkflow) Emit

func (rw *ReactiveWorkflow) Emit(event Event) error

Emit publishes an event to trigger workflows.

func (*ReactiveWorkflow) GetEventBus

func (rw *ReactiveWorkflow) GetEventBus() *EventBus

GetEventBus returns the underlying event bus for advanced usage.

func (*ReactiveWorkflow) On

func (rw *ReactiveWorkflow) On(eventType string, workflow Workflow) *ReactiveWorkflow

On registers a workflow to handle events of a specific type.

func (*ReactiveWorkflow) OnModule

func (rw *ReactiveWorkflow) OnModule(eventType string, module core.Module) *ReactiveWorkflow

OnModule is a convenience method to register a single module as a workflow.

func (*ReactiveWorkflow) Request

func (rw *ReactiveWorkflow) Request(event Event, timeout time.Duration) (interface{}, error)

Request sends an event and waits for a response.

func (*ReactiveWorkflow) Respond

func (rw *ReactiveWorkflow) Respond(requestID string, response interface{}) error

Respond sends a response to a request event.

func (*ReactiveWorkflow) Start

func (rw *ReactiveWorkflow) Start(ctx context.Context) error

Start begins reactive event processing.

func (*ReactiveWorkflow) Stop

func (rw *ReactiveWorkflow) Stop() error

Stop terminates reactive event processing.

func (*ReactiveWorkflow) WithConfig

WithConfig sets custom configuration.

func (*ReactiveWorkflow) WithEventBus

func (rw *ReactiveWorkflow) WithEventBus(eventBus *EventBus) *ReactiveWorkflow

WithEventBus allows using a custom event bus.

func (*ReactiveWorkflow) WithFilter

func (rw *ReactiveWorkflow) WithFilter(filter EventFilter) *ReactiveWorkflow

WithFilter adds an event filter.

func (*ReactiveWorkflow) WithTransformer

func (rw *ReactiveWorkflow) WithTransformer(transformer EventTransformer) *ReactiveWorkflow

WithTransformer adds an event transformer.

type ReactiveWorkflowConfig

type ReactiveWorkflowConfig struct {
	AutoStart         bool
	DefaultTimeout    time.Duration
	MaxConcurrentRuns int
	EnableTracing     bool
	EnableMetrics     bool
}

ReactiveWorkflowConfig configures reactive workflow behavior.

func DefaultReactiveWorkflowConfig

func DefaultReactiveWorkflowConfig() ReactiveWorkflowConfig

DefaultReactiveWorkflowConfig returns sensible defaults.

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// BackoffMultiplier determines how long to wait between retries
	BackoffMultiplier float64
}

RetryConfig defines how to handle step failures.

type RouterWorkflow

type RouterWorkflow struct {
	*BaseWorkflow
	// contains filtered or unexported fields
}

RouterWorkflow directs inputs to different processing paths based on a classification step.

func NewRouterWorkflow

func NewRouterWorkflow(memory agents.Memory, classifierStep *Step) *RouterWorkflow

func (*RouterWorkflow) AddRoute

func (w *RouterWorkflow) AddRoute(classification string, steps []*Step) error

AddRoute associates a classification value with a sequence of steps.

func (*RouterWorkflow) Execute

func (w *RouterWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

type StageType

type StageType int

StageType defines the execution pattern for a stage.

const (
	StageTypeSequential StageType = iota
	StageTypeParallel
	StageTypeConditional
	StageTypeLoop
	StageTypeForEach
	StageTypeWhile
	StageTypeUntil
	StageTypeTemplate
)

type Step

type Step struct {
	// ID uniquely identifies this step within the workflow
	ID string

	// Module is the underlying DSPy module that performs the actual computation
	// This could be a Predict, ChainOfThought, or any other DSPy module type
	Module core.Module

	// NextSteps contains the IDs of steps that should execute after this one
	// This allows us to define branching and conditional execution paths
	NextSteps []string

	// Condition is an optional function that determines if this step should execute
	// It can examine the current workflow state to make this decision
	Condition func(state map[string]interface{}) bool

	// RetryConfig specifies how to handle failures of this step
	RetryConfig *RetryConfig
}

Step represents a single unit of computation in a workflow. Each step wraps a DSPy module and adds workflow-specific metadata and control logic.

func (*Step) Execute

func (s *Step) Execute(ctx context.Context, inputs map[string]interface{}) (*StepResult, error)

Execute runs the step's DSPy module with the provided inputs.

type StepResult

type StepResult struct {
	// StepID identifies which step produced this result
	StepID string

	// Outputs contains the data produced by this step
	Outputs map[string]interface{}

	// Metadata contains additional information about the execution
	Metadata map[string]interface{}

	// NextSteps indicates which steps should run next (may be modified by step execution)
	NextSteps []string
}

StepResult holds the outputs and metadata from executing a step.

type TemplateParameterFunc

type TemplateParameterFunc func(ctx context.Context, params map[string]interface{}) (map[string]interface{}, error)

TemplateParameterFunc defines the signature for template parameter resolution.

type Workflow

type Workflow interface {
	// Execute runs the workflow with the provided inputs
	Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)

	// GetSteps returns all steps in this workflow
	GetSteps() []*Step

	// AddStep adds a new step to the workflow
	AddStep(step *Step) error
}

Workflow represents a sequence of steps that accomplish a task.

type WorkflowBuilder

type WorkflowBuilder struct {
	// contains filtered or unexported fields
}

WorkflowBuilder provides a fluent API for constructing workflows declaratively. It maintains backward compatibility with the existing workflow system while providing a more intuitive and powerful interface for workflow composition.

func NewBuilder

func NewBuilder(memory agents.Memory) *WorkflowBuilder

NewBuilder creates a new WorkflowBuilder instance with the provided memory store. If memory is nil, an in-memory store will be used.

func (*WorkflowBuilder) Build

func (wb *WorkflowBuilder) Build() (Workflow, error)

Build constructs the final Workflow from the builder configuration. It performs validation, optimization, and creates the appropriate workflow type based on the stages and their connections.

func (*WorkflowBuilder) Conditional

func (wb *WorkflowBuilder) Conditional(id string, condition ConditionalFunc) *ConditionalBuilder

Conditional creates a conditional execution stage that branches based on a condition function. The condition function receives the current workflow state and returns whether to execute the associated module or branch to alternative execution paths.

func (*WorkflowBuilder) ForEach

func (wb *WorkflowBuilder) ForEach(id string, iteratorFunc IteratorFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

ForEach creates a loop stage that iterates over a collection. The iteratorFunc returns the collection to iterate over. The body builder defines the workflow to execute for each item.

func (*WorkflowBuilder) Parallel

func (wb *WorkflowBuilder) Parallel(id string, steps ...*BuilderStep) *WorkflowBuilder

Parallel creates a parallel execution stage with multiple steps that run concurrently. Steps can be added using the NewStep helper function.

func (*WorkflowBuilder) Stage

func (wb *WorkflowBuilder) Stage(id string, module core.Module) *WorkflowBuilder

Stage adds a sequential stage to the workflow with the given ID and module. This is the most common workflow pattern where steps execute one after another.

func (*WorkflowBuilder) Template

func (wb *WorkflowBuilder) Template(id string, parameterFunc TemplateParameterFunc, templateBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

Template creates a reusable workflow template that can be instantiated with parameters.

func (*WorkflowBuilder) Then

func (wb *WorkflowBuilder) Then(nextStageID string) *WorkflowBuilder

Then creates a connection between the current stage and the next stage. This is used to explicitly define the workflow execution order.

func (*WorkflowBuilder) Until

func (wb *WorkflowBuilder) Until(id string, condition LoopConditionFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

Until creates a loop stage that continues until a condition becomes true.

func (*WorkflowBuilder) While

func (wb *WorkflowBuilder) While(id string, condition LoopConditionFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder

While creates a loop stage that continues while a condition is true.

func (*WorkflowBuilder) WithConfig

func (wb *WorkflowBuilder) WithConfig(config *BuilderConfig) *WorkflowBuilder

WithConfig sets custom configuration for the workflow builder.

func (*WorkflowBuilder) WithMaxIterations

func (wb *WorkflowBuilder) WithMaxIterations(maxIterations int) *WorkflowBuilder

WithMaxIterations sets the maximum number of iterations for the most recently added loop stage.

func (*WorkflowBuilder) WithMetadata

func (wb *WorkflowBuilder) WithMetadata(key string, value interface{}) *WorkflowBuilder

WithMetadata adds metadata to the most recently added stage.

func (*WorkflowBuilder) WithRetry

func (wb *WorkflowBuilder) WithRetry(retryConfig *RetryConfig) *WorkflowBuilder

WithRetry adds retry configuration to the most recently added stage.

func (*WorkflowBuilder) WithTimeout

func (wb *WorkflowBuilder) WithTimeout(timeoutMs int64) *WorkflowBuilder

WithTimeout adds a timeout to the most recently added stage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL