orchestration

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxStageOutputBytes limits stage output size passed to next stage via {{previous}}
	// This prevents exponential token growth in multi-stage pipelines.
	// 8KB is reasonable for most stage outputs (reports, summaries, query results).
	// Too small (2KB) causes context loss and downstream stage confusion.
	// Too large risks token bloat in long pipelines.
	MaxStageOutputBytes = 8192

	// StageOutputTruncationNoticeTemplate is appended when output is truncated.
	// %s is replaced with the SharedMemory key for fetching full output.
	StageOutputTruncationNoticeTemplate = "" /* 142-byte string literal not displayed */
)

Stage output truncation to prevent token bloat

Variables

View Source
var (
	ErrTemplateNotFound  = fmt.Errorf("template not found")
	ErrCircularReference = fmt.Errorf("circular template reference detected")
	ErrMissingParameter  = fmt.Errorf("required template parameter missing")
	ErrInvalidParameter  = fmt.Errorf("invalid parameter value")
)

Template-related errors

View Source
var (
	ErrFileNotFound       = fmt.Errorf("workflow file not found")
	ErrInvalidPermissions = fmt.Errorf("insufficient permissions to read workflow file")
	ErrInvalidYAML        = fmt.Errorf("invalid YAML syntax in workflow file")
	ErrInvalidWorkflow    = fmt.Errorf("invalid workflow structure")
	ErrUnsupportedPattern = fmt.Errorf("unsupported workflow pattern type")
)

Custom errors for workflow config loading

Functions

func ConvertConfigToProto

func ConvertConfigToProto(config *WorkflowConfig) (*loomv1.WorkflowPattern, error)

ConvertConfigToProto converts a WorkflowConfig to a WorkflowPattern proto. This is used by the scheduler after loading a workflow YAML file.

func ExtractAgentIDs

func ExtractAgentIDs(pattern *loomv1.WorkflowPattern) []string

ExtractAgentIDs returns all agent IDs referenced in a workflow pattern. Used to determine which agents need to be loaded before execution. This is called by ExecuteWorkflow RPC to load all required agents from the registry.

func GetPatternType

func GetPatternType(pattern *loomv1.WorkflowPattern) string

GetPatternType returns a string representation of the pattern type. This is used for logging, progress messages, and workflow execution tracking.

func InterpolateVariables

func InterpolateVariables(pattern *loomv1.WorkflowPattern, vars map[string]string) *loomv1.WorkflowPattern

InterpolateVariables replaces {{var}} placeholders in pattern prompts with provided values. Returns a new WorkflowPattern with interpolated prompts (does not modify original). This is used by the ExecuteWorkflow RPC to inject user-provided variables into workflow prompts.

func LoadAgentFromTemplate

func LoadAgentFromTemplate(templatePath string, vars map[string]string) (*loomv1.AgentConfig, error)

LoadAgentFromTemplate loads an agent config from a template file with variables

func LoadWorkflowFromYAML

func LoadWorkflowFromYAML(path string) (*loomv1.WorkflowPattern, error)

LoadWorkflowFromYAML loads and parses a workflow definition from a YAML file.

Parameters:

  • path: File system path to the YAML workflow definition file

Returns:

  • *loomv1.WorkflowPattern: Parsed workflow proto message
  • error: Error if file cannot be read or contains invalid YAML/workflow structure

Errors:

  • ErrFileNotFound: If the specified path does not exist
  • ErrInvalidPermissions: If the file cannot be read
  • ErrInvalidYAML: If the YAML syntax is invalid
  • ErrInvalidWorkflow: If the workflow structure is invalid
  • ErrUnsupportedPattern: If the pattern type is not recognized

func ValidateOutputStructure

func ValidateOutputStructure(output string) error

ValidateOutputStructure performs deterministic validation of stage output structure Supports both JSON (v3.9) and XML (v3.10) formats Returns detailed error if structure is invalid

Types

type AgentSpec

type AgentSpec struct {
	Name         string                 `yaml:"name,omitempty"`
	Description  string                 `yaml:"description,omitempty"`
	SystemPrompt string                 `yaml:"system_prompt,omitempty"`
	LLM          map[string]interface{} `yaml:"llm,omitempty"`
	Tools        map[string]interface{} `yaml:"tools,omitempty"`
	Memory       map[string]interface{} `yaml:"memory,omitempty"`
	Behavior     map[string]interface{} `yaml:"behavior,omitempty"`
	Metadata     map[string]string      `yaml:"metadata,omitempty"`
}

AgentSpec contains the agent configuration with templatable fields

func (*AgentSpec) UnmarshalYAML added in v1.1.0

func (s *AgentSpec) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML implements custom YAML unmarshaling for AgentSpec Supports both "config" and "behavior" field names for backward compatibility

type AgentTemplateConfig

type AgentTemplateConfig struct {
	APIVersion string                    `yaml:"apiVersion"`
	Kind       string                    `yaml:"kind"`
	Metadata   TemplateMetadata          `yaml:"metadata"`
	Parameters []TemplateParameterConfig `yaml:"parameters,omitempty"`
	Extends    string                    `yaml:"extends,omitempty"`
	Spec       AgentSpec                 `yaml:"spec"`
}

AgentTemplateConfig represents the YAML structure for agent templates

type ConditionalBuilder

type ConditionalBuilder struct {
	// contains filtered or unexported fields
}

ConditionalBuilder provides a fluent API for building conditional patterns. Conditional patterns route execution based on a classifier agent's decision.

func (*ConditionalBuilder) Default

Default sets the default branch to execute if no conditions match. This is optional but recommended to handle unexpected classifier outputs.

func (*ConditionalBuilder) Execute

Execute runs the conditional pattern and returns the result from the selected branch.

func (*ConditionalBuilder) When

func (b *ConditionalBuilder) When(condition string, pattern *loomv1.WorkflowPattern) *ConditionalBuilder

When adds a conditional branch. The condition string should match the expected output from the classifier agent. Matching is case-insensitive and supports substring matching.

func (*ConditionalBuilder) WithRetry added in v1.3.0

func (b *ConditionalBuilder) WithRetry(maxRetries int32) *ConditionalBuilder

WithRetry configures retry behavior when the classifier output doesn't match any branch key. On retry, the agent receives a prompt listing valid branch keys and its previous failed output. Each retry uses a fresh session ID.

type ConditionalExecutor

type ConditionalExecutor struct {
	// contains filtered or unexported fields
}

ConditionalExecutor executes a conditional pattern.

func NewConditionalExecutor

func NewConditionalExecutor(orchestrator *Orchestrator, pattern *loomv1.ConditionalPattern, workflowID string) *ConditionalExecutor

NewConditionalExecutor creates a new conditional executor.

func (*ConditionalExecutor) Execute

Execute runs the conditional pattern and returns the result.

type Config

type Config struct {
	// Agent registry for looking up agents
	Registry *agent.Registry

	// LLM provider for merge operations
	LLMProvider agent.LLMProvider

	// Tracer for observability
	Tracer observability.Tracer

	// Logger
	Logger *zap.Logger

	// MessageBus for agent-to-agent communication (optional, for iterative workflows)
	MessageBus *communication.MessageBus

	// SharedMemory for inter-stage data sharing (optional, for iterative workflows)
	SharedMemory *communication.SharedMemoryStore

	// ProgressCallback for reporting workflow execution progress (optional)
	ProgressCallback WorkflowProgressCallback

	// LLMSemaphore for limiting concurrent LLM calls (optional)
	// If nil, no concurrency control is applied
	// Use make(chan struct{}, N) where N is the max concurrent LLM calls
	LLMSemaphore chan struct{}

	// TaskManager for persistent workflow tracking (optional).
	// When set, ExecutePattern wraps execution with task-based state tracking,
	// providing persistence, resume capability, and progress visibility.
	TaskManager *task.Manager
}

Config configures the orchestrator.

type ContextMetadata

type ContextMetadata struct {
	WorkflowID   string    `json:"workflow_id"`
	WorkflowType string    `json:"workflow_type"`
	SchemaVer    string    `json:"schema_version"`
	StartedAt    time.Time `json:"started_at"`
}

ContextMetadata contains global workflow runtime information

type DebateBuilder

type DebateBuilder struct {
	// contains filtered or unexported fields
}

DebateBuilder provides a fluent API for building multi-agent debates. Agents participate in rounds where each agent sees previous round outputs and can build on or challenge them.

func (*DebateBuilder) Execute

Execute runs the debate and returns the merged result.

func (*DebateBuilder) WithAgentIDs

func (b *DebateBuilder) WithAgentIDs(agentIDs ...string) *DebateBuilder

WithAgentIDs adds agents by their registry IDs. This allows referencing agents already registered with the orchestrator.

func (*DebateBuilder) WithAgents

func (b *DebateBuilder) WithAgents(agents ...*agent.Agent) *DebateBuilder

WithAgents adds agents to the debate. Each agent will participate in all debate rounds.

func (*DebateBuilder) WithMergeStrategy

func (b *DebateBuilder) WithMergeStrategy(strategy loomv1.MergeStrategy) *DebateBuilder

WithMergeStrategy sets how to merge the final outputs. Options: CONSENSUS, VOTING, SUMMARY, CONCATENATE, FIRST, BEST.

func (*DebateBuilder) WithModerator

func (b *DebateBuilder) WithModerator(moderator *agent.Agent) *DebateBuilder

WithModerator adds a moderator agent to guide the debate. The moderator can provide structure and ensure productive discussion.

func (*DebateBuilder) WithRounds

func (b *DebateBuilder) WithRounds(rounds int) *DebateBuilder

WithRounds sets the number of debate rounds. Default is 1 round. More rounds allow agents to refine arguments.

type Evidence

type Evidence struct {
	ToolCalls       []ToolCall `json:"tool_calls"`
	QueriesExecuted []string   `json:"queries_executed"`
}

Evidence provides proof of how outputs were derived

type ExecuteFunc added in v1.3.0

type ExecuteFunc func(ctx context.Context, sessionID string, prompt string) (*loomv1.AgentResult, error)

ExecuteFunc is a function that executes an agent and returns its output. sessionID controls whether this is the same or a fresh session. prompt is the (possibly modified) prompt for this execution.

type FeedbackFunc added in v1.3.0

type FeedbackFunc func(ctx context.Context, sessionID string, feedback string) (*loomv1.AgentResult, error)

FeedbackFunc appends validation feedback to an existing session. Used in CONTINUE mode to add a user message to the same conversation.

type ForkJoinBuilder

type ForkJoinBuilder struct {
	// contains filtered or unexported fields
}

ForkJoinBuilder provides a fluent API for building fork-join patterns. Fork-join executes multiple agents in parallel with the same prompt, then merges their results using a specified strategy.

func (*ForkJoinBuilder) Execute

Execute runs the fork-join pattern and returns the merged result.

func (*ForkJoinBuilder) Join

Join sets the merge strategy for combining agent outputs. Options: CONSENSUS, VOTING, SUMMARY, CONCATENATE, FIRST, BEST.

func (*ForkJoinBuilder) WithAgentIDs

func (b *ForkJoinBuilder) WithAgentIDs(agentIDs ...string) *ForkJoinBuilder

WithAgentIDs adds agents by their registry IDs. This allows referencing agents already registered with the orchestrator.

func (*ForkJoinBuilder) WithAgents

func (b *ForkJoinBuilder) WithAgents(agents ...*agent.Agent) *ForkJoinBuilder

WithAgents adds agents to execute in parallel. All agents will receive the same prompt.

func (*ForkJoinBuilder) WithTimeout

func (b *ForkJoinBuilder) WithTimeout(seconds int) *ForkJoinBuilder

WithTimeout sets the maximum execution time in seconds. If agents don't complete within this time, the execution will be cancelled.

type ForkJoinExecutor

type ForkJoinExecutor struct {
	// contains filtered or unexported fields
}

ForkJoinExecutor executes a fork-join pattern.

func NewForkJoinExecutor

func NewForkJoinExecutor(orchestrator *Orchestrator, pattern *loomv1.ForkJoinPattern, workflowID string) *ForkJoinExecutor

NewForkJoinExecutor creates a new fork-join executor.

func (*ForkJoinExecutor) Execute

Execute runs the fork-join pattern and returns the result.

type IterativePipelineExecutor

type IterativePipelineExecutor struct {
	// contains filtered or unexported fields
}

IterativePipelineExecutor executes an iterative workflow pattern with restart coordination. Stages can trigger restarts of earlier stages via pub/sub messaging, enabling autonomous agent negotiation and self-correction.

func NewIterativePipelineExecutor

func NewIterativePipelineExecutor(
	orchestrator *Orchestrator,
	pattern *loomv1.IterativeWorkflowPattern,
	messageBus *communication.MessageBus,
	workflowID string,
) *IterativePipelineExecutor

NewIterativePipelineExecutor creates a new iterative pipeline executor.

func (*IterativePipelineExecutor) Execute

Execute runs the iterative pipeline with restart coordination.

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator coordinates multiple agents using workflow patterns. It provides a fluent API for building and executing multi-agent workflows like debates, pipelines, and parallel execution.

func NewOrchestrator

func NewOrchestrator(config Config) *Orchestrator

NewOrchestrator creates a new orchestrator instance.

func (*Orchestrator) Conditional

func (o *Orchestrator) Conditional(classifier *agent.Agent, conditionPrompt string) *ConditionalBuilder

Conditional creates a conditional pattern builder for routing logic.

Example:

result := orchestrator.
    Conditional(classifier, "Is this a bug or feature?").
    When("bug", debugWorkflow).
    When("feature", designWorkflow).
    Execute(ctx)

func (*Orchestrator) Debate

func (o *Orchestrator) Debate(topic string) *DebateBuilder

Debate creates a debate pattern builder for multi-agent debates.

Example:

result := orchestrator.
    Debate("Should we use SQLite or Postgres?").
    WithAgents(agent1, agent2).
    WithRounds(3).
    Execute(ctx)

func (*Orchestrator) ExecutePattern

func (o *Orchestrator) ExecutePattern(ctx context.Context, pattern *loomv1.WorkflowPattern) (*loomv1.WorkflowResult, error)

ExecutePattern executes a workflow pattern and returns the result. This is the low-level execution method used by pattern builders. When a TaskManager is configured, workflow execution is automatically wrapped with persistent task tracking (board creation, stage tasks, progress recording, resume support).

func (*Orchestrator) Fork

func (o *Orchestrator) Fork(prompt string) *ForkJoinBuilder

Fork creates a fork-join pattern builder for parallel execution.

Example:

result := orchestrator.
    Fork("Analyze this codebase").
    WithAgents(securityExpert, performanceExpert).
    Join(loomv1.MergeStrategy_SUMMARY).
    Execute(ctx)

func (*Orchestrator) GetAgent

func (o *Orchestrator) GetAgent(ctx context.Context, id string) (*agent.Agent, error)

GetAgent retrieves a registered agent by ID. If the agent is not in the local registry, it attempts to load it from the agent registry.

func (*Orchestrator) GetMergeLLM added in v1.2.0

func (o *Orchestrator) GetMergeLLM() agent.LLMProvider

GetMergeLLM returns the LLM provider to use for merge/synthesis operations. Resolution order:

  1. The orchestrator's explicitly configured llmProvider (Config.LLMProvider)
  2. The orchestrator role LLM from the first registered agent that has one (agent.GetLLMForRole(LLM_ROLE_ORCHESTRATOR) which falls back to the agent's main LLM)
  3. nil (caller must handle this, typically by returning an error)

Note: When falling back to agent LLMs (step 2), the selection is non-deterministic if multiple agents have orchestrator LLMs, because agents are stored in a map. A warning is logged in this case. Set Config.LLMProvider for deterministic behavior.

func (*Orchestrator) Parallel

func (o *Orchestrator) Parallel() *ParallelBuilder

Parallel creates a parallel pattern builder for independent tasks.

Example:

result := orchestrator.
    Parallel().
    WithTask(analyzer, "Analyze code quality").
    WithTask(scanner, "Check for vulnerabilities").
    Execute(ctx)

func (*Orchestrator) Pipeline

func (o *Orchestrator) Pipeline(initialPrompt string) *PipelineBuilder

Pipeline creates a pipeline pattern builder for sequential execution.

Example:

result := orchestrator.
    Pipeline("Design a new feature").
    WithStage(architect, "Create architecture").
    WithStage(implementer, "Implement: {{previous}}").
    Execute(ctx)

func (*Orchestrator) RegisterAgent

func (o *Orchestrator) RegisterAgent(id string, ag *agent.Agent)

RegisterAgent registers an agent with a specific ID for this orchestration. This allows the orchestrator to reference agents in workflow patterns.

func (*Orchestrator) SetProgressCallback

func (o *Orchestrator) SetProgressCallback(callback WorkflowProgressCallback)

SetProgressCallback sets or updates the progress callback. This allows updating the callback after orchestrator creation.

type OutputValidator added in v1.3.0

type OutputValidator struct {
	// contains filtered or unexported fields
}

OutputValidator provides universal output validation and retry for any agent execution. It composes structural validation (JSON Schema, instant) with semantic validation (LLM-based acceptance criteria) and supports three retry session modes: CONTINUE (same session), FRESH (new session), and ESCALATE (continue then upgrade LLM).

func NewOutputValidator added in v1.3.0

func NewOutputValidator(tracer observability.Tracer, logger *zap.Logger) *OutputValidator

NewOutputValidator creates a new output validator.

func (*OutputValidator) ValidateAndRetry added in v1.3.0

func (v *OutputValidator) ValidateAndRetry(
	ctx context.Context,
	policy *loomv1.OutputPolicy,
	execute ExecuteFunc,
	feedback FeedbackFunc,
	originalPrompt string,
	workflowID string,
) (*loomv1.AgentResult, []string, error)

ValidateAndRetry executes an agent, validates the output against the policy, and retries with feedback if validation fails. Works across all workflow patterns.

Parameters:

  • policy: output validation policy (nil = no validation, execute once)
  • execute: function to execute the agent (called for FRESH sessions)
  • feedback: function to send feedback in the same session (called for CONTINUE mode)
  • originalPrompt: the original prompt for fresh retries
  • workflowID: base workflow ID for session ID generation

Returns the agent result (possibly from a retry) and any validation warnings.

type ParallelBuilder

type ParallelBuilder struct {
	// contains filtered or unexported fields
}

ParallelBuilder provides a fluent API for building parallel patterns. Parallel patterns execute independent tasks concurrently, where each task has its own agent and prompt.

func (*ParallelBuilder) Execute

Execute runs all tasks in parallel and returns the merged result.

func (*ParallelBuilder) WithMergeStrategy

func (b *ParallelBuilder) WithMergeStrategy(strategy loomv1.MergeStrategy) *ParallelBuilder

WithMergeStrategy sets how to merge the task results. Options: CONSENSUS, VOTING, SUMMARY, CONCATENATE, FIRST, BEST.

func (*ParallelBuilder) WithTask

func (b *ParallelBuilder) WithTask(ag *agent.Agent, prompt string) *ParallelBuilder

WithTask adds an independent task with a specific prompt. Each task runs concurrently with its own agent and prompt.

func (*ParallelBuilder) WithTaskByID

func (b *ParallelBuilder) WithTaskByID(agentID string, prompt string) *ParallelBuilder

WithTaskByID adds a task using an agent ID. This allows referencing agents already registered with the orchestrator.

func (*ParallelBuilder) WithTaskMetadata

func (b *ParallelBuilder) WithTaskMetadata(ag *agent.Agent, prompt string, metadata map[string]string) *ParallelBuilder

WithTaskMetadata adds a task with metadata. Metadata can be used to label or categorize tasks.

func (*ParallelBuilder) WithTimeout

func (b *ParallelBuilder) WithTimeout(seconds int) *ParallelBuilder

WithTimeout sets the maximum execution time in seconds. If tasks don't complete within this time, the execution will be cancelled.

type ParallelExecutor

type ParallelExecutor struct {
	// contains filtered or unexported fields
}

ParallelExecutor executes a parallel pattern.

func NewParallelExecutor

func NewParallelExecutor(orchestrator *Orchestrator, pattern *loomv1.ParallelPattern, workflowID string) *ParallelExecutor

NewParallelExecutor creates a new parallel executor.

func (*ParallelExecutor) Execute

Execute runs the parallel pattern and returns the result.

type PipelineBuilder

type PipelineBuilder struct {
	// contains filtered or unexported fields
}

PipelineBuilder provides a fluent API for building pipeline patterns. Pipelines execute agents sequentially, where each agent's output becomes input for the next agent.

func (*PipelineBuilder) Execute

Execute runs the pipeline and returns the final result.

func (*PipelineBuilder) WithFullHistory

func (b *PipelineBuilder) WithFullHistory() *PipelineBuilder

WithFullHistory enables passing full history to each stage. By default, only the previous stage's output is available.

func (*PipelineBuilder) WithStage

func (b *PipelineBuilder) WithStage(ag *agent.Agent, promptTemplate string) *PipelineBuilder

WithStage adds a stage to the pipeline. The promptTemplate can include placeholders: - {{previous}}: Replaced with the previous stage's output - {{history}}: Replaced with all previous outputs

func (*PipelineBuilder) WithStageByID

func (b *PipelineBuilder) WithStageByID(agentID string, promptTemplate string) *PipelineBuilder

WithStageByID adds a stage using an agent ID. This allows referencing agents already registered with the orchestrator.

func (*PipelineBuilder) WithStageRetry added in v1.3.0

func (b *PipelineBuilder) WithStageRetry(ag *agent.Agent, promptTemplate string, validationPrompt string, maxRetries int32) *PipelineBuilder

WithStageRetry adds a stage with LLM validation and retry on failure. The validationPrompt should check if the output meets requirements. On failure, the stage is retried with feedback including the validation criteria.

func (*PipelineBuilder) WithStageSchema added in v1.3.0

func (b *PipelineBuilder) WithStageSchema(ag *agent.Agent, promptTemplate string, outputSchema string, maxRetries int32) *PipelineBuilder

WithStageSchema adds a stage with JSON Schema validation and retry on failure. The outputSchema is validated instantly (no LLM call) using gojsonschema. On failure, the retry prompt includes the full schema and specific violations.

func (*PipelineBuilder) WithStageValidation

func (b *PipelineBuilder) WithStageValidation(ag *agent.Agent, promptTemplate string, validationPrompt string) *PipelineBuilder

WithStageValidation adds a stage with output validation. The validationPrompt should check if the output meets requirements. Use {{output}} placeholder to reference the stage output.

type PipelineExecutor

type PipelineExecutor struct {
	// contains filtered or unexported fields
}

PipelineExecutor executes a pipeline pattern.

func NewPipelineExecutor

func NewPipelineExecutor(orchestrator *Orchestrator, pattern *loomv1.PipelinePattern, workflowID string) *PipelineExecutor

NewPipelineExecutor creates a new pipeline executor.

func (*PipelineExecutor) Execute

Execute runs the pipeline and returns the result.

type ScheduleYAML

type ScheduleYAML struct {
	Cron                string            `yaml:"cron"`
	Timezone            string            `yaml:"timezone,omitempty"`
	Enabled             bool              `yaml:"enabled"`
	SkipIfRunning       bool              `yaml:"skip_if_running,omitempty"`
	MaxExecutionSeconds int32             `yaml:"max_execution_seconds,omitempty"`
	Variables           map[string]string `yaml:"variables,omitempty"`
	SessionMode         string            `yaml:"session_mode,omitempty"` // "new" or "resume"
}

ScheduleYAML represents the schedule configuration in workflow YAML files.

type StageOutput

type StageOutput struct {
	StageID     string                 `json:"stage_id"`
	Status      string                 `json:"status"` // "completed", "failed", "skipped"
	StartedAt   time.Time              `json:"started_at"`
	CompletedAt time.Time              `json:"completed_at"`
	Inputs      map[string]interface{} `json:"inputs"`
	Outputs     map[string]interface{} `json:"outputs"`
	Evidence    Evidence               `json:"evidence"`
}

StageOutput represents the output from a single workflow stage

type StructuredContext

type StructuredContext struct {
	WorkflowContext ContextMetadata        `json:"workflow_context"`
	StageOutputs    map[string]StageOutput `json:"stage_outputs"`
}

StructuredContext represents the accumulated context across workflow stages. This prevents agent hallucinations by enforcing structured data passing.

func NewStructuredContext

func NewStructuredContext(workflowID, workflowType string) *StructuredContext

NewStructuredContext creates a new structured context for a workflow

func (*StructuredContext) AddStageOutput

func (ctx *StructuredContext) AddStageOutput(stageKey string, output StageOutput) error

AddStageOutput adds an output from a completed stage

func (*StructuredContext) FromJSON

func (ctx *StructuredContext) FromJSON(jsonStr string) error

FromJSON deserializes context from JSON (for parsing agent outputs)

func (*StructuredContext) GetStageOutput

func (ctx *StructuredContext) GetStageOutput(stageKey string) (StageOutput, bool)

GetStageOutput retrieves output from a specific stage

func (*StructuredContext) GetTargetTable

func (ctx *StructuredContext) GetTargetTable(sourceStageKey string) (database, table string, err error)

GetTargetTable extracts the recommended table from a specific stage (typically stage-2)

func (*StructuredContext) ToJSON

func (ctx *StructuredContext) ToJSON() (string, error)

ToJSON serializes the context to JSON for injection into agent prompts

func (*StructuredContext) ValidateDatabaseList

func (ctx *StructuredContext) ValidateDatabaseList(
	database string,
	sourceStageKey string,
) error

ValidateDatabaseList checks if a database exists in the discovery stage

func (*StructuredContext) ValidateFileCreation

func (ctx *StructuredContext) ValidateFileCreation(stageKey string, filePathKey string) error

ValidateFileCreation ensures a file output was actually created on disk

func (*StructuredContext) ValidateTableReference

func (ctx *StructuredContext) ValidateTableReference(
	currentStageKey string,
	database string,
	table string,
	sourceStageKey string,
) error

ValidateTableReference checks if a table reference exists in a previous stage This prevents agents from hallucinating non-existent tables

func (*StructuredContext) ValidateToolExecutions

func (ctx *StructuredContext) ValidateToolExecutions(stageKey string, requiredTools []string) error

ValidateToolExecutions ensures required tools were actually executed (prevents action hallucination)

type SwarmExecutor

type SwarmExecutor struct {
	// contains filtered or unexported fields
}

SwarmExecutor executes a swarm pattern (collective voting).

func NewSwarmExecutor

func NewSwarmExecutor(orchestrator *Orchestrator, pattern *loomv1.SwarmPattern, workflowID string) *SwarmExecutor

NewSwarmExecutor creates a new swarm executor.

func (*SwarmExecutor) Execute

Execute runs the swarm pattern and returns the voting result.

type TaskTrackedOrchestrator added in v1.3.0

type TaskTrackedOrchestrator struct {
	// contains filtered or unexported fields
}

TaskTrackedOrchestrator wraps an Orchestrator to persist workflow execution state via the task system. Each workflow pattern execution creates a board with tasks for each stage/agent, providing:

  • Persistent state: survives server restarts (SQLite/PostgreSQL)
  • Progress visibility: board shows stage status in real-time
  • Audit trail: full history of stage transitions
  • Stage output capture: stored in task notes
  • Resume capability: find last completed stage on restart
  • Graph memory: auto-creates memories for completed stages

func NewTaskTrackedOrchestrator added in v1.3.0

func NewTaskTrackedOrchestrator(inner *Orchestrator, manager *task.Manager, tracer observability.Tracer, logger *zap.Logger) *TaskTrackedOrchestrator

NewTaskTrackedOrchestrator wraps an orchestrator with task tracking.

func (*TaskTrackedOrchestrator) ExecutePattern added in v1.3.0

ExecutePattern wraps the inner orchestrator's ExecutePattern with task tracking. Before execution: creates a board and tasks from the pattern. After execution: closes tasks with stage outputs and records results.

func (*TaskTrackedOrchestrator) Orchestrator added in v1.3.0

func (t *TaskTrackedOrchestrator) Orchestrator() *Orchestrator

Orchestrator returns the wrapped orchestrator for direct access.

type TemplateMetadata

type TemplateMetadata struct {
	Name        string            `yaml:"name"`
	Description string            `yaml:"description,omitempty"`
	Version     string            `yaml:"version,omitempty"`
	Labels      map[string]string `yaml:"labels,omitempty"`
}

TemplateMetadata contains template identification

type TemplateParameterConfig

type TemplateParameterConfig struct {
	Name         string `yaml:"name"`
	Type         string `yaml:"type"`
	Required     bool   `yaml:"required,omitempty"`
	DefaultValue string `yaml:"default,omitempty"`
	Description  string `yaml:"description,omitempty"`
}

TemplateParameterConfig defines a template parameter

type TemplateRegistry

type TemplateRegistry struct {
	// contains filtered or unexported fields
}

TemplateRegistry manages agent templates

func NewTemplateRegistry

func NewTemplateRegistry() *TemplateRegistry

NewTemplateRegistry creates a new template registry

func (*TemplateRegistry) ApplyTemplate

func (r *TemplateRegistry) ApplyTemplate(templateName string, vars map[string]string) (*loomv1.AgentConfig, error)

ApplyTemplate applies a template with given variables to create an agent config

func (*TemplateRegistry) GetTemplate

func (r *TemplateRegistry) GetTemplate(name string) (*AgentTemplateConfig, error)

GetTemplate returns a registered template

func (*TemplateRegistry) ListTemplates

func (r *TemplateRegistry) ListTemplates() []string

ListTemplates returns all registered template names

func (*TemplateRegistry) LoadTemplate

func (r *TemplateRegistry) LoadTemplate(path string) error

LoadTemplate loads a template from a YAML file

func (*TemplateRegistry) LoadTemplateFromString

func (r *TemplateRegistry) LoadTemplateFromString(yamlContent string) error

LoadTemplateFromString loads a template from YAML string

type ToolCall

type ToolCall struct {
	ToolName      string                 `json:"tool_name"`
	Parameters    map[string]interface{} `json:"parameters"`
	ResultSummary string                 `json:"result_summary"`
}

ToolCall records a single tool execution

type WorkflowConfig

type WorkflowConfig struct {
	APIVersion string                 `yaml:"apiVersion"`
	Kind       string                 `yaml:"kind"`
	Metadata   WorkflowMetadata       `yaml:"metadata"`
	Spec       map[string]interface{} `yaml:"spec"`
	Schedule   *ScheduleYAML          `yaml:"schedule,omitempty"`
}

WorkflowConfig represents the Kubernetes-style YAML structure. Based on dogfooding recommendations: apiVersion, kind, metadata, spec

func LoadWorkflowConfigFromYAML

func LoadWorkflowConfigFromYAML(path string) (*WorkflowConfig, error)

LoadWorkflowConfigFromYAML loads a workflow YAML file and returns the parsed config. This is used by the scheduler to access the schedule section.

type WorkflowMetadata

type WorkflowMetadata struct {
	Name        string            `yaml:"name"`
	Version     string            `yaml:"version,omitempty"`
	Description string            `yaml:"description,omitempty"`
	Labels      map[string]string `yaml:"labels,omitempty"`
}

WorkflowMetadata contains workflow identification information

type WorkflowProgressCallback

type WorkflowProgressCallback func(event WorkflowProgressEvent)

WorkflowProgressCallback is called during workflow execution to report progress.

type WorkflowProgressEvent

type WorkflowProgressEvent struct {
	// Pattern type being executed
	PatternType string

	// Current stage/step description
	Message string

	// Progress percentage (0-100)
	Progress int32

	// Current agent executing (if applicable)
	CurrentAgentID string

	// Partial results available so far
	PartialResults []*loomv1.AgentResult
}

WorkflowProgressEvent represents a progress update during workflow execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL