Documentation
¶
Index ¶
- Constants
- Variables
- type AgentDefinition
- type AgentExecutor
- type AgentFactory
- type AgentRegistry
- func (r *AgentRegistry) CreateAgent(ctx context.Context, agentType string, config interface{}) (base.Agent, error)
- func (r *AgentRegistry) GetFactory(agentType string) (AgentFactory, bool)
- func (r *AgentRegistry) ListTypes() []string
- func (r *AgentRegistry) Register(agentType string, factory AgentFactory) error
- func (r *AgentRegistry) Unregister(agentType string)
- type DAG
- type DAGNode
- type Decoder
- type DefinitionParser
- type DirectoryLoader
- type DirectoryParser
- type Executor
- type FileLoader
- type FileLoaderOption
- type FileWatcher
- type JSONDecoder
- type OutputStore
- type ReloadCallback
- type RetryPolicy
- type RetryPolicyFile
- type Step
- type StepFile
- type StepOutput
- type StepResult
- type StepState
- type StepStatus
- type Workflow
- type WorkflowExecution
- type WorkflowFile
- type WorkflowLoader
- type WorkflowReloader
- func (r *WorkflowReloader) GetWorkflow(id string) (*Workflow, bool)
- func (r *WorkflowReloader) ListWorkflows() []*Workflow
- func (r *WorkflowReloader) Load(ctx context.Context, dir string) error
- func (r *WorkflowReloader) RegisterCallback(callback ReloadCallback) string
- func (r *WorkflowReloader) StartWatching(ctx context.Context, dir string) error
- func (r *WorkflowReloader) StopWatching()
- func (r *WorkflowReloader) UnregisterCallback(callbackID string)
- type WorkflowResult
- type WorkflowStatus
- type YAMLDecoder
Constants ¶
const ( // DefaultMaxParallel is the default maximum number of parallel steps. DefaultMaxParallel = 10 // DefaultStepTimeout is the default timeout for individual step execution. DefaultStepTimeout = 10 * time.Second // DefaultInitialDelay is the default initial delay for retry. DefaultInitialDelay = 10 * time.Millisecond // DefaultMaxDelay is the default maximum delay for retry. DefaultMaxDelay = 100 * time.Millisecond // DefaultRetryAttempts is the default number of retry attempts. DefaultRetryAttempts = 3 // DefaultWorkflowTimeout is the default timeout for workflow execution. DefaultWorkflowTimeout = 5 * time.Minute // DefaultStepWaitDuration is the default duration to wait between step checks. DefaultStepWaitDuration = 100 * time.Millisecond // DefaultDAGTraversalTimeout is the default timeout for DAG traversal. DefaultDAGTraversalTimeout = 1 * time.Minute // DefaultMaxWorkflowSize is the default maximum number of steps in a workflow. DefaultMaxWorkflowSize = 100 // DefaultMaxDependencies is the default maximum number of dependencies per step. DefaultMaxDependencies = 10 )
Default configuration constants for workflow engine.
Variables ¶
var ( ErrFieldNotFound = stderrors.New("field not found") ErrDuplicateAgentDefinition = stderrors.New("duplicate agent definition") )
Definition errors.
var ( ErrInvalidDependency = errors.New("invalid dependency: step not found") ErrCycleDetected = errors.New("cycle detected in workflow") ErrAgentTypeRegistered = errors.New("agent type already registered") ErrAgentTypeNotFound = errors.New("agent type not found") ErrAgentResultNil = errors.New("agent returned nil result") ErrWorkflowIncomplete = errors.New("workflow incomplete") ErrInvalidLoader = errors.New("invalid loader type") ErrDuplicateID = errors.New("duplicate ID") )
Workflow errors.
Functions ¶
This section is empty.
Types ¶
type AgentDefinition ¶
type AgentDefinition struct {
Name string `json:"name"`
Type string `json:"type"`
Description string `json:"description"`
Prompts map[string]string `json:"prompts"`
Tools []string `json:"tools"`
Metadata map[string]string `json:"metadata"`
}
AgentDefinition represents an agent definition from markdown.
type AgentExecutor ¶
type AgentExecutor struct {
// contains filtered or unexported fields
}
AgentExecutor executes tasks using registered agents.
func NewAgentExecutor ¶
func NewAgentExecutor(registry *AgentRegistry) *AgentExecutor
NewAgentExecutor creates a new AgentExecutor.
type AgentFactory ¶
AgentFactory creates agent instances.
type AgentRegistry ¶
type AgentRegistry struct {
// contains filtered or unexported fields
}
AgentRegistry manages agent type registrations.
func NewAgentRegistry ¶
func NewAgentRegistry() *AgentRegistry
NewAgentRegistry creates a new AgentRegistry.
func (*AgentRegistry) CreateAgent ¶
func (r *AgentRegistry) CreateAgent(ctx context.Context, agentType string, config interface{}) (base.Agent, error)
CreateAgent creates an agent instance by type.
func (*AgentRegistry) GetFactory ¶
func (r *AgentRegistry) GetFactory(agentType string) (AgentFactory, bool)
GetFactory returns an agent factory by type.
func (*AgentRegistry) ListTypes ¶
func (r *AgentRegistry) ListTypes() []string
ListTypes returns all registered agent types.
func (*AgentRegistry) Register ¶
func (r *AgentRegistry) Register(agentType string, factory AgentFactory) error
Register registers an agent factory for a type.
func (*AgentRegistry) Unregister ¶
func (r *AgentRegistry) Unregister(agentType string)
Unregister removes an agent factory.
type DAG ¶
DAG represents a directed acyclic graph of workflow steps.
func (*DAG) GetExecutionOrder ¶
GetExecutionOrder returns the topological sort order of steps.
type DefinitionParser ¶
type DefinitionParser struct {
}
DefinitionParser parses agent definitions from markdown files.
func NewDefinitionParser ¶
func NewDefinitionParser() *DefinitionParser
NewDefinitionParser creates a new DefinitionParser.
func (*DefinitionParser) Parse ¶
func (p *DefinitionParser) Parse(ctx context.Context, r io.Reader) (*AgentDefinition, error)
Parse parses an agent definition from a reader.
func (*DefinitionParser) ParseBytes ¶
func (p *DefinitionParser) ParseBytes(ctx context.Context, content []byte) (*AgentDefinition, error)
ParseBytes parses an agent definition from bytes.
func (*DefinitionParser) ParseFile ¶
func (p *DefinitionParser) ParseFile(ctx context.Context, path string) (*AgentDefinition, error)
ParseFile parses an agent definition from a file.
type DirectoryLoader ¶
type DirectoryLoader struct {
// contains filtered or unexported fields
}
DirectoryLoader loads workflows from a directory.
func NewDirectoryLoader ¶
func NewDirectoryLoader(fileLoader *FileLoader) *DirectoryLoader
NewDirectoryLoader creates a new DirectoryLoader.
type DirectoryParser ¶
type DirectoryParser struct {
// contains filtered or unexported fields
}
DirectoryParser parses agent definitions from a directory.
func NewDirectoryParser ¶
func NewDirectoryParser(parser *DefinitionParser) *DirectoryParser
NewDirectoryParser creates a new DirectoryParser.
func (*DirectoryParser) ParseAll ¶
func (p *DirectoryParser) ParseAll(ctx context.Context, dir string) (map[string]*AgentDefinition, error)
ParseAll parses all agent definitions from a directory.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor executes workflows based on DAG ordering. OutputStore is execution-scoped (created per Execute call) rather than executor-scoped, ensuring thread-safety and preventing data races when multiple workflows execute concurrently.
func NewExecutor ¶
func NewExecutor(registry *AgentRegistry) *Executor
NewExecutor creates a new Executor.
type FileLoader ¶
type FileLoader struct {
// contains filtered or unexported fields
}
FileLoader loads workflows from files.
func NewFileLoader ¶
func NewFileLoader(decoder Decoder, opts ...FileLoaderOption) *FileLoader
func NewJSONFileLoader ¶
func NewJSONFileLoader(opts ...FileLoaderOption) *FileLoader
NewJSONFileLoader creates a FileLoader for JSON files.
func NewYAMLFileLoader ¶
func NewYAMLFileLoader(opts ...FileLoaderOption) *FileLoader
NewYAMLFileLoader creates a FileLoader for YAML files.
type FileLoaderOption ¶
type FileLoaderOption func(*FileLoader)
NewFileLoader creates a new FileLoader.
func WithAllowedDir ¶
func WithAllowedDir(dir string) FileLoaderOption
WithAllowedDir sets the allowed directory for security checks.
type FileWatcher ¶
type FileWatcher struct {
// contains filtered or unexported fields
}
FileWatcher watches files for changes using fsnotify.
func NewFileWatcher ¶
func NewFileWatcher(loader WorkflowLoader, workflows map[string]*Workflow) *FileWatcher
NewFileWatcher creates a new FileWatcher.
func (*FileWatcher) Close ¶
func (w *FileWatcher) Close()
Close closes the file watcher and releases resources.
func (*FileWatcher) RegisterCallback ¶
func (w *FileWatcher) RegisterCallback(callback ReloadCallback) string
RegisterCallback registers a callback for reload events and returns the callback ID.
func (*FileWatcher) UnregisterCallback ¶
func (w *FileWatcher) UnregisterCallback(callbackID string)
UnregisterCallback removes a callback by ID.
type JSONDecoder ¶
type JSONDecoder struct{}
JSONDecoder decodes JSON files.
func (*JSONDecoder) Decode ¶
func (d *JSONDecoder) Decode(data []byte, v interface{}) error
Decode decodes JSON data.
type OutputStore ¶
type OutputStore struct {
// contains filtered or unexported fields
}
OutputStore stores step outputs.
func (*OutputStore) Get ¶
func (s *OutputStore) Get(stepID string) (*StepOutput, bool)
Get retrieves output for a step.
func (*OutputStore) GetMultiple ¶
func (s *OutputStore) GetMultiple(stepIDs []string) map[string]*StepOutput
GetMultiple retrieves outputs for multiple steps.
func (*OutputStore) Set ¶
func (s *OutputStore) Set(stepID string, output *StepOutput)
Set stores output for a step.
type ReloadCallback ¶
ReloadCallback is called when workflows are reloaded.
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int `json:"max_attempts"`
InitialDelay time.Duration `json:"initial_delay"`
MaxDelay time.Duration `json:"max_delay"`
BackoffMultiplier float64 `json:"backoff_multiplier"`
}
RetryPolicy defines retry behavior for a step.
type RetryPolicyFile ¶
type RetryPolicyFile struct {
MaxAttempts int `json:"max_attempts" yaml:"max_attempts"`
InitialDelay time.Duration `json:"initial_delay" yaml:"initial_delay"`
MaxDelay time.Duration `json:"max_delay" yaml:"max_delay"`
BackoffMultiplier float64 `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}
RetryPolicyFile represents retry policy from a file.
type Step ¶
type Step struct {
ID string `json:"id"`
Name string `json:"name"`
AgentType string `json:"agent_type"`
Input string `json:"input"`
DependsOn []string `json:"depends_on"`
Timeout time.Duration `json:"timeout"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
Status StepStatus `json:"status"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
Step represents a single step in a workflow.
type StepFile ¶
type StepFile struct {
ID string `json:"id" yaml:"id"`
Name string `json:"name" yaml:"name"`
AgentType string `json:"agent_type" yaml:"agent_type"`
Input string `json:"input" yaml:"input"`
DependsOn []string `json:"depends_on" yaml:"depends_on"`
Timeout time.Duration `json:"timeout" yaml:"timeout"`
RetryPolicy *RetryPolicyFile `json:"retry_policy" yaml:"retry_policy"`
Metadata map[string]string `json:"metadata" yaml:"metadata"`
}
StepFile represents a step definition from a file.
type StepOutput ¶
StepOutput stores the output of a step for dependency resolution.
type StepResult ¶
type StepResult struct {
StepID string `json:"step_id"`
Name string `json:"name"`
Status StepStatus `json:"status"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Metadata map[string]string `json:"metadata,omitempty"`
}
StepResult represents the result of a step execution.
type StepState ¶
type StepState struct {
StepID string `json:"step_id"`
Status StepStatus `json:"status"`
Output string `json:"output,omitempty"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Attempts int `json:"attempts"`
}
StepState represents the runtime state of a step.
type StepStatus ¶
type StepStatus string
StepStatus represents the execution status of a workflow step.
const ( StepStatusPending StepStatus = "pending" StepStatusRunning StepStatus = "running" StepStatusCompleted StepStatus = "completed" StepStatusFailed StepStatus = "failed" StepStatusSkipped StepStatus = "skipped" )
type Workflow ¶
type Workflow struct {
ID string `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Description string `json:"description"`
Steps []*Step `json:"steps"`
Variables map[string]string `json:"variables,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Workflow represents a workflow definition.
type WorkflowExecution ¶
type WorkflowExecution struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
Status WorkflowStatus `json:"status"`
StepStates map[string]*StepState `json:"step_states"`
Variables map[string]interface{} `json:"variables"`
Context *models.TaskContext `json:"context"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Error string `json:"error,omitempty"`
}
WorkflowExecution represents a running instance of a workflow.
type WorkflowFile ¶
type WorkflowFile struct {
ID string `json:"id" yaml:"id"`
Name string `json:"name" yaml:"name"`
Version string `json:"version" yaml:"version"`
Description string `json:"description" yaml:"description"`
Steps []*StepFile `json:"steps" yaml:"steps"`
Variables map[string]string `json:"variables" yaml:"variables"`
Metadata map[string]string `json:"metadata" yaml:"metadata"`
}
WorkflowFile represents a workflow definition from a file.
type WorkflowLoader ¶
WorkflowLoader loads workflow definitions from various sources.
type WorkflowReloader ¶
type WorkflowReloader struct {
// contains filtered or unexported fields
}
WorkflowReloader manages workflow hot reloading.
func NewWorkflowReloader ¶
func NewWorkflowReloader(loader WorkflowLoader) *WorkflowReloader
NewWorkflowReloader creates a new WorkflowReloader.
func (*WorkflowReloader) GetWorkflow ¶
func (r *WorkflowReloader) GetWorkflow(id string) (*Workflow, bool)
GetWorkflow returns a workflow by ID.
func (*WorkflowReloader) ListWorkflows ¶
func (r *WorkflowReloader) ListWorkflows() []*Workflow
ListWorkflows returns all loaded workflows.
func (*WorkflowReloader) Load ¶
func (r *WorkflowReloader) Load(ctx context.Context, dir string) error
Load workflows from a directory.
func (*WorkflowReloader) RegisterCallback ¶
func (r *WorkflowReloader) RegisterCallback(callback ReloadCallback) string
RegisterCallback registers a callback for reload events and returns the callback ID.
func (*WorkflowReloader) StartWatching ¶
func (r *WorkflowReloader) StartWatching(ctx context.Context, dir string) error
StartWatching starts watching for file changes.
func (*WorkflowReloader) StopWatching ¶
func (r *WorkflowReloader) StopWatching()
StopWatching stops watching for file changes.
func (*WorkflowReloader) UnregisterCallback ¶
func (r *WorkflowReloader) UnregisterCallback(callbackID string)
UnregisterCallback removes a callback by ID.
type WorkflowResult ¶
type WorkflowResult struct {
ExecutionID string `json:"execution_id"`
WorkflowID string `json:"workflow_id"`
Status WorkflowStatus `json:"status"`
Output map[string]interface{} `json:"output"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Steps []*StepResult `json:"steps"`
}
WorkflowResult represents the final result of a workflow execution.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the execution status of a workflow.
const ( WorkflowStatusPending WorkflowStatus = "pending" WorkflowStatusRunning WorkflowStatus = "running" WorkflowStatusCompleted WorkflowStatus = "completed" WorkflowStatusFailed WorkflowStatus = "failed" WorkflowStatusCancelled WorkflowStatus = "cancelled" )
type YAMLDecoder ¶
type YAMLDecoder struct{}
YAMLDecoder decodes YAML files.
func (*YAMLDecoder) Decode ¶
func (d *YAMLDecoder) Decode(data []byte, v interface{}) error
Decode decodes YAML data.