Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertConfigToProto(config *WorkflowConfig) (*loomv1.WorkflowPattern, error)
- func ExtractAgentIDs(pattern *loomv1.WorkflowPattern) []string
- func GetPatternType(pattern *loomv1.WorkflowPattern) string
- func InterpolateVariables(pattern *loomv1.WorkflowPattern, vars map[string]string) *loomv1.WorkflowPattern
- func LoadAgentFromTemplate(templatePath string, vars map[string]string) (*loomv1.AgentConfig, error)
- func LoadWorkflowFromYAML(path string) (*loomv1.WorkflowPattern, error)
- func ValidateOutputStructure(output string) error
- type AgentSpec
- type AgentTemplateConfig
- type ConditionalBuilder
- func (b *ConditionalBuilder) Default(pattern *loomv1.WorkflowPattern) *ConditionalBuilder
- func (b *ConditionalBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
- func (b *ConditionalBuilder) When(condition string, pattern *loomv1.WorkflowPattern) *ConditionalBuilder
- func (b *ConditionalBuilder) WithRetry(maxRetries int32) *ConditionalBuilder
- type ConditionalExecutor
- type Config
- type ContextMetadata
- type DebateBuilder
- func (b *DebateBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
- func (b *DebateBuilder) WithAgentIDs(agentIDs ...string) *DebateBuilder
- func (b *DebateBuilder) WithAgents(agents ...*agent.Agent) *DebateBuilder
- func (b *DebateBuilder) WithMergeStrategy(strategy loomv1.MergeStrategy) *DebateBuilder
- func (b *DebateBuilder) WithModerator(moderator *agent.Agent) *DebateBuilder
- func (b *DebateBuilder) WithRounds(rounds int) *DebateBuilder
- type Evidence
- type ExecuteFunc
- type FeedbackFunc
- type ForkJoinBuilder
- func (b *ForkJoinBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
- func (b *ForkJoinBuilder) Join(strategy loomv1.MergeStrategy) *ForkJoinBuilder
- func (b *ForkJoinBuilder) WithAgentIDs(agentIDs ...string) *ForkJoinBuilder
- func (b *ForkJoinBuilder) WithAgents(agents ...*agent.Agent) *ForkJoinBuilder
- func (b *ForkJoinBuilder) WithTimeout(seconds int) *ForkJoinBuilder
- type ForkJoinExecutor
- type IterativePipelineExecutor
- type Orchestrator
- func (o *Orchestrator) Conditional(classifier *agent.Agent, conditionPrompt string) *ConditionalBuilder
- func (o *Orchestrator) Debate(topic string) *DebateBuilder
- func (o *Orchestrator) ExecutePattern(ctx context.Context, pattern *loomv1.WorkflowPattern) (*loomv1.WorkflowResult, error)
- func (o *Orchestrator) Fork(prompt string) *ForkJoinBuilder
- func (o *Orchestrator) GetAgent(ctx context.Context, id string) (*agent.Agent, error)
- func (o *Orchestrator) GetMergeLLM() agent.LLMProvider
- func (o *Orchestrator) Parallel() *ParallelBuilder
- func (o *Orchestrator) Pipeline(initialPrompt string) *PipelineBuilder
- func (o *Orchestrator) RegisterAgent(id string, ag *agent.Agent)
- func (o *Orchestrator) SetProgressCallback(callback WorkflowProgressCallback)
- type OutputValidator
- type ParallelBuilder
- func (b *ParallelBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
- func (b *ParallelBuilder) WithMergeStrategy(strategy loomv1.MergeStrategy) *ParallelBuilder
- func (b *ParallelBuilder) WithTask(ag *agent.Agent, prompt string) *ParallelBuilder
- func (b *ParallelBuilder) WithTaskByID(agentID string, prompt string) *ParallelBuilder
- func (b *ParallelBuilder) WithTaskMetadata(ag *agent.Agent, prompt string, metadata map[string]string) *ParallelBuilder
- func (b *ParallelBuilder) WithTimeout(seconds int) *ParallelBuilder
- type ParallelExecutor
- type PipelineBuilder
- func (b *PipelineBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
- func (b *PipelineBuilder) WithFullHistory() *PipelineBuilder
- func (b *PipelineBuilder) WithStage(ag *agent.Agent, promptTemplate string) *PipelineBuilder
- func (b *PipelineBuilder) WithStageByID(agentID string, promptTemplate string) *PipelineBuilder
- func (b *PipelineBuilder) WithStageRetry(ag *agent.Agent, promptTemplate string, validationPrompt string, ...) *PipelineBuilder
- func (b *PipelineBuilder) WithStageSchema(ag *agent.Agent, promptTemplate string, outputSchema string, maxRetries int32) *PipelineBuilder
- func (b *PipelineBuilder) WithStageValidation(ag *agent.Agent, promptTemplate string, validationPrompt string) *PipelineBuilder
- type PipelineExecutor
- type ScheduleYAML
- type StageOutput
- type StructuredContext
- func (ctx *StructuredContext) AddStageOutput(stageKey string, output StageOutput) error
- func (ctx *StructuredContext) FromJSON(jsonStr string) error
- func (ctx *StructuredContext) GetStageOutput(stageKey string) (StageOutput, bool)
- func (ctx *StructuredContext) GetTargetTable(sourceStageKey string) (database, table string, err error)
- func (ctx *StructuredContext) ToJSON() (string, error)
- func (ctx *StructuredContext) ValidateDatabaseList(database string, sourceStageKey string) error
- func (ctx *StructuredContext) ValidateFileCreation(stageKey string, filePathKey string) error
- func (ctx *StructuredContext) ValidateTableReference(currentStageKey string, database string, table string, sourceStageKey string) error
- func (ctx *StructuredContext) ValidateToolExecutions(stageKey string, requiredTools []string) error
- type SwarmExecutor
- type TaskTrackedOrchestrator
- type TemplateMetadata
- type TemplateParameterConfig
- type TemplateRegistry
- func (r *TemplateRegistry) ApplyTemplate(templateName string, vars map[string]string) (*loomv1.AgentConfig, error)
- func (r *TemplateRegistry) GetTemplate(name string) (*AgentTemplateConfig, error)
- func (r *TemplateRegistry) ListTemplates() []string
- func (r *TemplateRegistry) LoadTemplate(path string) error
- func (r *TemplateRegistry) LoadTemplateFromString(yamlContent string) error
- type ToolCall
- type WorkflowConfig
- type WorkflowMetadata
- type WorkflowProgressCallback
- type WorkflowProgressEvent
Constants ¶
const ( // MaxStageOutputBytes limits stage output size passed to next stage via {{previous}} // This prevents exponential token growth in multi-stage pipelines. // 8KB is reasonable for most stage outputs (reports, summaries, query results). // Too small (2KB) causes context loss and downstream stage confusion. // Too large risks token bloat in long pipelines. MaxStageOutputBytes = 8192 // StageOutputTruncationNoticeTemplate is appended when output is truncated. // %s is replaced with the SharedMemory key for fetching full output. StageOutputTruncationNoticeTemplate = "" /* 142-byte string literal not displayed */ )
Stage output truncation to prevent token bloat
Variables ¶
var ( ErrTemplateNotFound = fmt.Errorf("template not found") ErrCircularReference = fmt.Errorf("circular template reference detected") ErrMissingParameter = fmt.Errorf("required template parameter missing") ErrInvalidParameter = fmt.Errorf("invalid parameter value") )
Template-related errors
var ( ErrFileNotFound = fmt.Errorf("workflow file not found") ErrInvalidPermissions = fmt.Errorf("insufficient permissions to read workflow file") ErrInvalidYAML = fmt.Errorf("invalid YAML syntax in workflow file") ErrInvalidWorkflow = fmt.Errorf("invalid workflow structure") ErrUnsupportedPattern = fmt.Errorf("unsupported workflow pattern type") )
Custom errors for workflow config loading
Functions ¶
func ConvertConfigToProto ¶
func ConvertConfigToProto(config *WorkflowConfig) (*loomv1.WorkflowPattern, error)
ConvertConfigToProto converts a WorkflowConfig to a WorkflowPattern proto. This is used by the scheduler after loading a workflow YAML file.
func ExtractAgentIDs ¶
func ExtractAgentIDs(pattern *loomv1.WorkflowPattern) []string
ExtractAgentIDs returns all agent IDs referenced in a workflow pattern. Used to determine which agents need to be loaded before execution. This is called by ExecuteWorkflow RPC to load all required agents from the registry.
func GetPatternType ¶
func GetPatternType(pattern *loomv1.WorkflowPattern) string
GetPatternType returns a string representation of the pattern type. This is used for logging, progress messages, and workflow execution tracking.
func InterpolateVariables ¶
func InterpolateVariables(pattern *loomv1.WorkflowPattern, vars map[string]string) *loomv1.WorkflowPattern
InterpolateVariables replaces {{var}} placeholders in pattern prompts with provided values. Returns a new WorkflowPattern with interpolated prompts (does not modify original). This is used by the ExecuteWorkflow RPC to inject user-provided variables into workflow prompts.
func LoadAgentFromTemplate ¶
func LoadAgentFromTemplate(templatePath string, vars map[string]string) (*loomv1.AgentConfig, error)
LoadAgentFromTemplate loads an agent config from a template file with variables
func LoadWorkflowFromYAML ¶
func LoadWorkflowFromYAML(path string) (*loomv1.WorkflowPattern, error)
LoadWorkflowFromYAML loads and parses a workflow definition from a YAML file.
Parameters:
- path: File system path to the YAML workflow definition file
Returns:
- *loomv1.WorkflowPattern: Parsed workflow proto message
- error: Error if file cannot be read or contains invalid YAML/workflow structure
Errors:
- ErrFileNotFound: If the specified path does not exist
- ErrInvalidPermissions: If the file cannot be read
- ErrInvalidYAML: If the YAML syntax is invalid
- ErrInvalidWorkflow: If the workflow structure is invalid
- ErrUnsupportedPattern: If the pattern type is not recognized
func ValidateOutputStructure ¶
ValidateOutputStructure performs deterministic validation of stage output structure Supports both JSON (v3.9) and XML (v3.10) formats Returns detailed error if structure is invalid
Types ¶
type AgentSpec ¶
type AgentSpec struct {
Name string `yaml:"name,omitempty"`
Description string `yaml:"description,omitempty"`
SystemPrompt string `yaml:"system_prompt,omitempty"`
LLM map[string]interface{} `yaml:"llm,omitempty"`
Tools map[string]interface{} `yaml:"tools,omitempty"`
Memory map[string]interface{} `yaml:"memory,omitempty"`
Behavior map[string]interface{} `yaml:"behavior,omitempty"`
Metadata map[string]string `yaml:"metadata,omitempty"`
}
AgentSpec contains the agent configuration with templatable fields
type AgentTemplateConfig ¶
type AgentTemplateConfig struct {
APIVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata TemplateMetadata `yaml:"metadata"`
Parameters []TemplateParameterConfig `yaml:"parameters,omitempty"`
Extends string `yaml:"extends,omitempty"`
Spec AgentSpec `yaml:"spec"`
}
AgentTemplateConfig represents the YAML structure for agent templates
type ConditionalBuilder ¶
type ConditionalBuilder struct {
// contains filtered or unexported fields
}
ConditionalBuilder provides a fluent API for building conditional patterns. Conditional patterns route execution based on a classifier agent's decision.
func (*ConditionalBuilder) Default ¶
func (b *ConditionalBuilder) Default(pattern *loomv1.WorkflowPattern) *ConditionalBuilder
Default sets the default branch to execute if no conditions match. This is optional but recommended to handle unexpected classifier outputs.
func (*ConditionalBuilder) Execute ¶
func (b *ConditionalBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the conditional pattern and returns the result from the selected branch.
func (*ConditionalBuilder) When ¶
func (b *ConditionalBuilder) When(condition string, pattern *loomv1.WorkflowPattern) *ConditionalBuilder
When adds a conditional branch. The condition string should match the expected output from the classifier agent. Matching is case-insensitive and supports substring matching.
func (*ConditionalBuilder) WithRetry ¶ added in v1.3.0
func (b *ConditionalBuilder) WithRetry(maxRetries int32) *ConditionalBuilder
WithRetry configures retry behavior when the classifier output doesn't match any branch key. On retry, the agent receives a prompt listing valid branch keys and its previous failed output. Each retry uses a fresh session ID.
type ConditionalExecutor ¶
type ConditionalExecutor struct {
// contains filtered or unexported fields
}
ConditionalExecutor executes a conditional pattern.
func NewConditionalExecutor ¶
func NewConditionalExecutor(orchestrator *Orchestrator, pattern *loomv1.ConditionalPattern, workflowID string) *ConditionalExecutor
NewConditionalExecutor creates a new conditional executor.
func (*ConditionalExecutor) Execute ¶
func (e *ConditionalExecutor) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the conditional pattern and returns the result.
type Config ¶
type Config struct {
// Agent registry for looking up agents
Registry *agent.Registry
// LLM provider for merge operations
LLMProvider agent.LLMProvider
// Tracer for observability
Tracer observability.Tracer
// Logger
Logger *zap.Logger
// MessageBus for agent-to-agent communication (optional, for iterative workflows)
MessageBus *communication.MessageBus
SharedMemory *communication.SharedMemoryStore
// ProgressCallback for reporting workflow execution progress (optional)
ProgressCallback WorkflowProgressCallback
// LLMSemaphore for limiting concurrent LLM calls (optional)
// If nil, no concurrency control is applied
// Use make(chan struct{}, N) where N is the max concurrent LLM calls
LLMSemaphore chan struct{}
// TaskManager for persistent workflow tracking (optional).
// When set, ExecutePattern wraps execution with task-based state tracking,
// providing persistence, resume capability, and progress visibility.
TaskManager *task.Manager
}
Config configures the orchestrator.
type ContextMetadata ¶
type ContextMetadata struct {
WorkflowID string `json:"workflow_id"`
WorkflowType string `json:"workflow_type"`
SchemaVer string `json:"schema_version"`
StartedAt time.Time `json:"started_at"`
}
ContextMetadata contains global workflow runtime information
type DebateBuilder ¶
type DebateBuilder struct {
// contains filtered or unexported fields
}
DebateBuilder provides a fluent API for building multi-agent debates. Agents participate in rounds where each agent sees previous round outputs and can build on or challenge them.
func (*DebateBuilder) Execute ¶
func (b *DebateBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the debate and returns the merged result.
func (*DebateBuilder) WithAgentIDs ¶
func (b *DebateBuilder) WithAgentIDs(agentIDs ...string) *DebateBuilder
WithAgentIDs adds agents by their registry IDs. This allows referencing agents already registered with the orchestrator.
func (*DebateBuilder) WithAgents ¶
func (b *DebateBuilder) WithAgents(agents ...*agent.Agent) *DebateBuilder
WithAgents adds agents to the debate. Each agent will participate in all debate rounds.
func (*DebateBuilder) WithMergeStrategy ¶
func (b *DebateBuilder) WithMergeStrategy(strategy loomv1.MergeStrategy) *DebateBuilder
WithMergeStrategy sets how to merge the final outputs. Options: CONSENSUS, VOTING, SUMMARY, CONCATENATE, FIRST, BEST.
func (*DebateBuilder) WithModerator ¶
func (b *DebateBuilder) WithModerator(moderator *agent.Agent) *DebateBuilder
WithModerator adds a moderator agent to guide the debate. The moderator can provide structure and ensure productive discussion.
func (*DebateBuilder) WithRounds ¶
func (b *DebateBuilder) WithRounds(rounds int) *DebateBuilder
WithRounds sets the number of debate rounds. Default is 1 round. More rounds allow agents to refine arguments.
type Evidence ¶
type Evidence struct {
ToolCalls []ToolCall `json:"tool_calls"`
QueriesExecuted []string `json:"queries_executed"`
}
Evidence provides proof of how outputs were derived
type ExecuteFunc ¶ added in v1.3.0
type ExecuteFunc func(ctx context.Context, sessionID string, prompt string) (*loomv1.AgentResult, error)
ExecuteFunc is a function that executes an agent and returns its output. sessionID controls whether this is the same or a fresh session. prompt is the (possibly modified) prompt for this execution.
type FeedbackFunc ¶ added in v1.3.0
type FeedbackFunc func(ctx context.Context, sessionID string, feedback string) (*loomv1.AgentResult, error)
FeedbackFunc appends validation feedback to an existing session. Used in CONTINUE mode to add a user message to the same conversation.
type ForkJoinBuilder ¶
type ForkJoinBuilder struct {
// contains filtered or unexported fields
}
ForkJoinBuilder provides a fluent API for building fork-join patterns. Fork-join executes multiple agents in parallel with the same prompt, then merges their results using a specified strategy.
func (*ForkJoinBuilder) Execute ¶
func (b *ForkJoinBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the fork-join pattern and returns the merged result.
func (*ForkJoinBuilder) Join ¶
func (b *ForkJoinBuilder) Join(strategy loomv1.MergeStrategy) *ForkJoinBuilder
Join sets the merge strategy for combining agent outputs. Options: CONSENSUS, VOTING, SUMMARY, CONCATENATE, FIRST, BEST.
func (*ForkJoinBuilder) WithAgentIDs ¶
func (b *ForkJoinBuilder) WithAgentIDs(agentIDs ...string) *ForkJoinBuilder
WithAgentIDs adds agents by their registry IDs. This allows referencing agents already registered with the orchestrator.
func (*ForkJoinBuilder) WithAgents ¶
func (b *ForkJoinBuilder) WithAgents(agents ...*agent.Agent) *ForkJoinBuilder
WithAgents adds agents to execute in parallel. All agents will receive the same prompt.
func (*ForkJoinBuilder) WithTimeout ¶
func (b *ForkJoinBuilder) WithTimeout(seconds int) *ForkJoinBuilder
WithTimeout sets the maximum execution time in seconds. If agents don't complete within this time, the execution will be cancelled.
type ForkJoinExecutor ¶
type ForkJoinExecutor struct {
// contains filtered or unexported fields
}
ForkJoinExecutor executes a fork-join pattern.
func NewForkJoinExecutor ¶
func NewForkJoinExecutor(orchestrator *Orchestrator, pattern *loomv1.ForkJoinPattern, workflowID string) *ForkJoinExecutor
NewForkJoinExecutor creates a new fork-join executor.
func (*ForkJoinExecutor) Execute ¶
func (e *ForkJoinExecutor) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the fork-join pattern and returns the result.
type IterativePipelineExecutor ¶
type IterativePipelineExecutor struct {
// contains filtered or unexported fields
}
IterativePipelineExecutor executes an iterative workflow pattern with restart coordination. Stages can trigger restarts of earlier stages via pub/sub messaging, enabling autonomous agent negotiation and self-correction.
func NewIterativePipelineExecutor ¶
func NewIterativePipelineExecutor( orchestrator *Orchestrator, pattern *loomv1.IterativeWorkflowPattern, messageBus *communication.MessageBus, workflowID string, ) *IterativePipelineExecutor
NewIterativePipelineExecutor creates a new iterative pipeline executor.
func (*IterativePipelineExecutor) Execute ¶
func (e *IterativePipelineExecutor) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the iterative pipeline with restart coordination.
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator coordinates multiple agents using workflow patterns. It provides a fluent API for building and executing multi-agent workflows like debates, pipelines, and parallel execution.
func NewOrchestrator ¶
func NewOrchestrator(config Config) *Orchestrator
NewOrchestrator creates a new orchestrator instance.
func (*Orchestrator) Conditional ¶
func (o *Orchestrator) Conditional(classifier *agent.Agent, conditionPrompt string) *ConditionalBuilder
Conditional creates a conditional pattern builder for routing logic.
Example:
result := orchestrator.
Conditional(classifier, "Is this a bug or feature?").
When("bug", debugWorkflow).
When("feature", designWorkflow).
Execute(ctx)
func (*Orchestrator) Debate ¶
func (o *Orchestrator) Debate(topic string) *DebateBuilder
Debate creates a debate pattern builder for multi-agent debates.
Example:
result := orchestrator.
Debate("Should we use SQLite or Postgres?").
WithAgents(agent1, agent2).
WithRounds(3).
Execute(ctx)
func (*Orchestrator) ExecutePattern ¶
func (o *Orchestrator) ExecutePattern(ctx context.Context, pattern *loomv1.WorkflowPattern) (*loomv1.WorkflowResult, error)
ExecutePattern executes a workflow pattern and returns the result. This is the low-level execution method used by pattern builders. When a TaskManager is configured, workflow execution is automatically wrapped with persistent task tracking (board creation, stage tasks, progress recording, resume support).
func (*Orchestrator) Fork ¶
func (o *Orchestrator) Fork(prompt string) *ForkJoinBuilder
Fork creates a fork-join pattern builder for parallel execution.
Example:
result := orchestrator.
Fork("Analyze this codebase").
WithAgents(securityExpert, performanceExpert).
Join(loomv1.MergeStrategy_SUMMARY).
Execute(ctx)
func (*Orchestrator) GetAgent ¶
GetAgent retrieves a registered agent by ID. If the agent is not in the local registry, it attempts to load it from the agent registry.
func (*Orchestrator) GetMergeLLM ¶ added in v1.2.0
func (o *Orchestrator) GetMergeLLM() agent.LLMProvider
GetMergeLLM returns the LLM provider to use for merge/synthesis operations. Resolution order:
- The orchestrator's explicitly configured llmProvider (Config.LLMProvider)
- The orchestrator role LLM from the first registered agent that has one (agent.GetLLMForRole(LLM_ROLE_ORCHESTRATOR) which falls back to the agent's main LLM)
- nil (caller must handle this, typically by returning an error)
Note: When falling back to agent LLMs (step 2), the selection is non-deterministic if multiple agents have orchestrator LLMs, because agents are stored in a map. A warning is logged in this case. Set Config.LLMProvider for deterministic behavior.
func (*Orchestrator) Parallel ¶
func (o *Orchestrator) Parallel() *ParallelBuilder
Parallel creates a parallel pattern builder for independent tasks.
Example:
result := orchestrator.
Parallel().
WithTask(analyzer, "Analyze code quality").
WithTask(scanner, "Check for vulnerabilities").
Execute(ctx)
func (*Orchestrator) Pipeline ¶
func (o *Orchestrator) Pipeline(initialPrompt string) *PipelineBuilder
Pipeline creates a pipeline pattern builder for sequential execution.
Example:
result := orchestrator.
Pipeline("Design a new feature").
WithStage(architect, "Create architecture").
WithStage(implementer, "Implement: {{previous}}").
Execute(ctx)
func (*Orchestrator) RegisterAgent ¶
func (o *Orchestrator) RegisterAgent(id string, ag *agent.Agent)
RegisterAgent registers an agent with a specific ID for this orchestration. This allows the orchestrator to reference agents in workflow patterns.
func (*Orchestrator) SetProgressCallback ¶
func (o *Orchestrator) SetProgressCallback(callback WorkflowProgressCallback)
SetProgressCallback sets or updates the progress callback. This allows updating the callback after orchestrator creation.
type OutputValidator ¶ added in v1.3.0
type OutputValidator struct {
// contains filtered or unexported fields
}
OutputValidator provides universal output validation and retry for any agent execution. It composes structural validation (JSON Schema, instant) with semantic validation (LLM-based acceptance criteria) and supports three retry session modes: CONTINUE (same session), FRESH (new session), and ESCALATE (continue then upgrade LLM).
func NewOutputValidator ¶ added in v1.3.0
func NewOutputValidator(tracer observability.Tracer, logger *zap.Logger) *OutputValidator
NewOutputValidator creates a new output validator.
func (*OutputValidator) ValidateAndRetry ¶ added in v1.3.0
func (v *OutputValidator) ValidateAndRetry( ctx context.Context, policy *loomv1.OutputPolicy, execute ExecuteFunc, feedback FeedbackFunc, originalPrompt string, workflowID string, ) (*loomv1.AgentResult, []string, error)
ValidateAndRetry executes an agent, validates the output against the policy, and retries with feedback if validation fails. Works across all workflow patterns.
Parameters:
- policy: output validation policy (nil = no validation, execute once)
- execute: function to execute the agent (called for FRESH sessions)
- feedback: function to send feedback in the same session (called for CONTINUE mode)
- originalPrompt: the original prompt for fresh retries
- workflowID: base workflow ID for session ID generation
Returns the agent result (possibly from a retry) and any validation warnings.
type ParallelBuilder ¶
type ParallelBuilder struct {
// contains filtered or unexported fields
}
ParallelBuilder provides a fluent API for building parallel patterns. Parallel patterns execute independent tasks concurrently, where each task has its own agent and prompt.
func (*ParallelBuilder) Execute ¶
func (b *ParallelBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs all tasks in parallel and returns the merged result.
func (*ParallelBuilder) WithMergeStrategy ¶
func (b *ParallelBuilder) WithMergeStrategy(strategy loomv1.MergeStrategy) *ParallelBuilder
WithMergeStrategy sets how to merge the task results. Options: CONSENSUS, VOTING, SUMMARY, CONCATENATE, FIRST, BEST.
func (*ParallelBuilder) WithTask ¶
func (b *ParallelBuilder) WithTask(ag *agent.Agent, prompt string) *ParallelBuilder
WithTask adds an independent task with a specific prompt. Each task runs concurrently with its own agent and prompt.
func (*ParallelBuilder) WithTaskByID ¶
func (b *ParallelBuilder) WithTaskByID(agentID string, prompt string) *ParallelBuilder
WithTaskByID adds a task using an agent ID. This allows referencing agents already registered with the orchestrator.
func (*ParallelBuilder) WithTaskMetadata ¶
func (b *ParallelBuilder) WithTaskMetadata(ag *agent.Agent, prompt string, metadata map[string]string) *ParallelBuilder
WithTaskMetadata adds a task with metadata. Metadata can be used to label or categorize tasks.
func (*ParallelBuilder) WithTimeout ¶
func (b *ParallelBuilder) WithTimeout(seconds int) *ParallelBuilder
WithTimeout sets the maximum execution time in seconds. If tasks don't complete within this time, the execution will be cancelled.
type ParallelExecutor ¶
type ParallelExecutor struct {
// contains filtered or unexported fields
}
ParallelExecutor executes a parallel pattern.
func NewParallelExecutor ¶
func NewParallelExecutor(orchestrator *Orchestrator, pattern *loomv1.ParallelPattern, workflowID string) *ParallelExecutor
NewParallelExecutor creates a new parallel executor.
func (*ParallelExecutor) Execute ¶
func (e *ParallelExecutor) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the parallel pattern and returns the result.
type PipelineBuilder ¶
type PipelineBuilder struct {
// contains filtered or unexported fields
}
PipelineBuilder provides a fluent API for building pipeline patterns. Pipelines execute agents sequentially, where each agent's output becomes input for the next agent.
func (*PipelineBuilder) Execute ¶
func (b *PipelineBuilder) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the pipeline and returns the final result.
func (*PipelineBuilder) WithFullHistory ¶
func (b *PipelineBuilder) WithFullHistory() *PipelineBuilder
WithFullHistory enables passing full history to each stage. By default, only the previous stage's output is available.
func (*PipelineBuilder) WithStage ¶
func (b *PipelineBuilder) WithStage(ag *agent.Agent, promptTemplate string) *PipelineBuilder
WithStage adds a stage to the pipeline. The promptTemplate can include placeholders: - {{previous}}: Replaced with the previous stage's output - {{history}}: Replaced with all previous outputs
func (*PipelineBuilder) WithStageByID ¶
func (b *PipelineBuilder) WithStageByID(agentID string, promptTemplate string) *PipelineBuilder
WithStageByID adds a stage using an agent ID. This allows referencing agents already registered with the orchestrator.
func (*PipelineBuilder) WithStageRetry ¶ added in v1.3.0
func (b *PipelineBuilder) WithStageRetry(ag *agent.Agent, promptTemplate string, validationPrompt string, maxRetries int32) *PipelineBuilder
WithStageRetry adds a stage with LLM validation and retry on failure. The validationPrompt should check if the output meets requirements. On failure, the stage is retried with feedback including the validation criteria.
func (*PipelineBuilder) WithStageSchema ¶ added in v1.3.0
func (b *PipelineBuilder) WithStageSchema(ag *agent.Agent, promptTemplate string, outputSchema string, maxRetries int32) *PipelineBuilder
WithStageSchema adds a stage with JSON Schema validation and retry on failure. The outputSchema is validated instantly (no LLM call) using gojsonschema. On failure, the retry prompt includes the full schema and specific violations.
func (*PipelineBuilder) WithStageValidation ¶
func (b *PipelineBuilder) WithStageValidation(ag *agent.Agent, promptTemplate string, validationPrompt string) *PipelineBuilder
WithStageValidation adds a stage with output validation. The validationPrompt should check if the output meets requirements. Use {{output}} placeholder to reference the stage output.
type PipelineExecutor ¶
type PipelineExecutor struct {
// contains filtered or unexported fields
}
PipelineExecutor executes a pipeline pattern.
func NewPipelineExecutor ¶
func NewPipelineExecutor(orchestrator *Orchestrator, pattern *loomv1.PipelinePattern, workflowID string) *PipelineExecutor
NewPipelineExecutor creates a new pipeline executor.
func (*PipelineExecutor) Execute ¶
func (e *PipelineExecutor) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the pipeline and returns the result.
type ScheduleYAML ¶
type ScheduleYAML struct {
Cron string `yaml:"cron"`
Timezone string `yaml:"timezone,omitempty"`
Enabled bool `yaml:"enabled"`
SkipIfRunning bool `yaml:"skip_if_running,omitempty"`
MaxExecutionSeconds int32 `yaml:"max_execution_seconds,omitempty"`
Variables map[string]string `yaml:"variables,omitempty"`
SessionMode string `yaml:"session_mode,omitempty"` // "new" or "resume"
}
ScheduleYAML represents the schedule configuration in workflow YAML files.
type StageOutput ¶
type StageOutput struct {
StageID string `json:"stage_id"`
Status string `json:"status"` // "completed", "failed", "skipped"
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
Inputs map[string]interface{} `json:"inputs"`
Outputs map[string]interface{} `json:"outputs"`
Evidence Evidence `json:"evidence"`
}
StageOutput represents the output from a single workflow stage
type StructuredContext ¶
type StructuredContext struct {
WorkflowContext ContextMetadata `json:"workflow_context"`
StageOutputs map[string]StageOutput `json:"stage_outputs"`
}
StructuredContext represents the accumulated context across workflow stages. This prevents agent hallucinations by enforcing structured data passing.
func NewStructuredContext ¶
func NewStructuredContext(workflowID, workflowType string) *StructuredContext
NewStructuredContext creates a new structured context for a workflow
func (*StructuredContext) AddStageOutput ¶
func (ctx *StructuredContext) AddStageOutput(stageKey string, output StageOutput) error
AddStageOutput adds an output from a completed stage
func (*StructuredContext) FromJSON ¶
func (ctx *StructuredContext) FromJSON(jsonStr string) error
FromJSON deserializes context from JSON (for parsing agent outputs)
func (*StructuredContext) GetStageOutput ¶
func (ctx *StructuredContext) GetStageOutput(stageKey string) (StageOutput, bool)
GetStageOutput retrieves output from a specific stage
func (*StructuredContext) GetTargetTable ¶
func (ctx *StructuredContext) GetTargetTable(sourceStageKey string) (database, table string, err error)
GetTargetTable extracts the recommended table from a specific stage (typically stage-2)
func (*StructuredContext) ToJSON ¶
func (ctx *StructuredContext) ToJSON() (string, error)
ToJSON serializes the context to JSON for injection into agent prompts
func (*StructuredContext) ValidateDatabaseList ¶
func (ctx *StructuredContext) ValidateDatabaseList( database string, sourceStageKey string, ) error
ValidateDatabaseList checks if a database exists in the discovery stage
func (*StructuredContext) ValidateFileCreation ¶
func (ctx *StructuredContext) ValidateFileCreation(stageKey string, filePathKey string) error
ValidateFileCreation ensures a file output was actually created on disk
func (*StructuredContext) ValidateTableReference ¶
func (ctx *StructuredContext) ValidateTableReference( currentStageKey string, database string, table string, sourceStageKey string, ) error
ValidateTableReference checks if a table reference exists in a previous stage This prevents agents from hallucinating non-existent tables
func (*StructuredContext) ValidateToolExecutions ¶
func (ctx *StructuredContext) ValidateToolExecutions(stageKey string, requiredTools []string) error
ValidateToolExecutions ensures required tools were actually executed (prevents action hallucination)
type SwarmExecutor ¶
type SwarmExecutor struct {
// contains filtered or unexported fields
}
SwarmExecutor executes a swarm pattern (collective voting).
func NewSwarmExecutor ¶
func NewSwarmExecutor(orchestrator *Orchestrator, pattern *loomv1.SwarmPattern, workflowID string) *SwarmExecutor
NewSwarmExecutor creates a new swarm executor.
func (*SwarmExecutor) Execute ¶
func (e *SwarmExecutor) Execute(ctx context.Context) (*loomv1.WorkflowResult, error)
Execute runs the swarm pattern and returns the voting result.
type TaskTrackedOrchestrator ¶ added in v1.3.0
type TaskTrackedOrchestrator struct {
// contains filtered or unexported fields
}
TaskTrackedOrchestrator wraps an Orchestrator to persist workflow execution state via the task system. Each workflow pattern execution creates a board with tasks for each stage/agent, providing:
- Persistent state: survives server restarts (SQLite/PostgreSQL)
- Progress visibility: board shows stage status in real-time
- Audit trail: full history of stage transitions
- Stage output capture: stored in task notes
- Resume capability: find last completed stage on restart
- Graph memory: auto-creates memories for completed stages
func NewTaskTrackedOrchestrator ¶ added in v1.3.0
func NewTaskTrackedOrchestrator(inner *Orchestrator, manager *task.Manager, tracer observability.Tracer, logger *zap.Logger) *TaskTrackedOrchestrator
NewTaskTrackedOrchestrator wraps an orchestrator with task tracking.
func (*TaskTrackedOrchestrator) ExecutePattern ¶ added in v1.3.0
func (t *TaskTrackedOrchestrator) ExecutePattern(ctx context.Context, pattern *loomv1.WorkflowPattern) (*loomv1.WorkflowResult, error)
ExecutePattern wraps the inner orchestrator's ExecutePattern with task tracking. Before execution: creates a board and tasks from the pattern. After execution: closes tasks with stage outputs and records results.
func (*TaskTrackedOrchestrator) Orchestrator ¶ added in v1.3.0
func (t *TaskTrackedOrchestrator) Orchestrator() *Orchestrator
Orchestrator returns the wrapped orchestrator for direct access.
type TemplateMetadata ¶
type TemplateMetadata struct {
Name string `yaml:"name"`
Description string `yaml:"description,omitempty"`
Version string `yaml:"version,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
}
TemplateMetadata contains template identification
type TemplateParameterConfig ¶
type TemplateParameterConfig struct {
Name string `yaml:"name"`
Type string `yaml:"type"`
Required bool `yaml:"required,omitempty"`
DefaultValue string `yaml:"default,omitempty"`
Description string `yaml:"description,omitempty"`
}
TemplateParameterConfig defines a template parameter
type TemplateRegistry ¶
type TemplateRegistry struct {
// contains filtered or unexported fields
}
TemplateRegistry manages agent templates
func NewTemplateRegistry ¶
func NewTemplateRegistry() *TemplateRegistry
NewTemplateRegistry creates a new template registry
func (*TemplateRegistry) ApplyTemplate ¶
func (r *TemplateRegistry) ApplyTemplate(templateName string, vars map[string]string) (*loomv1.AgentConfig, error)
ApplyTemplate applies a template with given variables to create an agent config
func (*TemplateRegistry) GetTemplate ¶
func (r *TemplateRegistry) GetTemplate(name string) (*AgentTemplateConfig, error)
GetTemplate returns a registered template
func (*TemplateRegistry) ListTemplates ¶
func (r *TemplateRegistry) ListTemplates() []string
ListTemplates returns all registered template names
func (*TemplateRegistry) LoadTemplate ¶
func (r *TemplateRegistry) LoadTemplate(path string) error
LoadTemplate loads a template from a YAML file
func (*TemplateRegistry) LoadTemplateFromString ¶
func (r *TemplateRegistry) LoadTemplateFromString(yamlContent string) error
LoadTemplateFromString loads a template from YAML string
type ToolCall ¶
type ToolCall struct {
ToolName string `json:"tool_name"`
Parameters map[string]interface{} `json:"parameters"`
ResultSummary string `json:"result_summary"`
}
ToolCall records a single tool execution
type WorkflowConfig ¶
type WorkflowConfig struct {
APIVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata WorkflowMetadata `yaml:"metadata"`
Spec map[string]interface{} `yaml:"spec"`
Schedule *ScheduleYAML `yaml:"schedule,omitempty"`
}
WorkflowConfig represents the Kubernetes-style YAML structure. Based on dogfooding recommendations: apiVersion, kind, metadata, spec
func LoadWorkflowConfigFromYAML ¶
func LoadWorkflowConfigFromYAML(path string) (*WorkflowConfig, error)
LoadWorkflowConfigFromYAML loads a workflow YAML file and returns the parsed config. This is used by the scheduler to access the schedule section.
type WorkflowMetadata ¶
type WorkflowMetadata struct {
Name string `yaml:"name"`
Version string `yaml:"version,omitempty"`
Description string `yaml:"description,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
}
WorkflowMetadata contains workflow identification information
type WorkflowProgressCallback ¶
type WorkflowProgressCallback func(event WorkflowProgressEvent)
WorkflowProgressCallback is called during workflow execution to report progress.
type WorkflowProgressEvent ¶
type WorkflowProgressEvent struct {
// Pattern type being executed
PatternType string
// Current stage/step description
Message string
// Progress percentage (0-100)
Progress int32
// Current agent executing (if applicable)
CurrentAgentID string
// Partial results available so far
PartialResults []*loomv1.AgentResult
}
WorkflowProgressEvent represents a progress update during workflow execution.
Source Files
¶
- agent_template.go
- conditional_builder.go
- conditional_executor.go
- debate_builder.go
- fork_join_builder.go
- fork_join_executor.go
- interpolation.go
- iterative_pipeline_executor.go
- merge_context.go
- orchestrator.go
- output_coercion.go
- output_validator.go
- parallel_builder.go
- parallel_executor.go
- pipeline_builder.go
- pipeline_executor.go
- structured_context.go
- swarm_executor.go
- task_tracked_executor.go
- utils.go
- workflow_config.go