coordinator

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2026 License: MIT Imports: 10 Imported by: 0

README

Coordinator Package

This package runs the parallel investigation workflow that follows decision tree outputs. The code is split into a few focused files so the main Coordinator type in coordinator.go stays small and testable.

Files

File Purpose
coordinator.go Entry point. Turns decision tree nodes into AgentConfigs, runs the dependency scheduler, launches agents, and aggregates results.
agent_types.go Declares each agent type (log, infrastructure, metrics, etc.) plus their dependency metadata (required data, provided data, execution order).
scheduler.go Groups agents by execution order and checks whether dependencies are satisfied via the shared data bus before launch.
state.go Shared concurrency primitives: SharedDataBus (dependency payload store), AgentRegistry (thread-safe list + counters), and CopyContextForAgent.
operations.go Maps agent types to the AWS commands/LLM operations they should run. Keeps the switchboard out of core logic.
playbooks.go AWS helpers (lightweight service discovery, log sampling, keyword helpers) plus factory helpers (newParallelAgent, persistProvidedData, lookupAgentType).

Flow Overview

  1. Decision tree produces []*dt.Node with agent names and parameters.
  2. Coordinator.SpawnAgents builds a map of AgentConfigs, keeping the highest-priority entry per agent name.
  3. DependencyScheduler.Plan sorts configs into []OrderGroup by execution order.
  4. Each order group launches agents whose dependencies are satisfied on the SharedDataBus. Every agent run is recorded in the AgentRegistry.
  5. runParallelAgent executes the precomputed operations for that agent type. When it succeeds, persistProvidedData pushes any promised data (e.g., logs, service_config) onto the bus for downstream agents.
  6. AggregateResults folds all completed agent outputs into a single model.AWSData blob and adds metadata (counts, decision path, timestamp).

Extending

  • New agent type: add it to agent_types.go, mention it in the decision tree, and register its LLM operations inside operations.go.
  • New dependency: include the provided/required data string in the relevant AgentType. Downstream agents reference those string keys when checking readiness.
  • New AWS playbook: keep helpers in playbooks.go (or a subpackage) instead of coordinator.go; call them from the operation generator.

This layout keeps orchestration (scheduling, lifecycle, aggregation) isolated from service-specific details and makes it easier to test each layer independently.

Documentation

Overview

Package coordinator orchestrates dependency-aware parallel agent execution.

Index

Constants

This section is empty.

Variables

View Source
var (
	AgentTypeLog = AgentType{
		Name: "log",
		Dependencies: Dependency{
			ProvidedData:   []string{"logs", "error_patterns", "log_metrics"},
			ExecutionOrder: 1,
			WaitTimeout:    5 * time.Second,
		},
	}
	AgentTypeMetrics = AgentType{
		Name: "metrics",
		Dependencies: Dependency{
			ProvidedData:   []string{"metrics", "performance_data", "thresholds"},
			ExecutionOrder: 1,
			WaitTimeout:    5 * time.Second,
		},
	}
	AgentTypeInfrastructure = AgentType{
		Name: "infrastructure",
		Dependencies: Dependency{
			ProvidedData:   []string{"service_config", "deployment_status", "resource_health"},
			ExecutionOrder: 2,
			WaitTimeout:    8 * time.Second,
		},
	}
	AgentTypeSecurity = AgentType{
		Name: "security",
		Dependencies: Dependency{
			RequiredData:   []string{"logs", "service_config"},
			ProvidedData:   []string{"security_status", "access_patterns", "vulnerabilities"},
			ExecutionOrder: 3,
			WaitTimeout:    6 * time.Second,
		},
	}
	AgentTypeCost = AgentType{
		Name: "cost",
		Dependencies: Dependency{
			RequiredData:   []string{"metrics", "resource_health"},
			ProvidedData:   []string{"cost_analysis", "usage_patterns", "optimization_suggestions"},
			ExecutionOrder: 4,
			WaitTimeout:    8 * time.Second,
		},
	}
	AgentTypePerformance = AgentType{
		Name: "performance",
		Dependencies: Dependency{
			RequiredData:   []string{"metrics", "logs", "resource_health"},
			ProvidedData:   []string{"performance_analysis", "bottlenecks", "scaling_recommendations"},
			ExecutionOrder: 5,
			WaitTimeout:    8 * time.Second,
		},
	}
	AgentTypeDeployment = AgentType{
		Name: "deployment",
		Dependencies: Dependency{
			ProvidedData:   []string{"deployment_status", "recent_changes"},
			ExecutionOrder: 2,
			WaitTimeout:    6 * time.Second,
		},
	}
	AgentTypeDataPipeline = AgentType{
		Name: "datapipeline",
		Dependencies: Dependency{
			ProvidedData:   []string{"pipeline_status", "etl_health"},
			ExecutionOrder: 3,
			WaitTimeout:    8 * time.Second,
		},
	}
	AgentTypeQueue = AgentType{
		Name: "queue",
		Dependencies: Dependency{
			ProvidedData:   []string{"queue_health", "backlog_metrics"},
			ExecutionOrder: 3,
			WaitTimeout:    6 * time.Second,
		},
	}
	AgentTypeAvailability = AgentType{
		Name: "availability",
		Dependencies: Dependency{
			ProvidedData:   []string{"availability_status", "region_health"},
			ExecutionOrder: 4,
			WaitTimeout:    6 * time.Second,
		},
	}
	AgentTypeLLM = AgentType{
		Name: "llm",
		Dependencies: Dependency{
			ProvidedData:   []string{"llm_metrics", "model_health"},
			ExecutionOrder: 2,
			WaitTimeout:    6 * time.Second,
		},
	}
)

Functions

func CopyContextForAgent

func CopyContextForAgent(main *model.AgentContext) *model.AgentContext

CopyContextForAgent clones the main context for an agent run.

Types

type AgentConfig

type AgentConfig struct {
	Priority   int
	Parameters model.AWSData
	AgentType  AgentType
}

AgentConfig captures a runnable agent definition emitted by the decision tree.

type AgentRegistry

type AgentRegistry struct {
	// contains filtered or unexported fields
}

AgentRegistry tracks agents in a concurrency-safe fashion.

func NewAgentRegistry

func NewAgentRegistry() *AgentRegistry

NewAgentRegistry constructs an empty registry.

func (*AgentRegistry) Agents

func (r *AgentRegistry) Agents() []*ParallelAgent

Agents returns a snapshot of the current agents slice.

func (*AgentRegistry) MarkCompleted

func (r *AgentRegistry) MarkCompleted()

MarkCompleted marks an agent completion event.

func (*AgentRegistry) MarkFailed

func (r *AgentRegistry) MarkFailed()

MarkFailed marks an agent failure event.

func (*AgentRegistry) Register

func (r *AgentRegistry) Register(agent *ParallelAgent)

Register stores an agent and increments totals.

func (*AgentRegistry) Reset

func (r *AgentRegistry) Reset()

Reset clears the registry to support reuse.

func (*AgentRegistry) Stats

func (r *AgentRegistry) Stats() AgentStats

Stats returns a snapshot of the aggregate stats.

type AgentStats

type AgentStats struct {
	Total     int
	Completed int
	Failed    int
}

AgentStats tracks counts for coordinator telemetry.

type AgentType

type AgentType struct {
	Name         string
	Dependencies Dependency
}

AgentType represents a specialized worker that can run in parallel.

type Coordinator

type Coordinator struct {
	DecisionTree *dt.Tree
	MainContext  *model.AgentContext
	// contains filtered or unexported fields
}

Coordinator drives decision-tree-based parallel execution.

func New

func New(mainContext *model.AgentContext, client *awsclient.Client) *Coordinator

New returns a ready-to-use coordinator.

func (*Coordinator) AggregateResults

func (c *Coordinator) AggregateResults() model.AWSData

AggregateResults merges successful agent outputs.

func (*Coordinator) Analyze

func (c *Coordinator) Analyze(query string) []*dt.Node

Analyze traverses the decision tree for the provided query.

func (*Coordinator) SpawnAgents

func (c *Coordinator) SpawnAgents(ctx context.Context, applicable []*dt.Node)

SpawnAgents starts agents grouped by dependency order.

func (*Coordinator) Stats

func (c *Coordinator) Stats() AgentStats

Stats exposes snapshot counters for callers needing execution metrics.

func (*Coordinator) WaitForCompletion

func (c *Coordinator) WaitForCompletion(ctx context.Context, timeout time.Duration) error

WaitForCompletion blocks until all agents finish or timeout occurs.

type Dependency

type Dependency struct {
	RequiredData   []string
	ProvidedData   []string
	ExecutionOrder int
	WaitTimeout    time.Duration
}

Dependency captures coordination requirements for a parallel agent type.

type DependencyScheduler

type DependencyScheduler struct{}

DependencyScheduler produces execution groups honoring dependency order.

func NewDependencyScheduler

func NewDependencyScheduler() *DependencyScheduler

NewDependencyScheduler constructs a scheduler instance.

func (*DependencyScheduler) Plan

func (s *DependencyScheduler) Plan(agentConfigs map[string]AgentConfig) []OrderGroup

Plan groups agent configs by execution order so the coordinator can fan them out deterministically.

func (*DependencyScheduler) Ready

func (s *DependencyScheduler) Ready(agentType AgentType, bus *SharedDataBus) bool

Ready reports whether an agent's dependencies are satisfied on the shared bus.

type OrderGroup

type OrderGroup struct {
	Order  int
	Agents []AgentConfig
}

OrderGroup represents a batch of agent configs that share the same execution order.

type ParallelAgent

type ParallelAgent struct {
	ID         string
	Type       AgentType
	Status     string
	StartTime  time.Time
	EndTime    time.Time
	Context    *model.AgentContext
	Results    model.AWSData
	Error      error
	Operations []awsclient.LLMOperation
}

ParallelAgent represents a running worker instance.

type SharedDataBus

type SharedDataBus struct {
	// contains filtered or unexported fields
}

SharedDataBus stores dependency data produced by agents.

func NewSharedDataBus

func NewSharedDataBus() *SharedDataBus

NewSharedDataBus returns an initialized bus.

func (*SharedDataBus) HasAll

func (b *SharedDataBus) HasAll(keys []string) bool

HasAll returns true if each key exists in the bus.

func (*SharedDataBus) Load

func (b *SharedDataBus) Load(key string) (any, bool)

Load retrieves a value if it exists.

func (*SharedDataBus) Store

func (b *SharedDataBus) Store(key string, value any)

Store saves the provided value under the supplied key.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL