Documentation
¶
Index ¶
- Constants
- func RemoveJSONSchema(input string) string
- func TestMain(m *testing.M)
- type ActionState
- type ActionStates
- type CLIProgressTracker
- type ExecutionResult
- type Executor
- type ExecutorConfig
- type ExecutorFunc
- type InputValidationError
- type InputValidationResult
- type OutputParser
- type Runner
- func (r *Runner) RunWorkflow(ctx execcontext.RunContext, workflowFile string, inputs map[string]interface{}, ...) (*ExecutionResult, error)
- func (r *Runner) RunWorkflowRaw(execCtx *execcontext.ExecutionContext, workflow *ast.Workflow, ...) (*ExecutionResult, error)
- func (r *Runner) SetProgressListener(listener pkgEvents.Listener)
- type RunnerOption
- type StepExecutionResult
- type StepProgressState
- type StepResult
- type TokenUsage
- type TokenUsageSummary
- type WorkflowExecutor
Constants ¶
const (
JSON_OUTPUT_SCHEMA_PREFIX = "IMPORTANT: Respond in JSON using the following schema:"
)
Variables ¶
This section is empty.
Functions ¶
func RemoveJSONSchema ¶
RemoveJSONSchema strips JSON schema instructions from AI model prompts by removing the "IMPORTANT:" directive and associated JSON schema blocks. This is used to clean prompts for display purposes while preserving the original prompt content.
Types ¶
type ActionState ¶
type ActionState struct {
// contains filtered or unexported fields
}
ActionState represents a single action within a workflow step, tracking its identifier and display text.
type ActionStates ¶
type ActionStates []ActionState
ActionStates is a collection of action states within a workflow step.
func (*ActionStates) Add ¶
func (as *ActionStates) Add(action ActionState)
Add appends a new action state and marks all previous actions as completed if they haven't already been marked with success or error icons.
func (ActionStates) String ¶
func (as ActionStates) String() string
String formats all action states with proper indentation and line wrapping.
type CLIProgressTracker ¶
type CLIProgressTracker struct {
// contains filtered or unexported fields
}
CLIProgressTracker manages visual progress display for workflow execution, coordinating step-level spinners and action states.
func NewProgressTracker ¶
func NewProgressTracker(writer io.Writer, prefix string, totalSteps int) *CLIProgressTracker
NewProgressTracker creates a progress tracker for displaying workflow execution status.
func (*CLIProgressTracker) HasCompleted ¶
func (pt *CLIProgressTracker) HasCompleted() bool
HasCompleted checks if the progress tracker has completed.
func (*CLIProgressTracker) StartListening ¶
func (pt *CLIProgressTracker) StartListening(progressChan <-chan pkgEvents.ExecutionEvent)
StartListening processes execution events and updates the visual progress display.
func (*CLIProgressTracker) StopListening ¶
func (pt *CLIProgressTracker) StopListening()
StopListening halts progress tracking and stops all active spinners.
type ExecutionResult ¶
type ExecutionResult struct {
WorkflowFile string `json:"workflow_file" yaml:"workflow_file"`
RunID string `json:"run_id" yaml:"run_id"`
Status string `json:"status" yaml:"status"`
StartTime time.Time `json:"start_time" yaml:"start_time"`
EndTime time.Time `json:"end_time,omitempty" yaml:"end_time,omitempty"`
Duration time.Duration `json:"duration" yaml:"duration"`
StepsTotal int `json:"steps_total" yaml:"steps_total"`
StepResults []StepExecutionResult `json:"step_results,omitempty" yaml:"step_results,omitempty"`
Inputs map[string]interface{} `json:"inputs" yaml:"inputs"`
Outputs map[string]interface{} `json:"outputs,omitempty" yaml:"outputs,omitempty"`
FinalState map[string]interface{} `json:"final_state,omitempty" yaml:"final_state,omitempty"`
Error string `json:"error,omitempty" yaml:"error,omitempty"`
TokenUsage *TokenUsageSummary `json:"token_usage,omitempty" yaml:"token_usage,omitempty"`
}
ExecutionResult contains the complete outcome of workflow execution including timing, status, step results, and resource usage metrics.
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor orchestrates the execution of workflow steps, managing AI providers, tools, templating, and progress reporting. It handles concurrent step execution, retry logic, and maintains execution context throughout the workflow lifecycle.
func (*Executor) ExecuteWorkflow ¶
func (e *Executor) ExecuteWorkflow(execCtx *execcontext.ExecutionContext, progressChan chan<- pkgEvents.ExecutionEvent) error
ExecuteWorkflow runs the complete workflow, executing steps sequentially while respecting dependencies and conditional logic. Progress events are sent to the provided channel for real-time monitoring. Returns an error if any step fails or if workflow output collection encounters issues.
type ExecutorConfig ¶
type ExecutorConfig struct {
MaxConcurrentSteps int `yaml:"max_concurrent_steps"`
DefaultTimeout time.Duration `yaml:"default_timeout"`
EnableRetries bool `yaml:"enable_retries"`
MaxRetries int `yaml:"max_retries"`
RetryDelay time.Duration `yaml:"retry_delay"`
EnableMetrics bool `yaml:"enable_metrics"`
}
ExecutorConfig defines the runtime behavior and limits for workflow execution. It controls concurrency, timeouts, retry policies, and observability features.
func DefaultExecutorConfig ¶
func DefaultExecutorConfig() *ExecutorConfig
DefaultExecutorConfig returns production-ready configuration values with moderate concurrency limits and retry policies enabled.
type ExecutorFunc ¶
type ExecutorFunc func(ctx execcontext.RunContext, config *ExecutorConfig, workflow *ast.Workflow, registry *provider.Registry, runner *Runner) (WorkflowExecutor, error)
ExecutorFunc is a function that creates a new Executor instance.
type InputValidationError ¶
type InputValidationError struct {
Field string `json:"field"`
Message string `json:"message"`
Value any `json:"value,omitempty"`
}
InputValidationError represents a validation error for a specific input field
func (*InputValidationError) Error ¶
func (e *InputValidationError) Error() string
Error implements the error interface
type InputValidationResult ¶
type InputValidationResult struct {
Valid bool `json:"valid"`
Errors []*InputValidationError `json:"errors,omitempty"`
ProcessedInputs map[string]any `json:"processed_inputs,omitempty"`
}
InputValidationResult holds the results of input validation
func ValidateWorkflowInputs ¶
func ValidateWorkflowInputs(workflow *ast.Workflow, providedInputs map[string]any) *InputValidationResult
ValidateWorkflowInputs validates provided inputs against workflow input definitions
func (*InputValidationResult) AddError ¶
func (r *InputValidationResult) AddError(field, message string, value any)
AddError adds a validation error
func (*InputValidationResult) Error ¶
func (r *InputValidationResult) Error() string
type OutputParser ¶
type OutputParser struct {
// contains filtered or unexported fields
}
OutputParser handles parsing and extraction of structured outputs from agent responses
func NewOutputParser ¶
func NewOutputParser() *OutputParser
NewOutputParser creates a new output parser instance
func (*OutputParser) ParseStepOutput ¶
func (p *OutputParser) ParseStepOutput(step *ast.Step, response string) interface{}
ParseStepOutput parses the agent response according to the step's output definitions
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner orchestrates workflow execution with progress tracking capabilities.
func NewRunner ¶
func NewRunner(progressListener pkgEvents.Listener, options ...RunnerOption) *Runner
NewRunner creates a workflow runner with the specified progress listener.
func (*Runner) RunWorkflow ¶
func (r *Runner) RunWorkflow(ctx execcontext.RunContext, workflowFile string, inputs map[string]interface{}, prefix ...string) (*ExecutionResult, error)
RunWorkflow parses and executes a workflow file with the given inputs. Handles input validation, default value assignment, and progress tracking.
func (*Runner) RunWorkflowRaw ¶
func (r *Runner) RunWorkflowRaw(execCtx *execcontext.ExecutionContext, workflow *ast.Workflow, startTime time.Time, prefix ...string) (*ExecutionResult, error)
RunWorkflowRaw executes a parsed workflow with the provided execution context. Returns detailed execution results including step outcomes and resource usage.
func (*Runner) SetProgressListener ¶
SetProgressListener updates the progress listener for workflow execution events.
type RunnerOption ¶
type RunnerOption func(*Runner)
RunnerOption is a function that can be used to configure a Runner.
func WithExecutorFunc ¶
func WithExecutorFunc(newExecutor ExecutorFunc) RunnerOption
WithExecutorFunc sets the function that creates a new Executor instance. This allows for custom Executor implementations to be used. In general this is only used for testing.
type StepExecutionResult ¶
type StepExecutionResult struct {
StepID string `json:"step_id" yaml:"step_id"`
Status string `json:"status" yaml:"status"`
StartTime time.Time `json:"start_time" yaml:"start_time"`
EndTime time.Time `json:"end_time,omitempty" yaml:"end_time,omitempty"`
Duration time.Duration `json:"duration" yaml:"duration"`
Output map[string]interface{} `json:"output,omitempty" yaml:"output,omitempty"`
Response string `json:"response,omitempty" yaml:"response,omitempty"`
Error string `json:"error,omitempty" yaml:"error,omitempty"`
Retries int `json:"retries" yaml:"retries"`
TokenUsage *TokenUsage `json:"token_usage,omitempty" yaml:"token_usage,omitempty"`
}
StepExecutionResult contains the execution outcome for an individual workflow step including its output, timing, retry information, and token usage.
type StepProgressState ¶
type StepProgressState struct {
// contains filtered or unexported fields
}
StepProgressState manages the visual display state for a workflow step, including its spinner animation and nested action states.
func (*StepProgressState) String ¶
func (s *StepProgressState) String() string
String returns a formatted representation of the step progress state including its title and action states.
type StepResult ¶
StepResult contains the execution result of a workflow step, including structured output data and the raw response from the execution.
func NewChildStepResult ¶
func NewChildStepResult(subExecCtx *execcontext.ExecutionContext, step *ast.Step) *StepResult
NewChildStepResult creates a StepResult for composite steps (like while loops) that contain nested steps. It aggregates all sub-step outputs and includes metadata about the number of iterations executed.
func NewStepResult ¶
func NewStepResult(output interface{}, raw ...string) *StepResult
NewStepResult creates a StepResult from execution output, automatically formatting structured data and optionally accepting a raw response string. The output is normalized to include both "output" and "outputs" fields.
type TokenUsage ¶
type TokenUsage struct {
PromptTokens int `json:"prompt_tokens" yaml:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens" yaml:"completion_tokens"`
TotalTokens int `json:"total_tokens" yaml:"total_tokens"`
EstimatedCost float64 `json:"estimated_cost" yaml:"estimated_cost"`
}
TokenUsage tracks token consumption and estimated cost for a single step execution.
type TokenUsageSummary ¶
type TokenUsageSummary struct {
TotalTokens int `json:"total_tokens" yaml:"total_tokens"`
PromptTokens int `json:"prompt_tokens" yaml:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens" yaml:"completion_tokens"`
}
TokenUsageSummary aggregates token consumption metrics across all workflow steps.
type WorkflowExecutor ¶
type WorkflowExecutor interface {
ExecuteWorkflow(execCtx *execcontext.ExecutionContext, progressChan chan<- pkgEvents.ExecutionEvent) error
}
WorkflowExecutor is an interface that defines the methods that an executor must implement. This is used to allow for custom executor implementations to be used. In general this is only used for testing.
func NewExecutor ¶
func NewExecutor(ctx execcontext.RunContext, config *ExecutorConfig, workflow *ast.Workflow, registry *provider.Registry, runner *Runner) (WorkflowExecutor, error)
NewExecutor creates a workflow executor instance with lazy initialization of AI providers, tool registries, and runtime dependencies. Only providers and tools referenced in the workflow are initialized to minimize resource usage. Returns an error if provider initialization or dependency resolution fails.