Documentation
¶
Overview ¶
Package coordinator orchestrates dependency-aware parallel agent execution.
Index ¶
- Variables
- func CopyContextForAgent(main *model.AgentContext) *model.AgentContext
- type AgentConfig
- type AgentRegistry
- type AgentStats
- type AgentType
- type Coordinator
- func (c *Coordinator) AggregateResults() model.AWSData
- func (c *Coordinator) Analyze(query string) []*dt.Node
- func (c *Coordinator) SpawnAgents(ctx context.Context, applicable []*dt.Node)
- func (c *Coordinator) Stats() AgentStats
- func (c *Coordinator) WaitForCompletion(ctx context.Context, timeout time.Duration) error
- type Dependency
- type DependencyScheduler
- type OrderGroup
- type ParallelAgent
- type SharedDataBus
Constants ¶
This section is empty.
Variables ¶
var ( AgentTypeLog = AgentType{ Name: "log", Dependencies: Dependency{ ProvidedData: []string{"logs", "error_patterns", "log_metrics"}, ExecutionOrder: 1, WaitTimeout: 5 * time.Second, }, } AgentTypeMetrics = AgentType{ Name: "metrics", Dependencies: Dependency{ ProvidedData: []string{"metrics", "performance_data", "thresholds"}, ExecutionOrder: 1, WaitTimeout: 5 * time.Second, }, } AgentTypeInfrastructure = AgentType{ Name: "infrastructure", Dependencies: Dependency{ ProvidedData: []string{"service_config", "deployment_status", "resource_health"}, ExecutionOrder: 2, WaitTimeout: 8 * time.Second, }, } AgentTypeSecurity = AgentType{ Name: "security", Dependencies: Dependency{ RequiredData: []string{"logs", "service_config"}, ProvidedData: []string{"security_status", "access_patterns", "vulnerabilities"}, ExecutionOrder: 3, WaitTimeout: 6 * time.Second, }, } AgentTypeCost = AgentType{ Name: "cost", Dependencies: Dependency{ RequiredData: []string{"metrics", "resource_health"}, ProvidedData: []string{"cost_analysis", "usage_patterns", "optimization_suggestions"}, ExecutionOrder: 4, WaitTimeout: 8 * time.Second, }, } AgentTypePerformance = AgentType{ Name: "performance", Dependencies: Dependency{ RequiredData: []string{"metrics", "logs", "resource_health"}, ProvidedData: []string{"performance_analysis", "bottlenecks", "scaling_recommendations"}, ExecutionOrder: 5, WaitTimeout: 8 * time.Second, }, } AgentTypeDeployment = AgentType{ Name: "deployment", Dependencies: Dependency{ ProvidedData: []string{"deployment_status", "recent_changes"}, ExecutionOrder: 2, WaitTimeout: 6 * time.Second, }, } AgentTypeDataPipeline = AgentType{ Name: "datapipeline", Dependencies: Dependency{ ProvidedData: []string{"pipeline_status", "etl_health"}, ExecutionOrder: 3, WaitTimeout: 8 * time.Second, }, } AgentTypeQueue = AgentType{ Name: "queue", Dependencies: Dependency{ ProvidedData: []string{"queue_health", "backlog_metrics"}, ExecutionOrder: 3, WaitTimeout: 6 * time.Second, }, } AgentTypeAvailability = AgentType{ Name: "availability", Dependencies: Dependency{ ProvidedData: []string{"availability_status", "region_health"}, ExecutionOrder: 4, WaitTimeout: 6 * time.Second, }, } AgentTypeLLM = AgentType{ Name: "llm", Dependencies: Dependency{ ProvidedData: []string{"llm_metrics", "model_health"}, ExecutionOrder: 2, WaitTimeout: 6 * time.Second, }, } )
Functions ¶
func CopyContextForAgent ¶
func CopyContextForAgent(main *model.AgentContext) *model.AgentContext
CopyContextForAgent clones the main context for an agent run.
Types ¶
type AgentConfig ¶
AgentConfig captures a runnable agent definition emitted by the decision tree.
type AgentRegistry ¶
type AgentRegistry struct {
// contains filtered or unexported fields
}
AgentRegistry tracks agents in a concurrency-safe fashion.
func NewAgentRegistry ¶
func NewAgentRegistry() *AgentRegistry
NewAgentRegistry constructs an empty registry.
func (*AgentRegistry) Agents ¶
func (r *AgentRegistry) Agents() []*ParallelAgent
Agents returns a snapshot of the current agents slice.
func (*AgentRegistry) MarkCompleted ¶
func (r *AgentRegistry) MarkCompleted()
MarkCompleted marks an agent completion event.
func (*AgentRegistry) MarkFailed ¶
func (r *AgentRegistry) MarkFailed()
MarkFailed marks an agent failure event.
func (*AgentRegistry) Register ¶
func (r *AgentRegistry) Register(agent *ParallelAgent)
Register stores an agent and increments totals.
func (*AgentRegistry) Reset ¶
func (r *AgentRegistry) Reset()
Reset clears the registry to support reuse.
func (*AgentRegistry) Stats ¶
func (r *AgentRegistry) Stats() AgentStats
Stats returns a snapshot of the aggregate stats.
type AgentStats ¶
AgentStats tracks counts for coordinator telemetry.
type AgentType ¶
type AgentType struct {
Name string
Dependencies Dependency
}
AgentType represents a specialized worker that can run in parallel.
type Coordinator ¶
type Coordinator struct {
DecisionTree *dt.Tree
MainContext *model.AgentContext
// contains filtered or unexported fields
}
Coordinator drives decision-tree-based parallel execution.
func New ¶
func New(mainContext *model.AgentContext, client *awsclient.Client) *Coordinator
New returns a ready-to-use coordinator.
func (*Coordinator) AggregateResults ¶
func (c *Coordinator) AggregateResults() model.AWSData
AggregateResults merges successful agent outputs.
func (*Coordinator) Analyze ¶
func (c *Coordinator) Analyze(query string) []*dt.Node
Analyze traverses the decision tree for the provided query.
func (*Coordinator) SpawnAgents ¶
func (c *Coordinator) SpawnAgents(ctx context.Context, applicable []*dt.Node)
SpawnAgents starts agents grouped by dependency order.
func (*Coordinator) Stats ¶
func (c *Coordinator) Stats() AgentStats
Stats exposes snapshot counters for callers needing execution metrics.
func (*Coordinator) WaitForCompletion ¶
WaitForCompletion blocks until all agents finish or timeout occurs.
type Dependency ¶
type Dependency struct {
RequiredData []string
ProvidedData []string
ExecutionOrder int
WaitTimeout time.Duration
}
Dependency captures coordination requirements for a parallel agent type.
type DependencyScheduler ¶
type DependencyScheduler struct{}
DependencyScheduler produces execution groups honoring dependency order.
func NewDependencyScheduler ¶
func NewDependencyScheduler() *DependencyScheduler
NewDependencyScheduler constructs a scheduler instance.
func (*DependencyScheduler) Plan ¶
func (s *DependencyScheduler) Plan(agentConfigs map[string]AgentConfig) []OrderGroup
Plan groups agent configs by execution order so the coordinator can fan them out deterministically.
func (*DependencyScheduler) Ready ¶
func (s *DependencyScheduler) Ready(agentType AgentType, bus *SharedDataBus) bool
Ready reports whether an agent's dependencies are satisfied on the shared bus.
type OrderGroup ¶
type OrderGroup struct {
Order int
Agents []AgentConfig
}
OrderGroup represents a batch of agent configs that share the same execution order.
type ParallelAgent ¶
type ParallelAgent struct {
ID string
Type AgentType
Status string
StartTime time.Time
EndTime time.Time
Context *model.AgentContext
Results model.AWSData
Error error
Operations []awsclient.LLMOperation
}
ParallelAgent represents a running worker instance.
type SharedDataBus ¶
type SharedDataBus struct {
// contains filtered or unexported fields
}
SharedDataBus stores dependency data produced by agents.
func NewSharedDataBus ¶
func NewSharedDataBus() *SharedDataBus
NewSharedDataBus returns an initialized bus.
func (*SharedDataBus) HasAll ¶
func (b *SharedDataBus) HasAll(keys []string) bool
HasAll returns true if each key exists in the bus.
func (*SharedDataBus) Load ¶
func (b *SharedDataBus) Load(key string) (any, bool)
Load retrieves a value if it exists.
func (*SharedDataBus) Store ¶
func (b *SharedDataBus) Store(key string, value any)
Store saves the provided value under the supplied key.