coordinator

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package coordinator implements active session coordination for multi-agent workflows. It transforms the NTM session coordinator from a passive identity holder to an active coordinator that monitors agents, detects conflicts, and assigns work.

Index

Constants

View Source
const MinDigestInterval = 10 * time.Second

MinDigestInterval is the minimum allowed digest interval.

View Source
const MinPollInterval = 100 * time.Millisecond

MinPollInterval is the minimum allowed poll interval to prevent ticker panics. time.NewTicker requires a positive duration.

Variables

This section is empty.

Functions

func ExtractMentionedFiles

func ExtractMentionedFiles(title, description string) []string

ExtractMentionedFiles extracts file paths mentioned in task text.

func ExtractTaskTags

func ExtractTaskTags(title, description string) []string

ExtractTaskTags extracts relevant profile tags from task title and description.

Types

type AgentDigestStatus

type AgentDigestStatus struct {
	PaneIndex    int     `json:"pane_index"`
	AgentType    string  `json:"agent_type"`
	Status       string  `json:"status"`
	ContextUsage float64 `json:"context_usage"`
	IdleFor      string  `json:"idle_for,omitempty"`
	Task         string  `json:"task,omitempty"`
}

AgentDigestStatus summarizes a single agent's status.

type AgentMonitor

type AgentMonitor struct {
	// contains filtered or unexported fields
}

AgentMonitor tracks agent status using the status detector.

func NewAgentMonitor

func NewAgentMonitor(session string, mailClient *agentmail.Client, projectKey string) *AgentMonitor

NewAgentMonitor creates a new agent monitor.

func (*AgentMonitor) CheckAgentHealth

func (m *AgentMonitor) CheckAgentHealth(paneID, agentType string) HealthCheck

CheckAgentHealth returns a health summary for an agent.

func (*AgentMonitor) GetAgentStatus

func (m *AgentMonitor) GetAgentStatus(paneID, agentType string) AgentStatusResult

GetAgentStatus returns the current status of an agent pane.

func (*AgentMonitor) GetAllAgentStatuses

func (m *AgentMonitor) GetAllAgentStatuses() (map[string]AgentStatusResult, error)

GetAllAgentStatuses returns status for all agent panes in the session.

func (*AgentMonitor) GetReservationsForAgent

func (m *AgentMonitor) GetReservationsForAgent(ctx context.Context, agentMailName string) ([]string, error)

GetReservationsForAgent returns file reservations held by an agent.

type AgentQualityMetrics

type AgentQualityMetrics struct {
	PaneID          string      `json:"pane_id"`
	AgentType       string      `json:"agent_type"`
	TestsPassed     int         `json:"tests_passed"`
	TestsFailed     int         `json:"tests_failed"`
	TestsTotal      int         `json:"tests_total"`
	ErrorCount      int         `json:"error_count"`
	RecoveryCount   int         `json:"recovery_count"`
	BugsIntroduced  int         `json:"bugs_introduced"`
	BugsFixed       int         `json:"bugs_fixed"`
	AvgContextUsage float64     `json:"avg_context_usage"`
	PeakContext     float64     `json:"peak_context_usage"`
	ContextSamples  int         `json:"context_samples"`
	LastActivity    time.Time   `json:"last_activity"`
	LastError       time.Time   `json:"last_error,omitempty"`
	ErrorHistory    []time.Time `json:"error_history,omitempty"` // Timestamps of errors for trend analysis
}

AgentQualityMetrics tracks quality metrics specific to an agent.

type AgentState

type AgentState struct {
	PaneID        string           `json:"pane_id"`
	PaneIndex     int              `json:"pane_index"`
	AgentType     string           `json:"agent_type"` // cc, cod, gmi
	AgentMailName string           `json:"agent_mail_name,omitempty"`
	Status        robot.AgentState `json:"status"`
	ContextUsage  float64          `json:"context_usage"`
	LastActivity  time.Time        `json:"last_activity"`
	CurrentTask   string           `json:"current_task,omitempty"`
	Reservations  []string         `json:"reservations,omitempty"`
	Healthy       bool             `json:"healthy"`
	Profile       *persona.Persona `json:"profile,omitempty"` // Agent's assigned profile for routing

	// Assignment tracking (from bd-1g5t8)
	// Assignments is the count of active assignments for this agent.
	// A value of -1 indicates tracking data is unavailable (fallback mode).
	Assignments    int       `json:"assignments"`
	LastAssignedAt time.Time `json:"last_assigned_at,omitempty"` // When this agent was last assigned work
}

AgentState tracks the current state of an agent pane.

type AgentStatusResult

type AgentStatusResult struct {
	Status       robot.AgentState `json:"status"`
	ContextUsage float64          `json:"context_usage"`
	LastActivity time.Time        `json:"last_activity"`
	Velocity     float64          `json:"velocity"`
	Healthy      bool             `json:"healthy"`
	ErrorMessage string           `json:"error_message,omitempty"`
}

AgentStatusResult holds the result of checking an agent's status.

type Assignment

type Assignment struct {
	Bead       *bv.TriageRecommendation `json:"bead"`
	Agent      *AgentState              `json:"agent"`
	Score      float64                  `json:"score"`
	Reason     string                   `json:"reason"`
	Confidence float64                  `json:"confidence"` // 0-1 confidence in this assignment
	Breakdown  AssignmentScoreBreakdown `json:"breakdown"`
}

Assignment represents an agent-task pairing with reasoning.

func AssignTasks

func AssignTasks(
	beads []*bv.TriageRecommendation,
	agents []*AgentState,
	strategy AssignmentStrategy,
	reservations map[string][]string,
) []Assignment

AssignTasks matches beads to agents using capability scores and availability. It returns optimal assignments based on the specified strategy.

The strategy parameter controls how tasks are distributed:

  • "balanced": spread work evenly across agents
  • "speed": assign tasks to any available agent quickly
  • "quality": assign tasks to the highest-scoring agent
  • "dependency": prioritize blockers and critical path items

The function handles:

  • More beads than agents (some beads unassigned)
  • More agents than beads (some agents idle)
  • Agent availability filtering (idle, sufficient context)

type AssignmentResult

type AssignmentResult struct {
	Success      bool            `json:"success"`
	Assignment   *WorkAssignment `json:"assignment,omitempty"`
	Error        string          `json:"error,omitempty"`
	Reservations []string        `json:"reservations,omitempty"`
	MessageSent  bool            `json:"message_sent"`
}

AssignmentResult contains the result of an assignment attempt.

type AssignmentScoreBreakdown

type AssignmentScoreBreakdown struct {
	BaseScore          float64 `json:"base_score"`           // From bv triage score
	AgentTypeBonus     float64 `json:"agent_type_bonus"`     // Bonus for agent-task match
	CriticalPathBonus  float64 `json:"critical_path_bonus"`  // Bonus for critical path items
	FileOverlapPenalty float64 `json:"file_overlap_penalty"` // Penalty for file conflicts
	ContextPenalty     float64 `json:"context_penalty"`      // Penalty for high context usage
	ProfileTagBonus    float64 `json:"profile_tag_bonus"`    // Bonus for profile tag matches
	FocusPatternBonus  float64 `json:"focus_pattern_bonus"`  // Bonus for focus pattern matches
}

AssignmentScoreBreakdown shows how the score was computed.

type AssignmentStrategy

type AssignmentStrategy string

AssignmentStrategy controls how tasks are distributed to agents.

const (
	// StrategyBalanced spreads work evenly across agents.
	StrategyBalanced AssignmentStrategy = "balanced"
	// StrategySpeed assigns tasks to any available agent as fast as possible.
	StrategySpeed AssignmentStrategy = "speed"
	// StrategyQuality assigns tasks to the highest-scoring agent for quality.
	StrategyQuality AssignmentStrategy = "quality"
	// StrategyDependency prioritizes blockers and critical path items.
	StrategyDependency AssignmentStrategy = "dependency"
	// StrategyRoundRobin distributes tasks evenly in deterministic order.
	// All assignments get score 1.0. First agents get +1 if counts are uneven.
	StrategyRoundRobin AssignmentStrategy = "round-robin"
)

func ParseStrategy

func ParseStrategy(s string) AssignmentStrategy

ParseStrategy converts a string to an AssignmentStrategy.

type Conflict

type Conflict struct {
	ID         string     `json:"id"`
	FilePath   string     `json:"file_path"`
	Pattern    string     `json:"pattern"`
	Holders    []Holder   `json:"holders"`
	DetectedAt time.Time  `json:"detected_at"`
	ResolvedAt *time.Time `json:"resolved_at,omitempty"`
	Resolution string     `json:"resolution,omitempty"`
}

Conflict represents a file reservation conflict between agents.

type ConflictDetector

type ConflictDetector struct {
	// contains filtered or unexported fields
}

ConflictDetector detects and tracks file reservation conflicts.

func NewConflictDetector

func NewConflictDetector(mailClient *agentmail.Client, projectKey string) *ConflictDetector

NewConflictDetector creates a new conflict detector.

func (*ConflictDetector) CheckPathConflict

func (d *ConflictDetector) CheckPathConflict(ctx context.Context, path, excludeAgent string) (*Conflict, error)

CheckPathConflict checks if a specific path would conflict with existing reservations.

func (*ConflictDetector) DetectConflicts

func (d *ConflictDetector) DetectConflicts(ctx context.Context) ([]Conflict, error)

DetectConflicts checks for file reservation conflicts.

type ContextSnapshot

type ContextSnapshot struct {
	Timestamp time.Time `json:"timestamp"`
	Usage     float64   `json:"usage"`
}

ContextSnapshot captures context usage at a point in time.

type CoordinatorConfig

type CoordinatorConfig struct {
	// Monitoring
	PollInterval   time.Duration `toml:"poll_interval"`   // How often to poll agent status (default: 5s)
	DigestInterval time.Duration `toml:"digest_interval"` // How often to send digests (default: 5m)

	// Work assignment
	AutoAssign     bool    `toml:"auto_assign"`      // Automatically assign work to idle agents
	IdleThreshold  float64 `toml:"idle_threshold"`   // Seconds of inactivity before considering idle
	AssignOnlyIdle bool    `toml:"assign_only_idle"` // Only assign to truly idle agents

	// Conflict handling
	ConflictNotify    bool `toml:"conflict_notify"`    // Notify when conflicts detected
	ConflictNegotiate bool `toml:"conflict_negotiate"` // Attempt automatic conflict resolution

	// Agent Mail
	SendDigests bool   `toml:"send_digests"` // Send periodic digests to human
	HumanAgent  string `toml:"human_agent"`  // Agent name to send digests to (default: "Human")
}

CoordinatorConfig holds configuration for the coordinator.

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() CoordinatorConfig

DefaultCoordinatorConfig returns sensible defaults.

type CoordinatorEvent

type CoordinatorEvent struct {
	Type      CoordinatorEventType `json:"type"`
	Timestamp time.Time            `json:"timestamp"`
	AgentID   string               `json:"agent_id,omitempty"`
	Details   map[string]any       `json:"details,omitempty"`
}

CoordinatorEvent represents an event from the coordinator.

type CoordinatorEventType

type CoordinatorEventType string

CoordinatorEventType represents types of coordinator events.

const (
	EventAgentIdle        CoordinatorEventType = "agent_idle"
	EventAgentBusy        CoordinatorEventType = "agent_busy"
	EventAgentError       CoordinatorEventType = "agent_error"
	EventAgentRecovered   CoordinatorEventType = "agent_recovered"
	EventConflictDetected CoordinatorEventType = "conflict_detected"
	EventConflictResolved CoordinatorEventType = "conflict_resolved"
	EventWorkAssigned     CoordinatorEventType = "work_assigned"
	EventDigestSent       CoordinatorEventType = "digest_sent"
	EventDigestFailed     CoordinatorEventType = "digest_failed"
)

type DigestSummary

type DigestSummary struct {
	Session       string              `json:"session"`
	GeneratedAt   time.Time           `json:"generated_at"`
	AgentCount    int                 `json:"agent_count"`
	ActiveCount   int                 `json:"active_count"`
	IdleCount     int                 `json:"idle_count"`
	ErrorCount    int                 `json:"error_count"`
	AgentStatuses []AgentDigestStatus `json:"agent_statuses"`
	Alerts        []string            `json:"alerts,omitempty"`
	WorkSummary   WorkDigestSummary   `json:"work_summary"`
}

DigestSummary contains a summary of session activity.

type HealthCheck

type HealthCheck struct {
	PaneID    string    `json:"pane_id"`
	AgentType string    `json:"agent_type"`
	Healthy   bool      `json:"healthy"`
	Issues    []string  `json:"issues,omitempty"`
	Timestamp time.Time `json:"timestamp"`
}

HealthCheck represents the result of a health check.

type Holder

type Holder struct {
	AgentName  string    `json:"agent_name"`
	PaneID     string    `json:"pane_id,omitempty"`
	ReservedAt time.Time `json:"reserved_at"`
	ExpiresAt  time.Time `json:"expires_at"`
	Reason     string    `json:"reason,omitempty"`
	Priority   int       `json:"priority"` // Lower = higher priority
}

Holder represents an agent holding a reservation.

type QualityMonitor

type QualityMonitor struct {
	// contains filtered or unexported fields
}

QualityMonitor tracks code quality metrics and agent performance. It monitors UBS scan results, test pass/fail rates, error rates per agent, and context usage patterns. This data is used for adjusting agent assignments, triggering alerts, and feeding into session digests.

func NewQualityMonitor

func NewQualityMonitor(projectDir string) *QualityMonitor

NewQualityMonitor creates a new quality monitor for a project.

func (*QualityMonitor) GetAgentMetrics

func (qm *QualityMonitor) GetAgentMetrics(paneID string) *AgentQualityMetrics

GetAgentMetrics returns quality metrics for a specific agent.

func (*QualityMonitor) GetAgentQualityScore

func (qm *QualityMonitor) GetAgentQualityScore(paneID string) float64

GetAgentQualityScore returns a quality score for a specific agent (0-100).

func (*QualityMonitor) GetAllAgentMetrics

func (qm *QualityMonitor) GetAllAgentMetrics() map[string]*AgentQualityMetrics

GetAllAgentMetrics returns quality metrics for all agents.

func (*QualityMonitor) GetLastScanResult

func (qm *QualityMonitor) GetLastScanResult() *scanner.ScanResult

GetLastScanResult returns the most recent scan result.

func (*QualityMonitor) GetQualityScore

func (qm *QualityMonitor) GetQualityScore() float64

GetQualityScore returns an overall quality score (0-100) for agent assignment consideration. Higher is better.

func (*QualityMonitor) GetSummary

func (qm *QualityMonitor) GetSummary() QualitySummary

GetSummary generates a quality summary for use in digests and alerts.

func (*QualityMonitor) IsUBSAvailable

func (qm *QualityMonitor) IsUBSAvailable() bool

IsUBSAvailable returns true if UBS scanning is available.

func (*QualityMonitor) RecordAgentError

func (qm *QualityMonitor) RecordAgentError(paneID, agentType string)

RecordAgentError records an error occurrence for an agent.

func (*QualityMonitor) RecordAgentRecovery

func (qm *QualityMonitor) RecordAgentRecovery(paneID string)

RecordAgentRecovery records when an agent recovers from an error state.

func (*QualityMonitor) RecordBugFixed

func (qm *QualityMonitor) RecordBugFixed(paneID string)

RecordBugFixed records that an agent fixed a bug.

func (*QualityMonitor) RecordBugIntroduced

func (qm *QualityMonitor) RecordBugIntroduced(paneID string)

RecordBugIntroduced records that an agent introduced a bug (detected by UBS).

func (*QualityMonitor) RecordContextUsage

func (qm *QualityMonitor) RecordContextUsage(paneID, agentType string, usage float64)

RecordContextUsage records context window usage for an agent.

func (*QualityMonitor) RecordTestRun

func (qm *QualityMonitor) RecordTestRun(agentPaneID string, passed, failed, skipped int, duration time.Duration, pkg string)

RecordTestRun records the results of a test execution.

func (*QualityMonitor) RunScan

func (qm *QualityMonitor) RunScan(ctx context.Context) (*scanner.ScanResult, error)

RunScan executes a UBS scan on the project directory.

type QualitySummary

type QualitySummary struct {
	Timestamp time.Time `json:"timestamp"`

	// Code quality from UBS
	UBSAvailable     bool `json:"ubs_available"`
	CriticalBugs     int  `json:"critical_bugs"`
	Warnings         int  `json:"warnings"`
	TotalFindings    int  `json:"total_findings"`
	LastScanAge      int  `json:"last_scan_age_minutes"` // Minutes since last scan
	ScanHealthy      bool `json:"scan_healthy"`
	ConsecutiveError int  `json:"consecutive_scan_errors,omitempty"`

	// Test metrics
	TestPassRate   float64 `json:"test_pass_rate"`   // 0-100
	RecentTestRuns int     `json:"recent_test_runs"` // In last hour
	TotalTestRuns  int     `json:"total_test_runs"`
	TestsPassedAll int     `json:"tests_passed_all"`
	TestsFailedAll int     `json:"tests_failed_all"`

	// Agent error metrics
	TotalAgentErrors int     `json:"total_agent_errors"`
	ErrorRate        float64 `json:"error_rate"` // Errors per hour

	// Context usage
	AvgContextUsage  float64 `json:"avg_context_usage"`
	PeakContextUsage float64 `json:"peak_context_usage"`
	HighContextCount int     `json:"high_context_count"` // Agents > 80%

	// Trends
	Trend QualityTrend `json:"trend"`

	// Alerts generated from quality metrics
	Alerts []string `json:"alerts,omitempty"`
}

QualitySummary provides a snapshot of current quality metrics for use in digests and alerts.

type QualityTrend

type QualityTrend struct {
	BugTrend     TrendDirection `json:"bug_trend"`     // Are bugs increasing/decreasing?
	TestTrend    TrendDirection `json:"test_trend"`    // Is test pass rate improving?
	ErrorTrend   TrendDirection `json:"error_trend"`   // Are agent errors increasing?
	ContextTrend TrendDirection `json:"context_trend"` // Is context usage increasing?
	LastUpdated  time.Time      `json:"last_updated"`
}

QualityTrend summarizes the direction of quality metrics.

type ScanMetrics

type ScanMetrics struct {
	Timestamp time.Time `json:"timestamp"`
	Duration  int64     `json:"duration_ms"`
	Critical  int       `json:"critical"`
	Warning   int       `json:"warning"`
	Info      int       `json:"info"`
	Files     int       `json:"files"`
	ExitCode  int       `json:"exit_code"`
}

ScanMetrics summarizes a single UBS scan for historical tracking.

type ScoreConfig

type ScoreConfig struct {
	PreferCriticalPath      bool    // Weight critical path items higher
	PenalizeFileOverlap     bool    // Avoid assigning overlapping files
	UseAgentProfiles        bool    // Match work to agent capabilities
	BudgetAware             bool    // Consider token budgets
	ContextThreshold        float64 // Max context usage before penalizing (percentage 0-100, default 80)
	ProfileTagBoostWeight   float64 // Weight for profile tag matches (default 0.15)
	FocusPatternBoostWeight float64 // Weight for focus pattern matches (default 0.10)
}

ScoreConfig controls how work assignments are scored.

func DefaultScoreConfig

func DefaultScoreConfig() ScoreConfig

DefaultScoreConfig returns a reasonable default configuration.

type ScoredAssignment

type ScoredAssignment struct {
	Assignment     *WorkAssignment
	Recommendation *bv.TriageRecommendation
	Agent          *AgentState
	TotalScore     float64
	ScoreBreakdown AssignmentScoreBreakdown
}

ScoredAssignment pairs an assignment with its computed score breakdown.

func ScoreAndSelectAssignments

func ScoreAndSelectAssignments(
	idleAgents []*AgentState,
	triage *bv.TriageResponse,
	config ScoreConfig,
	existingReservations map[string][]string,
) []ScoredAssignment

ScoreAndSelectAssignments computes optimal agent-task pairings using multi-factor scoring. It returns a list of scored assignments sorted by total score (highest first).

type SessionCoordinator

type SessionCoordinator struct {
	// contains filtered or unexported fields
}

SessionCoordinator manages agent coordination for a tmux session.

func New

func New(session, projectKey string, mailClient *agentmail.Client, agentName string) *SessionCoordinator

New creates a new SessionCoordinator.

func (*SessionCoordinator) AssignWork

func (c *SessionCoordinator) AssignWork(ctx context.Context) ([]AssignmentResult, error)

AssignWork assigns work to idle agents based on bv triage.

func (*SessionCoordinator) Events

func (c *SessionCoordinator) Events() <-chan CoordinatorEvent

Events returns the event channel for external listeners.

func (*SessionCoordinator) GenerateDigest

func (c *SessionCoordinator) GenerateDigest() DigestSummary

GenerateDigest creates a summary of the current session state.

func (*SessionCoordinator) GetAgentByPaneID

func (c *SessionCoordinator) GetAgentByPaneID(paneID string) *AgentState

GetAgentByPaneID returns the state of a specific agent.

func (*SessionCoordinator) GetAgents

func (c *SessionCoordinator) GetAgents() map[string]*AgentState

GetAgents returns the current state of all tracked agents.

func (*SessionCoordinator) GetAssignableWork

func (c *SessionCoordinator) GetAssignableWork(ctx context.Context) ([]bv.TriageRecommendation, error)

GetAssignableWork returns work items that could be assigned to idle agents.

func (*SessionCoordinator) GetIdleAgents

func (c *SessionCoordinator) GetIdleAgents() []*AgentState

GetIdleAgents returns agents that are idle and available for work.

func (*SessionCoordinator) NegotiateConflict

func (c *SessionCoordinator) NegotiateConflict(ctx context.Context, conflict *Conflict, requester string) error

NegotiateConflict attempts to resolve a conflict by requesting release from lower-priority holders.

func (*SessionCoordinator) NotifyConflict

func (c *SessionCoordinator) NotifyConflict(ctx context.Context, conflict *Conflict) error

NotifyConflict sends a notification about a conflict without requesting resolution.

func (*SessionCoordinator) SendDigest

func (c *SessionCoordinator) SendDigest(ctx context.Context) error

SendDigest sends a digest summary to the configured human agent.

func (*SessionCoordinator) Start

func (c *SessionCoordinator) Start(ctx context.Context) error

Start begins coordinator operations.

func (*SessionCoordinator) Stop

func (c *SessionCoordinator) Stop()

Stop halts coordinator operations.

func (*SessionCoordinator) SuggestAssignment

func (c *SessionCoordinator) SuggestAssignment(ctx context.Context, paneID string) (*WorkAssignment, error)

SuggestAssignment suggests the best work for a specific agent without assigning.

func (*SessionCoordinator) WithConfig

WithConfig sets the coordinator configuration.

type TestRunMetrics

type TestRunMetrics struct {
	Timestamp   time.Time `json:"timestamp"`
	AgentPaneID string    `json:"agent_pane_id,omitempty"`
	Passed      int       `json:"passed"`
	Failed      int       `json:"failed"`
	Skipped     int       `json:"skipped"`
	Duration    int64     `json:"duration_ms"`
	Package     string    `json:"package,omitempty"`
}

TestRunMetrics captures results from a test execution.

type TrendDirection

type TrendDirection string

TrendDirection indicates whether a metric is improving, declining, or stable.

const (
	TrendImproving TrendDirection = "improving"
	TrendStable    TrendDirection = "stable"
	TrendDeclining TrendDirection = "declining"
	TrendUnknown   TrendDirection = "unknown"
)

type WorkAssignment

type WorkAssignment struct {
	BeadID         string    `json:"bead_id"`
	BeadTitle      string    `json:"bead_title"`
	AgentPaneID    string    `json:"agent_pane_id"`
	AgentMailName  string    `json:"agent_mail_name,omitempty"`
	AgentType      string    `json:"agent_type"`
	AssignedAt     time.Time `json:"assigned_at"`
	Priority       int       `json:"priority"`
	Score          float64   `json:"score"`
	FilesToReserve []string  `json:"files_to_reserve,omitempty"`
}

WorkAssignment represents a work assignment to an agent.

type WorkDigestSummary

type WorkDigestSummary struct {
	PendingTasks    int      `json:"pending_tasks"`
	InProgressTasks int      `json:"in_progress_tasks"`
	CompletedToday  int      `json:"completed_today"`
	BlockedTasks    int      `json:"blocked_tasks"`
	TopReady        []string `json:"top_ready,omitempty"`
}

WorkDigestSummary summarizes work status.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL