Documentation
¶
Index ¶
- Constants
- func RegisterDefaultModes(reg *ModeRegistry, logger *zap.Logger) error
- type AggregatedResult
- type AggregationStrategy
- type Aggregator
- type DedupResultAggregator
- type FailurePolicy
- type ModeRegistry
- type ModeStrategy
- type QueryDecomposer
- type RetrievalResultAggregator
- type RetrievalSupervisor
- type RetrievalWorker
- type ScopedStores
- func (s *ScopedStores) LoadPrompt(ctx context.Context, agentType, name, tenantID string) *agent.PromptDocument
- func (s *ScopedStores) PersistConversation(ctx context.Context, ...)
- func (s *ScopedStores) RecordRun(ctx context.Context, tenantID, traceID, input string, startTime time.Time) string
- func (s *ScopedStores) RestoreConversation(ctx context.Context, conversationID string) []interface{}
- func (s *ScopedStores) UpdateRunStatus(ctx context.Context, runID, status string, output *agent.RunOutputDoc, ...) error
- type StaticSplitter
- type Supervisor
- type SupervisorConfig
- type TaskSplitter
- type WorkerPool
- type WorkerPoolConfig
- type WorkerResult
- type WorkerTask
Constants ¶
const ( ModeReasoning = "reasoning" ModeCollaboration = "collaboration" ModeHierarchical = "hierarchical" ModeCrew = "crew" ModeDeliberation = "deliberation" ModeFederation = "federation" )
Variables ¶
This section is empty.
Functions ¶
func RegisterDefaultModes ¶
func RegisterDefaultModes(reg *ModeRegistry, logger *zap.Logger) error
RegisterDefaultModes registers built-in mode strategies into a single registry.
Types ¶
type AggregatedResult ¶
type AggregatedResult struct {
Content string
TokensUsed int
Cost float64
Duration time.Duration
SourceCount int
FailedCount int
Metadata map[string]any
FinishReason string
}
AggregatedResult is the final merged output from all sub-agents.
type AggregationStrategy ¶
type AggregationStrategy string
AggregationStrategy defines how sub-agent results are combined.
const ( StrategyMergeAll AggregationStrategy = "merge_all" StrategyBestOfN AggregationStrategy = "best_of_n" StrategyVoteMajority AggregationStrategy = "vote_majority" StrategyWeightedMerge AggregationStrategy = "weighted_merge" )
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Aggregator merges multiple WorkerResult into a single AggregatedResult.
func NewAggregator ¶
func NewAggregator(strategy AggregationStrategy) *Aggregator
NewAggregator creates an Aggregator with the given strategy.
func (*Aggregator) Aggregate ¶
func (a *Aggregator) Aggregate(results []WorkerResult) (*AggregatedResult, error)
Aggregate applies the configured strategy to the results.
type DedupResultAggregator ¶
type DedupResultAggregator struct{}
DedupResultAggregator merges retrieval results and keeps highest-score doc per key.
func NewDedupResultAggregator ¶
func NewDedupResultAggregator() *DedupResultAggregator
NewDedupResultAggregator creates default dedup aggregator.
func (*DedupResultAggregator) Aggregate ¶
func (a *DedupResultAggregator) Aggregate(_ context.Context, resultsByQuery map[string][]rag.RetrievalResult) ([]rag.RetrievalResult, error)
Aggregate merges and deduplicates by document ID (fallback: content).
type FailurePolicy ¶
type FailurePolicy string
FailurePolicy controls how the pool handles sub-agent failures.
const ( PolicyFailFast FailurePolicy = "fail_fast" PolicyPartialResult FailurePolicy = "partial_result" PolicyRetryFailed FailurePolicy = "retry_failed" )
type ModeRegistry ¶
type ModeRegistry struct {
// contains filtered or unexported fields
}
ModeRegistry is the unified registry for all agent execution modes.
func GlobalModeRegistry ¶
func GlobalModeRegistry() *ModeRegistry
GlobalModeRegistry returns the singleton mode registry.
func NewModeRegistry ¶
func NewModeRegistry() *ModeRegistry
NewModeRegistry creates an empty ModeRegistry.
func (*ModeRegistry) Execute ¶
func (r *ModeRegistry) Execute(ctx context.Context, modeName string, agents []agent.Agent, input *agent.Input) (*agent.Output, error)
Execute looks up the named mode and runs it.
func (*ModeRegistry) Get ¶
func (r *ModeRegistry) Get(name string) (ModeStrategy, error)
Get returns the strategy for the given mode name.
func (*ModeRegistry) List ¶
func (r *ModeRegistry) List() []string
List returns all registered mode names.
func (*ModeRegistry) Register ¶
func (r *ModeRegistry) Register(strategy ModeStrategy)
Register adds a mode strategy. Overwrites if the name already exists.
type ModeStrategy ¶
type ModeStrategy interface {
// Name returns the unique mode identifier.
Name() string
// Execute runs the mode's logic using the provided agents and input.
Execute(ctx context.Context, agents []agent.Agent, input *agent.Input) (*agent.Output, error)
}
ModeStrategy is the unified interface for all agent execution modes. Each mode (reasoning, collaboration, hierarchical, crew, deliberation, federation) must implement this interface and register via ModeRegistry.
type QueryDecomposer ¶
QueryDecomposer splits a query into sub-queries for parallel retrieval.
type RetrievalResultAggregator ¶
type RetrievalResultAggregator interface {
Aggregate(ctx context.Context, resultsByQuery map[string][]rag.RetrievalResult) ([]rag.RetrievalResult, error)
}
RetrievalResultAggregator merges worker retrieval outputs.
type RetrievalSupervisor ¶
type RetrievalSupervisor struct {
// contains filtered or unexported fields
}
RetrievalSupervisor orchestrates query decomposition -> parallel worker retrieval -> dedup aggregation.
func NewRetrievalSupervisor ¶
func NewRetrievalSupervisor( decomposer QueryDecomposer, workers []RetrievalWorker, aggregator RetrievalResultAggregator, logger *zap.Logger, ) *RetrievalSupervisor
NewRetrievalSupervisor creates a retrieval collaboration supervisor.
func (*RetrievalSupervisor) Retrieve ¶
func (s *RetrievalSupervisor) Retrieve(ctx context.Context, query string) ([]rag.RetrievalResult, error)
Retrieve runs the collaboration pipeline.
type RetrievalWorker ¶
type RetrievalWorker interface {
Retrieve(ctx context.Context, query string) ([]rag.RetrievalResult, error)
}
RetrievalWorker executes retrieval for a sub-query.
type ScopedStores ¶
type ScopedStores struct {
// contains filtered or unexported fields
}
ScopedStores wraps agent.PersistenceStores and prefixes all operations with a sub-agent scope. This ensures sub-agent conversation/run/prompt data does not pollute the parent agent's stores.
func NewScopedStores ¶
func NewScopedStores(inner *agent.PersistenceStores, agentID string, logger *zap.Logger) *ScopedStores
NewScopedStores creates a ScopedStores for the given sub-agent.
func (*ScopedStores) LoadPrompt ¶
func (s *ScopedStores) LoadPrompt(ctx context.Context, agentType, name, tenantID string) *agent.PromptDocument
LoadPrompt loads prompt from the scoped agent type.
func (*ScopedStores) PersistConversation ¶
func (s *ScopedStores) PersistConversation(ctx context.Context, conversationID, tenantID, userID, inputContent, outputContent string)
PersistConversation saves conversation with scoped conversation ID.
func (*ScopedStores) RecordRun ¶
func (s *ScopedStores) RecordRun(ctx context.Context, tenantID, traceID, input string, startTime time.Time) string
RecordRun records a scoped run.
func (*ScopedStores) RestoreConversation ¶
func (s *ScopedStores) RestoreConversation(ctx context.Context, conversationID string) []interface{}
RestoreConversation restores conversation from scoped ID.
func (*ScopedStores) UpdateRunStatus ¶
func (s *ScopedStores) UpdateRunStatus(ctx context.Context, runID, status string, output *agent.RunOutputDoc, errMsg string) error
UpdateRunStatus updates a scoped run status.
type StaticSplitter ¶
type StaticSplitter struct {
Agents []agent.Agent
Weights map[string]float64 // agent_id -> weight (optional)
}
StaticSplitter is a simple TaskSplitter that dispatches the same input to a fixed set of agents.
func (*StaticSplitter) Split ¶
func (s *StaticSplitter) Split(_ context.Context, input *agent.Input) ([]WorkerTask, error)
Split returns one WorkerTask per agent, all sharing the same input.
type Supervisor ¶
type Supervisor struct {
// contains filtered or unexported fields
}
Supervisor orchestrates task splitting, parallel worker execution, and result aggregation.
func NewSupervisor ¶
func NewSupervisor(splitter TaskSplitter, cfg SupervisorConfig, logger *zap.Logger) *Supervisor
NewSupervisor creates a Supervisor with the given configuration.
func (*Supervisor) Run ¶
func (s *Supervisor) Run(ctx context.Context, input *agent.Input) (*AggregatedResult, error)
Run executes the full supervisor pipeline: split -> dispatch -> aggregate.
type SupervisorConfig ¶
type SupervisorConfig struct {
AggregationStrategy AggregationStrategy
FailurePolicy FailurePolicy
MaxRetries int
TaskTimeout time.Duration
}
SupervisorConfig configures the Supervisor.
func DefaultSupervisorConfig ¶
func DefaultSupervisorConfig() SupervisorConfig
DefaultSupervisorConfig returns production defaults.
type TaskSplitter ¶
type TaskSplitter interface {
Split(ctx context.Context, input *agent.Input) ([]WorkerTask, error)
}
TaskSplitter splits a supervisor input into sub-tasks for workers.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages concurrent sub-agent execution.
func NewWorkerPool ¶
func NewWorkerPool(cfg WorkerPoolConfig, logger *zap.Logger) *WorkerPool
NewWorkerPool creates a WorkerPool.
func (*WorkerPool) Execute ¶
func (p *WorkerPool) Execute(ctx context.Context, tasks []WorkerTask) ([]WorkerResult, error)
Execute dispatches all tasks concurrently and collects results according to FailurePolicy.
type WorkerPoolConfig ¶
type WorkerPoolConfig struct {
FailurePolicy FailurePolicy
MaxRetries int
TaskTimeout time.Duration
}
WorkerPoolConfig configures the worker pool.
func DefaultWorkerPoolConfig ¶
func DefaultWorkerPoolConfig() WorkerPoolConfig
DefaultWorkerPoolConfig returns production defaults (PartialResult).
type WorkerResult ¶
type WorkerResult struct {
AgentID string
Content string
TokensUsed int
Cost float64
Duration time.Duration
Score float64 // confidence / quality score (used by BestOfN)
Weight float64 // agent weight (used by WeightedMerge)
Err error
FinishReason string
Metadata map[string]any
}
WorkerResult holds the output of a single sub-agent execution.