engine

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxParallel is the default maximum number of parallel steps.
	DefaultMaxParallel = 10

	// DefaultStepTimeout is the default timeout for individual step execution.
	DefaultStepTimeout = 10 * time.Second

	// DefaultInitialDelay is the default initial delay for retry.
	DefaultInitialDelay = 10 * time.Millisecond

	// DefaultMaxDelay is the default maximum delay for retry.
	DefaultMaxDelay = 100 * time.Millisecond

	// DefaultRetryAttempts is the default number of retry attempts.
	DefaultRetryAttempts = 3

	// DefaultWorkflowTimeout is the default timeout for workflow execution.
	DefaultWorkflowTimeout = 5 * time.Minute

	// DefaultStepWaitDuration is the default duration to wait between step checks.
	DefaultStepWaitDuration = 100 * time.Millisecond

	// DefaultDAGTraversalTimeout is the default timeout for DAG traversal.
	DefaultDAGTraversalTimeout = 1 * time.Minute

	// DefaultMaxWorkflowSize is the default maximum number of steps in a workflow.
	DefaultMaxWorkflowSize = 100

	// DefaultMaxDependencies is the default maximum number of dependencies per step.
	DefaultMaxDependencies = 10
)

Default configuration constants for workflow engine.

Variables

View Source
var (
	ErrFieldNotFound            = stderrors.New("field not found")
	ErrDuplicateAgentDefinition = stderrors.New("duplicate agent definition")
)

Definition errors.

View Source
var (
	ErrInvalidDependency   = errors.New("invalid dependency: step not found")
	ErrCycleDetected       = errors.New("cycle detected in workflow")
	ErrAgentTypeRegistered = errors.New("agent type already registered")
	ErrAgentTypeNotFound   = errors.New("agent type not found")
	ErrAgentResultNil      = errors.New("agent returned nil result")
	ErrWorkflowIncomplete  = errors.New("workflow incomplete")
	ErrInvalidLoader       = errors.New("invalid loader type")
	ErrDuplicateID         = errors.New("duplicate ID")
)

Workflow errors.

Functions

This section is empty.

Types

type AgentDefinition

type AgentDefinition struct {
	Name        string            `json:"name"`
	Type        string            `json:"type"`
	Description string            `json:"description"`
	Prompts     map[string]string `json:"prompts"`
	Tools       []string          `json:"tools"`
	Metadata    map[string]string `json:"metadata"`
}

AgentDefinition represents an agent definition from markdown.

type AgentExecutor

type AgentExecutor struct {
	// contains filtered or unexported fields
}

AgentExecutor executes tasks using registered agents.

func NewAgentExecutor

func NewAgentExecutor(registry *AgentRegistry) *AgentExecutor

NewAgentExecutor creates a new AgentExecutor.

func (*AgentExecutor) Execute

func (e *AgentExecutor) Execute(ctx context.Context, step *Step, input string, taskCtx *models.TaskContext) (string, error)

Execute executes a step using the appropriate agent.

type AgentFactory

type AgentFactory func(ctx context.Context, config interface{}) (base.Agent, error)

AgentFactory creates agent instances.

type AgentRegistry

type AgentRegistry struct {
	// contains filtered or unexported fields
}

AgentRegistry manages agent type registrations.

func NewAgentRegistry

func NewAgentRegistry() *AgentRegistry

NewAgentRegistry creates a new AgentRegistry.

func (*AgentRegistry) CreateAgent

func (r *AgentRegistry) CreateAgent(ctx context.Context, agentType string, config interface{}) (base.Agent, error)

CreateAgent creates an agent instance by type.

func (*AgentRegistry) GetFactory

func (r *AgentRegistry) GetFactory(agentType string) (AgentFactory, bool)

GetFactory returns an agent factory by type.

func (*AgentRegistry) ListTypes

func (r *AgentRegistry) ListTypes() []string

ListTypes returns all registered agent types.

func (*AgentRegistry) Register

func (r *AgentRegistry) Register(agentType string, factory AgentFactory) error

Register registers an agent factory for a type.

func (*AgentRegistry) Unregister

func (r *AgentRegistry) Unregister(agentType string)

Unregister removes an agent factory.

type DAG

type DAG struct {
	Nodes map[string]*DAGNode
	Edges map[string][]string
}

DAG represents a directed acyclic graph of workflow steps.

func NewDAG

func NewDAG(steps []*Step) (*DAG, error)

NewDAG creates a new DAG from workflow steps.

func (*DAG) GetExecutionOrder

func (d *DAG) GetExecutionOrder() ([]string, error)

GetExecutionOrder returns the topological sort order of steps.

type DAGNode

type DAGNode struct {
	StepID    string
	InDegree  int
	OutDegree int
}

DAGNode represents a node in the workflow DAG.

type Decoder

type Decoder interface {
	Decode(data []byte, v interface{}) error
}

Decoder decodes workflow files.

type DefinitionParser

type DefinitionParser struct {
}

DefinitionParser parses agent definitions from markdown files.

func NewDefinitionParser

func NewDefinitionParser() *DefinitionParser

NewDefinitionParser creates a new DefinitionParser.

func (*DefinitionParser) Parse

Parse parses an agent definition from a reader.

func (*DefinitionParser) ParseBytes

func (p *DefinitionParser) ParseBytes(ctx context.Context, content []byte) (*AgentDefinition, error)

ParseBytes parses an agent definition from bytes.

func (*DefinitionParser) ParseFile

func (p *DefinitionParser) ParseFile(ctx context.Context, path string) (*AgentDefinition, error)

ParseFile parses an agent definition from a file.

type DirectoryLoader

type DirectoryLoader struct {
	// contains filtered or unexported fields
}

DirectoryLoader loads workflows from a directory.

func NewDirectoryLoader

func NewDirectoryLoader(fileLoader *FileLoader) *DirectoryLoader

NewDirectoryLoader creates a new DirectoryLoader.

func (*DirectoryLoader) LoadAll

func (l *DirectoryLoader) LoadAll(ctx context.Context, dir string) (map[string]*Workflow, error)

LoadAll loads all workflows from a directory.

type DirectoryParser

type DirectoryParser struct {
	// contains filtered or unexported fields
}

DirectoryParser parses agent definitions from a directory.

func NewDirectoryParser

func NewDirectoryParser(parser *DefinitionParser) *DirectoryParser

NewDirectoryParser creates a new DirectoryParser.

func (*DirectoryParser) ParseAll

func (p *DirectoryParser) ParseAll(ctx context.Context, dir string) (map[string]*AgentDefinition, error)

ParseAll parses all agent definitions from a directory.

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes workflows based on DAG ordering. OutputStore is execution-scoped (created per Execute call) rather than executor-scoped, ensuring thread-safety and preventing data races when multiple workflows execute concurrently.

func NewExecutor

func NewExecutor(registry *AgentRegistry) *Executor

NewExecutor creates a new Executor.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, workflow *Workflow, initialInput string) (*WorkflowResult, error)

Execute executes a workflow.

type FileLoader

type FileLoader struct {
	// contains filtered or unexported fields
}

FileLoader loads workflows from files.

func NewFileLoader

func NewFileLoader(decoder Decoder, opts ...FileLoaderOption) *FileLoader

func NewJSONFileLoader

func NewJSONFileLoader(opts ...FileLoaderOption) *FileLoader

NewJSONFileLoader creates a FileLoader for JSON files.

func NewYAMLFileLoader

func NewYAMLFileLoader(opts ...FileLoaderOption) *FileLoader

NewYAMLFileLoader creates a FileLoader for YAML files.

func (*FileLoader) Load

func (l *FileLoader) Load(ctx context.Context, source string) (*Workflow, error)

Load loads a workflow from a file.

func (*FileLoader) Parse

func (l *FileLoader) Parse(ctx context.Context, data []byte, source string) (*Workflow, error)

Parse parses workflow data.

type FileLoaderOption

type FileLoaderOption func(*FileLoader)

NewFileLoader creates a new FileLoader.

func WithAllowedDir

func WithAllowedDir(dir string) FileLoaderOption

WithAllowedDir sets the allowed directory for security checks.

type FileWatcher

type FileWatcher struct {
	// contains filtered or unexported fields
}

FileWatcher watches files for changes using fsnotify.

func NewFileWatcher

func NewFileWatcher(loader WorkflowLoader, workflows map[string]*Workflow) *FileWatcher

NewFileWatcher creates a new FileWatcher.

func (*FileWatcher) Close

func (w *FileWatcher) Close()

Close closes the file watcher and releases resources.

func (*FileWatcher) RegisterCallback

func (w *FileWatcher) RegisterCallback(callback ReloadCallback) string

RegisterCallback registers a callback for reload events and returns the callback ID.

func (*FileWatcher) UnregisterCallback

func (w *FileWatcher) UnregisterCallback(callbackID string)

UnregisterCallback removes a callback by ID.

func (*FileWatcher) Watch

func (w *FileWatcher) Watch(ctx context.Context, dir string) error

Watch starts watching workflow files for changes.

type JSONDecoder

type JSONDecoder struct{}

JSONDecoder decodes JSON files.

func (*JSONDecoder) Decode

func (d *JSONDecoder) Decode(data []byte, v interface{}) error

Decode decodes JSON data.

type OutputStore

type OutputStore struct {
	// contains filtered or unexported fields
}

OutputStore stores step outputs.

func NewOutputStore

func NewOutputStore() *OutputStore

NewOutputStore creates a new OutputStore.

func (*OutputStore) Clear

func (s *OutputStore) Clear()

Clear removes all outputs.

func (*OutputStore) Get

func (s *OutputStore) Get(stepID string) (*StepOutput, bool)

Get retrieves output for a step.

func (*OutputStore) GetMultiple

func (s *OutputStore) GetMultiple(stepIDs []string) map[string]*StepOutput

GetMultiple retrieves outputs for multiple steps.

func (*OutputStore) Set

func (s *OutputStore) Set(stepID string, output *StepOutput)

Set stores output for a step.

type ReloadCallback

type ReloadCallback func(workflows map[string]*Workflow)

ReloadCallback is called when workflows are reloaded.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts       int           `json:"max_attempts"`
	InitialDelay      time.Duration `json:"initial_delay"`
	MaxDelay          time.Duration `json:"max_delay"`
	BackoffMultiplier float64       `json:"backoff_multiplier"`
}

RetryPolicy defines retry behavior for a step.

type RetryPolicyFile

type RetryPolicyFile struct {
	MaxAttempts       int           `json:"max_attempts" yaml:"max_attempts"`
	InitialDelay      time.Duration `json:"initial_delay" yaml:"initial_delay"`
	MaxDelay          time.Duration `json:"max_delay" yaml:"max_delay"`
	BackoffMultiplier float64       `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}

RetryPolicyFile represents retry policy from a file.

type Step

type Step struct {
	ID          string            `json:"id"`
	Name        string            `json:"name"`
	AgentType   string            `json:"agent_type"`
	Input       string            `json:"input"`
	DependsOn   []string          `json:"depends_on"`
	Timeout     time.Duration     `json:"timeout"`
	RetryPolicy *RetryPolicy      `json:"retry_policy,omitempty"`
	Status      StepStatus        `json:"status"`
	Output      string            `json:"output,omitempty"`
	Error       string            `json:"error,omitempty"`
	StartedAt   time.Time         `json:"started_at,omitempty"`
	FinishedAt  time.Time         `json:"finished_at,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

Step represents a single step in a workflow.

type StepFile

type StepFile struct {
	ID          string            `json:"id" yaml:"id"`
	Name        string            `json:"name" yaml:"name"`
	AgentType   string            `json:"agent_type" yaml:"agent_type"`
	Input       string            `json:"input" yaml:"input"`
	DependsOn   []string          `json:"depends_on" yaml:"depends_on"`
	Timeout     time.Duration     `json:"timeout" yaml:"timeout"`
	RetryPolicy *RetryPolicyFile  `json:"retry_policy" yaml:"retry_policy"`
	Metadata    map[string]string `json:"metadata" yaml:"metadata"`
}

StepFile represents a step definition from a file.

type StepOutput

type StepOutput struct {
	StepID    string
	Output    string
	Variables map[string]interface{}
}

StepOutput stores the output of a step for dependency resolution.

type StepResult

type StepResult struct {
	StepID   string            `json:"step_id"`
	Name     string            `json:"name"`
	Status   StepStatus        `json:"status"`
	Output   string            `json:"output,omitempty"`
	Error    string            `json:"error,omitempty"`
	Duration time.Duration     `json:"duration"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

StepResult represents the result of a step execution.

type StepState

type StepState struct {
	StepID     string     `json:"step_id"`
	Status     StepStatus `json:"status"`
	Output     string     `json:"output,omitempty"`
	Error      string     `json:"error,omitempty"`
	StartedAt  time.Time  `json:"started_at,omitempty"`
	FinishedAt time.Time  `json:"finished_at,omitempty"`
	Attempts   int        `json:"attempts"`
}

StepState represents the runtime state of a step.

type StepStatus

type StepStatus string

StepStatus represents the execution status of a workflow step.

const (
	StepStatusPending   StepStatus = "pending"
	StepStatusRunning   StepStatus = "running"
	StepStatusCompleted StepStatus = "completed"
	StepStatusFailed    StepStatus = "failed"
	StepStatusSkipped   StepStatus = "skipped"
)

type Workflow

type Workflow struct {
	ID          string            `json:"id"`
	Name        string            `json:"name"`
	Version     string            `json:"version"`
	Description string            `json:"description"`
	Steps       []*Step           `json:"steps"`
	Variables   map[string]string `json:"variables,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
	CreatedAt   time.Time         `json:"created_at"`
	UpdatedAt   time.Time         `json:"updated_at"`
}

Workflow represents a workflow definition.

type WorkflowExecution

type WorkflowExecution struct {
	ID         string                 `json:"id"`
	WorkflowID string                 `json:"workflow_id"`
	Status     WorkflowStatus         `json:"status"`
	StepStates map[string]*StepState  `json:"step_states"`
	Variables  map[string]interface{} `json:"variables"`
	Context    *models.TaskContext    `json:"context"`
	StartedAt  time.Time              `json:"started_at"`
	FinishedAt time.Time              `json:"finished_at,omitempty"`
	Error      string                 `json:"error,omitempty"`
}

WorkflowExecution represents a running instance of a workflow.

type WorkflowFile

type WorkflowFile struct {
	ID          string            `json:"id" yaml:"id"`
	Name        string            `json:"name" yaml:"name"`
	Version     string            `json:"version" yaml:"version"`
	Description string            `json:"description" yaml:"description"`
	Steps       []*StepFile       `json:"steps" yaml:"steps"`
	Variables   map[string]string `json:"variables" yaml:"variables"`
	Metadata    map[string]string `json:"metadata" yaml:"metadata"`
}

WorkflowFile represents a workflow definition from a file.

type WorkflowLoader

type WorkflowLoader interface {
	Load(ctx context.Context, source string) (*Workflow, error)
}

WorkflowLoader loads workflow definitions from various sources.

type WorkflowReloader

type WorkflowReloader struct {
	// contains filtered or unexported fields
}

WorkflowReloader manages workflow hot reloading.

func NewWorkflowReloader

func NewWorkflowReloader(loader WorkflowLoader) *WorkflowReloader

NewWorkflowReloader creates a new WorkflowReloader.

func (*WorkflowReloader) GetWorkflow

func (r *WorkflowReloader) GetWorkflow(id string) (*Workflow, bool)

GetWorkflow returns a workflow by ID.

func (*WorkflowReloader) ListWorkflows

func (r *WorkflowReloader) ListWorkflows() []*Workflow

ListWorkflows returns all loaded workflows.

func (*WorkflowReloader) Load

func (r *WorkflowReloader) Load(ctx context.Context, dir string) error

Load workflows from a directory.

func (*WorkflowReloader) RegisterCallback

func (r *WorkflowReloader) RegisterCallback(callback ReloadCallback) string

RegisterCallback registers a callback for reload events and returns the callback ID.

func (*WorkflowReloader) StartWatching

func (r *WorkflowReloader) StartWatching(ctx context.Context, dir string) error

StartWatching starts watching for file changes.

func (*WorkflowReloader) StopWatching

func (r *WorkflowReloader) StopWatching()

StopWatching stops watching for file changes.

func (*WorkflowReloader) UnregisterCallback

func (r *WorkflowReloader) UnregisterCallback(callbackID string)

UnregisterCallback removes a callback by ID.

type WorkflowResult

type WorkflowResult struct {
	ExecutionID string                 `json:"execution_id"`
	WorkflowID  string                 `json:"workflow_id"`
	Status      WorkflowStatus         `json:"status"`
	Output      map[string]interface{} `json:"output"`
	Error       string                 `json:"error,omitempty"`
	Duration    time.Duration          `json:"duration"`
	Steps       []*StepResult          `json:"steps"`
}

WorkflowResult represents the final result of a workflow execution.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the execution status of a workflow.

const (
	WorkflowStatusPending   WorkflowStatus = "pending"
	WorkflowStatusRunning   WorkflowStatus = "running"
	WorkflowStatusCompleted WorkflowStatus = "completed"
	WorkflowStatusFailed    WorkflowStatus = "failed"
	WorkflowStatusCancelled WorkflowStatus = "cancelled"
)

type YAMLDecoder

type YAMLDecoder struct{}

YAMLDecoder decodes YAML files.

func (*YAMLDecoder) Decode

func (d *YAMLDecoder) Decode(data []byte, v interface{}) error

Decode decodes YAML data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL