Documentation
¶
Index ¶
- Variables
- func WrapWorkflowError(err error, fields map[string]interface{}) error
- type BackpressureStrategy
- type BaseWorkflow
- type BuilderConfig
- type BuilderStage
- type BuilderStep
- type ChainWorkflow
- type CompositeWorkflow
- type ConditionResult
- type ConditionalBuilder
- func (cb *ConditionalBuilder) Else(module core.Module) *ConditionalBuilder
- func (cb *ConditionalBuilder) ElseIf(condition ConditionalFunc, module core.Module) *ConditionalBuilder
- func (cb *ConditionalBuilder) End() *WorkflowBuilder
- func (cb *ConditionalBuilder) If(module core.Module) *ConditionalBuilder
- type ConditionalFunc
- type ConditionalRouterWorkflow
- type Event
- type EventBus
- func (eb *EventBus) AddFilter(filter EventFilter)
- func (eb *EventBus) AddTransformer(transformer EventTransformer)
- func (eb *EventBus) Broadcast(event Event) error
- func (eb *EventBus) Emit(event Event) error
- func (eb *EventBus) Request(event Event, timeout time.Duration) (interface{}, error)
- func (eb *EventBus) Respond(requestID string, response interface{}) error
- func (eb *EventBus) Start(ctx context.Context) error
- func (eb *EventBus) Stop() error
- func (eb *EventBus) Subscribe(eventType string, handler EventHandler) error
- func (eb *EventBus) Unsubscribe(eventType string)
- type EventBusConfig
- type EventFilter
- type EventHandler
- type EventStore
- type EventTransformer
- type IteratorFunc
- type LoopConditionFunc
- type ParallelWorkflow
- type ReactiveWorkflow
- func (rw *ReactiveWorkflow) Emit(event Event) error
- func (rw *ReactiveWorkflow) GetEventBus() *EventBus
- func (rw *ReactiveWorkflow) On(eventType string, workflow Workflow) *ReactiveWorkflow
- func (rw *ReactiveWorkflow) OnModule(eventType string, module core.Module) *ReactiveWorkflow
- func (rw *ReactiveWorkflow) Request(event Event, timeout time.Duration) (interface{}, error)
- func (rw *ReactiveWorkflow) Respond(requestID string, response interface{}) error
- func (rw *ReactiveWorkflow) Start(ctx context.Context) error
- func (rw *ReactiveWorkflow) Stop() error
- func (rw *ReactiveWorkflow) WithConfig(config ReactiveWorkflowConfig) *ReactiveWorkflow
- func (rw *ReactiveWorkflow) WithEventBus(eventBus *EventBus) *ReactiveWorkflow
- func (rw *ReactiveWorkflow) WithFilter(filter EventFilter) *ReactiveWorkflow
- func (rw *ReactiveWorkflow) WithTransformer(transformer EventTransformer) *ReactiveWorkflow
- type ReactiveWorkflowConfig
- type RetryConfig
- type RouterWorkflow
- type StageType
- type Step
- type StepResult
- type TemplateParameterFunc
- type Workflow
- type WorkflowBuilder
- func (wb *WorkflowBuilder) Build() (Workflow, error)
- func (wb *WorkflowBuilder) Conditional(id string, condition ConditionalFunc) *ConditionalBuilder
- func (wb *WorkflowBuilder) ForEach(id string, iteratorFunc IteratorFunc, ...) *WorkflowBuilder
- func (wb *WorkflowBuilder) Parallel(id string, steps ...*BuilderStep) *WorkflowBuilder
- func (wb *WorkflowBuilder) Stage(id string, module core.Module) *WorkflowBuilder
- func (wb *WorkflowBuilder) Template(id string, parameterFunc TemplateParameterFunc, ...) *WorkflowBuilder
- func (wb *WorkflowBuilder) Then(nextStageID string) *WorkflowBuilder
- func (wb *WorkflowBuilder) Until(id string, condition LoopConditionFunc, ...) *WorkflowBuilder
- func (wb *WorkflowBuilder) While(id string, condition LoopConditionFunc, ...) *WorkflowBuilder
- func (wb *WorkflowBuilder) WithConfig(config *BuilderConfig) *WorkflowBuilder
- func (wb *WorkflowBuilder) WithMaxIterations(maxIterations int) *WorkflowBuilder
- func (wb *WorkflowBuilder) WithMetadata(key string, value interface{}) *WorkflowBuilder
- func (wb *WorkflowBuilder) WithRetry(retryConfig *RetryConfig) *WorkflowBuilder
- func (wb *WorkflowBuilder) WithTimeout(timeoutMs int64) *WorkflowBuilder
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStepConditionFailed indicates a step's condition check failed. ErrStepConditionFailed = errors.New(errors.InvalidWorkflowState, "step condition check failed") // ErrStepNotFound indicates a referenced step doesn't exist in workflow. ErrStepNotFound = errors.New(errors.ResourceNotFound, "step not found in workflow") // ErrInvalidInput indicates missing or invalid input parameters. ErrInvalidInput = errors.New(errors.InvalidInput, "invalid input parameters") // ErrDuplicateStepID indicates attempt to add step with existing ID. ErrDuplicateStepID = errors.New(errors.ValidationFailed, "duplicate step ID") // ErrCyclicDependency indicates circular dependencies between steps. ErrCyclicDependency = errors.New(errors.WorkflowExecutionFailed, "cyclic dependency detected in workflow") )
Functions ¶
func WrapWorkflowError ¶
Types ¶
type BackpressureStrategy ¶
type BackpressureStrategy int
BackpressureStrategy defines how to handle event overflow.
const ( BackpressureBlock BackpressureStrategy = iota BackpressureDropOldest BackpressureDropNewest BackpressureDropLowest )
type BaseWorkflow ¶
type BaseWorkflow struct {
// contains filtered or unexported fields
}
BaseWorkflow provides common workflow functionality.
func NewBaseWorkflow ¶
func NewBaseWorkflow(memory agents.Memory) *BaseWorkflow
func (*BaseWorkflow) AddStep ¶
func (w *BaseWorkflow) AddStep(step *Step) error
func (*BaseWorkflow) GetSteps ¶
func (w *BaseWorkflow) GetSteps() []*Step
func (*BaseWorkflow) ValidateWorkflow ¶
func (w *BaseWorkflow) ValidateWorkflow() error
ValidateWorkflow checks if the workflow structure is valid.
type BuilderConfig ¶
type BuilderConfig struct {
EnableValidation bool // Whether to perform validation during build
EnableOptimization bool // Whether to optimize the workflow graph
EnableTracing bool // Whether to add tracing to steps
MaxConcurrency int // Maximum concurrent steps for parallel execution
DefaultRetryAttempts int // Default retry attempts for steps
}
BuilderConfig holds configuration options for the workflow builder.
func DefaultBuilderConfig ¶
func DefaultBuilderConfig() *BuilderConfig
DefaultBuilderConfig returns sensible defaults for workflow builder configuration.
type BuilderStage ¶
type BuilderStage struct {
ID string
Type StageType
Module core.Module
Steps []*BuilderStep
Condition ConditionalFunc
Branches map[string]*BuilderStage // For conditional execution
Next []string // IDs of next stages
RetryConfig *RetryConfig
Metadata map[string]interface{} // Additional metadata
// Advanced pattern fields
LoopCondition LoopConditionFunc // For while/until loops
IteratorFunc IteratorFunc // For forEach loops
TemplateParams TemplateParameterFunc // For templates
MaxIterations int // Safety limit for loops
LoopBody *WorkflowBuilder // Nested workflow for loop body
TemplateWorkflow *WorkflowBuilder // Template workflow definition
TimeoutMs int64 // Timeout in milliseconds
}
BuilderStage represents a stage in the workflow builder that can contain multiple execution patterns (sequential, parallel, conditional, loops, templates).
type BuilderStep ¶
type BuilderStep struct {
ID string
Module core.Module
Description string
Metadata map[string]interface{}
}
BuilderStep represents a single step within a stage.
func NewStep ¶
func NewStep(id string, module core.Module) *BuilderStep
NewStep creates a new BuilderStep for use in parallel stages.
func (*BuilderStep) WithDescription ¶
func (bs *BuilderStep) WithDescription(description string) *BuilderStep
WithDescription adds a description to a BuilderStep.
func (*BuilderStep) WithStepMetadata ¶
func (bs *BuilderStep) WithStepMetadata(key string, value interface{}) *BuilderStep
WithStepMetadata adds metadata to a BuilderStep.
type ChainWorkflow ¶
type ChainWorkflow struct {
*BaseWorkflow
}
ChainWorkflow executes steps in a linear sequence, where each step's output can be used as input for subsequent steps.
func NewChainWorkflow ¶
func NewChainWorkflow(memory agents.Memory) *ChainWorkflow
type CompositeWorkflow ¶
type CompositeWorkflow struct {
*BaseWorkflow
// contains filtered or unexported fields
}
CompositeWorkflow handles complex workflow patterns including loops and templates.
func NewCompositeWorkflow ¶
func NewCompositeWorkflow(memory agents.Memory) *CompositeWorkflow
NewCompositeWorkflow creates a new composite workflow.
func (*CompositeWorkflow) AddBuilderStage ¶
func (cw *CompositeWorkflow) AddBuilderStage(stage *BuilderStage)
AddBuilderStage adds a builder stage to the composite workflow.
type ConditionResult ¶
ConditionResult represents the result of a conditional evaluation.
type ConditionalBuilder ¶
type ConditionalBuilder struct {
// contains filtered or unexported fields
}
ConditionalBuilder provides a fluent interface for building conditional workflow stages.
func (*ConditionalBuilder) Else ¶
func (cb *ConditionalBuilder) Else(module core.Module) *ConditionalBuilder
Else adds an alternative branch that executes when the condition is false.
func (*ConditionalBuilder) ElseIf ¶
func (cb *ConditionalBuilder) ElseIf(condition ConditionalFunc, module core.Module) *ConditionalBuilder
ElseIf adds an additional conditional branch with its own condition.
func (*ConditionalBuilder) End ¶
func (cb *ConditionalBuilder) End() *WorkflowBuilder
End completes the conditional builder and returns to the main workflow builder.
func (*ConditionalBuilder) If ¶
func (cb *ConditionalBuilder) If(module core.Module) *ConditionalBuilder
If adds a conditional branch that executes the given module when the condition is true.
type ConditionalFunc ¶
ConditionalFunc defines the signature for conditional execution logic.
type ConditionalRouterWorkflow ¶
type ConditionalRouterWorkflow struct {
*BaseWorkflow
// contains filtered or unexported fields
}
ConditionalRouterWorkflow handles conditional routing patterns.
func NewConditionalRouterWorkflow ¶
func NewConditionalRouterWorkflow(memory agents.Memory, classifier core.Module) *ConditionalRouterWorkflow
NewConditionalRouterWorkflow creates a new conditional router workflow.
func (*ConditionalRouterWorkflow) AddRoute ¶
func (crw *ConditionalRouterWorkflow) AddRoute(route string, step *Step) error
AddRoute adds a route to the router.
func (*ConditionalRouterWorkflow) Execute ¶
func (crw *ConditionalRouterWorkflow) Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)
Execute runs the router workflow.
func (*ConditionalRouterWorkflow) SetDefaultRoute ¶
func (crw *ConditionalRouterWorkflow) SetDefaultRoute(step *Step)
SetDefaultRoute sets the default route for unmatched classifications.
type Event ¶
type Event struct {
// ID uniquely identifies this event instance
ID string `json:"id"`
// Type categorizes the event (e.g., "pr_created", "user_message")
Type string `json:"type"`
// Data contains the event payload
Data interface{} `json:"data"`
// Priority affects event processing order (1=highest, 10=lowest)
Priority int `json:"priority"`
// Timestamp when the event was created
Timestamp time.Time `json:"timestamp"`
// Context provides additional metadata
Context map[string]interface{} `json:"context"`
// Source identifies where the event originated
Source string `json:"source"`
// CorrelationID links related events
CorrelationID string `json:"correlation_id"`
}
Event represents a reactive event that can trigger workflow execution.
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus provides centralized event distribution for agent communication.
func NewEventBus ¶
func NewEventBus(config EventBusConfig) *EventBus
NewEventBus creates a new event bus for agent communication.
func (*EventBus) AddFilter ¶
func (eb *EventBus) AddFilter(filter EventFilter)
AddFilter adds an event filter to the bus.
func (*EventBus) AddTransformer ¶
func (eb *EventBus) AddTransformer(transformer EventTransformer)
AddTransformer adds an event transformer to the bus.
func (*EventBus) Subscribe ¶
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) error
Subscribe registers a handler for events of a specific type.
func (*EventBus) Unsubscribe ¶
Unsubscribe removes handlers for an event type.
type EventBusConfig ¶
type EventBusConfig struct {
BufferSize int
BackpressureStrategy BackpressureStrategy
MaxHandlers int
HandlerTimeout time.Duration
EnablePersistence bool
PersistenceStore EventStore
}
EventBusConfig configures the event bus behavior.
func DefaultEventBusConfig ¶
func DefaultEventBusConfig() EventBusConfig
DefaultEventBusConfig returns sensible defaults.
type EventFilter ¶
EventFilter determines if an event should be processed.
type EventHandler ¶
EventHandler defines the function signature for handling events.
type EventStore ¶
type EventStore interface {
Store(event Event) error
Retrieve(id string) (Event, error)
List(filter EventFilter) ([]Event, error)
Delete(id string) error
}
EventStore interface for persisting events.
type EventTransformer ¶
EventTransformer modifies events before processing.
type IteratorFunc ¶
IteratorFunc defines the signature for forEach iteration logic.
type LoopConditionFunc ¶
type LoopConditionFunc func(ctx context.Context, state map[string]interface{}, iteration int) (bool, error)
LoopConditionFunc defines the signature for loop conditions (while/until).
type ParallelWorkflow ¶
type ParallelWorkflow struct {
*BaseWorkflow
// contains filtered or unexported fields
}
ParallelWorkflow executes multiple steps concurrently.
func NewParallelWorkflow ¶
func NewParallelWorkflow(memory agents.Memory, maxConcurrent int) *ParallelWorkflow
type ReactiveWorkflow ¶
type ReactiveWorkflow struct {
// contains filtered or unexported fields
}
ReactiveWorkflow enables event-driven workflow execution.
func NewReactiveWorkflow ¶
func NewReactiveWorkflow(memory agents.Memory) *ReactiveWorkflow
NewReactiveWorkflow creates a new reactive workflow.
func (*ReactiveWorkflow) Emit ¶
func (rw *ReactiveWorkflow) Emit(event Event) error
Emit publishes an event to trigger workflows.
func (*ReactiveWorkflow) GetEventBus ¶
func (rw *ReactiveWorkflow) GetEventBus() *EventBus
GetEventBus returns the underlying event bus for advanced usage.
func (*ReactiveWorkflow) On ¶
func (rw *ReactiveWorkflow) On(eventType string, workflow Workflow) *ReactiveWorkflow
On registers a workflow to handle events of a specific type.
func (*ReactiveWorkflow) OnModule ¶
func (rw *ReactiveWorkflow) OnModule(eventType string, module core.Module) *ReactiveWorkflow
OnModule is a convenience method to register a single module as a workflow.
func (*ReactiveWorkflow) Request ¶
func (rw *ReactiveWorkflow) Request(event Event, timeout time.Duration) (interface{}, error)
Request sends an event and waits for a response.
func (*ReactiveWorkflow) Respond ¶
func (rw *ReactiveWorkflow) Respond(requestID string, response interface{}) error
Respond sends a response to a request event.
func (*ReactiveWorkflow) Start ¶
func (rw *ReactiveWorkflow) Start(ctx context.Context) error
Start begins reactive event processing.
func (*ReactiveWorkflow) Stop ¶
func (rw *ReactiveWorkflow) Stop() error
Stop terminates reactive event processing.
func (*ReactiveWorkflow) WithConfig ¶
func (rw *ReactiveWorkflow) WithConfig(config ReactiveWorkflowConfig) *ReactiveWorkflow
WithConfig sets custom configuration.
func (*ReactiveWorkflow) WithEventBus ¶
func (rw *ReactiveWorkflow) WithEventBus(eventBus *EventBus) *ReactiveWorkflow
WithEventBus allows using a custom event bus.
func (*ReactiveWorkflow) WithFilter ¶
func (rw *ReactiveWorkflow) WithFilter(filter EventFilter) *ReactiveWorkflow
WithFilter adds an event filter.
func (*ReactiveWorkflow) WithTransformer ¶
func (rw *ReactiveWorkflow) WithTransformer(transformer EventTransformer) *ReactiveWorkflow
WithTransformer adds an event transformer.
type ReactiveWorkflowConfig ¶
type ReactiveWorkflowConfig struct {
AutoStart bool
DefaultTimeout time.Duration
MaxConcurrentRuns int
EnableTracing bool
EnableMetrics bool
}
ReactiveWorkflowConfig configures reactive workflow behavior.
func DefaultReactiveWorkflowConfig ¶
func DefaultReactiveWorkflowConfig() ReactiveWorkflowConfig
DefaultReactiveWorkflowConfig returns sensible defaults.
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the maximum number of retry attempts
MaxAttempts int
// BackoffMultiplier determines how long to wait between retries
BackoffMultiplier float64
}
RetryConfig defines how to handle step failures.
type RouterWorkflow ¶
type RouterWorkflow struct {
*BaseWorkflow
// contains filtered or unexported fields
}
RouterWorkflow directs inputs to different processing paths based on a classification step.
func NewRouterWorkflow ¶
func NewRouterWorkflow(memory agents.Memory, classifierStep *Step) *RouterWorkflow
type Step ¶
type Step struct {
// ID uniquely identifies this step within the workflow
ID string
// Module is the underlying DSPy module that performs the actual computation
// This could be a Predict, ChainOfThought, or any other DSPy module type
Module core.Module
// NextSteps contains the IDs of steps that should execute after this one
// This allows us to define branching and conditional execution paths
NextSteps []string
// Condition is an optional function that determines if this step should execute
// It can examine the current workflow state to make this decision
Condition func(state map[string]interface{}) bool
// RetryConfig specifies how to handle failures of this step
RetryConfig *RetryConfig
}
Step represents a single unit of computation in a workflow. Each step wraps a DSPy module and adds workflow-specific metadata and control logic.
type StepResult ¶
type StepResult struct {
// StepID identifies which step produced this result
StepID string
// Outputs contains the data produced by this step
Outputs map[string]interface{}
// Metadata contains additional information about the execution
Metadata map[string]interface{}
// NextSteps indicates which steps should run next (may be modified by step execution)
NextSteps []string
}
StepResult holds the outputs and metadata from executing a step.
type TemplateParameterFunc ¶
type TemplateParameterFunc func(ctx context.Context, params map[string]interface{}) (map[string]interface{}, error)
TemplateParameterFunc defines the signature for template parameter resolution.
type Workflow ¶
type Workflow interface {
// Execute runs the workflow with the provided inputs
Execute(ctx context.Context, inputs map[string]interface{}) (map[string]interface{}, error)
// GetSteps returns all steps in this workflow
GetSteps() []*Step
// AddStep adds a new step to the workflow
AddStep(step *Step) error
}
Workflow represents a sequence of steps that accomplish a task.
type WorkflowBuilder ¶
type WorkflowBuilder struct {
// contains filtered or unexported fields
}
WorkflowBuilder provides a fluent API for constructing workflows declaratively. It maintains backward compatibility with the existing workflow system while providing a more intuitive and powerful interface for workflow composition.
func NewBuilder ¶
func NewBuilder(memory agents.Memory) *WorkflowBuilder
NewBuilder creates a new WorkflowBuilder instance with the provided memory store. If memory is nil, an in-memory store will be used.
func (*WorkflowBuilder) Build ¶
func (wb *WorkflowBuilder) Build() (Workflow, error)
Build constructs the final Workflow from the builder configuration. It performs validation, optimization, and creates the appropriate workflow type based on the stages and their connections.
func (*WorkflowBuilder) Conditional ¶
func (wb *WorkflowBuilder) Conditional(id string, condition ConditionalFunc) *ConditionalBuilder
Conditional creates a conditional execution stage that branches based on a condition function. The condition function receives the current workflow state and returns whether to execute the associated module or branch to alternative execution paths.
func (*WorkflowBuilder) ForEach ¶
func (wb *WorkflowBuilder) ForEach(id string, iteratorFunc IteratorFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder
ForEach creates a loop stage that iterates over a collection. The iteratorFunc returns the collection to iterate over. The body builder defines the workflow to execute for each item.
func (*WorkflowBuilder) Parallel ¶
func (wb *WorkflowBuilder) Parallel(id string, steps ...*BuilderStep) *WorkflowBuilder
Parallel creates a parallel execution stage with multiple steps that run concurrently. Steps can be added using the NewStep helper function.
func (*WorkflowBuilder) Stage ¶
func (wb *WorkflowBuilder) Stage(id string, module core.Module) *WorkflowBuilder
Stage adds a sequential stage to the workflow with the given ID and module. This is the most common workflow pattern where steps execute one after another.
func (*WorkflowBuilder) Template ¶
func (wb *WorkflowBuilder) Template(id string, parameterFunc TemplateParameterFunc, templateBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder
Template creates a reusable workflow template that can be instantiated with parameters.
func (*WorkflowBuilder) Then ¶
func (wb *WorkflowBuilder) Then(nextStageID string) *WorkflowBuilder
Then creates a connection between the current stage and the next stage. This is used to explicitly define the workflow execution order.
func (*WorkflowBuilder) Until ¶
func (wb *WorkflowBuilder) Until(id string, condition LoopConditionFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder
Until creates a loop stage that continues until a condition becomes true.
func (*WorkflowBuilder) While ¶
func (wb *WorkflowBuilder) While(id string, condition LoopConditionFunc, bodyBuilder func(*WorkflowBuilder) *WorkflowBuilder) *WorkflowBuilder
While creates a loop stage that continues while a condition is true.
func (*WorkflowBuilder) WithConfig ¶
func (wb *WorkflowBuilder) WithConfig(config *BuilderConfig) *WorkflowBuilder
WithConfig sets custom configuration for the workflow builder.
func (*WorkflowBuilder) WithMaxIterations ¶
func (wb *WorkflowBuilder) WithMaxIterations(maxIterations int) *WorkflowBuilder
WithMaxIterations sets the maximum number of iterations for the most recently added loop stage.
func (*WorkflowBuilder) WithMetadata ¶
func (wb *WorkflowBuilder) WithMetadata(key string, value interface{}) *WorkflowBuilder
WithMetadata adds metadata to the most recently added stage.
func (*WorkflowBuilder) WithRetry ¶
func (wb *WorkflowBuilder) WithRetry(retryConfig *RetryConfig) *WorkflowBuilder
WithRetry adds retry configuration to the most recently added stage.
func (*WorkflowBuilder) WithTimeout ¶
func (wb *WorkflowBuilder) WithTimeout(timeoutMs int64) *WorkflowBuilder
WithTimeout adds a timeout to the most recently added stage.