coordinator

package
v0.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0 Imports: 45 Imported by: 0

Documentation

Overview

Package coordinator provides approval checkpoints for task execution.

Package coordinator provides unified approval processing for CLI, Dashboard, and Daemon.

Package coordinator provides task coordination and event formatting.

Package coordinator provides the event handler for streaming executor events.

Package coordinator provides human interaction handling for the feedback loop.

Package coordinator provides git merge operations for approved worktrees.

Package coordinator provides task coordination and execution.

Package coordinator provides task execution and orchestration for the AILANG daemon.

Package coordinator provides task execution and orchestration for the AILANG daemon.

Package coordinator provides resource tracking for task execution.

Index

Constants

View Source
const (
	CoordinatorModeLocal = "local" // Default: SQLite polling + HTTP broadcaster
	CoordinatorModeCloud = "cloud" // Pub/Sub subscriptions + Pub/Sub broadcaster
)

CoordinatorMode determines how the coordinator receives messages and broadcasts events.

View Source
const LabelInProgress = "coordinator:in-progress"

LabelInProgress is the claim label used to prevent race conditions when multiple coordinator instances try to pick up the same issue.

View Source
const LabelNeedsRevision = "needs-revision"

LabelNeedsRevision is the universal label for requesting revisions. This is the only hardcoded label - all others come from AgentConfig.Approval.

View Source
const MaxAgentIterations = 3

MaxAgentIterations is the limit for agent-to-agent handoffs to prevent infinite loops. Human rejections (via dashboard or CLI) have no iteration limit.

Variables

View Source
var CommentTemplates = struct {
	Working            *template.Template
	DesignDocComplete  *template.Template
	SprintPlanReady    *template.Template
	ImplementComplete  *template.Template
	EvaluationComplete *template.Template
	MergeComplete      *template.Template
	NeedsRevision      *template.Template
	Error              *template.Template
}{
	Working:            template.Must(template.New("working").Parse(workingTemplate)),
	DesignDocComplete:  template.Must(template.New("design").Parse(designDocCompleteTemplate)),
	SprintPlanReady:    template.Must(template.New("sprint").Parse(sprintPlanReadyTemplate)),
	ImplementComplete:  template.Must(template.New("implement").Parse(implementCompleteTemplate)),
	EvaluationComplete: template.Must(template.New("evaluation").Parse(evaluationCompleteTemplate)),
	MergeComplete:      template.Must(template.New("merge").Parse(mergeCompleteTemplate)),
	NeedsRevision:      template.Must(template.New("revision").Parse(needsRevisionTemplate)),
	Error:              template.Must(template.New("error").Parse(errorTemplate)),
}

CommentTemplates holds all the comment templates.

View Source
var DefaultBotPatterns = []string{
	"[bot]",
	"github-actions",
	"dependabot",
	"renovate",
	"codecov",
	"stale",
	"ailang-agent",
	"sunholo-voight-kampff",
}

DefaultBotPatterns contains common bot username patterns to filter.

View Source
var ErrWorktreeLimitReached = errors.New("max worktrees limit reached")

ErrWorktreeLimitReached is returned when no worktrees are available. Tasks receiving this error should remain in queue for retry, not fail.

Functions

func BuildDirectiveFromConfig

func BuildDirectiveFromConfig(task *TaskRecord, agent *AgentConfig) string

BuildDirectiveFromConfig creates a directive based on agent configuration. This is the config-driven approach that replaces hardcoded skill names.

Supports three invoke types:

  • "skill": Invokes a Claude Code skill by name (e.g., "/design-doc-creator")
  • "agent": Sends a message to another agent (e.g., "sprint-planner")
  • "prompt": Uses a custom template with variable substitution

Template variables available for prompt type:

  • {{.TaskID}}: The task ID
  • {{.GithubIssue}}: The GitHub issue number
  • {{.Content}}: The original task content
  • {{.Stage}}: The current task stage
  • {{.OutputMarkers}}: Comma-separated list of expected output markers

func BuildStageDirective

func BuildStageDirective(task *TaskRecord) string

BuildStageDirective creates a stage-appropriate directive for GitHub-linked tasks. This is the key integration point - tasks at different stages get different prompts that invoke the appropriate skills.

This function uses the config-driven approach by creating a temporary AgentConfig with the stage-appropriate agent ID, which triggers the effective defaults.

func BuildSystemPrompt

func BuildSystemPrompt(taskType TaskType, agentConfig *AgentConfig) string

BuildSystemPrompt constructs the full system prompt for an agent. It combines: global meta-prompt + task type context + per-agent system prompt (if any).

func CalculatePriority

func CalculatePriority(analyzed *AnalyzedTask) int

CalculatePriority calculates task priority based on keywords and type

func CanRetrigger

func CanRetrigger(task *TaskRecord, channels ...string) bool

CanRetrigger checks if a task can be re-triggered. Human channels (dashboard, cli) have no limit; agent-to-agent has a max of MaxAgentIterations.

func CollectVersionHistory

func CollectVersionHistory(
	ctx context.Context,
	msgStore messaging.MessageStore,
	taskStore Store,
	task *TaskRecord,
	pkgName, version, previousVersion string,
) *pkg.VersionHistory

CollectVersionHistory builds a VersionHistory by aggregating existing data sources: - The triggering inbox message (from messaging store) - Task events (from task store: turns, tool uses, completions, errors) - Approval history (from messaging store: created, approved, rejected)

No new data capture needed — everything is already recorded during execution.

func CountTurns

func CountTurns(events []*TaskEventRecord) int

CountTurns returns the number of turns in the event list.

func DefaultArtifactPatterns

func DefaultArtifactPatterns(agentID string) []string

DefaultArtifactPatterns returns the default file patterns for artifact discovery. These patterns are used with git diff to deterministically discover created/modified artifacts.

For known AILANG agents, returns specific patterns for their typical outputs. For unknown agents, returns universal pattern "**/*" to discover ALL changed files. This ensures artifact discovery works for any agent without hardcoded configuration.

func DefaultOutputMarkers

func DefaultOutputMarkers(agentID string) []string

DefaultOutputMarkers returns the default output markers for known AILANG agent IDs. Returns nil for unknown agents (no markers expected). Used by GetEffectiveOutputMarkers() when agent has no explicit YAML config. Consider using ArtifactPatterns + git diff instead for deterministic artifact discovery.

func DefaultServerURL

func DefaultServerURL() string

DefaultServerURL returns the default Collaboration Hub server URL Note: Use 127.0.0.1 instead of localhost to avoid IPv6 resolution issues (Go's HTTP client tries IPv6 first, but server may only bind to IPv4)

func DeriveWorkspaceFromPath

func DeriveWorkspaceFromPath(path string) string

DeriveWorkspaceFromPath extracts a workspace ID from a file path. Uses the last two meaningful path segments (parent/basename) as the workspace ID. This is portable across different directory structures.

Examples:

  • /Users/mark/dev/sunholo/ailang -> sunholo/ailang
  • /home/user/projects/rockwool/ROCKGAP -> rockwool/ROCKGAP
  • /path/to/TwilightGame -> to/TwilightGame (or just TwilightGame if only 1 meaningful segment)
  • /tmp/foo -> foo

func ExtractFeedbackFromComments

func ExtractFeedbackFromComments(comments []IssueComment) string

ExtractFeedbackFromComments combines all human comments into a single feedback string.

func FormatEventsAsText

func FormatEventsAsText(events []*TaskEventRecord, opts *FormatOptions) string

FormatEventsAsText formats events for human-readable CLI output. Returns a string with turn separators and tool highlighting.

func FormatWebhookSetupCommand

func FormatWebhookSetupCommand(repo, coordinatorURL, secret string) string

FormatWebhookSetupCommand returns the gh CLI command to configure a webhook for the given repo and coordinator URL.

func GetCurrentPID

func GetCurrentPID() int

GetCurrentPID returns the current process PID (useful for self-monitoring)

func GetDefaultBranch

func GetDefaultBranch(repoPath string) string

GetDefaultBranch queries git for the remote's default branch. It first tries git symbolic-ref, then falls back to git remote show. This avoids hardcoding branch names like "main" or "dev".

func GetTurnTimestamp

func GetTurnTimestamp(events []*TaskEventRecord, turnNum int) *time.Time

GetTurnTimestamp returns the timestamp of the first event in a turn.

func GetWorktreeDiff

func GetWorktreeDiff(ctx context.Context, worktreePath, baseBranch, baseCommit string) (string, error)

GetWorktreeDiff returns the git diff for a worktree against its base. baseCommit is the commit hash at worktree creation (stable - branch may have moved). baseBranch is the branch name (may have moved since worktree creation). Prefers baseCommit if provided, falls back to baseBranch, then queries git for default.

func HasMarkerValue

func HasMarkerValue(output, marker, expectedValue string) bool

HasMarkerValue checks if a specific marker value is present in output. Useful for boolean markers like "IMPLEMENTATION_COMPLETE: true".

func IsBotUser

func IsBotUser(username string, additionalPatterns ...string) bool

IsBotUser checks if a username matches known bot patterns.

func IsCloudMode

func IsCloudMode() bool

IsCloudMode returns true if running in cloud mode.

func ParseOutputMarkers

func ParseOutputMarkers(output string, markers []string) map[string]string

ParseOutputMarkers extracts values for configured markers from execution output. This is the config-driven approach that replaces the hardcoded ParseStageOutput.

It searches for each marker in the output and extracts the value that follows. Markers should be in the format "MARKER_NAME:" (with or without trailing colon).

Returns a map of marker name -> extracted value. Returns empty map if no markers are found or markers slice is empty.

For multi-value markers (comma-separated), the full comma-separated string is returned. Use splitMarkerValues() to split into individual values if needed.

func ParsePayloadToEnv

func ParsePayloadToEnv(payload string) ([]string, error)

ParsePayloadToEnv converts a JSON payload to environment variables. Keys are converted to UPPER_SNAKE_CASE. Nested objects are flattened: {"db": {"host": "x"}} → DB_HOST=x Arrays become comma-separated: ["a", "b"] → VALUE=a,b

func PrepareTaskForRetrigger

func PrepareTaskForRetrigger(task *TaskRecord, feedback string)

PrepareTaskForRetrigger prepares feedback content for a re-triggered task. NOTE (M-TASK-HIERARCHY): This function no longer modifies task status. The new workflow creates a NEW task via the message system with parent_task_id linking, and the old task is marked as "rejected" separately via store.MarkTaskRejected().

This function now only: - Increments iteration count for tracking - Appends feedback to content for context propagation

func RenderComment

func RenderComment(tmpl *template.Template, data *CommentData) (string, error)

RenderComment renders a comment template with the given data.

func RenderDesignDocComment

func RenderDesignDocComment(data *CommentData) (string, error)

RenderDesignDocComment renders the design doc complete comment.

func RenderErrorComment

func RenderErrorComment(taskID, stage, errMsg string) (string, error)

RenderErrorComment renders the error comment.

func RenderEvaluationComment

func RenderEvaluationComment(data *CommentData) (string, error)

RenderEvaluationComment renders the evaluation complete comment.

func RenderImplementCompleteComment

func RenderImplementCompleteComment(data *CommentData) (string, error)

RenderImplementCompleteComment renders the implementation complete comment.

func RenderMergeCompleteComment

func RenderMergeCompleteComment(data *CommentData) (string, error)

RenderMergeCompleteComment renders the merge complete comment.

func RenderRevisionComment

func RenderRevisionComment(taskID, stage string) (string, error)

RenderRevisionComment renders the needs revision comment.

func RenderSprintPlanComment

func RenderSprintPlanComment(data *CommentData) (string, error)

RenderSprintPlanComment renders the sprint plan ready comment.

func RenderWorkingComment

func RenderWorkingComment(taskID, agent, stage string) (string, error)

RenderWorkingComment renders the "working" status comment.

func SampleAgentConfig

func SampleAgentConfig() string

SampleAgentConfig returns a sample configuration string for documentation.

func SanitizeLog

func SanitizeLog[S ~string](s S) string

SanitizeLog strips CR, LF, and ASCII control characters from a string so it cannot forge log lines when interpolated into a log message. Wrap any user-controlled value (task IDs, labels, repo names, issue titles) at the log call site. The generic constraint allows named string types (e.g. ApprovalEventType) without an explicit cast.

func ScheduleCascadeUpdate

func ScheduleCascadeUpdate(index *pkg.RegistryIndex, triggerPkg string) ([]string, error)

ScheduleCascadeUpdate determines the execution order for packages affected by an update to triggerPkg. Returns package names in topological order (leaf deps first). Returns an error if a cycle is detected.

func SplitMarkerValues

func SplitMarkerValues(value string) []string

SplitMarkerValues splits a comma-separated marker value into individual values. Useful for markers like "FILES_CREATED: file1.go, file2.go, file3.go". Returns nil if value is empty or "none"/"None".

func StoreApprovalEvent

func StoreApprovalEvent(ctx context.Context, store Store, taskID string, approvedBy string) error

StoreApprovalEvent stores human approval as a task event.

func StoreFeedbackEvent

func StoreFeedbackEvent(ctx context.Context, store Store, feedback *HumanFeedback) error

StoreFeedbackEvent stores human feedback as a task event for audit trail.

func StoreIterationStartEvent

func StoreIterationStartEvent(ctx context.Context, store Store, taskID string, iteration int) error

StoreIterationStartEvent marks the start of a new task iteration.

func SummarizeEvents

func SummarizeEvents(events []*TaskEventRecord) string

SummarizeEvents returns a brief summary of the events.

Types

type APIKeyCache

type APIKeyCache struct {
	// contains filtered or unexported fields
}

APIKeyCache is a thread-safe in-memory cache for user-provided API keys. Keys are stored with a TTL and deleted after retrieval (one-time use). M-CLOUD-DUAL-AUTH: API keys NEVER touch Firestore or Secret Manager.

func NewAPIKeyCache

func NewAPIKeyCache(ttl time.Duration) *APIKeyCache

NewAPIKeyCache creates a cache with the given TTL.

func (*APIKeyCache) Retrieve

func (c *APIKeyCache) Retrieve(messageID string) (string, bool)

Retrieve returns the API key for a message ID and deletes it (one-time use). Returns ("", false) if not found or expired.

func (*APIKeyCache) Store

func (c *APIKeyCache) Store(messageID, apiKey string)

Store saves an API key keyed by message ID.

type AgentConfig

type AgentConfig struct {
	ID                  string   `yaml:"id" json:"id"`
	Label               string   `yaml:"label" json:"label"`
	Inbox               string   `yaml:"inbox" json:"inbox"`                                 // Message inbox to watch
	Workspace           string   `yaml:"workspace" json:"workspace"`                         // Base directory for worktrees
	Capabilities        []string `yaml:"capabilities" json:"capabilities"`                   // e.g., ["code", "research", "docs"]
	TriggerOnComplete   []string `yaml:"trigger_on_complete" json:"trigger_on_complete"`     // Agent IDs to trigger when this agent completes
	AutoApproveHandoffs bool     `yaml:"auto_approve_handoffs" json:"auto_approve_handoffs"` // Skip approval for agent-to-agent handoffs
	AutoMerge           bool     `yaml:"auto_merge" json:"auto_merge"`                       // Automatically merge approved work
	SkipApproval        bool     `yaml:"skip_approval" json:"skip_approval"`                 // Skip approval workflow entirely (for script agents)
	Provider            string   `yaml:"provider" json:"provider"`                           // "claude" or "gemini"
	MergeBranch         string   `yaml:"merge_branch" json:"merge_branch"`                   // Target branch for merges (e.g., "dev", "main")
	MaxConcurrentTasks  int      `yaml:"max_concurrent_tasks" json:"max_concurrent_tasks"`   // 0 = unlimited
	SessionContinuity   bool     `yaml:"session_continuity" json:"session_continuity"`       // Use --resume for Claude Code / --conversation-id for Gemini

	// Generic workflow configuration (v0.6.3+)
	Invoke           *InvokeConfig   `yaml:"invoke" json:"invoke,omitempty"`                       // How to invoke this agent
	OutputMarkers    []string        `yaml:"output_markers" json:"output_markers,omitempty"`       // Markers to extract from output (e.g., "DESIGN_DOC_PATH:")
	ArtifactPatterns []string        `yaml:"artifact_patterns" json:"artifact_patterns,omitempty"` // File patterns for artifacts (e.g., "*.md", "design_docs/**")
	Approval         *ApprovalConfig `yaml:"approval" json:"approval,omitempty"`                   // Approval workflow configuration

	// Per-agent system prompt (v0.8.0+)
	// Appended to the global meta-prompt for agent-specific instructions.
	SystemPrompt string `yaml:"system_prompt" json:"system_prompt,omitempty"`

	// Per-agent model override (v0.8.0+)
	// Controls which Claude model is used for this agent's tasks.
	// Examples: "opus", "sonnet", "haiku", "claude-opus-4-5-20251101"
	// If empty, falls back to the executor's default (currently "haiku").
	Model string `yaml:"model" json:"model,omitempty"`

	// Per-agent execution timeout (v0.8.1+)
	// Go duration string (e.g., "15m", "30m", "1h"). Default: 60m (hard ceiling).
	Timeout string `yaml:"timeout" json:"timeout,omitempty"`

	// Per-agent idle timeout (v0.8.1+)
	// Kill if no streaming events for this long. Default: 3m.
	// Distinguishes "agent is stuck" from "agent is working but slow".
	IdleTimeout string `yaml:"idle_timeout" json:"idle_timeout,omitempty"`

	// Per-agent effort level (Claude Code 2.1.47+: "low", "medium", "high")
	// Controls how much effort Claude puts into the task. Default: unset (Claude's default).
	Effort string `yaml:"effort" json:"effort,omitempty"`

	// Per-agent plugin directories (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
	// Local paths to Claude Code plugin directories containing .claude-plugin/plugin.json.
	// Passed as --plugin-dir flags. Complementary to coordinator-level PluginRepo (for cloud).
	PluginDirs []string `yaml:"plugin_dirs" json:"plugin_dirs,omitempty"`

	// Per-agent third-party plugin configuration (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
	// Installs marketplace and custom plugins before task execution.
	Plugins *PluginsConfig `yaml:"plugins" json:"plugins,omitempty"`

	// Per-agent auth mode override (M-CLOUD-DUAL-AUTH, v0.9.2).
	// "oauth" (default) or "apikey" — selects Cloud Run Job template.
	// User-provided API keys from messages override this setting.
	AuthMode string `yaml:"auth_mode" json:"auth_mode,omitempty"`

	// Per-agent git mode (M-GIT-GUARDRAILS, v0.9.2).
	// Controls how the PreToolUse git_guard.sh hook restricts git operations:
	// "guardrails" (default) — reads + commits allowed, push only to expected branch
	// "strict" — all git write operations blocked (read-only)
	// "permissive" — all allowed except force-push and reset --hard
	GitMode string `yaml:"git_mode" json:"git_mode,omitempty"`

	// Subdirectory within the workspace for monorepo support (M-PKG-AUTONOMOUS-UPDATES, v0.10.0).
	// When set, the agent's working directory is scoped to this path within the worktree.
	// Used for package agents that operate on a specific package within a monorepo.
	// Example: "packages/auth" for a package agent in the ailang-packages monorepo.
	Subdirectory string `yaml:"subdirectory" json:"subdirectory,omitempty"`
}

AgentConfig represents a configured agent in the coordinator system. Each agent has an inbox, workspace, and capabilities for task execution.

func AdjustAutonomyForChangeClass

func AdjustAutonomyForChangeClass(agent *AgentConfig, msg *messaging.InboxMessage) *AgentConfig

AdjustAutonomyForChangeClass modifies an agent's approval settings based on the incoming package message's change class. Returns a copy of the config with adjusted autonomy levels. Non-package messages pass through unchanged.

func (*AgentConfig) GetEffectiveApprovalConfig

func (a *AgentConfig) GetEffectiveApprovalConfig() *ApprovalConfig

GetEffectiveApprovalConfig returns the agent's approval config, or defaults for known agents.

func (*AgentConfig) GetEffectiveArtifactPatterns

func (a *AgentConfig) GetEffectiveArtifactPatterns() []string

GetEffectiveArtifactPatterns returns the agent's artifact patterns, or defaults for known agents. These patterns are used with git diff to discover created/modified files.

func (*AgentConfig) GetEffectiveIdleTimeout

func (a *AgentConfig) GetEffectiveIdleTimeout() time.Duration

GetEffectiveIdleTimeout returns the agent's configured idle timeout, or the default (3m). The agent is killed if no streaming events are produced for this duration. Safe to call on nil receiver.

func (*AgentConfig) GetEffectiveInvokeConfig

func (a *AgentConfig) GetEffectiveInvokeConfig() *InvokeConfig

GetEffectiveInvokeConfig returns the agent's invoke config, or defaults for known agents. Returns nil for unknown agents without explicit config.

Note: Deprecation warnings for using defaults should be logged by the caller, as they have logger access. Check if result differs from explicit config.

func (*AgentConfig) GetEffectiveOutputMarkers

func (a *AgentConfig) GetEffectiveOutputMarkers() []string

GetEffectiveOutputMarkers returns the agent's output markers, or defaults for known agents.

func (*AgentConfig) GetEffectiveTimeout

func (a *AgentConfig) GetEffectiveTimeout() time.Duration

GetEffectiveTimeout returns the agent's configured hard ceiling timeout, or the default (60m). This is the maximum wall-clock time regardless of activity. Safe to call on nil receiver.

type AgentRegistry

type AgentRegistry struct {
	// contains filtered or unexported fields
}

AgentRegistry manages the set of configured agents. It provides thread-safe lookup by ID or inbox.

func LoadAgentRegistry

func LoadAgentRegistry() (*AgentRegistry, error)

LoadAgentRegistry loads agents from config and returns a populated registry.

func LoadAgentRegistryFrom

func LoadAgentRegistryFrom(configPath string) (*AgentRegistry, error)

LoadAgentRegistryFrom loads agents from a specific config path.

func NewAgentRegistry

func NewAgentRegistry() *AgentRegistry

NewAgentRegistry creates an empty agent registry.

func (*AgentRegistry) Clear

func (r *AgentRegistry) Clear()

Clear removes all registered agents.

func (*AgentRegistry) Count

func (r *AgentRegistry) Count() int

Count returns the number of registered agents.

func (*AgentRegistry) GetAgentByID

func (r *AgentRegistry) GetAgentByID(id string) *AgentConfig

GetAgentByID returns an agent by its ID. Returns nil if not found.

func (*AgentRegistry) GetAgentForInbox

func (r *AgentRegistry) GetAgentForInbox(inbox string) *AgentConfig

GetAgentForInbox returns the agent configured to watch the given inbox. Returns nil if no agent watches that inbox.

func (*AgentRegistry) HasAgent

func (r *AgentRegistry) HasAgent(id string) bool

HasAgent returns true if an agent with the given ID is registered.

func (*AgentRegistry) HasInbox

func (r *AgentRegistry) HasInbox(inbox string) bool

HasInbox returns true if an agent is registered for the given inbox.

func (*AgentRegistry) ListAgents

func (r *AgentRegistry) ListAgents() []*AgentConfig

ListAgents returns all registered agents.

func (*AgentRegistry) ListInboxes

func (r *AgentRegistry) ListInboxes() []string

ListInboxes returns all registered inbox names.

func (*AgentRegistry) Register

func (r *AgentRegistry) Register(agent *AgentConfig) error

Register adds an agent to the registry. Returns an error if an agent with the same ID or inbox already exists.

func (*AgentRegistry) Unregister

func (r *AgentRegistry) Unregister(id string) error

Unregister removes an agent by ID. Returns an error if the agent is not found.

func (*AgentRegistry) Validate

func (r *AgentRegistry) Validate() []string

Validate checks that the registry is internally consistent. Returns a list of issues found.

type AgentResult

type AgentResult struct {
	// Artifact paths (set whichever is relevant)
	ArtifactPath    string   // Primary artifact (design doc, sprint plan, etc.)
	ArtifactContent string   // Optional: content for GitHub comment display
	AllArtifacts    []string // All discovered artifacts from git diff

	// Execution metrics
	Duration     time.Duration
	Cost         float64
	TokensUsed   int
	InputTokens  int
	OutputTokens int

	// Implementation-specific (for sprint-executor type agents)
	BranchName    string
	WorktreePath  string
	FilesCreated  []string
	FilesModified []string
}

AgentResult contains the unified result of any agent completion. This struct supports all agent types (design-doc-creator, sprint-planner, sprint-executor, or any custom agent).

type AnalyzedTask

type AnalyzedTask struct {
	Task        *Task
	Type        TaskType
	Keywords    []string
	Fingerprint uint64
	DuplicateOf string // ID of duplicate task, if any

	// Capability detection (migrated from internal/agent)
	Capabilities  []Capability `json:"capabilities,omitempty"`   // Detected capability requirements
	ImpactLevel   string       `json:"impact_level,omitempty"`   // "low", "medium", or "high"
	EstimatedCost float64      `json:"estimated_cost,omitempty"` // Pre-execution cost estimate in USD
}

AnalyzedTask is a task with analysis metadata

type ApprovalCallback

type ApprovalCallback func(request *ApprovalRequest)

ApprovalCallback is called when an approval is resolved

type ApprovalCheckpoint

type ApprovalCheckpoint struct {
	// contains filtered or unexported fields
}

ApprovalCheckpoint manages approval requests for a coordinator. It provides blocking wait for human decisions and timeout handling.

func NewApprovalCheckpoint

func NewApprovalCheckpoint(defaultTimeout time.Duration) *ApprovalCheckpoint

NewApprovalCheckpoint creates a new approval checkpoint manager

func (*ApprovalCheckpoint) Approve

func (ac *ApprovalCheckpoint) Approve(requestID string, resolvedBy string) error

Approve approves an approval request

func (*ApprovalCheckpoint) ApproveByTask

func (ac *ApprovalCheckpoint) ApproveByTask(taskID string, resolvedBy string) error

ApproveByTask approves the pending request for a task

func (*ApprovalCheckpoint) Clear

func (ac *ApprovalCheckpoint) Clear()

Clear removes all requests (for testing/cleanup)

func (*ApprovalCheckpoint) Count

func (ac *ApprovalCheckpoint) Count() int

Count returns the number of pending approval requests

func (*ApprovalCheckpoint) GetPendingRequests

func (ac *ApprovalCheckpoint) GetPendingRequests() []*ApprovalRequest

GetPendingRequests returns all pending approval requests

func (*ApprovalCheckpoint) GetRequest

func (ac *ApprovalCheckpoint) GetRequest(requestID string) *ApprovalRequest

GetRequest returns an approval request by ID

func (*ApprovalCheckpoint) GetRequestByTask

func (ac *ApprovalCheckpoint) GetRequestByTask(taskID string) *ApprovalRequest

GetRequestByTask returns the approval request for a task

func (*ApprovalCheckpoint) HasPendingApproval

func (ac *ApprovalCheckpoint) HasPendingApproval(taskID string) bool

HasPendingApproval checks if a task has a pending approval request

func (*ApprovalCheckpoint) Reject

func (ac *ApprovalCheckpoint) Reject(requestID string, resolvedBy string) error

Reject rejects an approval request

func (*ApprovalCheckpoint) RejectByTask

func (ac *ApprovalCheckpoint) RejectByTask(taskID string, resolvedBy string) error

RejectByTask rejects the pending request for a task

func (*ApprovalCheckpoint) RequestApproval

func (ac *ApprovalCheckpoint) RequestApproval(ctx context.Context, request *ApprovalRequest) (ApprovalStatus, error)

RequestApproval creates a new approval request and blocks until resolved. Returns the approval status (approved, rejected, or timeout).

func (*ApprovalCheckpoint) SetCallback

func (ac *ApprovalCheckpoint) SetCallback(callback ApprovalCallback)

SetCallback sets the callback for approval resolution events

type ApprovalConfig

type ApprovalConfig struct {
	NeedsLabel            string `yaml:"needs_label" json:"needs_label"`                         // Label to add when awaiting approval (e.g., "needs-design-approval")
	ApprovedLabel         string `yaml:"approved_label" json:"approved_label"`                   // Label that triggers next stage (e.g., "design-approved")
	GithubCommentTemplate string `yaml:"github_comment_template" json:"github_comment_template"` // Template for GitHub comment on completion
}

ApprovalConfig specifies the approval workflow for an agent. Controls GitHub labels and comment templates for human-in-the-loop approval.

func DefaultApprovalConfig

func DefaultApprovalConfig(agentID string) *ApprovalConfig

DefaultApprovalConfig returns the default approval config for known AILANG agent IDs. Returns nil for unknown agents (no approval workflow). Used by GetEffectiveApprovalConfig() when agent has no explicit YAML config.

type ApprovalEvent

type ApprovalEvent struct {
	TaskID      string
	IssueNumber int
	Label       string
	EventType   ApprovalEventType
	// Feedback contains human comments harvested from GitHub (may be empty)
	Feedback string
	// FeedbackAuthor is the author of the most recent human comment
	FeedbackAuthor string
	// Channel indicates where the approval came from (github, dashboard, cli)
	Channel string
}

ApprovalEvent represents a detected approval or revision request

type ApprovalEventType

type ApprovalEventType string

ApprovalEventType categorizes the type of approval event

const (
	ApprovalEventDesign   ApprovalEventType = "design-approved"
	ApprovalEventSprint   ApprovalEventType = "sprint-approved"
	ApprovalEventMerge    ApprovalEventType = "merge-approved"
	ApprovalEventRevision ApprovalEventType = "needs-revision"
)

type ApprovalHandler

type ApprovalHandler func(ctx context.Context, event *ApprovalEvent) error

ApprovalHandler is called when an approval is detected

type ApprovalParams

type ApprovalParams struct {
	TaskID      string // Required: task ID or approval ID (apr-xxx converted to task-xxx)
	Action      string // Required: "approve" or "reject"
	ApprovedBy  string // Required: who is approving/rejecting (e.g., "cli-user", "dashboard-user")
	Channel     string // Required: source channel ("cli", "dashboard", "daemon")
	Feedback    string // Optional: feedback text for rejections
	MergeBranch string // Optional: target branch for merge (defaults from config, then channel)

	// Behavior options
	SkipMerge         bool // If true, don't merge worktree on approval
	KeepWorktree      bool // If true, don't clean up worktree after merge
	RetriggerOnReject bool // If true, send feedback to agent for re-attempt (feedback loop)

	// Dependencies (injected by caller)
	Store         Store                  // Required: coordinator store for task/approval operations
	MsgStore      messaging.MessageStore // Optional: messaging store for sending feedback to agents
	GitHubPoster  *GitHubPoster          // Optional: for posting feedback to GitHub issues
	AgentRegistry *AgentRegistry         // Optional: for looking up per-agent merge branch
	ObsBackend    observatory.Backend    // Optional: for updating chain/stage status (M-CHAINS-SIMPLIFY)
}

ApprovalParams contains all parameters for processing an approval or rejection. This is the single interface used by CLI, Dashboard, and Daemon.

type ApprovalRequest

type ApprovalRequest struct {
	ID           string         `json:"id"`
	TaskID       string         `json:"task_id"`
	ThreadID     string         `json:"thread_id,omitempty"`
	Type         ApprovalType   `json:"type"`
	Status       ApprovalStatus `json:"status"`
	Title        string         `json:"title"`
	Description  string         `json:"description"`
	DiffSummary  string         `json:"diff_summary,omitempty"`
	FilesChanged []string       `json:"files_changed,omitempty"`
	CreatedAt    time.Time      `json:"created_at"`
	ResolvedAt   *time.Time     `json:"resolved_at,omitempty"`
	ResolvedBy   string         `json:"resolved_by,omitempty"`
	Timeout      time.Duration  `json:"timeout"`
	AutoReject   bool           `json:"auto_reject"` // Reject on timeout instead of approve

	// Handoff-specific fields (used when Type == ApprovalTypeHandoff)
	SourceAgentID string `json:"source_agent_id,omitempty"` // Agent that completed the task
	TargetAgentID string `json:"target_agent_id,omitempty"` // Agent to hand off to
	SessionID     string `json:"session_id,omitempty"`      // Claude Code/Gemini CLI session for continuity
	HandoffData   string `json:"handoff_data,omitempty"`    // Additional context for handoff
}

ApprovalRequest represents a pending approval request

type ApprovalRequestRecord

type ApprovalRequestRecord struct {
	ID          string     `json:"id"`
	TaskID      string     `json:"task_id"`
	Type        string     `json:"type"`
	Description string     `json:"description"`
	ContextJSON string     `json:"context_json,omitempty"`
	Status      string     `json:"status"`
	ResolvedBy  string     `json:"resolved_by,omitempty"`
	CreatedAt   time.Time  `json:"created_at"`
	ResolvedAt  *time.Time `json:"resolved_at,omitempty"`
	TimeoutAt   *time.Time `json:"timeout_at,omitempty"`
	AutoReject  bool       `json:"auto_reject"`
}

ApprovalRequestRecord is the database record for an approval request

type ApprovalResult

type ApprovalResult struct {
	Success       bool
	Message       string   // Human-readable result message
	MergeCommit   string   // Commit hash if merged
	MergedFiles   []string // Files that were merged
	ConflictFiles []string // Files with conflicts (if merge failed)
	NewTaskID     string   // ID of new task if re-triggered
	Error         string   // Error message if failed
}

ApprovalResult contains the result of processing an approval or rejection.

func ProcessApprovalRequest

func ProcessApprovalRequest(ctx context.Context, params *ApprovalParams) (*ApprovalResult, error)

ProcessApprovalRequest handles approval/rejection from any channel (CLI, dashboard, daemon). This is the SINGLE source of truth for approval logic.

type ApprovalStatus

type ApprovalStatus string

ApprovalStatus represents the state of an approval request

const (
	ApprovalStatusPending  ApprovalStatus = "pending"
	ApprovalStatusApproved ApprovalStatus = "approved"
	ApprovalStatusRejected ApprovalStatus = "rejected"
	ApprovalStatusTimeout  ApprovalStatus = "timeout"
)

type ApprovalStore

type ApprovalStore interface {
	CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error
	GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error)
	GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)
	ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)
	ResolveApprovalRequest(ctx context.Context, id string, status string, resolvedBy string) error
}

ApprovalStore interface for persistence

type ApprovalType

type ApprovalType string

ApprovalType defines the kind of approval being requested

const (
	ApprovalTypeMerge   ApprovalType = "merge"   // Request to merge worktree changes
	ApprovalTypeDestroy ApprovalType = "destroy" // Request to destroy worktree with changes
	ApprovalTypeExecute ApprovalType = "execute" // Request to execute a destructive operation
	ApprovalTypeCost    ApprovalType = "cost"    // Cost threshold exceeded
	ApprovalTypeHandoff ApprovalType = "handoff" // Request to hand off work to another agent
)

type ApprovalWatcher

type ApprovalWatcher struct {
	// contains filtered or unexported fields
}

ApprovalWatcher polls GitHub for approval labels and triggers pipeline stages. It watches issues linked to tasks and detects when humans add approval labels.

The watcher supports two modes: 1. Legacy hardcoded labels (design-approved, sprint-approved, merge-approved) 2. Config-driven labels (from AgentConfig.Approval)

needs-revision is universal and works with both modes.

func NewApprovalWatcher

func NewApprovalWatcher(poster *GitHubPoster, store Store, pollInterval time.Duration) *ApprovalWatcher

NewApprovalWatcher creates a new approval watcher.

func (*ApprovalWatcher) GetAgentByLabel

func (w *ApprovalWatcher) GetAgentByLabel(label string) *AgentConfig

GetAgentByLabel returns the agent config for a given approval label. Returns nil if no agent is registered for that label.

func (*ApprovalWatcher) GetRegisteredLabels

func (w *ApprovalWatcher) GetRegisteredLabels() []string

GetRegisteredLabels returns all registered custom approval labels.

func (*ApprovalWatcher) GetStatus

func (w *ApprovalWatcher) GetStatus() WatcherStatus

GetStatus returns the current status of the watcher.

func (*ApprovalWatcher) IsRunning

func (w *ApprovalWatcher) IsRunning() bool

IsRunning returns whether the watcher is currently running.

func (*ApprovalWatcher) LoadWatchedIssuesFromStore

func (w *ApprovalWatcher) LoadWatchedIssuesFromStore(ctx context.Context) error

LoadWatchedIssuesFromStore loads all tasks with GitHub issues that are in active stages.

func (*ApprovalWatcher) RegisterAgentApproval

func (w *ApprovalWatcher) RegisterAgentApproval(agent *AgentConfig, handler ApprovalHandler) error

RegisterAgentApproval registers an agent's approval configuration. This enables config-driven approval labels for the agent.

When the approved_label is detected on a watched issue, the handler will be called. The handler receives an ApprovalEvent with the label and associated task.

Example:

watcher.RegisterAgentApproval(agent, func(ctx context.Context, event *ApprovalEvent) error {
    // Handle custom approval
    return nil
})

func (*ApprovalWatcher) RegisterAgentApprovalHandlers

func (w *ApprovalWatcher) RegisterAgentApprovalHandlers(registry *AgentRegistry, defaultHandler ApprovalHandler) (int, error)

RegisterAgentApprovalHandlers registers approval handlers for all agents in the registry that have approval configurations. This is a convenience method that iterates through all registered agents and calls RegisterAgentApproval for each one.

The provided defaultHandler is called for any agent that has an approval config. If you need per-agent handlers, use RegisterAgentApproval directly.

This enables config-driven approval workflows where new agents can be added without code changes - just add to the config file and restart.

Deprecated hardcoded labels (design-approved, sprint-approved, merge-approved) continue to work for backwards compatibility, but new agents should use config-driven labels.

func (*ApprovalWatcher) RegisterHandler

func (w *ApprovalWatcher) RegisterHandler(eventType ApprovalEventType, handler ApprovalHandler) error

RegisterHandler registers a handler for a specific approval event type. Note: handler can be nil, which will effectively deregister the handler. Always validates eventType is not empty to prevent accidental misconfiguration.

func (*ApprovalWatcher) SetAgentRegistry

func (w *ApprovalWatcher) SetAgentRegistry(registry *AgentRegistry)

SetAgentRegistry sets the agent registry for config-driven approval lookup.

func (*ApprovalWatcher) Start

func (w *ApprovalWatcher) Start(ctx context.Context) error

Start begins the polling loop.

func (*ApprovalWatcher) Stop

func (w *ApprovalWatcher) Stop()

Stop halts the polling loop.

func (*ApprovalWatcher) UnwatchIssue

func (w *ApprovalWatcher) UnwatchIssue(issueNumber int)

UnwatchIssue stops watching a GitHub issue.

func (*ApprovalWatcher) WatchIssue

func (w *ApprovalWatcher) WatchIssue(issueNumber int, taskID string) error

WatchIssue starts watching a GitHub issue for approval labels. Returns error if issueNumber is invalid (<=0) or taskID is empty.

func (*ApprovalWatcher) WatchedIssueCount

func (w *ApprovalWatcher) WatchedIssueCount() int

WatchedIssueCount returns the number of issues being watched.

type ArtifactDiscovery

type ArtifactDiscovery struct {
	WorktreePath string   // Path to git worktree
	Patterns     []string // Glob patterns to filter files (e.g., "*.md", "design_docs/**")
	BaseBranch   string   // Base branch to compare against (default: auto-detect)
	BaseCommit   string   // Base commit hash (stable - branch may have moved since worktree creation)
}

ArtifactDiscovery provides deterministic artifact discovery using git diff. This is preferred over parsing output markers, as git accurately tracks all file changes.

func NewArtifactDiscovery

func NewArtifactDiscovery(worktreePath string, patterns []string) *ArtifactDiscovery

NewArtifactDiscovery creates a new artifact discovery instance.

func (*ArtifactDiscovery) DiscoverArtifacts

func (ad *ArtifactDiscovery) DiscoverArtifacts(maxSize int64) (map[string]string, error)

DiscoverArtifacts returns a map of file paths to their content. Only includes files matching the patterns that are under maxSize bytes.

func (*ArtifactDiscovery) DiscoverChangedFiles

func (ad *ArtifactDiscovery) DiscoverChangedFiles() ([]string, error)

DiscoverChangedFiles returns files that were created or modified in the worktree. Uses git diff to compare against HEAD (or the base branch). Filters results by the configured patterns.

func (*ArtifactDiscovery) ReadArtifactContent

func (ad *ArtifactDiscovery) ReadArtifactContent(relativePath string) (string, error)

ReadArtifactContent reads the content of an artifact file from the worktree.

func (*ArtifactDiscovery) WithBaseBranch

func (ad *ArtifactDiscovery) WithBaseBranch(branch string) *ArtifactDiscovery

WithBaseBranch sets the base branch for comparison.

func (*ArtifactDiscovery) WithBaseCommit

func (ad *ArtifactDiscovery) WithBaseCommit(commit string) *ArtifactDiscovery

WithBaseCommit sets the base commit hash for comparison (stable reference). This is preferred over BaseBranch as the branch may have moved since worktree creation.

type BudgetsConfig

type BudgetsConfig struct {
	Global    *GlobalBudget             `yaml:"global"`
	Providers map[string]*ProviderLimit `yaml:"providers"`
}

BudgetsConfig represents budget limits from config.yaml

func DefaultBudgetsConfig

func DefaultBudgetsConfig() *BudgetsConfig

DefaultBudgetsConfig returns sensible default budget limits

func LoadBudgetsConfig

func LoadBudgetsConfig() (*BudgetsConfig, error)

LoadBudgetsConfig loads budget configuration from ~/.ailang/config.yaml. Respects AILANG_CONFIG env var for Cloud Run deployments.

func LoadBudgetsConfigFrom

func LoadBudgetsConfigFrom(configPath string) (*BudgetsConfig, error)

LoadBudgetsConfigFrom loads budget configuration from a specific path

type Capability

type Capability struct {
	Type        CapabilityType `json:"type"`
	Paths       []string       `json:"paths,omitempty"`        // Affected paths or URLs
	BudgetDelta float64        `json:"budget_delta,omitempty"` // Estimated cost if Budget type
}

Capability represents a detected capability requirement

type CapabilityDetector

type CapabilityDetector struct{}

CapabilityDetector analyzes task content and determines required capabilities

func NewCapabilityDetector

func NewCapabilityDetector() *CapabilityDetector

NewCapabilityDetector creates a new capability detector

func (*CapabilityDetector) ClassifyImpact

func (cd *CapabilityDetector) ClassifyImpact(caps []Capability) string

ClassifyImpact returns impact level as "low", "medium", or "high"

func (*CapabilityDetector) DetectCapabilities

func (cd *CapabilityDetector) DetectCapabilities(content string) []Capability

DetectCapabilities analyzes content and returns required capabilities Returns nil if no special capabilities are needed (safe execution)

func (*CapabilityDetector) EstimateTotalCost

func (cd *CapabilityDetector) EstimateTotalCost(caps []Capability, baseExecutionCost float64) float64

EstimateTotalCost calculates total estimated cost from capabilities

func (*CapabilityDetector) FormatImpact

func (cd *CapabilityDetector) FormatImpact(caps []Capability) string

FormatImpact formats a human-readable impact description

type CapabilityType

type CapabilityType string

CapabilityType represents the type of capability required

const (
	// AILANG effect capabilities (map to language effects)
	CapabilityIO    CapabilityType = "IO"    // Console input/output
	CapabilityFS    CapabilityType = "FS"    // File system access
	CapabilityNet   CapabilityType = "Net"   // Network access
	CapabilityClock CapabilityType = "Clock" // Time/sleep operations
	CapabilityEnv   CapabilityType = "Env"   // Environment variables
	CapabilityAI    CapabilityType = "AI"    // AI/LLM operations
	CapabilityDebug CapabilityType = "Debug" // Debugging operations

	// Coordinator-specific capabilities (not language effects)
	CapabilityShell  CapabilityType = "Shell"  // Shell/bash execution (high risk)
	CapabilityBudget CapabilityType = "Budget" // High-cost operations
)

type CascadeCircuitBreaker

type CascadeCircuitBreaker struct {
	MaxFailures   int
	CorrelationID string
	// contains filtered or unexported fields
}

CascadeCircuitBreaker tracks failures during a cascade update. If MaxFailures consecutive failures occur, IsBroken returns true.

func (*CascadeCircuitBreaker) FailureCount

func (cb *CascadeCircuitBreaker) FailureCount() int

FailureCount returns the current consecutive failure count.

func (*CascadeCircuitBreaker) IsBroken

func (cb *CascadeCircuitBreaker) IsBroken() bool

IsBroken returns true if consecutive failures have reached the threshold.

func (*CascadeCircuitBreaker) RecordFailure

func (cb *CascadeCircuitBreaker) RecordFailure()

RecordFailure increments the consecutive failure counter.

func (*CascadeCircuitBreaker) RecordSuccess

func (cb *CascadeCircuitBreaker) RecordSuccess()

RecordSuccess resets the consecutive failure counter.

type ChangeClass

type ChangeClass int

ChangeClass represents the severity level of a package update.

const (
	// ChangeClassA is a patch/internal change — fully autonomous.
	ChangeClassA ChangeClass = iota
	// ChangeClassB is an additive/minor change — semi-autonomous.
	ChangeClassB
	// ChangeClassC is a breaking/major change — requires human approval.
	ChangeClassC
)

func ClassifyChange

func ClassifyChange(env *messaging.PackageMessageEnvelope) ChangeClass

ClassifyChange determines the change class from a package message envelope.

type CloudDispatcher

type CloudDispatcher interface {
	// Dispatch triggers execution of a task on the remote backend.
	// The task is already persisted in the task store — the dispatcher
	// only needs to trigger execution with the given parameters.
	Dispatch(ctx context.Context, params DispatchParams) error
}

CloudDispatcher triggers remote task execution on a cloud backend. Implementations are backend-specific (Cloud Run Jobs, K8s Jobs, etc.) The coordinator calls Dispatch() without knowing the backend.

type CommentData

type CommentData struct {
	// Task information
	TaskID    string
	Title     string
	Type      string
	Priority  int
	Stage     string
	Status    string
	Agent     string
	Workspace string

	// GitHub context
	IssueNumber int
	Repository  string

	// Timing
	StartedAt   *time.Time
	CompletedAt *time.Time
	Duration    time.Duration

	// Results
	Success      bool
	Output       string
	Error        string
	Cost         float64
	TokensUsed   int
	InputTokens  int
	OutputTokens int

	// Evaluation results (sprint-evaluator)
	EvaluationScore      int
	EvaluationResult     string // "pass" or "fail"
	EvaluationRound      int
	EvaluationReportPath string
	FeedbackSummary      string

	// Artifacts
	DesignDocPath     string
	DesignDocContent  string // Actual markdown content of the design doc
	SprintPlanPath    string
	SprintPlanContent string // Actual markdown content of the sprint plan
	WorktreePath      string
	BranchName        string
	FilesCreated      []string
	FilesModified     []string

	// Custom data
	Extra map[string]interface{}
}

CommentData contains all data available to comment templates.

type CompletionHandler

type CompletionHandler struct {
	// contains filtered or unexported fields
}

CompletionHandler processes task completions and updates task status in the coordinator's store when Cloud Run Jobs finish. Completions arrive either via pull subscription (Start) or push HTTP endpoint (HandleCompletion).

func NewCompletionHandler

func NewCompletionHandler(subscriber *pubsub.Subscriber, taskStore Store, msgStore messaging.MessageStore, agentRegistry *AgentRegistry, logger *log.Logger) *CompletionHandler

NewCompletionHandler creates a handler that processes task completions. msgStore and agentRegistry are optional — if nil, completion notifications and skip_approval handling are disabled (backwards compatible).

func (*CompletionHandler) HandleCompletion

func (h *CompletionHandler) HandleCompletion(data []byte, attrs map[string]string) error

HandleCompletion processes a task completion from either pull subscription or push HTTP endpoint. Returns nil to ack bad data (prevent retry loops).

func (*CompletionHandler) Start

func (h *CompletionHandler) Start(ctx context.Context)

Start begins listening for completions via pull subscription in the background. Not used in push mode — the HTTP handler calls HandleCompletion() directly.

type Config

type Config struct {
	PollInterval         time.Duration // How often to check for new messages
	MaxWorktrees         int           // Maximum concurrent worktrees
	LogFile              string        // Path to log file
	PIDFile              string        // Path to PID file
	StateDir             string        // Directory for state files
	ApprovalPollInterval time.Duration // How often to poll GitHub for approval labels (M-COORD-GITHUB-AUTO-ROUTING)
	DevMode              bool          // Skip stale detector + approval watcher, increase poll interval
}

Config holds daemon configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns sensible defaults

type ConfigFile

type ConfigFile struct {
	Coordinator *CoordinatorConfig `yaml:"coordinator"`
	Budgets     *BudgetsConfig     `yaml:"budgets"`
	Firebase    *FirebaseConfig    `yaml:"firebase"`
	Workspaces  *WorkspacesConfig  `yaml:"workspaces"`
}

ConfigFile represents the full ~/.ailang/config.yaml structure.

type CoordinatorConfig

type CoordinatorConfig struct {
	Agents          []*AgentConfig    `yaml:"agents" json:"agents"`
	DefaultProvider string            `yaml:"default_provider" json:"default_provider"`
	ClaudePath      string            `yaml:"claude_path" json:"claude_path,omitempty"` // Explicit path to Claude CLI binary (empty = auto-detect: native > PATH > NVM)
	MergeBranch     string            `yaml:"merge_branch" json:"merge_branch"`         // Target branch for approvals (default: "dev")
	GitHubSync      *GitHubSyncConfig `yaml:"github_sync" json:"github_sync"`

	// PluginRepo is a git URL for a shared skills plugin (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
	// In cloud mode, this repo is cloned and passed as --plugin-dir to Claude CLI.
	// Example: "https://github.com/sunholo-data/ailang_bootstrap.git"
	PluginRepo string `yaml:"plugin_repo" json:"plugin_repo,omitempty"`

	// DevMode disables stale task detector and approval watcher to reduce
	// Firestore reads during local development. (M-COST1)
	DevMode bool `yaml:"dev_mode" json:"dev_mode,omitempty"`
}

CoordinatorConfig is the coordinator section of the global config file.

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() *CoordinatorConfig

DefaultCoordinatorConfig returns a minimal default configuration.

func LoadCoordinatorConfig

func LoadCoordinatorConfig() (*CoordinatorConfig, error)

LoadCoordinatorConfig loads the coordinator configuration from ~/.ailang/config.yaml. If the file doesn't exist, returns a default configuration. If the file exists but has no coordinator section, returns a default configuration.

func LoadCoordinatorConfigFrom

func LoadCoordinatorConfigFrom(configPath string) (*CoordinatorConfig, error)

LoadCoordinatorConfigFrom loads the coordinator configuration from a specific path.

type CoordinatorEventHandler

type CoordinatorEventHandler struct {
	// contains filtered or unexported fields
}

CoordinatorEventHandler implements executor.EventHandler to capture and broadcast executor events via WebSocket.

func NewCoordinatorEventHandler

func NewCoordinatorEventHandler(taskID, threadID string, broadcast EventBroadcaster) *CoordinatorEventHandler

NewCoordinatorEventHandler creates a new event handler for a task.

func (*CoordinatorEventHandler) EmitStatus

func (h *CoordinatorEventHandler) EmitStatus(status string)

EmitStatus emits a status change event

func (*CoordinatorEventHandler) GetEventBuffer

func (h *CoordinatorEventHandler) GetEventBuffer() []*websocket.TaskStreamEvent

GetEventBuffer returns a copy of the event buffer for replay

func (*CoordinatorEventHandler) IsThrottled

func (h *CoordinatorEventHandler) IsThrottled() bool

IsThrottled returns whether events are being throttled

func (*CoordinatorEventHandler) OnError

func (h *CoordinatorEventHandler) OnError(err error)

OnError is called when an error occurs

func (*CoordinatorEventHandler) OnText

func (h *CoordinatorEventHandler) OnText(text string)

OnText is called when text is generated

func (*CoordinatorEventHandler) OnToolResult

func (h *CoordinatorEventHandler) OnToolResult(toolName string, output string)

OnToolResult is called when a tool returns a result

func (*CoordinatorEventHandler) OnToolUse

func (h *CoordinatorEventHandler) OnToolUse(toolName string, input string)

OnToolUse is called when a tool is invoked

func (*CoordinatorEventHandler) OnTurnEnd

func (h *CoordinatorEventHandler) OnTurnEnd(turnNum int)

OnTurnEnd is called when a turn ends

func (*CoordinatorEventHandler) OnTurnStart

func (h *CoordinatorEventHandler) OnTurnStart(turnNum int)

OnTurnStart is called when a new turn starts

func (*CoordinatorEventHandler) SetEventStorer

func (h *CoordinatorEventHandler) SetEventStorer(storer EventStorer)

SetEventStorer sets the database storage function for persisting events.

func (*CoordinatorEventHandler) SetTaskContext

func (h *CoordinatorEventHandler) SetTaskContext(ctx *TaskEventContext)

SetTaskContext sets the task context for event enrichment. Call this after construction to add workspace, directive, and agent info to all events.

func (*CoordinatorEventHandler) UpdateMetrics

func (h *CoordinatorEventHandler) UpdateMetrics(tokensIn, tokensOut int, cost float64)

UpdateMetrics updates the token and cost metrics

type Daemon

type Daemon struct {
	// contains filtered or unexported fields
}

Daemon is the coordinator daemon

func NewDaemon

func NewDaemon(config *Config) (*Daemon, error)

NewDaemon creates a new daemon instance

func (*Daemon) Close

func (d *Daemon) Close() error

Close releases resources held by the daemon. This must be called when done with the daemon to release file handles. On Windows, this is required before the log file can be deleted.

func (*Daemon) GetActiveTaskMetrics

func (d *Daemon) GetActiveTaskMetrics() []*ResourceMetrics

GetActiveTaskMetrics returns metrics for all currently running tasks

func (*Daemon) GetContext

func (d *Daemon) GetContext() context.Context

GetContext returns the daemon's context

func (*Daemon) GetLogger

func (d *Daemon) GetLogger() *log.Logger

GetLogger returns the daemon's logger

func (*Daemon) GetResourceRegistry

func (d *Daemon) GetResourceRegistry() *ResourceTrackerRegistry

GetResourceRegistry returns the resource tracker registry for external access

func (*Daemon) GetWatcherStatus

func (d *Daemon) GetWatcherStatus() WatcherStatus

GetWatcherStatus returns the current status of the ApprovalWatcher. Returns a default (not running) status if the watcher is not initialized.

func (*Daemon) HandleApproval

func (d *Daemon) HandleApproval(ctx context.Context, taskID, approvedBy string) error

HandleApproval processes an approved task. For GitHub-linked tasks at design/sprint stages, this triggers the next stage. For merge-ready tasks, this merges worktree changes to main branch.

func (*Daemon) HandleRejection

func (d *Daemon) HandleRejection(ctx context.Context, taskID, rejectedBy, reason string) error

HandleRejection processes a rejected task - preserves worktree and marks task rejected. For tasks with linked GitHub issues, feedback is posted to GitHub (M-DASHBOARD-APPROVAL-INTEGRATION).

func (*Daemon) IncrementTasksRun

func (d *Daemon) IncrementTasksRun()

IncrementTasksRun increments the tasks run counter

func (*Daemon) ProcessStageCompletion

func (d *Daemon) ProcessStageCompletion(ctx context.Context, task *TaskRecord, execResult *ExecuteResult) error

ProcessStageCompletion handles the completion of a stage for GitHub-linked tasks. It calls the appropriate TaskChain callback and handles stage transitions.

Artifact discovery strategy (in order of preference): 1. Git diff + artifact patterns (deterministic, reliable) 2. Output markers (fallback for backwards compatibility)

func (*Daemon) Run

func (d *Daemon) Run() error

Run is the main daemon loop

func (*Daemon) SetCloudDispatcher

func (d *Daemon) SetCloudDispatcher(dispatcher CloudDispatcher)

SetCloudDispatcher sets the cloud task dispatcher. Call this after NewDaemon() but before Start() to enable Cloud Run Job dispatch. The dispatcher is created in the CLI entry point to avoid circular imports.

func (*Daemon) SetEventBroadcaster

func (d *Daemon) SetEventBroadcaster(broadcaster EventBroadcaster)

SetEventBroadcaster sets the event broadcaster for real-time task updates. When set, task execution events will be streamed via the broadcaster.

func (*Daemon) SetStores

func (d *Daemon) SetStores(taskStore Store, msgStore messaging.MessageStore, obsBackend observatory.Backend)

SetStores pre-sets the task store, messaging store, and observatory backend. When set, initTaskProcessing() will use these instead of opening local SQLite databases. Call this after NewDaemon() but before Start() to use cloud backends.

func (*Daemon) Start

func (d *Daemon) Start() error

Start starts the daemon

func (*Daemon) Status

func (d *Daemon) Status() (*Status, error)

Status returns the current daemon status

func (*Daemon) StatusJSON

func (d *Daemon) StatusJSON() (string, error)

StatusJSON returns status as JSON string

func (*Daemon) Stop

func (d *Daemon) Stop() error

Stop stops a running daemon

type DesignDocResult

type DesignDocResult struct {
	Path         string   // Path to .md file (may be empty if no .md found)
	AllArtifacts []string // All discovered artifacts (any file type)
	Duration     time.Duration
	Cost         float64
	TokensUsed   int
	InputTokens  int
	OutputTokens int
}

DesignDocResult contains the result of design document creation.

type DetailedStats

type DetailedStats struct {
	Count        int     `json:"count"`
	CostUSD      float64 `json:"cost_usd"`
	InputTokens  int     `json:"input_tokens"`
	OutputTokens int     `json:"output_tokens"`
}

DetailedStats provides cost/token breakdown for a provider or workspace

type DispatchParams

type DispatchParams struct {
	TaskID       string  // Coordinator task ID (e.g., "task-29404032")
	AgentID      string  // Target agent (e.g., "sprint-executor")
	Workspace    string  // Workspace path
	Provider     string  // AI provider ("claude" or "gemini")
	Directive    string  // Task prompt (optional — job can fetch from Firestore)
	RepoURL      string  // Git repo URL
	Branch       string  // Base branch (default: "dev")
	PushBranch   string  // If set, push directly to this branch (skip coordinator/ branch creation)
	PluginRepo   string  // Git URL for shared skills plugin (M-CLOUD-PLUGIN-SKILLS, v0.9.1)
	Model        string  // AI model override (e.g., "sonnet", "opus") — from agent config
	Timeout      string  // Executor timeout (e.g., "15m", "60m") — from agent config (M-CLOUD-OAUTH)
	AuthMode     string  // "oauth" (default) or "apikey" — selects Cloud Run Job template (M-CLOUD-DUAL-AUTH)
	APIKey       string  // User-provided Anthropic API key, only when AuthMode == "apikey"
	MaxCostUSD   float64 // Per-task cost budget (0 = unlimited) — M-CLOUD-PROGRESS-TRACKING
	GitMode      string  // "guardrails", "strict", "permissive" — M-GIT-GUARDRAILS
	SiteSlug     string  // Website site slug for commit message — M-HARNESS-COMMIT-CONTRACT
	BriefID      string  // Brief ID for commit message — M-HARNESS-COMMIT-CONTRACT
	Subdirectory string  // Monorepo subdirectory for package agents — M-PKG-AUTONOMOUS-UPDATES
}

DispatchParams contains the parameters needed to trigger remote task execution.

type EventBroadcaster

type EventBroadcaster func(*websocket.TaskStreamEvent)

EventBroadcaster is a function that broadcasts task stream events. This allows the event handler to be decoupled from the WebSocket server.

func CreateWebSocketBroadcaster

func CreateWebSocketBroadcaster(wsServer interface {
	BroadcastTaskEvent(stream *websocket.TaskStreamEvent)
}) EventBroadcaster

CreateWebSocketBroadcaster creates an EventBroadcaster that broadcasts to a WebSocket server. This is used when the coordinator is connected to the collaboration hub server.

type EventStorer

type EventStorer func(*TaskEventRecord) error

EventStorer is a function that stores task stream events to database. This allows events to be persisted for historical replay.

type EventsResponse

type EventsResponse struct {
	TaskID      string             `json:"task_id"`
	TotalEvents int                `json:"total_events"`
	TotalTurns  int                `json:"total_turns"`
	Iteration   int                `json:"iteration,omitempty"`
	Events      []*TaskEventRecord `json:"events"`
}

EventsResponse is the JSON response for the events API endpoint.

func FormatEventsAsJSON

func FormatEventsAsJSON(taskID string, events []*TaskEventRecord, opts *FormatOptions) (*EventsResponse, error)

FormatEventsAsJSON formats events as a JSON response.

type ExecuteOptions

type ExecuteOptions struct {
	Timeout      time.Duration // Hard ceiling (max wall-clock time)
	IdleTimeout  time.Duration // Kill if no events for this long (v0.8.1+)
	DryRun       bool
	Workspace    string                // Working directory for the task
	Model        string                // Model to use (provider-specific)
	EventHandler executor.EventHandler // Optional handler for streaming events

	// Observatory context for trace linking (M-TASK-HIERARCHY)
	ObservatoryContext *ObservatoryContext

	// InvokeConfig for script execution (v0.6.4+)
	// Set when the agent has invoke.type: "script"
	InvokeConfig *InvokeConfig

	// AgentConfig for system prompt construction (v0.8.0+)
	// Used by providers to build the meta-prompt with agent-specific instructions.
	AgentConfig *AgentConfig

	// Effort level for Claude Code (low/medium/high). Empty = Claude's default.
	Effort string

	// PluginDirs for Claude Code (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
	// Paths to plugin directories passed as --plugin-dir flags.
	PluginDirs []string

	// Plugins for per-agent third-party plugin installation (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
	Plugins *PluginsConfig
}

ExecuteOptions configures task execution

func DefaultExecuteOptions

func DefaultExecuteOptions() *ExecuteOptions

DefaultExecuteOptions returns sensible defaults

type ExecuteResult

type ExecuteResult struct {
	Success    bool
	Output     string
	Error      string
	Provider   string
	Duration   time.Duration
	Cost       float64
	TokensUsed int // Total tokens (InputTokens + OutputTokens)
	// Detailed token breakdown
	InputTokens  int
	OutputTokens int
	// Execution detail
	NumTurns      int
	ToolCallCount int
	// Files affected
	FilesCreated  []string
	FilesModified []string
	// Session continuity (for agent-to-agent handoffs)
	SessionID string // Claude Code --resume ID or Gemini CLI --conversation-id
}

ExecuteResult contains the result of task execution

type ExecutorProvider

type ExecutorProvider struct {
	// contains filtered or unexported fields
}

ExecutorProvider wraps any executor.Executor as a coordinator Provider. This is the unified provider for all CLI-based agentic executors (Claude Code, Gemini CLI, Codex, etc.). Adding a new executor only requires creating the executor package with an init() registration — no coordinator changes needed.

func NewExecutorProvider

func NewExecutorProvider(executorName string) (*ExecutorProvider, error)

NewExecutorProvider creates an ExecutorProvider for the named executor. The executor must be registered in the global executor factory (via init()).

func (*ExecutorProvider) CanHandle

func (p *ExecutorProvider) CanHandle(task *AnalyzedTask) bool

CanHandle returns true — executor-based providers can handle any coding task.

func (*ExecutorProvider) Execute

Execute runs a task using the underlying executor CLI.

func (*ExecutorProvider) ExecutorName

func (p *ExecutorProvider) ExecutorName() string

ExecutorName returns the underlying executor name (e.g., "claude", "gemini").

func (*ExecutorProvider) Name

func (p *ExecutorProvider) Name() string

Name returns the provider name (e.g., "claude-code", "gemini-cli").

type FirebaseConfig

type FirebaseConfig struct {
	ProjectID string `yaml:"project_id"` // GCP/Firebase project ID (e.g., "ailang-dev")
}

FirebaseConfig contains Firebase authentication settings.

func LoadFirebaseConfig

func LoadFirebaseConfig() *FirebaseConfig

LoadFirebaseConfig loads Firebase configuration from ~/.ailang/config.yaml. Returns nil if no Firebase config is set (Firebase auth will be disabled).

type FormatOptions

type FormatOptions struct {
	// ShowTimestamps includes timestamps in text output
	ShowTimestamps bool
	// ShowToolInputs includes full tool input in text output
	ShowToolInputs bool
	// MaxTextLength truncates long text blocks (0 = no limit)
	MaxTextLength int
	// TurnFilter only shows specific turn (0 = all turns)
	TurnFilter int
	// TypeFilter only shows specific event types (empty = all types)
	TypeFilter []string
}

FormatOptions controls how events are formatted.

func DefaultFormatOptions

func DefaultFormatOptions() *FormatOptions

DefaultFormatOptions returns sensible defaults for CLI display.

type GeminiAPIProvider

type GeminiAPIProvider struct {
	// contains filtered or unexported fields
}

GeminiAPIProvider executes tasks using Gemini API (text generation only). This uses internal/ai/gemini for simple research and documentation tasks. Unlike ExecutorProvider (CLI-based agentic coding), this is API-based and does not support file editing or tool use.

func NewGeminiAPIProvider

func NewGeminiAPIProvider() (*GeminiAPIProvider, error)

NewGeminiAPIProvider creates a new Gemini API provider

func (*GeminiAPIProvider) CanHandle

func (p *GeminiAPIProvider) CanHandle(task *AnalyzedTask) bool

CanHandle returns true for tasks that Gemini API can handle

func (*GeminiAPIProvider) Execute

Execute runs a task using Gemini API

func (*GeminiAPIProvider) Name

func (p *GeminiAPIProvider) Name() string

Name returns the provider name

type GitHubPoster

type GitHubPoster struct {
	// contains filtered or unexported fields
}

GitHubPoster provides GitHub integration for the coordinator. It wraps the messaging GitHubClient to post comments, manage labels, and close issues as part of the autonomous workflow.

func NewGitHubPoster

func NewGitHubPoster() (*GitHubPoster, error)

NewGitHubPoster creates a new GitHub poster for coordinator tasks. Enables auto-switch so the coordinator can automatically switch GitHub accounts if there's a mismatch with the expected user.

func (*GitHubPoster) AddLabel

func (p *GitHubPoster) AddLabel(issueNum int, label string) error

AddLabel adds a label to a GitHub issue. Creates the label if it doesn't exist. Deprecated: Use AddLabelInRepo with an explicit repo parameter.

func (*GitHubPoster) AddLabelInRepo

func (p *GitHubPoster) AddLabelInRepo(repo string, issueNum int, label string) error

AddLabelInRepo adds a label to a GitHub issue in a specific repo. Creates the label if it doesn't exist. If repo is empty, falls back to the default repo.

func (*GitHubPoster) ClaimIssue

func (p *GitHubPoster) ClaimIssue(issueNum int) error

ClaimIssue attempts to claim an issue by adding the in-progress label. Returns an error if the issue is already claimed by another coordinator. This prevents race conditions when multiple coordinators poll GitHub. Deprecated: Use ClaimIssueInRepo with an explicit repo parameter.

func (*GitHubPoster) ClaimIssueInRepo

func (p *GitHubPoster) ClaimIssueInRepo(repo string, issueNum int) error

ClaimIssueInRepo attempts to claim an issue in a specific repo. Returns an error if the issue is already claimed by another coordinator. If repo is empty, falls back to the default repo.

func (*GitHubPoster) Client

func (p *GitHubPoster) Client() *messaging.GitHubClient

Client returns the underlying GitHub client.

func (*GitHubPoster) CloseIssue

func (p *GitHubPoster) CloseIssue(issueNum int, comment string) error

CloseIssue closes a GitHub issue with an optional comment. Uses the default repo configured in the poster.

func (*GitHubPoster) CloseIssueInRepo

func (p *GitHubPoster) CloseIssueInRepo(repo string, issueNum int, comment string) error

CloseIssueInRepo closes a GitHub issue in a specific repo with an optional comment. If repo is empty, falls back to the default repo.

func (*GitHubPoster) EnsureLabel

func (p *GitHubPoster) EnsureLabel(label string) error

EnsureLabel creates a label if it doesn't exist. Deprecated: Use EnsureLabelInRepo with an explicit repo parameter.

func (*GitHubPoster) EnsureLabelInRepo

func (p *GitHubPoster) EnsureLabelInRepo(repo, label string) error

EnsureLabelInRepo creates a label in a specific repo if it doesn't exist. If repo is empty, falls back to the default repo.

func (*GitHubPoster) GetLabels

func (p *GitHubPoster) GetLabels(issueNum int) ([]string, error)

GetLabels returns the current labels on an issue. Deprecated: Use GetLabelsInRepo with an explicit repo parameter.

func (*GitHubPoster) GetLabelsInRepo

func (p *GitHubPoster) GetLabelsInRepo(repo string, issueNum int) ([]string, error)

GetLabelsInRepo returns the current labels on an issue in a specific repo. If repo is empty, falls back to the default repo.

func (*GitHubPoster) GetLatestHumanComment

func (p *GitHubPoster) GetLatestHumanComment(issueNum int, since time.Time) (*IssueComment, error)

GetLatestHumanComment returns the most recent human comment, or nil if none.

func (*GitHubPoster) GetRecentHumanComments

func (p *GitHubPoster) GetRecentHumanComments(issueNum int, since time.Time) ([]IssueComment, error)

GetRecentHumanComments fetches comments from a GitHub issue, filtering out bot comments and only returning those after the given time. Returns error if issueNum is invalid (<=0) or repo is not configured.

func (*GitHubPoster) HasLabel

func (p *GitHubPoster) HasLabel(issueNum int, label string) (bool, error)

HasLabel checks if an issue has a specific label. Deprecated: Use HasLabelInRepo with an explicit repo parameter.

func (*GitHubPoster) HasLabelInRepo

func (p *GitHubPoster) HasLabelInRepo(repo string, issueNum int, label string) (bool, error)

HasLabelInRepo checks if an issue in a specific repo has a specific label. If repo is empty, falls back to the default repo.

func (*GitHubPoster) IsIssueClaimed

func (p *GitHubPoster) IsIssueClaimed(issueNum int) (bool, error)

IsIssueClaimed checks if an issue is already claimed by any coordinator. Deprecated: Use IsIssueClaimedInRepo with an explicit repo parameter.

func (*GitHubPoster) IsIssueClaimedInRepo

func (p *GitHubPoster) IsIssueClaimedInRepo(repo string, issueNum int) (bool, error)

IsIssueClaimedInRepo checks if an issue in a specific repo is already claimed. If repo is empty, falls back to the default repo.

func (*GitHubPoster) PostComment

func (p *GitHubPoster) PostComment(issueNum int, body string) error

PostComment posts a comment to a GitHub issue. Deprecated: Use PostCommentInRepo with an explicit repo parameter.

func (*GitHubPoster) PostCommentInRepo

func (p *GitHubPoster) PostCommentInRepo(repo string, issueNum int, body string) error

PostCommentInRepo posts a comment to a specific repo's issue. If repo is empty, falls back to the default repo.

func (*GitHubPoster) PostFeedback

func (p *GitHubPoster) PostFeedback(issueNum int, feedback string, iteration int, channel string) error

PostFeedback posts a feedback comment to a GitHub issue. Includes iteration context for tracking. Returns error if issueNum is invalid, iteration is out of bounds, or channel is empty. Deprecated: Use PostFeedbackInRepo with an explicit repo parameter.

func (*GitHubPoster) PostFeedbackInRepo

func (p *GitHubPoster) PostFeedbackInRepo(repo string, issueNum int, feedback string, iteration int, channel string) error

PostFeedbackInRepo posts a feedback comment to a GitHub issue in a specific repo. If repo is empty, falls back to the default repo. Returns error if issueNum is invalid, iteration is out of bounds, or channel is empty.

func (*GitHubPoster) PostWorkingStatus

func (p *GitHubPoster) PostWorkingStatus(issueNum int, taskID, agent string) error

PostWorkingStatus posts a "working on it" comment to the issue. Deprecated: Use PostWorkingStatusInRepo with an explicit repo parameter.

func (*GitHubPoster) PostWorkingStatusInRepo

func (p *GitHubPoster) PostWorkingStatusInRepo(repo string, issueNum int, taskID, agent string) error

PostWorkingStatusInRepo posts a "working on it" comment to an issue in a specific repo. If repo is empty, falls back to the default repo.

func (*GitHubPoster) ReleaseIssue

func (p *GitHubPoster) ReleaseIssue(issueNum int) error

ReleaseIssue releases a claimed issue by removing the in-progress label. Called when a task completes, fails, or is cancelled. Uses the default repo configured in the poster.

func (*GitHubPoster) ReleaseIssueInRepo

func (p *GitHubPoster) ReleaseIssueInRepo(repo string, issueNum int) error

ReleaseIssueInRepo releases a claimed issue in a specific repo. If repo is empty, falls back to the default repo.

func (*GitHubPoster) RemoveLabel

func (p *GitHubPoster) RemoveLabel(issueNum int, label string) error

RemoveLabel removes a label from a GitHub issue. Deprecated: Use RemoveLabelInRepo with an explicit repo parameter.

func (*GitHubPoster) RemoveLabelInRepo

func (p *GitHubPoster) RemoveLabelInRepo(repo string, issueNum int, label string) error

RemoveLabelInRepo removes a label from a GitHub issue in a specific repo. If repo is empty, falls back to the default repo.

func (*GitHubPoster) Repo

func (p *GitHubPoster) Repo() string

Repo returns the configured repository.

type GitHubSyncConfig

type GitHubSyncConfig struct {
	// Legacy single-repo fields (for backwards compatibility)
	Enabled           bool     `yaml:"enabled" json:"enabled"`
	IntervalSecs      int      `yaml:"interval_secs" json:"interval_secs"`               // Default: 300 (5 min)
	WatchLabels       []string `yaml:"watch_labels" json:"watch_labels"`                 // Filter by labels
	TargetInbox       string   `yaml:"target_inbox" json:"target_inbox"`                 // Where to send imported issues
	ResyncLabels      bool     `yaml:"resync_labels" json:"resync_labels"`               // Re-check labels on imported messages
	ResyncIntervalSec int      `yaml:"resync_interval_secs" json:"resync_interval_secs"` // Default: 3600 (1 hour)

	// Multi-repo configuration (v0.6.6+)
	// If Repos is non-empty, uses multi-repo mode (ignores legacy fields above except ResyncLabels)
	Repos []RepoSyncConfig `yaml:"repos" json:"repos"`
}

GitHubSyncConfig configures automatic GitHub issue import. Supports both single-repo (legacy) and multi-repo configurations.

func (*GitHubSyncConfig) GetRepos

func (c *GitHubSyncConfig) GetRepos(defaultRepo string) []RepoSyncConfig

GetRepos returns the list of repos to sync, handling backwards compatibility. If Repos is non-empty, returns it directly. Otherwise, constructs a single-repo config from legacy fields.

type GlobalBudget

type GlobalBudget struct {
	WorkspaceBudget  float64 `yaml:"workspace_budget"`
	DailyBudget      float64 `yaml:"daily_budget"`
	TaskMaxCost      float64 `yaml:"task_max_cost"`
	WarningThreshold float64 `yaml:"warning_threshold"`
}

GlobalBudget defines default budget limits

type HTTPBroadcaster

type HTTPBroadcaster struct {
	// contains filtered or unexported fields
}

HTTPBroadcaster sends task events to the Collaboration Hub server via HTTP. This enables real-time streaming when daemon and server run as separate processes.

func NewHTTPBroadcaster

func NewHTTPBroadcaster(serverURL string, logger *log.Logger) *HTTPBroadcaster

NewHTTPBroadcaster creates a broadcaster that POSTs events to the server.

func (*HTTPBroadcaster) Broadcast

func (h *HTTPBroadcaster) Broadcast(event *websocket.TaskStreamEvent)

Broadcast sends an event to the server. Implements EventBroadcaster.

func (*HTTPBroadcaster) BroadcastFunc

func (h *HTTPBroadcaster) BroadcastFunc() EventBroadcaster

BroadcastFunc returns the EventBroadcaster function.

func (*HTTPBroadcaster) CheckServerAvailable

func (h *HTTPBroadcaster) CheckServerAvailable() bool

CheckServerAvailable checks if the server is reachable

type HumanFeedback

type HumanFeedback struct {
	TaskID    string    // Task being reviewed
	Iteration int       // Current iteration of the task
	Feedback  string    // Human's feedback text
	Action    string    // "reject" or "approve"
	Timestamp time.Time // When feedback was provided
	UserID    string    // Who provided feedback (optional)
}

HumanFeedback represents feedback provided by a human reviewer.

type ImplementResult

type ImplementResult struct {
	BranchName    string
	WorktreePath  string
	Duration      time.Duration
	Cost          float64
	TokensUsed    int
	InputTokens   int
	OutputTokens  int
	FilesCreated  []string
	FilesModified []string
}

ImplementResult contains the result of implementation.

type InboxMessageAdapter

type InboxMessageAdapter struct {
	// contains filtered or unexported fields
}

InboxMessageAdapter adapts messaging.MessageStore (inbox_messages table) to the coordinator's MessageStore interface

func NewInboxMessageAdapter

func NewInboxMessageAdapter(store messaging.MessageStore, inbox string) *InboxMessageAdapter

NewInboxMessageAdapter creates a new adapter that watches a specific inbox

func OpenDefaultInboxAdapter

func OpenDefaultInboxAdapter(targetInbox string) (*InboxMessageAdapter, messaging.MessageStore, error)

OpenDefaultInboxAdapter opens the default collaboration database and creates an adapter

func (*InboxMessageAdapter) ListUnread

func (a *InboxMessageAdapter) ListUnread() ([]*Message, error)

ListUnread returns unread messages for the configured inbox

func (*InboxMessageAdapter) MarkAsRead

func (a *InboxMessageAdapter) MarkAsRead(id string) error

MarkAsRead marks a message as read

type InvokeConfig

type InvokeConfig struct {
	Type         string `yaml:"type" json:"type"`                   // "skill", "agent", "prompt", or "script"
	Name         string `yaml:"name" json:"name"`                   // Skill/agent name (for skill/agent types)
	Template     string `yaml:"template" json:"template"`           // Custom template (for prompt type) - inline
	TemplateFile string `yaml:"template_file" json:"template_file"` // Path to template file (for prompt type) - v0.6.7+

	// Script-specific fields (v0.6.4+)
	// Used when Type == "script" for deterministic workflow execution
	Command        string `yaml:"command" json:"command,omitempty"`                   // Script path or inline command
	Shell          string `yaml:"shell" json:"shell,omitempty"`                       // Shell to use (default: /bin/sh)
	EnvFromPayload bool   `yaml:"env_from_payload" json:"env_from_payload,omitempty"` // Parse JSON payload as env vars
	Timeout        string `yaml:"timeout" json:"timeout,omitempty"`                   // Execution timeout (e.g., "30m", "2h")
	WorkingDir     string `yaml:"working_dir" json:"working_dir,omitempty"`           // Working directory (supports {{.Workspace}})
}

InvokeConfig specifies how an agent should be invoked. Supports four types:

  • "skill": Invoke a Claude Code skill (e.g., "/design-doc-creator")
  • "agent": Send message to another agent (e.g., "sprint-planner")
  • "prompt": Use custom prompt template with variable substitution
  • "script": Execute a shell script with JSON payload as environment variables (v0.6.4+)

func DefaultInvokeConfig

func DefaultInvokeConfig(agentID string) *InvokeConfig

DefaultInvokeConfig returns the default invoke config for known AILANG agent IDs. Returns nil for unknown agents (no default behavior). Used by GetEffectiveInvokeConfig() when agent has no explicit YAML config.

func (*InvokeConfig) ResolveTemplate

func (ic *InvokeConfig) ResolveTemplate(workspace string) (string, error)

ResolveTemplate returns the template content, loading from file if template_file is set. Priority: template_file > template (inline) Supports:

  • Absolute paths: /path/to/template.md
  • Home directory: ~/.ailang/templates/design-doc.md
  • Relative to workspace: templates/design-doc.md (requires workspace param)

type IssueComment

type IssueComment struct {
	ID        int64
	Body      string
	Author    string
	CreatedAt time.Time
	IsBot     bool
}

IssueComment represents a comment on a GitHub issue with bot detection.

type KMSEncrypter

type KMSEncrypter struct {
	// contains filtered or unexported fields
}

KMSEncrypter encrypts API keys using Cloud KMS before they are passed as Cloud Run Job env var overrides. This prevents plaintext keys from appearing in Cloud Audit Logs.

M-CLOUD-DUAL-AUTH: Coordinator SA has roles/cloudkms.cryptoKeyEncrypter (encrypt only — cannot read back user keys).

func NewKMSEncrypter

func NewKMSEncrypter() *KMSEncrypter

NewKMSEncrypter creates an encrypter using the AILANG_KMS_KEY env var. Returns nil if the env var is not set (local dev — plaintext passthrough).

func (*KMSEncrypter) Encrypt

func (e *KMSEncrypter) Encrypt(ctx context.Context, plaintext string) (string, error)

Encrypt encrypts a plaintext string with Cloud KMS. Returns "ENC:" + base64(ciphertext).

type LabelRouteConfig

type LabelRouteConfig struct {
	LabelPrefix string `yaml:"label_prefix" json:"label_prefix"` // Match labels starting with this
	Target      string `yaml:"target" json:"target"`             // Route to this inbox
}

LabelRouteConfig maps a label prefix to a target inbox.

type MergeResult

type MergeResult struct {
	Success       bool     `json:"success"`
	MergedFiles   []string `json:"merged_files,omitempty"`
	ConflictFiles []string `json:"conflict_files,omitempty"`
	Error         string   `json:"error,omitempty"`
	CommitHash    string   `json:"commit_hash,omitempty"`
}

MergeResult contains the result of a merge operation.

func MergeWorktree

func MergeWorktree(ctx context.Context, worktreePath, mainBranch string) (*MergeResult, error)

MergeWorktree merges changes from a worktree into the main branch. It performs the following steps: 1. Get the list of changed files 2. Attempt to merge the worktree branch into main 3. If conflicts occur, report them without forcing 4. On success, return the merge commit hash

type Message

type Message struct {
	ID           string
	From         string
	Title        string
	Content      string
	Inbox        string // Target inbox (set by PubSubInboxAdapter from message attributes, M-CLOUD-E2E)
	Type         string // bug, feature, task, etc. (category)
	Kind         string // directive, question (message type)
	Priority     string // high, medium, low
	GithubIssue  int    // Linked GitHub issue number (M-COORD-GITHUB-AUTO-ROUTING)
	GithubRepo   string // GitHub repo (owner/repo) for issue operations (M-COORD-GITHUB-CLOSE-ON-MERGE)
	ParentTaskID string // Parent task ID for hierarchy tracking (M-TASK-HIERARCHY)
	Iteration    int    // Iteration number for feedback loops (M-TASK-HIERARCHY)
	ChainID      string // ExecutionChain ID for unified hierarchy (M-CHAINS-SIMPLIFY)
	CreatedAt    time.Time
}

Message represents a message from the messaging system

type MessageStore

type MessageStore interface {
	ListUnread() ([]*Message, error)
	MarkAsRead(id string) error
}

MessageStore is the interface for accessing messages

type MessageWatcher

type MessageWatcher struct {
	// contains filtered or unexported fields
}

MessageWatcher polls for unread messages and emits tasks

func NewMessageWatcher

func NewMessageWatcher(store MessageStore, pollInterval time.Duration) *MessageWatcher

NewMessageWatcher creates a new message watcher

func (*MessageWatcher) ClearSeen

func (w *MessageWatcher) ClearSeen()

ClearSeen clears the seen messages (useful for testing)

func (*MessageWatcher) SeenCount

func (w *MessageWatcher) SeenCount() int

SeenCount returns the number of seen messages

func (*MessageWatcher) Start

func (w *MessageWatcher) Start(ctx context.Context) error

Start begins watching for messages

func (*MessageWatcher) Tasks

func (w *MessageWatcher) Tasks() <-chan *Task

Tasks returns the channel of discovered tasks

type MockMessageStore

type MockMessageStore struct {
	// contains filtered or unexported fields
}

MockMessageStore is a mock implementation for testing

func NewMockMessageStore

func NewMockMessageStore() *MockMessageStore

NewMockMessageStore creates a new mock message store

func (*MockMessageStore) AddMessage

func (m *MockMessageStore) AddMessage(msg *Message)

AddMessage adds a message to the mock store

func (*MockMessageStore) ListUnread

func (m *MockMessageStore) ListUnread() ([]*Message, error)

ListUnread returns unread messages

func (*MockMessageStore) MarkAsRead

func (m *MockMessageStore) MarkAsRead(id string) error

MarkAsRead marks a message as read

type ObservatoryContext

type ObservatoryContext struct {
	TaskID       string // Coordinator task ID
	AgentID      string // Agent handling the task (e.g., "design-doc-creator")
	AssignmentID string // Observatory agent_assignment ID (aa_xxx)
	WorkspaceID  string // Observatory workspace ID (ws_xxx)

	// Chain context for unified hierarchy tracking (M-CHAINS-SIMPLIFY)
	ChainID   string // Execution chain ID (UUID)
	StageID   string // Chain stage ID (UUID)
	MessageID string // Source message ID that triggered this chain
}

ObservatoryContext holds context for linking traces to coordinator entities. This enables the unified hierarchy: CHAIN → STAGE → TASK → SPANS in Observatory.

type ObservatorySync

type ObservatorySync struct {
	// contains filtered or unexported fields
}

ObservatorySync handles syncing coordinator entities to Observatory. This enables the full entity hierarchy: WORKSPACE → TASK → AGENT → SPANS

func NewObservatorySync

func NewObservatorySync(backend observatory.Backend, logger *log.Logger) *ObservatorySync

NewObservatorySync creates a new ObservatorySync instance.

func (*ObservatorySync) CompleteAgentAssignment

func (s *ObservatorySync) CompleteAgentAssignment(ctx context.Context, assignmentID string, success bool) error

CompleteAgentAssignment marks an agent assignment as completed.

func (*ObservatorySync) GetWorkspaceID

func (s *ObservatorySync) GetWorkspaceID(path string) string

GetWorkspaceID returns the cached workspace ID for a given path. Returns empty string if not cached (workspace not yet synced).

func (*ObservatorySync) SyncAgentAssignment

func (s *ObservatorySync) SyncAgentAssignment(ctx context.Context, taskID, agentID, provider string) (string, error)

SyncAgentAssignment syncs an agent assignment to Observatory. This creates the link between a task and the agent executing it.

func (*ObservatorySync) SyncTask

func (s *ObservatorySync) SyncTask(ctx context.Context, task *TaskRecord) error

SyncTask syncs a coordinator task to Observatory. This should be called when a task is created or updated.

type PluginsConfig

type PluginsConfig struct {
	// Marketplaces to register (e.g., "anthropics/claude-code", "anthropics/skills")
	Marketplaces []string `yaml:"marketplaces" json:"marketplaces,omitempty"`
	// Plugins to install from registered marketplaces (e.g., "frontend-design@anthropics-claude-code")
	Install []string `yaml:"install" json:"install,omitempty"`
}

PluginsConfig specifies third-party plugins to install for an agent. Marketplaces are registered first, then plugins are installed from those marketplaces. This runs before task execution and is complementary to PluginDirs (static plugin paths).

type Provider

type Provider interface {
	// Name returns the provider identifier (e.g., "claude", "gemini", "gemini-api")
	Name() string

	// CanHandle returns true if this provider can handle the given task type
	CanHandle(task *AnalyzedTask) bool

	// Execute runs a task and returns the result
	Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
}

Provider executes tasks using a specific AI backend. There are two types of providers:

  • Executor-based (Claude Code, Gemini CLI): For agentic coding tasks
  • API-based (Gemini API, Claude API): For simple text generation

type ProviderLimit

type ProviderLimit struct {
	DailyBudget      float64 `yaml:"daily_budget"`
	TaskMaxCost      float64 `yaml:"task_max_cost"`
	HardLimit        bool    `yaml:"hard_limit"`
	WarningThreshold float64 `yaml:"warning_threshold"`
}

ProviderLimit defines per-provider budget overrides

type PubSubBroadcaster

type PubSubBroadcaster struct {
	// contains filtered or unexported fields
}

PubSubBroadcaster sends task stream events to the Pub/Sub events topic. This replaces HTTPBroadcaster when running in cloud mode, enabling the dashboard and laptop to receive events via pull subscriptions.

func NewPubSubBroadcaster

func NewPubSubBroadcaster(publisher *pubsub.Publisher, workspace string, logger *log.Logger) *PubSubBroadcaster

NewPubSubBroadcaster creates a broadcaster that publishes events to Pub/Sub.

func (*PubSubBroadcaster) Broadcast

func (b *PubSubBroadcaster) Broadcast(event *websocket.TaskStreamEvent)

Broadcast sends a task event to the Pub/Sub events topic.

func (*PubSubBroadcaster) BroadcastFunc

func (b *PubSubBroadcaster) BroadcastFunc() EventBroadcaster

BroadcastFunc returns the EventBroadcaster function.

type PubSubInboxAdapter

type PubSubInboxAdapter struct {
	// contains filtered or unexported fields
}

PubSubInboxAdapter buffers incoming message notifications for the coordinator. Messages arrive either via pull subscription (Start) or push HTTP endpoint (HandleNotification). ListUnread() drains the buffer.

func NewPubSubInboxAdapter

func NewPubSubInboxAdapter(subscriber *pubsub.Subscriber, subName, inbox string, msgStore messaging.MessageStore, logger *log.Logger) *PubSubInboxAdapter

NewPubSubInboxAdapter creates an adapter for receiving message notifications. msgStore is used to fetch full message content from Firestore when a notification arrives. For pull mode, call Start(). For push mode, the HTTP handler calls HandleNotification() directly.

func (*PubSubInboxAdapter) HandleNotification

func (a *PubSubInboxAdapter) HandleNotification(data []byte, attrs map[string]string) error

HandleNotification processes a message notification from either pull subscription or push HTTP endpoint. It decodes the notification, fetches full content from Firestore, and buffers the message for ListUnread().

func (*PubSubInboxAdapter) ListUnread

func (a *PubSubInboxAdapter) ListUnread() ([]*Message, error)

ListUnread returns buffered messages and clears the buffer.

func (*PubSubInboxAdapter) MarkAsRead

func (a *PubSubInboxAdapter) MarkAsRead(_ string) error

MarkAsRead is a no-op for Pub/Sub — messages are acked on receipt.

func (*PubSubInboxAdapter) Start

func (a *PubSubInboxAdapter) Start(ctx context.Context)

Start begins pulling messages from the Pub/Sub subscription in the background. Not used in push mode — the HTTP handler calls HandleNotification() directly.

type RepoSyncConfig

type RepoSyncConfig struct {
	Repo         string             `yaml:"repo" json:"repo"`                   // GitHub repo (owner/repo)
	Enabled      bool               `yaml:"enabled" json:"enabled"`             // Enable sync for this repo
	IntervalSecs int                `yaml:"interval_secs" json:"interval_secs"` // Override default interval
	WatchLabels  []string           `yaml:"watch_labels" json:"watch_labels"`   // Filter by labels
	TargetInbox  string             `yaml:"target_inbox" json:"target_inbox"`   // Default inbox for this repo
	LabelRouting []LabelRouteConfig `yaml:"label_routing" json:"label_routing"` // Route by label prefix
}

RepoSyncConfig configures GitHub sync for a single repository.

type ResourceMetrics

type ResourceMetrics struct {
	TaskID    string    `json:"task_id"`
	ThreadID  string    `json:"thread_id,omitempty"`
	Timestamp time.Time `json:"timestamp"`

	// Process metrics
	CPUPercent float64 `json:"cpu_percent"`
	MemoryMB   float64 `json:"memory_mb"`
	PID        int     `json:"pid,omitempty"`

	// Token metrics (accumulated)
	TokensIn  int `json:"tokens_in"`
	TokensOut int `json:"tokens_out"`

	// Cost metrics
	Cost float64 `json:"cost"`

	// Duration
	DurationSec int `json:"duration_sec"`

	// Peak values
	PeakCPU    float64 `json:"peak_cpu"`
	PeakMemory float64 `json:"peak_memory_mb"`
}

ResourceMetrics holds current resource usage for a task

type ResourceTracker

type ResourceTracker struct {
	// contains filtered or unexported fields
}

ResourceTracker tracks resource usage for a running task. It polls process metrics periodically and accumulates token usage from events.

func NewResourceTracker

func NewResourceTracker(taskID, threadID string, pid int) *ResourceTracker

NewResourceTracker creates a new resource tracker for a task

func (*ResourceTracker) GetMetrics

func (rt *ResourceTracker) GetMetrics() *ResourceMetrics

GetMetrics returns the current resource metrics

func (*ResourceTracker) SetCost

func (rt *ResourceTracker) SetCost(cost float64)

SetCost sets the cost (replacement, not additive)

func (*ResourceTracker) SetPollInterval

func (rt *ResourceTracker) SetPollInterval(interval time.Duration)

SetPollInterval sets the polling interval for process metrics

func (*ResourceTracker) SetUpdateCallback

func (rt *ResourceTracker) SetUpdateCallback(callback func(*ResourceMetrics))

SetUpdateCallback sets the callback for metric updates

func (*ResourceTracker) Start

func (rt *ResourceTracker) Start(ctx context.Context)

Start begins polling for resource metrics

func (*ResourceTracker) Stop

func (rt *ResourceTracker) Stop()

Stop stops polling for metrics

func (*ResourceTracker) UpdateCost

func (rt *ResourceTracker) UpdateCost(cost float64)

UpdateCost updates the accumulated cost

func (*ResourceTracker) UpdateTokens

func (rt *ResourceTracker) UpdateTokens(inputTokens, outputTokens int)

UpdateTokens adds tokens to the accumulated count

type ResourceTrackerRegistry

type ResourceTrackerRegistry struct {
	// contains filtered or unexported fields
}

ResourceTrackerRegistry manages multiple resource trackers for concurrent tasks

func NewResourceTrackerRegistry

func NewResourceTrackerRegistry() *ResourceTrackerRegistry

NewResourceTrackerRegistry creates a new registry

func (*ResourceTrackerRegistry) Get

Get returns a tracker by task ID

func (*ResourceTrackerRegistry) GetAllMetrics

func (r *ResourceTrackerRegistry) GetAllMetrics() []*ResourceMetrics

GetAllMetrics returns metrics for all active trackers

func (*ResourceTrackerRegistry) Register

func (r *ResourceTrackerRegistry) Register(taskID string, tracker *ResourceTracker)

Register adds a new tracker to the registry

func (*ResourceTrackerRegistry) StopAll

func (r *ResourceTrackerRegistry) StopAll()

StopAll stops all trackers

func (*ResourceTrackerRegistry) Unregister

func (r *ResourceTrackerRegistry) Unregister(taskID string)

Unregister removes a tracker from the registry

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore implements Store using SQLite

func NewSQLiteStore

func NewSQLiteStore(dbPath string) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite store

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the database connection

func (*SQLiteStore) CreateApprovalRequest

func (s *SQLiteStore) CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error

CreateApprovalRequest creates a new approval request in the database

func (*SQLiteStore) CreateTask

func (s *SQLiteStore) CreateTask(ctx context.Context, task *TaskRecord) error

CreateTask creates a new task

func (*SQLiteStore) DeleteOldApprovals

func (s *SQLiteStore) DeleteOldApprovals(ctx context.Context, olderThan time.Duration) (int, error)

DeleteOldApprovals removes approval requests older than the specified duration

func (*SQLiteStore) DeleteOldTaskEvents

func (s *SQLiteStore) DeleteOldTaskEvents(ctx context.Context, olderThan time.Duration) (int, error)

DeleteOldTaskEvents removes events older than the specified duration

func (*SQLiteStore) DeleteOldTasks

func (s *SQLiteStore) DeleteOldTasks(ctx context.Context, olderThan time.Duration) (int, error)

DeleteOldTasks removes tasks older than the specified duration

func (*SQLiteStore) DeleteTask

func (s *SQLiteStore) DeleteTask(ctx context.Context, id string) error

DeleteTask deletes a task

func (*SQLiteStore) DeleteTaskEvents

func (s *SQLiteStore) DeleteTaskEvents(ctx context.Context, taskID string) error

DeleteTaskEvents removes all events for a task

func (*SQLiteStore) FindDuplicateTask

func (s *SQLiteStore) FindDuplicateTask(ctx context.Context, fingerprint uint64, threshold float64) (*TaskRecord, error)

FindDuplicateTask finds a similar task by fingerprint

func (*SQLiteStore) GetApprovalRequest

func (s *SQLiteStore) GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error)

GetApprovalRequest retrieves an approval request by ID

func (*SQLiteStore) GetApprovalRequestByTask

func (s *SQLiteStore) GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)

GetApprovalRequestByTask retrieves a pending approval request for a task

func (*SQLiteStore) GetApprovalRequestByTaskAnyStatus

func (s *SQLiteStore) GetApprovalRequestByTaskAnyStatus(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)

GetApprovalRequestByTaskAnyStatus retrieves the approval request for a task regardless of status. Use this when you need to fetch the approval context after the request has been processed (e.g., for handoffs).

func (*SQLiteStore) GetCostByProvider

func (s *SQLiteStore) GetCostByProvider() (map[string]float64, error)

GetCostByProvider returns total cost per provider for budget tracking.

func (*SQLiteStore) GetTask

func (s *SQLiteStore) GetTask(ctx context.Context, id string) (*TaskRecord, error)

GetTask retrieves a task by ID

func (*SQLiteStore) GetTaskAgentInfo

func (s *SQLiteStore) GetTaskAgentInfo(ctx context.Context, taskID string) (agentID, inbox, title string, err error)

GetTaskAgentInfo returns agent info for a task (for cross-db correlation in Control Plane)

func (*SQLiteStore) GetTaskEvents

func (s *SQLiteStore) GetTaskEvents(ctx context.Context, taskID string, limit int) ([]*TaskEventRecord, error)

GetTaskEvents retrieves all events for a task

func (*SQLiteStore) GetTaskStats

func (s *SQLiteStore) GetTaskStats(ctx context.Context) (*TaskStats, error)

GetTaskStats returns aggregate statistics

func (*SQLiteStore) GetTasksByGithubIssue

func (s *SQLiteStore) GetTasksByGithubIssue(ctx context.Context, issueNum int) ([]*TaskRecord, error)

GetTasksByGithubIssue retrieves all tasks linked to a GitHub issue

func (*SQLiteStore) GetTasksByStage

func (s *SQLiteStore) GetTasksByStage(ctx context.Context, stage TaskStage) ([]*TaskRecord, error)

GetTasksByStage retrieves all tasks in a specific pipeline stage

func (*SQLiteStore) ListApprovedMergeHandoffsWithoutTrigger

func (s *SQLiteStore) ListApprovedMergeHandoffsWithoutTrigger(ctx context.Context) ([]*ApprovalRequestRecord, error)

ListApprovedMergeHandoffsWithoutTrigger finds approved merge_handoff requests where handoffs were never sent. These are approvals that were processed before the handoff triggering code was deployed (catch-up mechanism). Only returns approvals from the last 7 days to avoid re-triggering very old approvals.

func (*SQLiteStore) ListPendingApprovals

func (s *SQLiteStore) ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)

ListPendingApprovals retrieves all pending approval requests

func (*SQLiteStore) ListResolvedApprovals

func (s *SQLiteStore) ListResolvedApprovals(ctx context.Context, limit int) ([]*ApprovalRequestRecord, error)

ListResolvedApprovals returns resolved (approved/rejected) approval requests

func (*SQLiteStore) ListTasks

func (s *SQLiteStore) ListTasks(ctx context.Context, filter *TaskFilter) ([]*TaskRecord, error)

ListTasks retrieves tasks matching the filter

func (*SQLiteStore) MarkApprovalHandoffsTriggered

func (s *SQLiteStore) MarkApprovalHandoffsTriggered(ctx context.Context, taskID string) error

MarkApprovalHandoffsTriggered marks that handoffs have been sent for an approval request. This is used to track whether handoffs were triggered, enabling catch-up on daemon startup.

func (*SQLiteStore) MarkTaskCancelled

func (s *SQLiteStore) MarkTaskCancelled(ctx context.Context, id string) error

MarkTaskCancelled marks a task as cancelled

func (*SQLiteStore) MarkTaskCompleted

func (s *SQLiteStore) MarkTaskCompleted(ctx context.Context, id string, result *ExecuteResult) error

MarkTaskCompleted marks a task as completed with results

func (*SQLiteStore) MarkTaskFailed

func (s *SQLiteStore) MarkTaskFailed(ctx context.Context, id string, taskErr error) error

MarkTaskFailed marks a task as failed

func (*SQLiteStore) MarkTaskPendingApproval

func (s *SQLiteStore) MarkTaskPendingApproval(ctx context.Context, id, worktreePath, worktreeBranch, baseBranch, baseCommit string, result *ExecuteResult) error

MarkTaskPendingApproval marks a task as awaiting human approval

func (*SQLiteStore) MarkTaskQueued

func (s *SQLiteStore) MarkTaskQueued(ctx context.Context, id string) error

MarkTaskQueued marks a task as queued

func (*SQLiteStore) MarkTaskRejected

func (s *SQLiteStore) MarkTaskRejected(ctx context.Context, id string) error

MarkTaskRejected marks a task as rejected by human

func (*SQLiteStore) MarkTaskRunning

func (s *SQLiteStore) MarkTaskRunning(ctx context.Context, id, provider, worktreeID string) error

MarkTaskRunning marks a task as running

func (*SQLiteStore) RecoverStaleTasks

func (s *SQLiteStore) RecoverStaleTasks(ctx context.Context, staleThreshold time.Duration) (int, error)

RecoverStaleTasks marks stale running/queued tasks as cancelled on daemon startup. This handles tasks that were running when the daemon crashed or was killed.

func (*SQLiteStore) ReopenTask

func (s *SQLiteStore) ReopenTask(ctx context.Context, taskID string) error

ReopenTask moves a rejected/cancelled task back to pending_approval status

func (*SQLiteStore) RequeueTask

func (s *SQLiteStore) RequeueTask(ctx context.Context, id string) error

RequeueTask resets a task to pending status for re-execution.

func (*SQLiteStore) ResetTaskToPending

func (s *SQLiteStore) ResetTaskToPending(ctx context.Context, id string) error

ResetTaskToPending resets a running task back to pending state.

func (*SQLiteStore) ResolveApprovalRequest

func (s *SQLiteStore) ResolveApprovalRequest(ctx context.Context, id string, status string, resolvedBy string) error

ResolveApprovalRequest marks an approval request as approved or rejected

func (*SQLiteStore) ResolveApprovalRequestByTask

func (s *SQLiteStore) ResolveApprovalRequestByTask(ctx context.Context, taskID string, status string, resolvedBy string) error

ResolveApprovalRequestByTask marks approval requests for a task as approved or rejected. Resolves both "merge" and "merge_handoff" type approvals - pure "handoff" approvals must be resolved separately via ResolveApprovalRequest.

func (*SQLiteStore) RetryAllFailedTasks

func (s *SQLiteStore) RetryAllFailedTasks(ctx context.Context) (int, error)

RetryAllFailedTasks resets all failed tasks to pending so they will be retried.

func (*SQLiteStore) SetTaskDesignDocPath

func (s *SQLiteStore) SetTaskDesignDocPath(ctx context.Context, id string, path string) error

SetTaskDesignDocPath stores the design doc path for a task

func (*SQLiteStore) SetTaskFingerprint

func (s *SQLiteStore) SetTaskFingerprint(ctx context.Context, id string, fingerprint uint64) error

SetTaskFingerprint sets the fingerprint for duplicate detection

func (*SQLiteStore) SetTaskGithubIssue

func (s *SQLiteStore) SetTaskGithubIssue(ctx context.Context, id string, issueNum int) error

SetTaskGithubIssue links a task to a GitHub issue number

func (*SQLiteStore) SetTaskSprintPlanPath

func (s *SQLiteStore) SetTaskSprintPlanPath(ctx context.Context, id string, path string) error

SetTaskSprintPlanPath stores the sprint plan path for a task

func (*SQLiteStore) SetTaskStage

func (s *SQLiteStore) SetTaskStage(ctx context.Context, id string, stage TaskStage) error

SetTaskStage sets the pipeline stage for a task

func (*SQLiteStore) SetTaskThreadID

func (s *SQLiteStore) SetTaskThreadID(ctx context.Context, id string, threadID string) error

SetTaskThreadID links a task to a thread in collaboration.db for dashboard visibility

func (*SQLiteStore) StoreTaskEvent

func (s *SQLiteStore) StoreTaskEvent(ctx context.Context, event *TaskEventRecord) error

StoreTaskEvent saves a task streaming event to the database

func (*SQLiteStore) UpdateTask

func (s *SQLiteStore) UpdateTask(ctx context.Context, task *TaskRecord) error

UpdateTask updates an existing task

func (*SQLiteStore) UpdateTaskChainInfo

func (s *SQLiteStore) UpdateTaskChainInfo(ctx context.Context, id, chainID, stageID string) error

UpdateTaskChainInfo updates the chain_id and stage_id for a task (M-CHAINS-SIMPLIFY)

func (*SQLiteStore) UpdateTaskMetrics

func (s *SQLiteStore) UpdateTaskMetrics(ctx context.Context, id string, peakCPU, peakMemory float64) error

UpdateTaskMetrics updates peak resource metrics for a task

type ScriptProvider

type ScriptProvider struct {
	// contains filtered or unexported fields
}

ScriptProvider executes shell scripts for deterministic workflow tasks. Unlike AI providers, scripts run locally with predictable output.

Usage in agent config:

invoke:
  type: script
  command: ./scripts/run-eval.sh
  env_from_payload: true
  timeout: 30m

func NewScriptProvider

func NewScriptProvider() *ScriptProvider

NewScriptProvider creates a new script execution provider.

func (*ScriptProvider) CanHandle

func (p *ScriptProvider) CanHandle(task *AnalyzedTask) bool

CanHandle returns true for tasks with script invoke type. Note: Script tasks are explicitly routed via InvokeConfig, not auto-detected.

func (*ScriptProvider) Execute

func (p *ScriptProvider) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)

Execute runs a shell script with environment variables from the task payload.

func (*ScriptProvider) Name

func (p *ScriptProvider) Name() string

Name returns the provider identifier.

type SprintPlanResult

type SprintPlanResult struct {
	Path         string   // Path to .md file (may be empty if no .md found)
	AllArtifacts []string // All discovered artifacts (any file type)
	Duration     time.Duration
	Cost         float64
	TokensUsed   int
	InputTokens  int
	OutputTokens int
}

SprintPlanResult contains the result of sprint plan creation.

type StageExecutionResult

type StageExecutionResult struct {
	// Design doc stage
	DesignDocPath string

	// Sprint plan stage
	SprintPlanPath string

	// Implementation stage
	BranchName    string
	FilesCreated  []string
	FilesModified []string

	// Common fields
	Duration     time.Duration
	Cost         float64
	TokensUsed   int
	InputTokens  int
	OutputTokens int
}

StageExecutionResult contains extracted artifacts from stage execution

func ParseStageOutput

func ParseStageOutput(output string, stage TaskStage) *StageExecutionResult

ParseStageOutput extracts structured artifacts from execution output. Handles both plain text markers and markdown-formatted markers:

  • DESIGN_DOC_PATH: path.md
  • **DESIGN_DOC_PATH**: `path.md`

type StaleTaskDetector

type StaleTaskDetector struct {
	// contains filtered or unexported fields
}

StaleTaskDetector periodically checks for tasks stuck in queued/running status and marks them as failed after their timeout expires. This catches cases where:

  • Cloud Run Job container fails to start (image pull, OOM, quota)
  • Pub/Sub completion publish fails
  • Job process crashes before publishing completion

Only runs in cloud mode (COORDINATOR_MODE=cloud). Local mode uses RecoverStaleTasks at startup instead.

func NewStaleTaskDetector

func NewStaleTaskDetector(store Store, agentRegistry *AgentRegistry, msgStore messaging.MessageStore, logger *log.Logger) *StaleTaskDetector

NewStaleTaskDetector creates a detector that checks for stale tasks periodically.

func (*StaleTaskDetector) Run

func (d *StaleTaskDetector) Run(ctx context.Context)

Run starts the periodic stale task check. Blocks until ctx is cancelled.

type Status

type Status struct {
	Running   bool      `json:"running"`
	PID       int       `json:"pid,omitempty"`
	StartedAt time.Time `json:"started_at,omitempty"`
	Uptime    string    `json:"uptime,omitempty"`
	TasksRun  int       `json:"tasks_run"`
	// Extended stats from database
	PendingTasks     int     `json:"pending_tasks,omitempty"`
	RunningTasks     int     `json:"running_tasks,omitempty"`
	PendingApprovals int     `json:"pending_approvals,omitempty"` // Tasks awaiting human approval
	FailedTasks      int     `json:"failed_tasks,omitempty"`
	TotalCost        float64 `json:"total_cost,omitempty"`
	TotalTokens      int     `json:"total_tokens,omitempty"`
}

Status represents the daemon's current state

type Store

type Store interface {
	// Task CRUD operations
	CreateTask(ctx context.Context, task *TaskRecord) error
	GetTask(ctx context.Context, id string) (*TaskRecord, error)
	UpdateTask(ctx context.Context, task *TaskRecord) error
	DeleteTask(ctx context.Context, id string) error

	// Task queries
	ListTasks(ctx context.Context, filter *TaskFilter) ([]*TaskRecord, error)
	GetTaskStats(ctx context.Context) (*TaskStats, error)

	// Task state transitions
	MarkTaskQueued(ctx context.Context, id string) error
	MarkTaskRunning(ctx context.Context, id, provider, worktreeID string) error
	MarkTaskPendingApproval(ctx context.Context, id, worktreePath, worktreeBranch, baseBranch, baseCommit string, result *ExecuteResult) error // Work done, awaiting human review
	MarkTaskCompleted(ctx context.Context, id string, result *ExecuteResult) error
	MarkTaskFailed(ctx context.Context, id string, err error) error
	MarkTaskRejected(ctx context.Context, id string) error // Human rejected the work
	MarkTaskCancelled(ctx context.Context, id string) error
	RequeueTask(ctx context.Context, id string) error        // Reset status to pending for next stage execution
	ResetTaskToPending(ctx context.Context, id string) error // Reset running task back to pending (worktree limit recovery)

	// Duplicate detection
	FindDuplicateTask(ctx context.Context, fingerprint uint64, threshold float64) (*TaskRecord, error)
	SetTaskFingerprint(ctx context.Context, id string, fingerprint uint64) error

	// Thread linking (for dashboard visibility)
	SetTaskThreadID(ctx context.Context, id string, threadID string) error

	// Execution chain tracking (M-CHAINS-SIMPLIFY)
	UpdateTaskChainInfo(ctx context.Context, id, chainID, stageID string) error

	// Cross-database correlation (for Control Plane event classification)
	// Returns agent info for a task: agentID (FromAgent), inbox (ToInbox), title
	// Note: By convention, agent id == inbox in agent config
	GetTaskAgentInfo(ctx context.Context, taskID string) (agentID, inbox, title string, err error)

	// GitHub integration (M-COORD-GITHUB-AUTO-ROUTING)
	SetTaskGithubIssue(ctx context.Context, id string, issueNum int) error
	SetTaskStage(ctx context.Context, id string, stage TaskStage) error
	SetTaskDesignDocPath(ctx context.Context, id string, path string) error
	SetTaskSprintPlanPath(ctx context.Context, id string, path string) error
	GetTasksByGithubIssue(ctx context.Context, issueNum int) ([]*TaskRecord, error)
	GetTasksByStage(ctx context.Context, stage TaskStage) ([]*TaskRecord, error)

	// Resource metrics
	UpdateTaskMetrics(ctx context.Context, id string, peakCPU, peakMemory float64) error

	// Budget tracking (per-provider)
	GetCostByProvider() (map[string]float64, error)

	// Approval requests
	CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error
	GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error)                    // Get by approval ID
	GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)          // Get pending by task ID
	GetApprovalRequestByTaskAnyStatus(ctx context.Context, taskID string) (*ApprovalRequestRecord, error) // Get approval regardless of status (for handoff triggering)
	ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)
	ListResolvedApprovals(ctx context.Context, limit int) ([]*ApprovalRequestRecord, error) // List resolved (approved/rejected) approvals
	ResolveApprovalRequest(ctx context.Context, id, status, resolvedBy string) error
	ResolveApprovalRequestByTask(ctx context.Context, taskID, status, resolvedBy string) error
	MarkApprovalHandoffsTriggered(ctx context.Context, taskID string) error                        // Mark that handoffs were sent
	ListApprovedMergeHandoffsWithoutTrigger(ctx context.Context) ([]*ApprovalRequestRecord, error) // Find missed handoffs

	// Cleanup
	DeleteOldTasks(ctx context.Context, olderThan time.Duration) (int, error)
	RecoverStaleTasks(ctx context.Context, staleThreshold time.Duration) (int, error) // Cancel stale running/queued tasks on startup
	RetryAllFailedTasks(ctx context.Context) (int, error)                             // Reset all failed tasks to pending

	// Event storage (for task logs)
	StoreTaskEvent(ctx context.Context, event *TaskEventRecord) error
	GetTaskEvents(ctx context.Context, taskID string, limit int) ([]*TaskEventRecord, error)

	// Lifecycle
	Close() error
}

Store is the neutral interface for task persistence. Implementations can be SQLite (local), PostgreSQL, or cloud services.

type StoreBackedApprovalCheckpoint

type StoreBackedApprovalCheckpoint struct {
	*ApprovalCheckpoint
	// contains filtered or unexported fields
}

StoreBackedApprovalCheckpoint wraps ApprovalCheckpoint with SQLite persistence. This allows CLI commands to approve/reject requests that the daemon is waiting on.

func NewStoreBackedApprovalCheckpoint

func NewStoreBackedApprovalCheckpoint(store ApprovalStore, defaultTimeout time.Duration) *StoreBackedApprovalCheckpoint

NewStoreBackedApprovalCheckpoint creates a store-backed approval checkpoint

func (*StoreBackedApprovalCheckpoint) RequestApproval

func (sac *StoreBackedApprovalCheckpoint) RequestApproval(ctx context.Context, request *ApprovalRequest) (ApprovalStatus, error)

RequestApproval creates an approval request and waits for resolution. It persists the request to the store and polls for status changes.

type StoreConfig

type StoreConfig struct {
	DBPath string
}

StoreConfig configures store creation

type Task

type Task struct {
	ID           string
	Title        string
	Content      string
	Kind         string // "directive" or "question" - affects execution mode
	Priority     int
	MessageID    string
	ParentTaskID string // Parent task ID for hierarchy tracking (M-TASK-HIERARCHY)
	CreatedAt    time.Time
	// M-TRANSCRIPT: Feedback loop support
	Iteration int    // Current iteration (1 = first run, 2+ = re-run with feedback)
	SessionID string // Session ID for resume capability
}

Task represents a task to be executed

type TaskAnalyzer

type TaskAnalyzer struct {
	// contains filtered or unexported fields
}

TaskAnalyzer analyzes and classifies tasks

func NewTaskAnalyzer

func NewTaskAnalyzer(similarityThreshold float64) *TaskAnalyzer

NewTaskAnalyzer creates a new task analyzer

func (*TaskAnalyzer) Analyze

func (a *TaskAnalyzer) Analyze(task *Task) *AnalyzedTask

Analyze processes a task and returns an AnalyzedTask

func (*TaskAnalyzer) ClearFingerprints

func (a *TaskAnalyzer) ClearFingerprints()

ClearFingerprints clears all stored fingerprints (useful for testing)

func (*TaskAnalyzer) FingerprintCount

func (a *TaskAnalyzer) FingerprintCount() int

FingerprintCount returns the number of stored fingerprints

type TaskChain

type TaskChain struct {
	// contains filtered or unexported fields
}

TaskChain manages the pipeline: design-doc → sprint-planner → sprint-executor. It handles stage transitions and GitHub notifications.

func NewTaskChain

func NewTaskChain(poster *GitHubPoster, store Store, watcher *ApprovalWatcher) *TaskChain

NewTaskChain creates a new task chain manager.

func (*TaskChain) OnAgentApproved

func (tc *TaskChain) OnAgentApproved(ctx context.Context, event *ApprovalEvent, agentID string) error

OnAgentApproved is called when an agent's work is approved via config-driven workflow.

func (*TaskChain) OnAgentComplete

func (tc *TaskChain) OnAgentComplete(ctx context.Context, taskID, agentID string, result *AgentResult, registry *AgentRegistry) error

OnAgentComplete is the unified handler for any agent completion. It uses the agent's ApprovalConfig to determine the appropriate GitHub labels and comment template, eliminating the need for hardcoded stage handlers.

This handler: 1. Stores the artifact path for later use 2. Reads artifact content from worktree (if applicable) 3. Posts completion comment to GitHub 4. Adds needs-approval label from agent config

For agents without ApprovalConfig, this is a no-op (agent handles its own workflow).

func (*TaskChain) OnDesignApproved

func (tc *TaskChain) OnDesignApproved(ctx context.Context, event *ApprovalEvent) error

OnDesignApproved is called when a design document is approved.

func (*TaskChain) OnDesignDocComplete

func (tc *TaskChain) OnDesignDocComplete(ctx context.Context, taskID string, result *DesignDocResult) error

OnDesignDocComplete is called when a design document is created. Posts a summary to GitHub and adds the needs-design-approval label.

func (*TaskChain) OnError

func (tc *TaskChain) OnError(ctx context.Context, taskID string, errMsg string) error

OnError is called when an error occurs during any stage.

func (*TaskChain) OnImplementationComplete

func (tc *TaskChain) OnImplementationComplete(ctx context.Context, taskID string, result *ImplementResult) error

OnImplementationComplete is called when implementation is finished.

func (*TaskChain) OnMergeApproved

func (tc *TaskChain) OnMergeApproved(ctx context.Context, event *ApprovalEvent) error

OnMergeApproved is called when merge is approved.

func (*TaskChain) OnNeedsRevision

func (tc *TaskChain) OnNeedsRevision(ctx context.Context, event *ApprovalEvent) error

OnNeedsRevision is called when revision is requested.

func (*TaskChain) OnSprintApproved

func (tc *TaskChain) OnSprintApproved(ctx context.Context, event *ApprovalEvent) error

OnSprintApproved is called when a sprint plan is approved.

func (*TaskChain) OnSprintPlanComplete

func (tc *TaskChain) OnSprintPlanComplete(ctx context.Context, taskID string, result *SprintPlanResult) error

OnSprintPlanComplete is called when a sprint plan is created.

func (*TaskChain) SetAgentRegistry

func (tc *TaskChain) SetAgentRegistry(registry *AgentRegistry)

SetAgentRegistry sets the agent registry for config-driven workflows.

func (*TaskChain) SetMessageStore

func (tc *TaskChain) SetMessageStore(msgStore messaging.MessageStore)

SetMessageStore sets the message store for handoff messages.

func (*TaskChain) StartTask

func (tc *TaskChain) StartTask(ctx context.Context, taskID string, issueNum int) error

StartTask initializes a new GitHub-linked task at the design stage. It first claims the issue to prevent race conditions with other coordinator instances.

type TaskEventContext

type TaskEventContext struct {
	Workspace  string // Working directory path
	Directive  string // Full initial directive/prompt
	AgentID    string // Agent identifier (e.g., "design-doc-creator")
	SourceType string // Event source: coordinator, eval, github, direct
}

TaskEventContext holds context information for task events. This is used to enrich events with workspace, directive, and agent info.

type TaskEventRecord

type TaskEventRecord struct {
	ID          int64     `json:"id"`
	TaskID      string    `json:"task_id"`
	ThreadID    string    `json:"thread_id,omitempty"`
	StreamType  string    `json:"stream_type"` // "text", "tool_use", "tool_result", "error", "status", "turn_start", "turn_end"
	TurnNum     int       `json:"turn_num,omitempty"`
	Text        string    `json:"text,omitempty"`
	ToolName    string    `json:"tool_name,omitempty"`
	ToolInput   string    `json:"tool_input,omitempty"`
	ToolOutput  string    `json:"tool_output,omitempty"`
	ErrorMsg    string    `json:"error_msg,omitempty"`
	Status      string    `json:"status,omitempty"`
	TokensIn    int       `json:"tokens_in,omitempty"`
	TokensOut   int       `json:"tokens_out,omitempty"`
	Cost        float64   `json:"cost,omitempty"`
	DurationSec int       `json:"duration_sec,omitempty"`
	CreatedAt   time.Time `json:"created_at"`
}

TaskEventRecord represents a stored task streaming event

type TaskExecutor

type TaskExecutor struct {
	// contains filtered or unexported fields
}

TaskExecutor orchestrates task execution across multiple providers. It routes tasks to the appropriate provider based on task type and provider capabilities.

func DefaultTaskExecutor

func DefaultTaskExecutor() (*TaskExecutor, error)

DefaultTaskExecutor creates a TaskExecutor with all available providers. It discovers executor-based providers from the global executor factory, so adding a new executor (e.g., codex) only requires registering it via init().

func NewTaskExecutor

func NewTaskExecutor(providers ...Provider) *TaskExecutor

NewTaskExecutor creates a new task executor with the given providers

func (*TaskExecutor) Execute

func (te *TaskExecutor) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)

Execute runs a task using the best available provider

func (*TaskExecutor) ExecuteWithRetry

func (te *TaskExecutor) ExecuteWithRetry(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions, maxRetries int) (*ExecuteResult, error)

ExecuteWithRetry runs a task with retry logic

func (*TaskExecutor) ListProviders

func (te *TaskExecutor) ListProviders() []string

ListProviders returns the names of all registered providers

type TaskFilter

type TaskFilter struct {
	Status    []TaskStatus
	Type      []TaskType
	Provider  string
	Workspace string // Filter by source workspace
	Since     *time.Time
	Until     *time.Time
	Limit     int
	Offset    int
	OrderBy   string // "created_at", "priority", "started_at"
	OrderDesc bool
}

TaskFilter for querying tasks

type TaskRecord

type TaskRecord struct {
	ID           string     `json:"id"`
	MessageID    string     `json:"message_id,omitempty"`
	ThreadID     string     `json:"thread_id,omitempty"`      // Thread in collaboration.db for dashboard visibility
	ParentTaskID string     `json:"parent_task_id,omitempty"` // Parent task for hierarchy tracking (handoffs)
	Title        string     `json:"title"`
	Content      string     `json:"content"`
	Type         TaskType   `json:"type"`
	Kind         string     `json:"kind,omitempty"` // "directive" or "question" - affects execution mode
	Priority     int        `json:"priority"`
	Status       TaskStatus `json:"status"`
	Provider     string     `json:"provider,omitempty"`
	AgentID      string     `json:"agent_id,omitempty"` // ID of agent that processed this task
	WorktreeID   string     `json:"worktree_id,omitempty"`
	WorktreePath string     `json:"worktree_path,omitempty"` // Path to git worktree (preserved until approval)
	BaseBranch   string     `json:"base_branch,omitempty"`   // Base branch worktree was created from (for diff comparison)
	BaseCommit   string     `json:"base_commit,omitempty"`   // Base commit hash at worktree creation (stable reference for diff)
	SessionID    string     `json:"session_id,omitempty"`    // Claude Code/Gemini CLI session for resumption
	Iteration    int        `json:"iteration,omitempty"`     // Iteration number (1 = first, 2+ = re-run with feedback)
	Workspace    string     `json:"workspace,omitempty"`     // Source workspace from thread (not worktree)
	// Execution chain tracking (M-CHAINS-SIMPLIFY)
	ChainID string `json:"chain_id,omitempty"` // ExecutionChain ID for unified hierarchy
	StageID string `json:"stage_id,omitempty"` // ChainStage ID for this agent's execution
	// GitHub integration (M-COORD-GITHUB-AUTO-ROUTING)
	GithubIssue    int       `json:"github_issue,omitempty"`     // Linked GitHub issue number
	GithubRepo     string    `json:"github_repo,omitempty"`      // GitHub repo (owner/repo) for issue operations
	Stage          TaskStage `json:"stage,omitempty"`            // Pipeline stage (design, sprint, implementation, merge)
	DesignDocPath  string    `json:"design_doc_path,omitempty"`  // Path to design doc (for merge comment)
	SprintPlanPath string    `json:"sprint_plan_path,omitempty"` // Path to sprint plan (for merge comment)
	// Timestamps
	CreatedAt   time.Time     `json:"created_at"`
	StartedAt   *time.Time    `json:"started_at,omitempty"`
	CompletedAt *time.Time    `json:"completed_at,omitempty"`
	Duration    time.Duration `json:"duration,omitempty"`
	Error       string        `json:"error,omitempty"`
	Output      string        `json:"output,omitempty"`
	Cost        float64       `json:"cost,omitempty"`
	TokensUsed  int           `json:"tokens_used,omitempty"`
	// Detailed token breakdown
	InputTokens  int `json:"input_tokens,omitempty"`
	OutputTokens int `json:"output_tokens,omitempty"`
	// Resource metrics
	PeakCPU    float64 `json:"peak_cpu,omitempty"`
	PeakMemory float64 `json:"peak_memory_mb,omitempty"`
	// Capability detection (M-DEPRECATE-AILANG-AGENT)
	Capabilities  []Capability `json:"capabilities,omitempty"`   // Detected capability requirements
	ImpactLevel   string       `json:"impact_level,omitempty"`   // "low", "medium", or "high"
	EstimatedCost float64      `json:"estimated_cost,omitempty"` // Pre-execution cost estimate in USD
	// M-HARNESS-COMMIT-CONTRACT: Website builder metadata from message payload
	SiteSlug string `json:"site_slug,omitempty"` // Site identifier for commit messages
	BriefID  string `json:"brief_id,omitempty"`  // Brief identifier for commit messages
}

TaskRecord represents a task stored in the database

type TaskStage deprecated

type TaskStage string

TaskStage represents the pipeline stage for GitHub-linked tasks.

Deprecated: Use agent_id tracking with trigger_on_complete configuration instead. Stage is maintained for backwards compatibility with existing tasks. Will be removed in v0.9.0. See M-GENERIC-PIPELINE design doc for migration guidance.

Migration guide:

  • Replace TaskStageDesign with agent_id = "design-doc-creator"
  • Replace TaskStageSprint with agent_id = "sprint-planner"
  • Replace TaskStageImplementation with agent_id = "sprint-executor"
  • Configure trigger_on_complete in ~/.ailang/config.yaml for automatic handoffs
const (
	// Deprecated: Use agent_id tracking instead
	TaskStageNone TaskStage = "" // Not part of a pipeline
	// Deprecated: Use agent_id = "design-doc-creator" with approval config instead
	TaskStageDesign TaskStage = "design" // Creating design document
	// Deprecated: Use agent_id = "sprint-planner" with approval config instead
	TaskStageSprint TaskStage = "sprint" // Creating sprint plan
	// Deprecated: Use agent_id = "sprint-executor" with approval config instead
	TaskStageImplementation TaskStage = "implementation" // Implementing the sprint
	// Deprecated: Use agent_id with approval config for merge workflow instead
	TaskStageMerge TaskStage = "merge" // Awaiting merge approval
)

type TaskStats

type TaskStats struct {
	TotalTasks       int                       `json:"total_tasks"`
	PendingTasks     int                       `json:"pending_tasks"`
	RunningTasks     int                       `json:"running_tasks"`
	PendingApprovals int                       `json:"pending_approvals"` // Tasks awaiting human approval
	CompletedTasks   int                       `json:"completed_tasks"`
	FailedTasks      int                       `json:"failed_tasks"`
	ByType           map[string]int            `json:"by_type"`
	ByProvider       map[string]*DetailedStats `json:"by_provider"`
	ByWorkspace      map[string]*DetailedStats `json:"by_workspace"` // Per-workspace breakdown
	TotalCost        float64                   `json:"total_cost"`
	TotalTokens      int                       `json:"total_tokens"`
	AvgDuration      time.Duration             `json:"avg_duration"`
}

TaskStats provides aggregate statistics

type TaskStatus

type TaskStatus string

TaskStatus represents the lifecycle state of a task

const (
	TaskStatusPending         TaskStatus = "pending"
	TaskStatusQueued          TaskStatus = "queued"
	TaskStatusRunning         TaskStatus = "running"
	TaskStatusPendingApproval TaskStatus = "pending_approval" // Work done, awaiting human review
	TaskStatusCompleted       TaskStatus = "completed"        // Approved and merged
	TaskStatusFailed          TaskStatus = "failed"
	TaskStatusRejected        TaskStatus = "rejected" // Human rejected the work
	TaskStatusCancelled       TaskStatus = "cancelled"
	TaskStatusDuplicate       TaskStatus = "duplicate"
)

type TaskType

type TaskType string

TaskType represents the category of a task

const (
	TaskTypeBugFix   TaskType = "bug-fix"
	TaskTypeFeature  TaskType = "feature"
	TaskTypeDocs     TaskType = "docs"
	TaskTypeResearch TaskType = "research"
	TaskTypeRefactor TaskType = "refactor"
	TaskTypeTest     TaskType = "test"
	TaskTypeUnknown  TaskType = "unknown"
)

type WatcherStatus

type WatcherStatus struct {
	Running       bool           `json:"running"`
	LastPoll      time.Time      `json:"last_poll"`
	PollInterval  time.Duration  `json:"poll_interval"`
	WatchedIssues map[int]string `json:"watched_issues"` // issue number -> task ID
}

WatcherStatus represents the current state of the ApprovalWatcher.

type WorkspaceMapping

type WorkspaceMapping struct {
	Pattern   string `yaml:"pattern" json:"pattern"`     // Glob pattern (e.g., "*/dev/sunholo/ailang")
	Workspace string `yaml:"workspace" json:"workspace"` // Workspace ID (e.g., "sunholo-data/ailang")
}

WorkspaceMapping defines a path pattern to workspace ID mapping.

type WorkspacesConfig

type WorkspacesConfig struct {
	Mappings         []WorkspaceMapping `yaml:"mappings" json:"mappings"`
	DefaultWorkspace string             `yaml:"default_workspace" json:"default_workspace"`
	DeriveFromPath   bool               `yaml:"derive_from_path" json:"derive_from_path"` // If true, derive workspace from path when not matched
}

WorkspacesConfig contains workspace-related configuration for access control.

func DefaultWorkspacesConfig

func DefaultWorkspacesConfig() *WorkspacesConfig

DefaultWorkspacesConfig returns minimal default workspace mappings. Only internal patterns are hardcoded - user projects should be in config. For unmapped paths, DeriveWorkspaceFromPath() extracts workspace ID from path.

func LoadWorkspacesConfig

func LoadWorkspacesConfig() *WorkspacesConfig

LoadWorkspacesConfig loads workspace configuration from ~/.ailang/config.yaml. Returns a default configuration if no config is set.

func (*WorkspacesConfig) BuildWorkspaceMappingSQL

func (c *WorkspacesConfig) BuildWorkspaceMappingSQL(cwdColumn string) string

BuildWorkspaceMappingSQL generates a SQL CASE statement for mapping file paths to workspace IDs. Used by the analytics backend to map process.cwd values to Firestore workspace IDs.

When DeriveFromPath is true, unmapped paths are derived using SQL expressions: - Paths with /dev/{org}/{repo} return "org/repo" - Other paths return "unknown"

func (*WorkspacesConfig) GetPathPatternsForWorkspace

func (c *WorkspacesConfig) GetPathPatternsForWorkspace(workspaceID string) []string

GetPathPatternsForWorkspace returns SQL LIKE patterns that match a workspace ID. Used for filtering spans by workspace when the filter is an org/repo ID. Returns nil if no matching mappings found (caller should use fallback).

Example: For workspace "MarkEdmondson1234/TwilightGame" with mapping {Pattern: "*/TwilightGame*", Workspace: "MarkEdmondson1234/TwilightGame"}, returns ["%/TwilightGame%"] as the pattern to match process.cwd values.

func (*WorkspacesConfig) GetWorkspaceLabel

func (c *WorkspacesConfig) GetWorkspaceLabel(workspaceID string) string

GetWorkspaceLabel returns a human-friendly label for a workspace ID. For internal workspaces, returns predefined labels. For user workspaces (org/repo format), returns a formatted label. For raw paths (from DeriveFromPath fallback), derives and formats the label.

type Worktree

type Worktree struct {
	TaskID     string
	Path       string
	Branch     string
	BaseBranch string // Base branch the worktree was created from (for diff comparison)
	BaseCommit string // Base commit hash at worktree creation (stable reference - branch may move)
	CreatedAt  time.Time
}

Worktree represents a git worktree for task isolation

type WorktreeChanges

type WorktreeChanges struct {
	TaskID       string   `json:"task_id"`
	Path         string   `json:"path"`
	Branch       string   `json:"branch"`
	FilesChanged []string `json:"files_changed"`
	DiffSummary  string   `json:"diff_summary"`
	CommitsAhead string   `json:"commits_ahead"`
}

WorktreeChanges describes the changes in a worktree

type WorktreeManager

type WorktreeManager struct {
	// contains filtered or unexported fields
}

WorktreeManager manages git worktrees for isolated task execution

func NewWorktreeManager

func NewWorktreeManager(repoDir, baseDir string, maxWorktrees int) (*WorktreeManager, error)

NewWorktreeManager creates a new worktree manager

func (*WorktreeManager) CleanupAll

func (wm *WorktreeManager) CleanupAll() error

CleanupAll removes all worktrees

func (*WorktreeManager) CleanupOrphaned

func (wm *WorktreeManager) CleanupOrphaned() (int, error)

CleanupOrphaned removes worktrees that no longer exist on disk

func (*WorktreeManager) Count

func (wm *WorktreeManager) Count() int

Count returns the number of active worktrees

func (*WorktreeManager) CreateWorktree

func (wm *WorktreeManager) CreateWorktree(taskID, baseBranch string) (*Worktree, error)

CreateWorktree creates a new worktree for a task. baseBranch specifies the branch to create the worktree from (e.g., "dev"). If baseBranch is empty, it queries git for the remote's default branch.

func (*WorktreeManager) GetChangeSummary

func (wm *WorktreeManager) GetChangeSummary(taskID string) (*WorktreeChanges, error)

GetChangeSummary returns a summary of changes in the worktree

func (*WorktreeManager) GetWorktree

func (wm *WorktreeManager) GetWorktree(taskID string) (*Worktree, bool)

GetWorktree returns a worktree by task ID

func (*WorktreeManager) HasChanges

func (wm *WorktreeManager) HasChanges(taskID string) (bool, error)

HasChanges checks if a worktree has uncommitted changes

func (*WorktreeManager) ListWorktrees

func (wm *WorktreeManager) ListWorktrees() []*Worktree

ListWorktrees returns all active worktrees

func (*WorktreeManager) RemoveWorktree

func (wm *WorktreeManager) RemoveWorktree(taskID string) error

RemoveWorktree removes a worktree

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL