multiagent

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ModeReasoning     = "reasoning"
	ModeCollaboration = "collaboration"
	ModeHierarchical  = "hierarchical"
	ModeCrew          = "crew"
	ModeDeliberation  = "deliberation"
	ModeFederation    = "federation"
)

Variables

This section is empty.

Functions

func RegisterDefaultModes

func RegisterDefaultModes(reg *ModeRegistry, logger *zap.Logger) error

RegisterDefaultModes registers built-in mode strategies into a single registry.

Types

type AggregatedResult

type AggregatedResult struct {
	Content      string
	TokensUsed   int
	Cost         float64
	Duration     time.Duration
	SourceCount  int
	FailedCount  int
	Metadata     map[string]any
	FinishReason string
}

AggregatedResult is the final merged output from all sub-agents.

type AggregationStrategy

type AggregationStrategy string

AggregationStrategy defines how sub-agent results are combined.

const (
	StrategyMergeAll      AggregationStrategy = "merge_all"
	StrategyBestOfN       AggregationStrategy = "best_of_n"
	StrategyVoteMajority  AggregationStrategy = "vote_majority"
	StrategyWeightedMerge AggregationStrategy = "weighted_merge"
)

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

Aggregator merges multiple WorkerResult into a single AggregatedResult.

func NewAggregator

func NewAggregator(strategy AggregationStrategy) *Aggregator

NewAggregator creates an Aggregator with the given strategy.

func (*Aggregator) Aggregate

func (a *Aggregator) Aggregate(results []WorkerResult) (*AggregatedResult, error)

Aggregate applies the configured strategy to the results.

type DedupResultAggregator

type DedupResultAggregator struct{}

DedupResultAggregator merges retrieval results and keeps highest-score doc per key.

func NewDedupResultAggregator

func NewDedupResultAggregator() *DedupResultAggregator

NewDedupResultAggregator creates default dedup aggregator.

func (*DedupResultAggregator) Aggregate

func (a *DedupResultAggregator) Aggregate(_ context.Context, resultsByQuery map[string][]rag.RetrievalResult) ([]rag.RetrievalResult, error)

Aggregate merges and deduplicates by document ID (fallback: content).

type FailurePolicy

type FailurePolicy string

FailurePolicy controls how the pool handles sub-agent failures.

const (
	PolicyFailFast      FailurePolicy = "fail_fast"
	PolicyPartialResult FailurePolicy = "partial_result"
	PolicyRetryFailed   FailurePolicy = "retry_failed"
)

type ModeRegistry

type ModeRegistry struct {
	// contains filtered or unexported fields
}

ModeRegistry is the unified registry for all agent execution modes.

func GlobalModeRegistry

func GlobalModeRegistry() *ModeRegistry

GlobalModeRegistry returns the singleton mode registry.

func NewModeRegistry

func NewModeRegistry() *ModeRegistry

NewModeRegistry creates an empty ModeRegistry.

func (*ModeRegistry) Execute

func (r *ModeRegistry) Execute(ctx context.Context, modeName string, agents []agent.Agent, input *agent.Input) (*agent.Output, error)

Execute looks up the named mode and runs it.

func (*ModeRegistry) Get

func (r *ModeRegistry) Get(name string) (ModeStrategy, error)

Get returns the strategy for the given mode name.

func (*ModeRegistry) List

func (r *ModeRegistry) List() []string

List returns all registered mode names.

func (*ModeRegistry) Register

func (r *ModeRegistry) Register(strategy ModeStrategy)

Register adds a mode strategy. Overwrites if the name already exists.

type ModeStrategy

type ModeStrategy interface {
	// Name returns the unique mode identifier.
	Name() string
	// Execute runs the mode's logic using the provided agents and input.
	Execute(ctx context.Context, agents []agent.Agent, input *agent.Input) (*agent.Output, error)
}

ModeStrategy is the unified interface for all agent execution modes. Each mode (reasoning, collaboration, hierarchical, crew, deliberation, federation) must implement this interface and register via ModeRegistry.

type QueryDecomposer

type QueryDecomposer interface {
	Decompose(ctx context.Context, query string) ([]string, error)
}

QueryDecomposer splits a query into sub-queries for parallel retrieval.

type RetrievalResultAggregator

type RetrievalResultAggregator interface {
	Aggregate(ctx context.Context, resultsByQuery map[string][]rag.RetrievalResult) ([]rag.RetrievalResult, error)
}

RetrievalResultAggregator merges worker retrieval outputs.

type RetrievalSupervisor

type RetrievalSupervisor struct {
	// contains filtered or unexported fields
}

RetrievalSupervisor orchestrates query decomposition -> parallel worker retrieval -> dedup aggregation.

func NewRetrievalSupervisor

func NewRetrievalSupervisor(
	decomposer QueryDecomposer,
	workers []RetrievalWorker,
	aggregator RetrievalResultAggregator,
	logger *zap.Logger,
) *RetrievalSupervisor

NewRetrievalSupervisor creates a retrieval collaboration supervisor.

func (*RetrievalSupervisor) Retrieve

func (s *RetrievalSupervisor) Retrieve(ctx context.Context, query string) ([]rag.RetrievalResult, error)

Retrieve runs the collaboration pipeline.

type RetrievalWorker

type RetrievalWorker interface {
	Retrieve(ctx context.Context, query string) ([]rag.RetrievalResult, error)
}

RetrievalWorker executes retrieval for a sub-query.

type ScopedStores

type ScopedStores struct {
	// contains filtered or unexported fields
}

ScopedStores wraps agent.PersistenceStores and prefixes all operations with a sub-agent scope. This ensures sub-agent conversation/run/prompt data does not pollute the parent agent's stores.

func NewScopedStores

func NewScopedStores(inner *agent.PersistenceStores, agentID string, logger *zap.Logger) *ScopedStores

NewScopedStores creates a ScopedStores for the given sub-agent.

func (*ScopedStores) LoadPrompt

func (s *ScopedStores) LoadPrompt(ctx context.Context, agentType, name, tenantID string) *agent.PromptDocument

LoadPrompt loads prompt from the scoped agent type.

func (*ScopedStores) PersistConversation

func (s *ScopedStores) PersistConversation(ctx context.Context, conversationID, tenantID, userID, inputContent, outputContent string)

PersistConversation saves conversation with scoped conversation ID.

func (*ScopedStores) RecordRun

func (s *ScopedStores) RecordRun(ctx context.Context, tenantID, traceID, input string, startTime time.Time) string

RecordRun records a scoped run.

func (*ScopedStores) RestoreConversation

func (s *ScopedStores) RestoreConversation(ctx context.Context, conversationID string) []interface{}

RestoreConversation restores conversation from scoped ID.

func (*ScopedStores) UpdateRunStatus

func (s *ScopedStores) UpdateRunStatus(ctx context.Context, runID, status string, output *agent.RunOutputDoc, errMsg string) error

UpdateRunStatus updates a scoped run status.

type StaticSplitter

type StaticSplitter struct {
	Agents  []agent.Agent
	Weights map[string]float64 // agent_id -> weight (optional)
}

StaticSplitter is a simple TaskSplitter that dispatches the same input to a fixed set of agents.

func (*StaticSplitter) Split

func (s *StaticSplitter) Split(_ context.Context, input *agent.Input) ([]WorkerTask, error)

Split returns one WorkerTask per agent, all sharing the same input.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor orchestrates task splitting, parallel worker execution, and result aggregation.

func NewSupervisor

func NewSupervisor(splitter TaskSplitter, cfg SupervisorConfig, logger *zap.Logger) *Supervisor

NewSupervisor creates a Supervisor with the given configuration.

func (*Supervisor) Run

func (s *Supervisor) Run(ctx context.Context, input *agent.Input) (*AggregatedResult, error)

Run executes the full supervisor pipeline: split -> dispatch -> aggregate.

type SupervisorConfig

type SupervisorConfig struct {
	AggregationStrategy AggregationStrategy
	FailurePolicy       FailurePolicy
	MaxRetries          int
	TaskTimeout         time.Duration
}

SupervisorConfig configures the Supervisor.

func DefaultSupervisorConfig

func DefaultSupervisorConfig() SupervisorConfig

DefaultSupervisorConfig returns production defaults.

type TaskSplitter

type TaskSplitter interface {
	Split(ctx context.Context, input *agent.Input) ([]WorkerTask, error)
}

TaskSplitter splits a supervisor input into sub-tasks for workers.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages concurrent sub-agent execution.

func NewWorkerPool

func NewWorkerPool(cfg WorkerPoolConfig, logger *zap.Logger) *WorkerPool

NewWorkerPool creates a WorkerPool.

func (*WorkerPool) Execute

func (p *WorkerPool) Execute(ctx context.Context, tasks []WorkerTask) ([]WorkerResult, error)

Execute dispatches all tasks concurrently and collects results according to FailurePolicy.

type WorkerPoolConfig

type WorkerPoolConfig struct {
	FailurePolicy FailurePolicy
	MaxRetries    int
	TaskTimeout   time.Duration
}

WorkerPoolConfig configures the worker pool.

func DefaultWorkerPoolConfig

func DefaultWorkerPoolConfig() WorkerPoolConfig

DefaultWorkerPoolConfig returns production defaults (PartialResult).

type WorkerResult

type WorkerResult struct {
	AgentID      string
	Content      string
	TokensUsed   int
	Cost         float64
	Duration     time.Duration
	Score        float64 // confidence / quality score (used by BestOfN)
	Weight       float64 // agent weight (used by WeightedMerge)
	Err          error
	FinishReason string
	Metadata     map[string]any
}

WorkerResult holds the output of a single sub-agent execution.

type WorkerTask

type WorkerTask struct {
	AgentID   string
	Agent     agent.Agent
	Input     *agent.Input
	Weight    float64  // used by WeightedMerge aggregation
	ToolScope []string // allowed tool names (empty = all tools)
}

WorkerTask is a unit of work dispatched to a sub-agent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL