Documentation
¶
Overview ¶
Package coordinator provides approval checkpoints for task execution.
Package coordinator provides unified approval processing for CLI, Dashboard, and Daemon.
Package coordinator provides task coordination and event formatting.
Package coordinator provides the event handler for streaming executor events.
Package coordinator provides human interaction handling for the feedback loop.
Package coordinator provides git merge operations for approved worktrees.
Package coordinator provides task coordination and execution.
Package coordinator provides task execution and orchestration for the AILANG daemon.
Package coordinator provides task execution and orchestration for the AILANG daemon.
Package coordinator provides resource tracking for task execution.
Index ¶
- Constants
- Variables
- func BuildDirectiveFromConfig(task *TaskRecord, agent *AgentConfig) string
- func BuildStageDirective(task *TaskRecord) string
- func BuildSystemPrompt(taskType TaskType, agentConfig *AgentConfig) string
- func CalculatePriority(analyzed *AnalyzedTask) int
- func CanRetrigger(task *TaskRecord, channels ...string) bool
- func CollectVersionHistory(ctx context.Context, msgStore messaging.MessageStore, taskStore Store, ...) *pkg.VersionHistory
- func CountTurns(events []*TaskEventRecord) int
- func DefaultArtifactPatterns(agentID string) []string
- func DefaultOutputMarkers(agentID string) []string
- func DefaultServerURL() string
- func DeriveWorkspaceFromPath(path string) string
- func ExtractFeedbackFromComments(comments []IssueComment) string
- func FormatEventsAsText(events []*TaskEventRecord, opts *FormatOptions) string
- func FormatWebhookSetupCommand(repo, coordinatorURL, secret string) string
- func GetCurrentPID() int
- func GetDefaultBranch(repoPath string) string
- func GetTurnTimestamp(events []*TaskEventRecord, turnNum int) *time.Time
- func GetWorktreeDiff(ctx context.Context, worktreePath, baseBranch, baseCommit string) (string, error)
- func HasMarkerValue(output, marker, expectedValue string) bool
- func IsBotUser(username string, additionalPatterns ...string) bool
- func IsCloudMode() bool
- func ParseOutputMarkers(output string, markers []string) map[string]string
- func ParsePayloadToEnv(payload string) ([]string, error)
- func PrepareTaskForRetrigger(task *TaskRecord, feedback string)
- func RenderComment(tmpl *template.Template, data *CommentData) (string, error)
- func RenderDesignDocComment(data *CommentData) (string, error)
- func RenderErrorComment(taskID, stage, errMsg string) (string, error)
- func RenderEvaluationComment(data *CommentData) (string, error)
- func RenderImplementCompleteComment(data *CommentData) (string, error)
- func RenderMergeCompleteComment(data *CommentData) (string, error)
- func RenderRevisionComment(taskID, stage string) (string, error)
- func RenderSprintPlanComment(data *CommentData) (string, error)
- func RenderWorkingComment(taskID, agent, stage string) (string, error)
- func SampleAgentConfig() string
- func SanitizeLog[S ~string](s S) string
- func ScheduleCascadeUpdate(index *pkg.RegistryIndex, triggerPkg string) ([]string, error)
- func SplitMarkerValues(value string) []string
- func StoreApprovalEvent(ctx context.Context, store Store, taskID string, approvedBy string) error
- func StoreFeedbackEvent(ctx context.Context, store Store, feedback *HumanFeedback) error
- func StoreIterationStartEvent(ctx context.Context, store Store, taskID string, iteration int) error
- func SummarizeEvents(events []*TaskEventRecord) string
- type APIKeyCache
- type AgentConfig
- func (a *AgentConfig) GetEffectiveApprovalConfig() *ApprovalConfig
- func (a *AgentConfig) GetEffectiveArtifactPatterns() []string
- func (a *AgentConfig) GetEffectiveIdleTimeout() time.Duration
- func (a *AgentConfig) GetEffectiveInvokeConfig() *InvokeConfig
- func (a *AgentConfig) GetEffectiveOutputMarkers() []string
- func (a *AgentConfig) GetEffectiveTimeout() time.Duration
- type AgentRegistry
- func (r *AgentRegistry) Clear()
- func (r *AgentRegistry) Count() int
- func (r *AgentRegistry) GetAgentByID(id string) *AgentConfig
- func (r *AgentRegistry) GetAgentForInbox(inbox string) *AgentConfig
- func (r *AgentRegistry) HasAgent(id string) bool
- func (r *AgentRegistry) HasInbox(inbox string) bool
- func (r *AgentRegistry) ListAgents() []*AgentConfig
- func (r *AgentRegistry) ListInboxes() []string
- func (r *AgentRegistry) Register(agent *AgentConfig) error
- func (r *AgentRegistry) Unregister(id string) error
- func (r *AgentRegistry) Validate() []string
- type AgentResult
- type AnalyzedTask
- type ApprovalCallback
- type ApprovalCheckpoint
- func (ac *ApprovalCheckpoint) Approve(requestID string, resolvedBy string) error
- func (ac *ApprovalCheckpoint) ApproveByTask(taskID string, resolvedBy string) error
- func (ac *ApprovalCheckpoint) Clear()
- func (ac *ApprovalCheckpoint) Count() int
- func (ac *ApprovalCheckpoint) GetPendingRequests() []*ApprovalRequest
- func (ac *ApprovalCheckpoint) GetRequest(requestID string) *ApprovalRequest
- func (ac *ApprovalCheckpoint) GetRequestByTask(taskID string) *ApprovalRequest
- func (ac *ApprovalCheckpoint) HasPendingApproval(taskID string) bool
- func (ac *ApprovalCheckpoint) Reject(requestID string, resolvedBy string) error
- func (ac *ApprovalCheckpoint) RejectByTask(taskID string, resolvedBy string) error
- func (ac *ApprovalCheckpoint) RequestApproval(ctx context.Context, request *ApprovalRequest) (ApprovalStatus, error)
- func (ac *ApprovalCheckpoint) SetCallback(callback ApprovalCallback)
- type ApprovalConfig
- type ApprovalEvent
- type ApprovalEventType
- type ApprovalHandler
- type ApprovalParams
- type ApprovalRequest
- type ApprovalRequestRecord
- type ApprovalResult
- type ApprovalStatus
- type ApprovalStore
- type ApprovalType
- type ApprovalWatcher
- func (w *ApprovalWatcher) GetAgentByLabel(label string) *AgentConfig
- func (w *ApprovalWatcher) GetRegisteredLabels() []string
- func (w *ApprovalWatcher) GetStatus() WatcherStatus
- func (w *ApprovalWatcher) IsRunning() bool
- func (w *ApprovalWatcher) LoadWatchedIssuesFromStore(ctx context.Context) error
- func (w *ApprovalWatcher) RegisterAgentApproval(agent *AgentConfig, handler ApprovalHandler) error
- func (w *ApprovalWatcher) RegisterAgentApprovalHandlers(registry *AgentRegistry, defaultHandler ApprovalHandler) (int, error)
- func (w *ApprovalWatcher) RegisterHandler(eventType ApprovalEventType, handler ApprovalHandler) error
- func (w *ApprovalWatcher) SetAgentRegistry(registry *AgentRegistry)
- func (w *ApprovalWatcher) Start(ctx context.Context) error
- func (w *ApprovalWatcher) Stop()
- func (w *ApprovalWatcher) UnwatchIssue(issueNumber int)
- func (w *ApprovalWatcher) WatchIssue(issueNumber int, taskID string) error
- func (w *ApprovalWatcher) WatchedIssueCount() int
- type ArtifactDiscovery
- func (ad *ArtifactDiscovery) DiscoverArtifacts(maxSize int64) (map[string]string, error)
- func (ad *ArtifactDiscovery) DiscoverChangedFiles() ([]string, error)
- func (ad *ArtifactDiscovery) ReadArtifactContent(relativePath string) (string, error)
- func (ad *ArtifactDiscovery) WithBaseBranch(branch string) *ArtifactDiscovery
- func (ad *ArtifactDiscovery) WithBaseCommit(commit string) *ArtifactDiscovery
- type BudgetsConfig
- type Capability
- type CapabilityDetector
- func (cd *CapabilityDetector) ClassifyImpact(caps []Capability) string
- func (cd *CapabilityDetector) DetectCapabilities(content string) []Capability
- func (cd *CapabilityDetector) EstimateTotalCost(caps []Capability, baseExecutionCost float64) float64
- func (cd *CapabilityDetector) FormatImpact(caps []Capability) string
- type CapabilityType
- type CascadeCircuitBreaker
- type ChangeClass
- type CloudDispatcher
- type CommentData
- type CompletionHandler
- type Config
- type ConfigFile
- type CoordinatorConfig
- type CoordinatorEventHandler
- func (h *CoordinatorEventHandler) EmitStatus(status string)
- func (h *CoordinatorEventHandler) GetEventBuffer() []*websocket.TaskStreamEvent
- func (h *CoordinatorEventHandler) IsThrottled() bool
- func (h *CoordinatorEventHandler) OnError(err error)
- func (h *CoordinatorEventHandler) OnText(text string)
- func (h *CoordinatorEventHandler) OnToolResult(toolName string, output string)
- func (h *CoordinatorEventHandler) OnToolUse(toolName string, input string)
- func (h *CoordinatorEventHandler) OnTurnEnd(turnNum int)
- func (h *CoordinatorEventHandler) OnTurnStart(turnNum int)
- func (h *CoordinatorEventHandler) SetEventStorer(storer EventStorer)
- func (h *CoordinatorEventHandler) SetTaskContext(ctx *TaskEventContext)
- func (h *CoordinatorEventHandler) UpdateMetrics(tokensIn, tokensOut int, cost float64)
- type Daemon
- func (d *Daemon) Close() error
- func (d *Daemon) GetActiveTaskMetrics() []*ResourceMetrics
- func (d *Daemon) GetContext() context.Context
- func (d *Daemon) GetLogger() *log.Logger
- func (d *Daemon) GetResourceRegistry() *ResourceTrackerRegistry
- func (d *Daemon) GetWatcherStatus() WatcherStatus
- func (d *Daemon) HandleApproval(ctx context.Context, taskID, approvedBy string) error
- func (d *Daemon) HandleRejection(ctx context.Context, taskID, rejectedBy, reason string) error
- func (d *Daemon) IncrementTasksRun()
- func (d *Daemon) ProcessStageCompletion(ctx context.Context, task *TaskRecord, execResult *ExecuteResult) error
- func (d *Daemon) Run() error
- func (d *Daemon) SetCloudDispatcher(dispatcher CloudDispatcher)
- func (d *Daemon) SetEventBroadcaster(broadcaster EventBroadcaster)
- func (d *Daemon) SetStores(taskStore Store, msgStore messaging.MessageStore, ...)
- func (d *Daemon) Start() error
- func (d *Daemon) Status() (*Status, error)
- func (d *Daemon) StatusJSON() (string, error)
- func (d *Daemon) Stop() error
- type DesignDocResult
- type DetailedStats
- type DispatchParams
- type EventBroadcaster
- type EventStorer
- type EventsResponse
- type ExecuteOptions
- type ExecuteResult
- type ExecutorProvider
- type FirebaseConfig
- type FormatOptions
- type GeminiAPIProvider
- type GitHubPoster
- func (p *GitHubPoster) AddLabel(issueNum int, label string) error
- func (p *GitHubPoster) AddLabelInRepo(repo string, issueNum int, label string) error
- func (p *GitHubPoster) ClaimIssue(issueNum int) error
- func (p *GitHubPoster) ClaimIssueInRepo(repo string, issueNum int) error
- func (p *GitHubPoster) Client() *messaging.GitHubClient
- func (p *GitHubPoster) CloseIssue(issueNum int, comment string) error
- func (p *GitHubPoster) CloseIssueInRepo(repo string, issueNum int, comment string) error
- func (p *GitHubPoster) EnsureLabel(label string) error
- func (p *GitHubPoster) EnsureLabelInRepo(repo, label string) error
- func (p *GitHubPoster) GetLabels(issueNum int) ([]string, error)
- func (p *GitHubPoster) GetLabelsInRepo(repo string, issueNum int) ([]string, error)
- func (p *GitHubPoster) GetLatestHumanComment(issueNum int, since time.Time) (*IssueComment, error)
- func (p *GitHubPoster) GetRecentHumanComments(issueNum int, since time.Time) ([]IssueComment, error)
- func (p *GitHubPoster) HasLabel(issueNum int, label string) (bool, error)
- func (p *GitHubPoster) HasLabelInRepo(repo string, issueNum int, label string) (bool, error)
- func (p *GitHubPoster) IsIssueClaimed(issueNum int) (bool, error)
- func (p *GitHubPoster) IsIssueClaimedInRepo(repo string, issueNum int) (bool, error)
- func (p *GitHubPoster) PostComment(issueNum int, body string) error
- func (p *GitHubPoster) PostCommentInRepo(repo string, issueNum int, body string) error
- func (p *GitHubPoster) PostFeedback(issueNum int, feedback string, iteration int, channel string) error
- func (p *GitHubPoster) PostFeedbackInRepo(repo string, issueNum int, feedback string, iteration int, channel string) error
- func (p *GitHubPoster) PostWorkingStatus(issueNum int, taskID, agent string) error
- func (p *GitHubPoster) PostWorkingStatusInRepo(repo string, issueNum int, taskID, agent string) error
- func (p *GitHubPoster) ReleaseIssue(issueNum int) error
- func (p *GitHubPoster) ReleaseIssueInRepo(repo string, issueNum int) error
- func (p *GitHubPoster) RemoveLabel(issueNum int, label string) error
- func (p *GitHubPoster) RemoveLabelInRepo(repo string, issueNum int, label string) error
- func (p *GitHubPoster) Repo() string
- type GitHubSyncConfig
- type GlobalBudget
- type HTTPBroadcaster
- type HumanFeedback
- type ImplementResult
- type InboxMessageAdapter
- type InvokeConfig
- type IssueComment
- type KMSEncrypter
- type LabelRouteConfig
- type MergeResult
- type Message
- type MessageStore
- type MessageWatcher
- type MockMessageStore
- type ObservatoryContext
- type ObservatorySync
- func (s *ObservatorySync) CompleteAgentAssignment(ctx context.Context, assignmentID string, success bool) error
- func (s *ObservatorySync) GetWorkspaceID(path string) string
- func (s *ObservatorySync) SyncAgentAssignment(ctx context.Context, taskID, agentID, provider string) (string, error)
- func (s *ObservatorySync) SyncTask(ctx context.Context, task *TaskRecord) error
- type PluginsConfig
- type Provider
- type ProviderLimit
- type PubSubBroadcaster
- type PubSubInboxAdapter
- type RepoSyncConfig
- type ResourceMetrics
- type ResourceTracker
- func (rt *ResourceTracker) GetMetrics() *ResourceMetrics
- func (rt *ResourceTracker) SetCost(cost float64)
- func (rt *ResourceTracker) SetPollInterval(interval time.Duration)
- func (rt *ResourceTracker) SetUpdateCallback(callback func(*ResourceMetrics))
- func (rt *ResourceTracker) Start(ctx context.Context)
- func (rt *ResourceTracker) Stop()
- func (rt *ResourceTracker) UpdateCost(cost float64)
- func (rt *ResourceTracker) UpdateTokens(inputTokens, outputTokens int)
- type ResourceTrackerRegistry
- func (r *ResourceTrackerRegistry) Get(taskID string) *ResourceTracker
- func (r *ResourceTrackerRegistry) GetAllMetrics() []*ResourceMetrics
- func (r *ResourceTrackerRegistry) Register(taskID string, tracker *ResourceTracker)
- func (r *ResourceTrackerRegistry) StopAll()
- func (r *ResourceTrackerRegistry) Unregister(taskID string)
- type SQLiteStore
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error
- func (s *SQLiteStore) CreateTask(ctx context.Context, task *TaskRecord) error
- func (s *SQLiteStore) DeleteOldApprovals(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *SQLiteStore) DeleteOldTaskEvents(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *SQLiteStore) DeleteOldTasks(ctx context.Context, olderThan time.Duration) (int, error)
- func (s *SQLiteStore) DeleteTask(ctx context.Context, id string) error
- func (s *SQLiteStore) DeleteTaskEvents(ctx context.Context, taskID string) error
- func (s *SQLiteStore) FindDuplicateTask(ctx context.Context, fingerprint uint64, threshold float64) (*TaskRecord, error)
- func (s *SQLiteStore) GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error)
- func (s *SQLiteStore) GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)
- func (s *SQLiteStore) GetApprovalRequestByTaskAnyStatus(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)
- func (s *SQLiteStore) GetCostByProvider() (map[string]float64, error)
- func (s *SQLiteStore) GetTask(ctx context.Context, id string) (*TaskRecord, error)
- func (s *SQLiteStore) GetTaskAgentInfo(ctx context.Context, taskID string) (agentID, inbox, title string, err error)
- func (s *SQLiteStore) GetTaskEvents(ctx context.Context, taskID string, limit int) ([]*TaskEventRecord, error)
- func (s *SQLiteStore) GetTaskStats(ctx context.Context) (*TaskStats, error)
- func (s *SQLiteStore) GetTasksByGithubIssue(ctx context.Context, issueNum int) ([]*TaskRecord, error)
- func (s *SQLiteStore) GetTasksByStage(ctx context.Context, stage TaskStage) ([]*TaskRecord, error)
- func (s *SQLiteStore) ListApprovedMergeHandoffsWithoutTrigger(ctx context.Context) ([]*ApprovalRequestRecord, error)
- func (s *SQLiteStore) ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)
- func (s *SQLiteStore) ListResolvedApprovals(ctx context.Context, limit int) ([]*ApprovalRequestRecord, error)
- func (s *SQLiteStore) ListTasks(ctx context.Context, filter *TaskFilter) ([]*TaskRecord, error)
- func (s *SQLiteStore) MarkApprovalHandoffsTriggered(ctx context.Context, taskID string) error
- func (s *SQLiteStore) MarkTaskCancelled(ctx context.Context, id string) error
- func (s *SQLiteStore) MarkTaskCompleted(ctx context.Context, id string, result *ExecuteResult) error
- func (s *SQLiteStore) MarkTaskFailed(ctx context.Context, id string, taskErr error) error
- func (s *SQLiteStore) MarkTaskPendingApproval(ctx context.Context, ...) error
- func (s *SQLiteStore) MarkTaskQueued(ctx context.Context, id string) error
- func (s *SQLiteStore) MarkTaskRejected(ctx context.Context, id string) error
- func (s *SQLiteStore) MarkTaskRunning(ctx context.Context, id, provider, worktreeID string) error
- func (s *SQLiteStore) RecoverStaleTasks(ctx context.Context, staleThreshold time.Duration) (int, error)
- func (s *SQLiteStore) ReopenTask(ctx context.Context, taskID string) error
- func (s *SQLiteStore) RequeueTask(ctx context.Context, id string) error
- func (s *SQLiteStore) ResetTaskToPending(ctx context.Context, id string) error
- func (s *SQLiteStore) ResolveApprovalRequest(ctx context.Context, id string, status string, resolvedBy string) error
- func (s *SQLiteStore) ResolveApprovalRequestByTask(ctx context.Context, taskID string, status string, resolvedBy string) error
- func (s *SQLiteStore) RetryAllFailedTasks(ctx context.Context) (int, error)
- func (s *SQLiteStore) SetTaskDesignDocPath(ctx context.Context, id string, path string) error
- func (s *SQLiteStore) SetTaskFingerprint(ctx context.Context, id string, fingerprint uint64) error
- func (s *SQLiteStore) SetTaskGithubIssue(ctx context.Context, id string, issueNum int) error
- func (s *SQLiteStore) SetTaskSprintPlanPath(ctx context.Context, id string, path string) error
- func (s *SQLiteStore) SetTaskStage(ctx context.Context, id string, stage TaskStage) error
- func (s *SQLiteStore) SetTaskThreadID(ctx context.Context, id string, threadID string) error
- func (s *SQLiteStore) StoreTaskEvent(ctx context.Context, event *TaskEventRecord) error
- func (s *SQLiteStore) UpdateTask(ctx context.Context, task *TaskRecord) error
- func (s *SQLiteStore) UpdateTaskChainInfo(ctx context.Context, id, chainID, stageID string) error
- func (s *SQLiteStore) UpdateTaskMetrics(ctx context.Context, id string, peakCPU, peakMemory float64) error
- type ScriptProvider
- type SprintPlanResult
- type StageExecutionResult
- type StaleTaskDetector
- type Status
- type Store
- type StoreBackedApprovalCheckpoint
- type StoreConfig
- type Task
- type TaskAnalyzer
- type TaskChain
- func (tc *TaskChain) OnAgentApproved(ctx context.Context, event *ApprovalEvent, agentID string) error
- func (tc *TaskChain) OnAgentComplete(ctx context.Context, taskID, agentID string, result *AgentResult, ...) error
- func (tc *TaskChain) OnDesignApproved(ctx context.Context, event *ApprovalEvent) error
- func (tc *TaskChain) OnDesignDocComplete(ctx context.Context, taskID string, result *DesignDocResult) error
- func (tc *TaskChain) OnError(ctx context.Context, taskID string, errMsg string) error
- func (tc *TaskChain) OnImplementationComplete(ctx context.Context, taskID string, result *ImplementResult) error
- func (tc *TaskChain) OnMergeApproved(ctx context.Context, event *ApprovalEvent) error
- func (tc *TaskChain) OnNeedsRevision(ctx context.Context, event *ApprovalEvent) error
- func (tc *TaskChain) OnSprintApproved(ctx context.Context, event *ApprovalEvent) error
- func (tc *TaskChain) OnSprintPlanComplete(ctx context.Context, taskID string, result *SprintPlanResult) error
- func (tc *TaskChain) SetAgentRegistry(registry *AgentRegistry)
- func (tc *TaskChain) SetMessageStore(msgStore messaging.MessageStore)
- func (tc *TaskChain) StartTask(ctx context.Context, taskID string, issueNum int) error
- type TaskEventContext
- type TaskEventRecord
- type TaskExecutor
- func (te *TaskExecutor) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
- func (te *TaskExecutor) ExecuteWithRetry(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions, maxRetries int) (*ExecuteResult, error)
- func (te *TaskExecutor) ListProviders() []string
- type TaskFilter
- type TaskRecord
- type TaskStagedeprecated
- type TaskStats
- type TaskStatus
- type TaskType
- type WatcherStatus
- type WorkspaceMapping
- type WorkspacesConfig
- type Worktree
- type WorktreeChanges
- type WorktreeManager
- func (wm *WorktreeManager) CleanupAll() error
- func (wm *WorktreeManager) CleanupOrphaned() (int, error)
- func (wm *WorktreeManager) Count() int
- func (wm *WorktreeManager) CreateWorktree(taskID, baseBranch string) (*Worktree, error)
- func (wm *WorktreeManager) GetChangeSummary(taskID string) (*WorktreeChanges, error)
- func (wm *WorktreeManager) GetWorktree(taskID string) (*Worktree, bool)
- func (wm *WorktreeManager) HasChanges(taskID string) (bool, error)
- func (wm *WorktreeManager) ListWorktrees() []*Worktree
- func (wm *WorktreeManager) RemoveWorktree(taskID string) error
Constants ¶
const ( CoordinatorModeLocal = "local" // Default: SQLite polling + HTTP broadcaster CoordinatorModeCloud = "cloud" // Pub/Sub subscriptions + Pub/Sub broadcaster )
CoordinatorMode determines how the coordinator receives messages and broadcasts events.
const LabelInProgress = "coordinator:in-progress"
LabelInProgress is the claim label used to prevent race conditions when multiple coordinator instances try to pick up the same issue.
const LabelNeedsRevision = "needs-revision"
LabelNeedsRevision is the universal label for requesting revisions. This is the only hardcoded label - all others come from AgentConfig.Approval.
const MaxAgentIterations = 3
MaxAgentIterations is the limit for agent-to-agent handoffs to prevent infinite loops. Human rejections (via dashboard or CLI) have no iteration limit.
Variables ¶
var CommentTemplates = struct { Working *template.Template DesignDocComplete *template.Template SprintPlanReady *template.Template ImplementComplete *template.Template EvaluationComplete *template.Template MergeComplete *template.Template NeedsRevision *template.Template Error *template.Template }{ Working: template.Must(template.New("working").Parse(workingTemplate)), DesignDocComplete: template.Must(template.New("design").Parse(designDocCompleteTemplate)), SprintPlanReady: template.Must(template.New("sprint").Parse(sprintPlanReadyTemplate)), ImplementComplete: template.Must(template.New("implement").Parse(implementCompleteTemplate)), EvaluationComplete: template.Must(template.New("evaluation").Parse(evaluationCompleteTemplate)), MergeComplete: template.Must(template.New("merge").Parse(mergeCompleteTemplate)), NeedsRevision: template.Must(template.New("revision").Parse(needsRevisionTemplate)), Error: template.Must(template.New("error").Parse(errorTemplate)), }
CommentTemplates holds all the comment templates.
var DefaultBotPatterns = []string{
"[bot]",
"github-actions",
"dependabot",
"renovate",
"codecov",
"stale",
"ailang-agent",
"sunholo-voight-kampff",
}
DefaultBotPatterns contains common bot username patterns to filter.
var ErrWorktreeLimitReached = errors.New("max worktrees limit reached")
ErrWorktreeLimitReached is returned when no worktrees are available. Tasks receiving this error should remain in queue for retry, not fail.
Functions ¶
func BuildDirectiveFromConfig ¶
func BuildDirectiveFromConfig(task *TaskRecord, agent *AgentConfig) string
BuildDirectiveFromConfig creates a directive based on agent configuration. This is the config-driven approach that replaces hardcoded skill names.
Supports three invoke types:
- "skill": Invokes a Claude Code skill by name (e.g., "/design-doc-creator")
- "agent": Sends a message to another agent (e.g., "sprint-planner")
- "prompt": Uses a custom template with variable substitution
Template variables available for prompt type:
- {{.TaskID}}: The task ID
- {{.GithubIssue}}: The GitHub issue number
- {{.Content}}: The original task content
- {{.Stage}}: The current task stage
- {{.OutputMarkers}}: Comma-separated list of expected output markers
func BuildStageDirective ¶
func BuildStageDirective(task *TaskRecord) string
BuildStageDirective creates a stage-appropriate directive for GitHub-linked tasks. This is the key integration point - tasks at different stages get different prompts that invoke the appropriate skills.
This function uses the config-driven approach by creating a temporary AgentConfig with the stage-appropriate agent ID, which triggers the effective defaults.
func BuildSystemPrompt ¶
func BuildSystemPrompt(taskType TaskType, agentConfig *AgentConfig) string
BuildSystemPrompt constructs the full system prompt for an agent. It combines: global meta-prompt + task type context + per-agent system prompt (if any).
func CalculatePriority ¶
func CalculatePriority(analyzed *AnalyzedTask) int
CalculatePriority calculates task priority based on keywords and type
func CanRetrigger ¶
func CanRetrigger(task *TaskRecord, channels ...string) bool
CanRetrigger checks if a task can be re-triggered. Human channels (dashboard, cli) have no limit; agent-to-agent has a max of MaxAgentIterations.
func CollectVersionHistory ¶
func CollectVersionHistory( ctx context.Context, msgStore messaging.MessageStore, taskStore Store, task *TaskRecord, pkgName, version, previousVersion string, ) *pkg.VersionHistory
CollectVersionHistory builds a VersionHistory by aggregating existing data sources: - The triggering inbox message (from messaging store) - Task events (from task store: turns, tool uses, completions, errors) - Approval history (from messaging store: created, approved, rejected)
No new data capture needed — everything is already recorded during execution.
func CountTurns ¶
func CountTurns(events []*TaskEventRecord) int
CountTurns returns the number of turns in the event list.
func DefaultArtifactPatterns ¶
DefaultArtifactPatterns returns the default file patterns for artifact discovery. These patterns are used with git diff to deterministically discover created/modified artifacts.
For known AILANG agents, returns specific patterns for their typical outputs. For unknown agents, returns universal pattern "**/*" to discover ALL changed files. This ensures artifact discovery works for any agent without hardcoded configuration.
func DefaultOutputMarkers ¶
DefaultOutputMarkers returns the default output markers for known AILANG agent IDs. Returns nil for unknown agents (no markers expected). Used by GetEffectiveOutputMarkers() when agent has no explicit YAML config. Consider using ArtifactPatterns + git diff instead for deterministic artifact discovery.
func DefaultServerURL ¶
func DefaultServerURL() string
DefaultServerURL returns the default Collaboration Hub server URL Note: Use 127.0.0.1 instead of localhost to avoid IPv6 resolution issues (Go's HTTP client tries IPv6 first, but server may only bind to IPv4)
func DeriveWorkspaceFromPath ¶
DeriveWorkspaceFromPath extracts a workspace ID from a file path. Uses the last two meaningful path segments (parent/basename) as the workspace ID. This is portable across different directory structures.
Examples:
- /Users/mark/dev/sunholo/ailang -> sunholo/ailang
- /home/user/projects/rockwool/ROCKGAP -> rockwool/ROCKGAP
- /path/to/TwilightGame -> to/TwilightGame (or just TwilightGame if only 1 meaningful segment)
- /tmp/foo -> foo
func ExtractFeedbackFromComments ¶
func ExtractFeedbackFromComments(comments []IssueComment) string
ExtractFeedbackFromComments combines all human comments into a single feedback string.
func FormatEventsAsText ¶
func FormatEventsAsText(events []*TaskEventRecord, opts *FormatOptions) string
FormatEventsAsText formats events for human-readable CLI output. Returns a string with turn separators and tool highlighting.
func FormatWebhookSetupCommand ¶
FormatWebhookSetupCommand returns the gh CLI command to configure a webhook for the given repo and coordinator URL.
func GetCurrentPID ¶
func GetCurrentPID() int
GetCurrentPID returns the current process PID (useful for self-monitoring)
func GetDefaultBranch ¶
GetDefaultBranch queries git for the remote's default branch. It first tries git symbolic-ref, then falls back to git remote show. This avoids hardcoding branch names like "main" or "dev".
func GetTurnTimestamp ¶
func GetTurnTimestamp(events []*TaskEventRecord, turnNum int) *time.Time
GetTurnTimestamp returns the timestamp of the first event in a turn.
func GetWorktreeDiff ¶
func GetWorktreeDiff(ctx context.Context, worktreePath, baseBranch, baseCommit string) (string, error)
GetWorktreeDiff returns the git diff for a worktree against its base. baseCommit is the commit hash at worktree creation (stable - branch may have moved). baseBranch is the branch name (may have moved since worktree creation). Prefers baseCommit if provided, falls back to baseBranch, then queries git for default.
func HasMarkerValue ¶
HasMarkerValue checks if a specific marker value is present in output. Useful for boolean markers like "IMPLEMENTATION_COMPLETE: true".
func ParseOutputMarkers ¶
ParseOutputMarkers extracts values for configured markers from execution output. This is the config-driven approach that replaces the hardcoded ParseStageOutput.
It searches for each marker in the output and extracts the value that follows. Markers should be in the format "MARKER_NAME:" (with or without trailing colon).
Returns a map of marker name -> extracted value. Returns empty map if no markers are found or markers slice is empty.
For multi-value markers (comma-separated), the full comma-separated string is returned. Use splitMarkerValues() to split into individual values if needed.
func ParsePayloadToEnv ¶
ParsePayloadToEnv converts a JSON payload to environment variables. Keys are converted to UPPER_SNAKE_CASE. Nested objects are flattened: {"db": {"host": "x"}} → DB_HOST=x Arrays become comma-separated: ["a", "b"] → VALUE=a,b
func PrepareTaskForRetrigger ¶
func PrepareTaskForRetrigger(task *TaskRecord, feedback string)
PrepareTaskForRetrigger prepares feedback content for a re-triggered task. NOTE (M-TASK-HIERARCHY): This function no longer modifies task status. The new workflow creates a NEW task via the message system with parent_task_id linking, and the old task is marked as "rejected" separately via store.MarkTaskRejected().
This function now only: - Increments iteration count for tracking - Appends feedback to content for context propagation
func RenderComment ¶
func RenderComment(tmpl *template.Template, data *CommentData) (string, error)
RenderComment renders a comment template with the given data.
func RenderDesignDocComment ¶
func RenderDesignDocComment(data *CommentData) (string, error)
RenderDesignDocComment renders the design doc complete comment.
func RenderErrorComment ¶
RenderErrorComment renders the error comment.
func RenderEvaluationComment ¶
func RenderEvaluationComment(data *CommentData) (string, error)
RenderEvaluationComment renders the evaluation complete comment.
func RenderImplementCompleteComment ¶
func RenderImplementCompleteComment(data *CommentData) (string, error)
RenderImplementCompleteComment renders the implementation complete comment.
func RenderMergeCompleteComment ¶
func RenderMergeCompleteComment(data *CommentData) (string, error)
RenderMergeCompleteComment renders the merge complete comment.
func RenderRevisionComment ¶
RenderRevisionComment renders the needs revision comment.
func RenderSprintPlanComment ¶
func RenderSprintPlanComment(data *CommentData) (string, error)
RenderSprintPlanComment renders the sprint plan ready comment.
func RenderWorkingComment ¶
RenderWorkingComment renders the "working" status comment.
func SampleAgentConfig ¶
func SampleAgentConfig() string
SampleAgentConfig returns a sample configuration string for documentation.
func SanitizeLog ¶
SanitizeLog strips CR, LF, and ASCII control characters from a string so it cannot forge log lines when interpolated into a log message. Wrap any user-controlled value (task IDs, labels, repo names, issue titles) at the log call site. The generic constraint allows named string types (e.g. ApprovalEventType) without an explicit cast.
func ScheduleCascadeUpdate ¶
func ScheduleCascadeUpdate(index *pkg.RegistryIndex, triggerPkg string) ([]string, error)
ScheduleCascadeUpdate determines the execution order for packages affected by an update to triggerPkg. Returns package names in topological order (leaf deps first). Returns an error if a cycle is detected.
func SplitMarkerValues ¶
SplitMarkerValues splits a comma-separated marker value into individual values. Useful for markers like "FILES_CREATED: file1.go, file2.go, file3.go". Returns nil if value is empty or "none"/"None".
func StoreApprovalEvent ¶
StoreApprovalEvent stores human approval as a task event.
func StoreFeedbackEvent ¶
func StoreFeedbackEvent(ctx context.Context, store Store, feedback *HumanFeedback) error
StoreFeedbackEvent stores human feedback as a task event for audit trail.
func StoreIterationStartEvent ¶
StoreIterationStartEvent marks the start of a new task iteration.
func SummarizeEvents ¶
func SummarizeEvents(events []*TaskEventRecord) string
SummarizeEvents returns a brief summary of the events.
Types ¶
type APIKeyCache ¶
type APIKeyCache struct {
// contains filtered or unexported fields
}
APIKeyCache is a thread-safe in-memory cache for user-provided API keys. Keys are stored with a TTL and deleted after retrieval (one-time use). M-CLOUD-DUAL-AUTH: API keys NEVER touch Firestore or Secret Manager.
func NewAPIKeyCache ¶
func NewAPIKeyCache(ttl time.Duration) *APIKeyCache
NewAPIKeyCache creates a cache with the given TTL.
func (*APIKeyCache) Retrieve ¶
func (c *APIKeyCache) Retrieve(messageID string) (string, bool)
Retrieve returns the API key for a message ID and deletes it (one-time use). Returns ("", false) if not found or expired.
func (*APIKeyCache) Store ¶
func (c *APIKeyCache) Store(messageID, apiKey string)
Store saves an API key keyed by message ID.
type AgentConfig ¶
type AgentConfig struct {
ID string `yaml:"id" json:"id"`
Label string `yaml:"label" json:"label"`
Inbox string `yaml:"inbox" json:"inbox"` // Message inbox to watch
Workspace string `yaml:"workspace" json:"workspace"` // Base directory for worktrees
Capabilities []string `yaml:"capabilities" json:"capabilities"` // e.g., ["code", "research", "docs"]
TriggerOnComplete []string `yaml:"trigger_on_complete" json:"trigger_on_complete"` // Agent IDs to trigger when this agent completes
AutoApproveHandoffs bool `yaml:"auto_approve_handoffs" json:"auto_approve_handoffs"` // Skip approval for agent-to-agent handoffs
AutoMerge bool `yaml:"auto_merge" json:"auto_merge"` // Automatically merge approved work
SkipApproval bool `yaml:"skip_approval" json:"skip_approval"` // Skip approval workflow entirely (for script agents)
Provider string `yaml:"provider" json:"provider"` // "claude" or "gemini"
MergeBranch string `yaml:"merge_branch" json:"merge_branch"` // Target branch for merges (e.g., "dev", "main")
MaxConcurrentTasks int `yaml:"max_concurrent_tasks" json:"max_concurrent_tasks"` // 0 = unlimited
SessionContinuity bool `yaml:"session_continuity" json:"session_continuity"` // Use --resume for Claude Code / --conversation-id for Gemini
// Generic workflow configuration (v0.6.3+)
Invoke *InvokeConfig `yaml:"invoke" json:"invoke,omitempty"` // How to invoke this agent
OutputMarkers []string `yaml:"output_markers" json:"output_markers,omitempty"` // Markers to extract from output (e.g., "DESIGN_DOC_PATH:")
ArtifactPatterns []string `yaml:"artifact_patterns" json:"artifact_patterns,omitempty"` // File patterns for artifacts (e.g., "*.md", "design_docs/**")
Approval *ApprovalConfig `yaml:"approval" json:"approval,omitempty"` // Approval workflow configuration
// Per-agent system prompt (v0.8.0+)
// Appended to the global meta-prompt for agent-specific instructions.
SystemPrompt string `yaml:"system_prompt" json:"system_prompt,omitempty"`
// Per-agent model override (v0.8.0+)
// Controls which Claude model is used for this agent's tasks.
// Examples: "opus", "sonnet", "haiku", "claude-opus-4-5-20251101"
// If empty, falls back to the executor's default (currently "haiku").
Model string `yaml:"model" json:"model,omitempty"`
// Per-agent execution timeout (v0.8.1+)
// Go duration string (e.g., "15m", "30m", "1h"). Default: 60m (hard ceiling).
Timeout string `yaml:"timeout" json:"timeout,omitempty"`
// Per-agent idle timeout (v0.8.1+)
// Kill if no streaming events for this long. Default: 3m.
// Distinguishes "agent is stuck" from "agent is working but slow".
IdleTimeout string `yaml:"idle_timeout" json:"idle_timeout,omitempty"`
// Per-agent effort level (Claude Code 2.1.47+: "low", "medium", "high")
// Controls how much effort Claude puts into the task. Default: unset (Claude's default).
Effort string `yaml:"effort" json:"effort,omitempty"`
// Per-agent plugin directories (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
// Local paths to Claude Code plugin directories containing .claude-plugin/plugin.json.
// Passed as --plugin-dir flags. Complementary to coordinator-level PluginRepo (for cloud).
PluginDirs []string `yaml:"plugin_dirs" json:"plugin_dirs,omitempty"`
// Per-agent third-party plugin configuration (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
// Installs marketplace and custom plugins before task execution.
Plugins *PluginsConfig `yaml:"plugins" json:"plugins,omitempty"`
// Per-agent auth mode override (M-CLOUD-DUAL-AUTH, v0.9.2).
// "oauth" (default) or "apikey" — selects Cloud Run Job template.
// User-provided API keys from messages override this setting.
AuthMode string `yaml:"auth_mode" json:"auth_mode,omitempty"`
// Per-agent git mode (M-GIT-GUARDRAILS, v0.9.2).
// Controls how the PreToolUse git_guard.sh hook restricts git operations:
// "guardrails" (default) — reads + commits allowed, push only to expected branch
// "strict" — all git write operations blocked (read-only)
// "permissive" — all allowed except force-push and reset --hard
GitMode string `yaml:"git_mode" json:"git_mode,omitempty"`
// Subdirectory within the workspace for monorepo support (M-PKG-AUTONOMOUS-UPDATES, v0.10.0).
// When set, the agent's working directory is scoped to this path within the worktree.
// Used for package agents that operate on a specific package within a monorepo.
// Example: "packages/auth" for a package agent in the ailang-packages monorepo.
Subdirectory string `yaml:"subdirectory" json:"subdirectory,omitempty"`
}
AgentConfig represents a configured agent in the coordinator system. Each agent has an inbox, workspace, and capabilities for task execution.
func AdjustAutonomyForChangeClass ¶
func AdjustAutonomyForChangeClass(agent *AgentConfig, msg *messaging.InboxMessage) *AgentConfig
AdjustAutonomyForChangeClass modifies an agent's approval settings based on the incoming package message's change class. Returns a copy of the config with adjusted autonomy levels. Non-package messages pass through unchanged.
func (*AgentConfig) GetEffectiveApprovalConfig ¶
func (a *AgentConfig) GetEffectiveApprovalConfig() *ApprovalConfig
GetEffectiveApprovalConfig returns the agent's approval config, or defaults for known agents.
func (*AgentConfig) GetEffectiveArtifactPatterns ¶
func (a *AgentConfig) GetEffectiveArtifactPatterns() []string
GetEffectiveArtifactPatterns returns the agent's artifact patterns, or defaults for known agents. These patterns are used with git diff to discover created/modified files.
func (*AgentConfig) GetEffectiveIdleTimeout ¶
func (a *AgentConfig) GetEffectiveIdleTimeout() time.Duration
GetEffectiveIdleTimeout returns the agent's configured idle timeout, or the default (3m). The agent is killed if no streaming events are produced for this duration. Safe to call on nil receiver.
func (*AgentConfig) GetEffectiveInvokeConfig ¶
func (a *AgentConfig) GetEffectiveInvokeConfig() *InvokeConfig
GetEffectiveInvokeConfig returns the agent's invoke config, or defaults for known agents. Returns nil for unknown agents without explicit config.
Note: Deprecation warnings for using defaults should be logged by the caller, as they have logger access. Check if result differs from explicit config.
func (*AgentConfig) GetEffectiveOutputMarkers ¶
func (a *AgentConfig) GetEffectiveOutputMarkers() []string
GetEffectiveOutputMarkers returns the agent's output markers, or defaults for known agents.
func (*AgentConfig) GetEffectiveTimeout ¶
func (a *AgentConfig) GetEffectiveTimeout() time.Duration
GetEffectiveTimeout returns the agent's configured hard ceiling timeout, or the default (60m). This is the maximum wall-clock time regardless of activity. Safe to call on nil receiver.
type AgentRegistry ¶
type AgentRegistry struct {
// contains filtered or unexported fields
}
AgentRegistry manages the set of configured agents. It provides thread-safe lookup by ID or inbox.
func LoadAgentRegistry ¶
func LoadAgentRegistry() (*AgentRegistry, error)
LoadAgentRegistry loads agents from config and returns a populated registry.
func LoadAgentRegistryFrom ¶
func LoadAgentRegistryFrom(configPath string) (*AgentRegistry, error)
LoadAgentRegistryFrom loads agents from a specific config path.
func NewAgentRegistry ¶
func NewAgentRegistry() *AgentRegistry
NewAgentRegistry creates an empty agent registry.
func (*AgentRegistry) Count ¶
func (r *AgentRegistry) Count() int
Count returns the number of registered agents.
func (*AgentRegistry) GetAgentByID ¶
func (r *AgentRegistry) GetAgentByID(id string) *AgentConfig
GetAgentByID returns an agent by its ID. Returns nil if not found.
func (*AgentRegistry) GetAgentForInbox ¶
func (r *AgentRegistry) GetAgentForInbox(inbox string) *AgentConfig
GetAgentForInbox returns the agent configured to watch the given inbox. Returns nil if no agent watches that inbox.
func (*AgentRegistry) HasAgent ¶
func (r *AgentRegistry) HasAgent(id string) bool
HasAgent returns true if an agent with the given ID is registered.
func (*AgentRegistry) HasInbox ¶
func (r *AgentRegistry) HasInbox(inbox string) bool
HasInbox returns true if an agent is registered for the given inbox.
func (*AgentRegistry) ListAgents ¶
func (r *AgentRegistry) ListAgents() []*AgentConfig
ListAgents returns all registered agents.
func (*AgentRegistry) ListInboxes ¶
func (r *AgentRegistry) ListInboxes() []string
ListInboxes returns all registered inbox names.
func (*AgentRegistry) Register ¶
func (r *AgentRegistry) Register(agent *AgentConfig) error
Register adds an agent to the registry. Returns an error if an agent with the same ID or inbox already exists.
func (*AgentRegistry) Unregister ¶
func (r *AgentRegistry) Unregister(id string) error
Unregister removes an agent by ID. Returns an error if the agent is not found.
func (*AgentRegistry) Validate ¶
func (r *AgentRegistry) Validate() []string
Validate checks that the registry is internally consistent. Returns a list of issues found.
type AgentResult ¶
type AgentResult struct {
// Artifact paths (set whichever is relevant)
ArtifactPath string // Primary artifact (design doc, sprint plan, etc.)
ArtifactContent string // Optional: content for GitHub comment display
AllArtifacts []string // All discovered artifacts from git diff
// Execution metrics
Duration time.Duration
Cost float64
TokensUsed int
InputTokens int
OutputTokens int
// Implementation-specific (for sprint-executor type agents)
BranchName string
WorktreePath string
FilesCreated []string
FilesModified []string
}
AgentResult contains the unified result of any agent completion. This struct supports all agent types (design-doc-creator, sprint-planner, sprint-executor, or any custom agent).
type AnalyzedTask ¶
type AnalyzedTask struct {
Task *Task
Type TaskType
Keywords []string
Fingerprint uint64
DuplicateOf string // ID of duplicate task, if any
// Capability detection (migrated from internal/agent)
Capabilities []Capability `json:"capabilities,omitempty"` // Detected capability requirements
ImpactLevel string `json:"impact_level,omitempty"` // "low", "medium", or "high"
EstimatedCost float64 `json:"estimated_cost,omitempty"` // Pre-execution cost estimate in USD
}
AnalyzedTask is a task with analysis metadata
type ApprovalCallback ¶
type ApprovalCallback func(request *ApprovalRequest)
ApprovalCallback is called when an approval is resolved
type ApprovalCheckpoint ¶
type ApprovalCheckpoint struct {
// contains filtered or unexported fields
}
ApprovalCheckpoint manages approval requests for a coordinator. It provides blocking wait for human decisions and timeout handling.
func NewApprovalCheckpoint ¶
func NewApprovalCheckpoint(defaultTimeout time.Duration) *ApprovalCheckpoint
NewApprovalCheckpoint creates a new approval checkpoint manager
func (*ApprovalCheckpoint) Approve ¶
func (ac *ApprovalCheckpoint) Approve(requestID string, resolvedBy string) error
Approve approves an approval request
func (*ApprovalCheckpoint) ApproveByTask ¶
func (ac *ApprovalCheckpoint) ApproveByTask(taskID string, resolvedBy string) error
ApproveByTask approves the pending request for a task
func (*ApprovalCheckpoint) Clear ¶
func (ac *ApprovalCheckpoint) Clear()
Clear removes all requests (for testing/cleanup)
func (*ApprovalCheckpoint) Count ¶
func (ac *ApprovalCheckpoint) Count() int
Count returns the number of pending approval requests
func (*ApprovalCheckpoint) GetPendingRequests ¶
func (ac *ApprovalCheckpoint) GetPendingRequests() []*ApprovalRequest
GetPendingRequests returns all pending approval requests
func (*ApprovalCheckpoint) GetRequest ¶
func (ac *ApprovalCheckpoint) GetRequest(requestID string) *ApprovalRequest
GetRequest returns an approval request by ID
func (*ApprovalCheckpoint) GetRequestByTask ¶
func (ac *ApprovalCheckpoint) GetRequestByTask(taskID string) *ApprovalRequest
GetRequestByTask returns the approval request for a task
func (*ApprovalCheckpoint) HasPendingApproval ¶
func (ac *ApprovalCheckpoint) HasPendingApproval(taskID string) bool
HasPendingApproval checks if a task has a pending approval request
func (*ApprovalCheckpoint) Reject ¶
func (ac *ApprovalCheckpoint) Reject(requestID string, resolvedBy string) error
Reject rejects an approval request
func (*ApprovalCheckpoint) RejectByTask ¶
func (ac *ApprovalCheckpoint) RejectByTask(taskID string, resolvedBy string) error
RejectByTask rejects the pending request for a task
func (*ApprovalCheckpoint) RequestApproval ¶
func (ac *ApprovalCheckpoint) RequestApproval(ctx context.Context, request *ApprovalRequest) (ApprovalStatus, error)
RequestApproval creates a new approval request and blocks until resolved. Returns the approval status (approved, rejected, or timeout).
func (*ApprovalCheckpoint) SetCallback ¶
func (ac *ApprovalCheckpoint) SetCallback(callback ApprovalCallback)
SetCallback sets the callback for approval resolution events
type ApprovalConfig ¶
type ApprovalConfig struct {
NeedsLabel string `yaml:"needs_label" json:"needs_label"` // Label to add when awaiting approval (e.g., "needs-design-approval")
ApprovedLabel string `yaml:"approved_label" json:"approved_label"` // Label that triggers next stage (e.g., "design-approved")
GithubCommentTemplate string `yaml:"github_comment_template" json:"github_comment_template"` // Template for GitHub comment on completion
}
ApprovalConfig specifies the approval workflow for an agent. Controls GitHub labels and comment templates for human-in-the-loop approval.
func DefaultApprovalConfig ¶
func DefaultApprovalConfig(agentID string) *ApprovalConfig
DefaultApprovalConfig returns the default approval config for known AILANG agent IDs. Returns nil for unknown agents (no approval workflow). Used by GetEffectiveApprovalConfig() when agent has no explicit YAML config.
type ApprovalEvent ¶
type ApprovalEvent struct {
TaskID string
IssueNumber int
Label string
EventType ApprovalEventType
// Feedback contains human comments harvested from GitHub (may be empty)
Feedback string
// FeedbackAuthor is the author of the most recent human comment
FeedbackAuthor string
// Channel indicates where the approval came from (github, dashboard, cli)
Channel string
}
ApprovalEvent represents a detected approval or revision request
type ApprovalEventType ¶
type ApprovalEventType string
ApprovalEventType categorizes the type of approval event
const ( ApprovalEventDesign ApprovalEventType = "design-approved" ApprovalEventSprint ApprovalEventType = "sprint-approved" ApprovalEventMerge ApprovalEventType = "merge-approved" ApprovalEventRevision ApprovalEventType = "needs-revision" )
type ApprovalHandler ¶
type ApprovalHandler func(ctx context.Context, event *ApprovalEvent) error
ApprovalHandler is called when an approval is detected
type ApprovalParams ¶
type ApprovalParams struct {
TaskID string // Required: task ID or approval ID (apr-xxx converted to task-xxx)
Action string // Required: "approve" or "reject"
ApprovedBy string // Required: who is approving/rejecting (e.g., "cli-user", "dashboard-user")
Channel string // Required: source channel ("cli", "dashboard", "daemon")
Feedback string // Optional: feedback text for rejections
MergeBranch string // Optional: target branch for merge (defaults from config, then channel)
// Behavior options
SkipMerge bool // If true, don't merge worktree on approval
KeepWorktree bool // If true, don't clean up worktree after merge
RetriggerOnReject bool // If true, send feedback to agent for re-attempt (feedback loop)
// Dependencies (injected by caller)
Store Store // Required: coordinator store for task/approval operations
MsgStore messaging.MessageStore // Optional: messaging store for sending feedback to agents
GitHubPoster *GitHubPoster // Optional: for posting feedback to GitHub issues
AgentRegistry *AgentRegistry // Optional: for looking up per-agent merge branch
ObsBackend observatory.Backend // Optional: for updating chain/stage status (M-CHAINS-SIMPLIFY)
}
ApprovalParams contains all parameters for processing an approval or rejection. This is the single interface used by CLI, Dashboard, and Daemon.
type ApprovalRequest ¶
type ApprovalRequest struct {
ID string `json:"id"`
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
Type ApprovalType `json:"type"`
Status ApprovalStatus `json:"status"`
Title string `json:"title"`
Description string `json:"description"`
DiffSummary string `json:"diff_summary,omitempty"`
FilesChanged []string `json:"files_changed,omitempty"`
CreatedAt time.Time `json:"created_at"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
ResolvedBy string `json:"resolved_by,omitempty"`
Timeout time.Duration `json:"timeout"`
AutoReject bool `json:"auto_reject"` // Reject on timeout instead of approve
// Handoff-specific fields (used when Type == ApprovalTypeHandoff)
SourceAgentID string `json:"source_agent_id,omitempty"` // Agent that completed the task
TargetAgentID string `json:"target_agent_id,omitempty"` // Agent to hand off to
SessionID string `json:"session_id,omitempty"` // Claude Code/Gemini CLI session for continuity
HandoffData string `json:"handoff_data,omitempty"` // Additional context for handoff
}
ApprovalRequest represents a pending approval request
type ApprovalRequestRecord ¶
type ApprovalRequestRecord struct {
ID string `json:"id"`
TaskID string `json:"task_id"`
Type string `json:"type"`
Description string `json:"description"`
ContextJSON string `json:"context_json,omitempty"`
Status string `json:"status"`
ResolvedBy string `json:"resolved_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
TimeoutAt *time.Time `json:"timeout_at,omitempty"`
AutoReject bool `json:"auto_reject"`
}
ApprovalRequestRecord is the database record for an approval request
type ApprovalResult ¶
type ApprovalResult struct {
Success bool
Message string // Human-readable result message
MergeCommit string // Commit hash if merged
MergedFiles []string // Files that were merged
ConflictFiles []string // Files with conflicts (if merge failed)
NewTaskID string // ID of new task if re-triggered
Error string // Error message if failed
}
ApprovalResult contains the result of processing an approval or rejection.
func ProcessApprovalRequest ¶
func ProcessApprovalRequest(ctx context.Context, params *ApprovalParams) (*ApprovalResult, error)
ProcessApprovalRequest handles approval/rejection from any channel (CLI, dashboard, daemon). This is the SINGLE source of truth for approval logic.
type ApprovalStatus ¶
type ApprovalStatus string
ApprovalStatus represents the state of an approval request
const ( ApprovalStatusPending ApprovalStatus = "pending" ApprovalStatusApproved ApprovalStatus = "approved" ApprovalStatusRejected ApprovalStatus = "rejected" ApprovalStatusTimeout ApprovalStatus = "timeout" )
type ApprovalStore ¶
type ApprovalStore interface {
CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error
GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error)
GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)
ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)
ResolveApprovalRequest(ctx context.Context, id string, status string, resolvedBy string) error
}
ApprovalStore interface for persistence
type ApprovalType ¶
type ApprovalType string
ApprovalType defines the kind of approval being requested
const ( ApprovalTypeMerge ApprovalType = "merge" // Request to merge worktree changes ApprovalTypeDestroy ApprovalType = "destroy" // Request to destroy worktree with changes ApprovalTypeExecute ApprovalType = "execute" // Request to execute a destructive operation ApprovalTypeCost ApprovalType = "cost" // Cost threshold exceeded ApprovalTypeHandoff ApprovalType = "handoff" // Request to hand off work to another agent )
type ApprovalWatcher ¶
type ApprovalWatcher struct {
// contains filtered or unexported fields
}
ApprovalWatcher polls GitHub for approval labels and triggers pipeline stages. It watches issues linked to tasks and detects when humans add approval labels.
The watcher supports two modes: 1. Legacy hardcoded labels (design-approved, sprint-approved, merge-approved) 2. Config-driven labels (from AgentConfig.Approval)
needs-revision is universal and works with both modes.
func NewApprovalWatcher ¶
func NewApprovalWatcher(poster *GitHubPoster, store Store, pollInterval time.Duration) *ApprovalWatcher
NewApprovalWatcher creates a new approval watcher.
func (*ApprovalWatcher) GetAgentByLabel ¶
func (w *ApprovalWatcher) GetAgentByLabel(label string) *AgentConfig
GetAgentByLabel returns the agent config for a given approval label. Returns nil if no agent is registered for that label.
func (*ApprovalWatcher) GetRegisteredLabels ¶
func (w *ApprovalWatcher) GetRegisteredLabels() []string
GetRegisteredLabels returns all registered custom approval labels.
func (*ApprovalWatcher) GetStatus ¶
func (w *ApprovalWatcher) GetStatus() WatcherStatus
GetStatus returns the current status of the watcher.
func (*ApprovalWatcher) IsRunning ¶
func (w *ApprovalWatcher) IsRunning() bool
IsRunning returns whether the watcher is currently running.
func (*ApprovalWatcher) LoadWatchedIssuesFromStore ¶
func (w *ApprovalWatcher) LoadWatchedIssuesFromStore(ctx context.Context) error
LoadWatchedIssuesFromStore loads all tasks with GitHub issues that are in active stages.
func (*ApprovalWatcher) RegisterAgentApproval ¶
func (w *ApprovalWatcher) RegisterAgentApproval(agent *AgentConfig, handler ApprovalHandler) error
RegisterAgentApproval registers an agent's approval configuration. This enables config-driven approval labels for the agent.
When the approved_label is detected on a watched issue, the handler will be called. The handler receives an ApprovalEvent with the label and associated task.
Example:
watcher.RegisterAgentApproval(agent, func(ctx context.Context, event *ApprovalEvent) error {
// Handle custom approval
return nil
})
func (*ApprovalWatcher) RegisterAgentApprovalHandlers ¶
func (w *ApprovalWatcher) RegisterAgentApprovalHandlers(registry *AgentRegistry, defaultHandler ApprovalHandler) (int, error)
RegisterAgentApprovalHandlers registers approval handlers for all agents in the registry that have approval configurations. This is a convenience method that iterates through all registered agents and calls RegisterAgentApproval for each one.
The provided defaultHandler is called for any agent that has an approval config. If you need per-agent handlers, use RegisterAgentApproval directly.
This enables config-driven approval workflows where new agents can be added without code changes - just add to the config file and restart.
Deprecated hardcoded labels (design-approved, sprint-approved, merge-approved) continue to work for backwards compatibility, but new agents should use config-driven labels.
func (*ApprovalWatcher) RegisterHandler ¶
func (w *ApprovalWatcher) RegisterHandler(eventType ApprovalEventType, handler ApprovalHandler) error
RegisterHandler registers a handler for a specific approval event type. Note: handler can be nil, which will effectively deregister the handler. Always validates eventType is not empty to prevent accidental misconfiguration.
func (*ApprovalWatcher) SetAgentRegistry ¶
func (w *ApprovalWatcher) SetAgentRegistry(registry *AgentRegistry)
SetAgentRegistry sets the agent registry for config-driven approval lookup.
func (*ApprovalWatcher) Start ¶
func (w *ApprovalWatcher) Start(ctx context.Context) error
Start begins the polling loop.
func (*ApprovalWatcher) UnwatchIssue ¶
func (w *ApprovalWatcher) UnwatchIssue(issueNumber int)
UnwatchIssue stops watching a GitHub issue.
func (*ApprovalWatcher) WatchIssue ¶
func (w *ApprovalWatcher) WatchIssue(issueNumber int, taskID string) error
WatchIssue starts watching a GitHub issue for approval labels. Returns error if issueNumber is invalid (<=0) or taskID is empty.
func (*ApprovalWatcher) WatchedIssueCount ¶
func (w *ApprovalWatcher) WatchedIssueCount() int
WatchedIssueCount returns the number of issues being watched.
type ArtifactDiscovery ¶
type ArtifactDiscovery struct {
WorktreePath string // Path to git worktree
Patterns []string // Glob patterns to filter files (e.g., "*.md", "design_docs/**")
BaseBranch string // Base branch to compare against (default: auto-detect)
BaseCommit string // Base commit hash (stable - branch may have moved since worktree creation)
}
ArtifactDiscovery provides deterministic artifact discovery using git diff. This is preferred over parsing output markers, as git accurately tracks all file changes.
func NewArtifactDiscovery ¶
func NewArtifactDiscovery(worktreePath string, patterns []string) *ArtifactDiscovery
NewArtifactDiscovery creates a new artifact discovery instance.
func (*ArtifactDiscovery) DiscoverArtifacts ¶
func (ad *ArtifactDiscovery) DiscoverArtifacts(maxSize int64) (map[string]string, error)
DiscoverArtifacts returns a map of file paths to their content. Only includes files matching the patterns that are under maxSize bytes.
func (*ArtifactDiscovery) DiscoverChangedFiles ¶
func (ad *ArtifactDiscovery) DiscoverChangedFiles() ([]string, error)
DiscoverChangedFiles returns files that were created or modified in the worktree. Uses git diff to compare against HEAD (or the base branch). Filters results by the configured patterns.
func (*ArtifactDiscovery) ReadArtifactContent ¶
func (ad *ArtifactDiscovery) ReadArtifactContent(relativePath string) (string, error)
ReadArtifactContent reads the content of an artifact file from the worktree.
func (*ArtifactDiscovery) WithBaseBranch ¶
func (ad *ArtifactDiscovery) WithBaseBranch(branch string) *ArtifactDiscovery
WithBaseBranch sets the base branch for comparison.
func (*ArtifactDiscovery) WithBaseCommit ¶
func (ad *ArtifactDiscovery) WithBaseCommit(commit string) *ArtifactDiscovery
WithBaseCommit sets the base commit hash for comparison (stable reference). This is preferred over BaseBranch as the branch may have moved since worktree creation.
type BudgetsConfig ¶
type BudgetsConfig struct {
Global *GlobalBudget `yaml:"global"`
Providers map[string]*ProviderLimit `yaml:"providers"`
}
BudgetsConfig represents budget limits from config.yaml
func DefaultBudgetsConfig ¶
func DefaultBudgetsConfig() *BudgetsConfig
DefaultBudgetsConfig returns sensible default budget limits
func LoadBudgetsConfig ¶
func LoadBudgetsConfig() (*BudgetsConfig, error)
LoadBudgetsConfig loads budget configuration from ~/.ailang/config.yaml. Respects AILANG_CONFIG env var for Cloud Run deployments.
func LoadBudgetsConfigFrom ¶
func LoadBudgetsConfigFrom(configPath string) (*BudgetsConfig, error)
LoadBudgetsConfigFrom loads budget configuration from a specific path
type Capability ¶
type Capability struct {
Type CapabilityType `json:"type"`
Paths []string `json:"paths,omitempty"` // Affected paths or URLs
BudgetDelta float64 `json:"budget_delta,omitempty"` // Estimated cost if Budget type
}
Capability represents a detected capability requirement
type CapabilityDetector ¶
type CapabilityDetector struct{}
CapabilityDetector analyzes task content and determines required capabilities
func NewCapabilityDetector ¶
func NewCapabilityDetector() *CapabilityDetector
NewCapabilityDetector creates a new capability detector
func (*CapabilityDetector) ClassifyImpact ¶
func (cd *CapabilityDetector) ClassifyImpact(caps []Capability) string
ClassifyImpact returns impact level as "low", "medium", or "high"
func (*CapabilityDetector) DetectCapabilities ¶
func (cd *CapabilityDetector) DetectCapabilities(content string) []Capability
DetectCapabilities analyzes content and returns required capabilities Returns nil if no special capabilities are needed (safe execution)
func (*CapabilityDetector) EstimateTotalCost ¶
func (cd *CapabilityDetector) EstimateTotalCost(caps []Capability, baseExecutionCost float64) float64
EstimateTotalCost calculates total estimated cost from capabilities
func (*CapabilityDetector) FormatImpact ¶
func (cd *CapabilityDetector) FormatImpact(caps []Capability) string
FormatImpact formats a human-readable impact description
type CapabilityType ¶
type CapabilityType string
CapabilityType represents the type of capability required
const ( // AILANG effect capabilities (map to language effects) CapabilityIO CapabilityType = "IO" // Console input/output CapabilityFS CapabilityType = "FS" // File system access CapabilityNet CapabilityType = "Net" // Network access CapabilityClock CapabilityType = "Clock" // Time/sleep operations CapabilityEnv CapabilityType = "Env" // Environment variables CapabilityAI CapabilityType = "AI" // AI/LLM operations CapabilityDebug CapabilityType = "Debug" // Debugging operations // Coordinator-specific capabilities (not language effects) CapabilityShell CapabilityType = "Shell" // Shell/bash execution (high risk) CapabilityBudget CapabilityType = "Budget" // High-cost operations )
type CascadeCircuitBreaker ¶
type CascadeCircuitBreaker struct {
MaxFailures int
CorrelationID string
// contains filtered or unexported fields
}
CascadeCircuitBreaker tracks failures during a cascade update. If MaxFailures consecutive failures occur, IsBroken returns true.
func (*CascadeCircuitBreaker) FailureCount ¶
func (cb *CascadeCircuitBreaker) FailureCount() int
FailureCount returns the current consecutive failure count.
func (*CascadeCircuitBreaker) IsBroken ¶
func (cb *CascadeCircuitBreaker) IsBroken() bool
IsBroken returns true if consecutive failures have reached the threshold.
func (*CascadeCircuitBreaker) RecordFailure ¶
func (cb *CascadeCircuitBreaker) RecordFailure()
RecordFailure increments the consecutive failure counter.
func (*CascadeCircuitBreaker) RecordSuccess ¶
func (cb *CascadeCircuitBreaker) RecordSuccess()
RecordSuccess resets the consecutive failure counter.
type ChangeClass ¶
type ChangeClass int
ChangeClass represents the severity level of a package update.
const ( // ChangeClassA is a patch/internal change — fully autonomous. ChangeClassA ChangeClass = iota // ChangeClassB is an additive/minor change — semi-autonomous. ChangeClassB // ChangeClassC is a breaking/major change — requires human approval. ChangeClassC )
func ClassifyChange ¶
func ClassifyChange(env *messaging.PackageMessageEnvelope) ChangeClass
ClassifyChange determines the change class from a package message envelope.
type CloudDispatcher ¶
type CloudDispatcher interface {
// Dispatch triggers execution of a task on the remote backend.
// The task is already persisted in the task store — the dispatcher
// only needs to trigger execution with the given parameters.
Dispatch(ctx context.Context, params DispatchParams) error
}
CloudDispatcher triggers remote task execution on a cloud backend. Implementations are backend-specific (Cloud Run Jobs, K8s Jobs, etc.) The coordinator calls Dispatch() without knowing the backend.
type CommentData ¶
type CommentData struct {
// Task information
TaskID string
Title string
Type string
Priority int
Stage string
Status string
Agent string
Workspace string
// GitHub context
IssueNumber int
Repository string
// Timing
StartedAt *time.Time
CompletedAt *time.Time
Duration time.Duration
// Results
Success bool
Output string
Error string
Cost float64
TokensUsed int
InputTokens int
OutputTokens int
// Evaluation results (sprint-evaluator)
EvaluationScore int
EvaluationResult string // "pass" or "fail"
EvaluationRound int
EvaluationReportPath string
FeedbackSummary string
// Artifacts
DesignDocPath string
DesignDocContent string // Actual markdown content of the design doc
SprintPlanPath string
SprintPlanContent string // Actual markdown content of the sprint plan
WorktreePath string
BranchName string
FilesCreated []string
FilesModified []string
// Custom data
Extra map[string]interface{}
}
CommentData contains all data available to comment templates.
type CompletionHandler ¶
type CompletionHandler struct {
// contains filtered or unexported fields
}
CompletionHandler processes task completions and updates task status in the coordinator's store when Cloud Run Jobs finish. Completions arrive either via pull subscription (Start) or push HTTP endpoint (HandleCompletion).
func NewCompletionHandler ¶
func NewCompletionHandler(subscriber *pubsub.Subscriber, taskStore Store, msgStore messaging.MessageStore, agentRegistry *AgentRegistry, logger *log.Logger) *CompletionHandler
NewCompletionHandler creates a handler that processes task completions. msgStore and agentRegistry are optional — if nil, completion notifications and skip_approval handling are disabled (backwards compatible).
func (*CompletionHandler) HandleCompletion ¶
func (h *CompletionHandler) HandleCompletion(data []byte, attrs map[string]string) error
HandleCompletion processes a task completion from either pull subscription or push HTTP endpoint. Returns nil to ack bad data (prevent retry loops).
func (*CompletionHandler) Start ¶
func (h *CompletionHandler) Start(ctx context.Context)
Start begins listening for completions via pull subscription in the background. Not used in push mode — the HTTP handler calls HandleCompletion() directly.
type Config ¶
type Config struct {
PollInterval time.Duration // How often to check for new messages
MaxWorktrees int // Maximum concurrent worktrees
LogFile string // Path to log file
PIDFile string // Path to PID file
StateDir string // Directory for state files
ApprovalPollInterval time.Duration // How often to poll GitHub for approval labels (M-COORD-GITHUB-AUTO-ROUTING)
DevMode bool // Skip stale detector + approval watcher, increase poll interval
}
Config holds daemon configuration
type ConfigFile ¶
type ConfigFile struct {
Coordinator *CoordinatorConfig `yaml:"coordinator"`
Budgets *BudgetsConfig `yaml:"budgets"`
Firebase *FirebaseConfig `yaml:"firebase"`
Workspaces *WorkspacesConfig `yaml:"workspaces"`
}
ConfigFile represents the full ~/.ailang/config.yaml structure.
type CoordinatorConfig ¶
type CoordinatorConfig struct {
Agents []*AgentConfig `yaml:"agents" json:"agents"`
DefaultProvider string `yaml:"default_provider" json:"default_provider"`
ClaudePath string `yaml:"claude_path" json:"claude_path,omitempty"` // Explicit path to Claude CLI binary (empty = auto-detect: native > PATH > NVM)
MergeBranch string `yaml:"merge_branch" json:"merge_branch"` // Target branch for approvals (default: "dev")
GitHubSync *GitHubSyncConfig `yaml:"github_sync" json:"github_sync"`
// PluginRepo is a git URL for a shared skills plugin (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
// In cloud mode, this repo is cloned and passed as --plugin-dir to Claude CLI.
// Example: "https://github.com/sunholo-data/ailang_bootstrap.git"
PluginRepo string `yaml:"plugin_repo" json:"plugin_repo,omitempty"`
// DevMode disables stale task detector and approval watcher to reduce
// Firestore reads during local development. (M-COST1)
DevMode bool `yaml:"dev_mode" json:"dev_mode,omitempty"`
}
CoordinatorConfig is the coordinator section of the global config file.
func DefaultCoordinatorConfig ¶
func DefaultCoordinatorConfig() *CoordinatorConfig
DefaultCoordinatorConfig returns a minimal default configuration.
func LoadCoordinatorConfig ¶
func LoadCoordinatorConfig() (*CoordinatorConfig, error)
LoadCoordinatorConfig loads the coordinator configuration from ~/.ailang/config.yaml. If the file doesn't exist, returns a default configuration. If the file exists but has no coordinator section, returns a default configuration.
func LoadCoordinatorConfigFrom ¶
func LoadCoordinatorConfigFrom(configPath string) (*CoordinatorConfig, error)
LoadCoordinatorConfigFrom loads the coordinator configuration from a specific path.
type CoordinatorEventHandler ¶
type CoordinatorEventHandler struct {
// contains filtered or unexported fields
}
CoordinatorEventHandler implements executor.EventHandler to capture and broadcast executor events via WebSocket.
func NewCoordinatorEventHandler ¶
func NewCoordinatorEventHandler(taskID, threadID string, broadcast EventBroadcaster) *CoordinatorEventHandler
NewCoordinatorEventHandler creates a new event handler for a task.
func (*CoordinatorEventHandler) EmitStatus ¶
func (h *CoordinatorEventHandler) EmitStatus(status string)
EmitStatus emits a status change event
func (*CoordinatorEventHandler) GetEventBuffer ¶
func (h *CoordinatorEventHandler) GetEventBuffer() []*websocket.TaskStreamEvent
GetEventBuffer returns a copy of the event buffer for replay
func (*CoordinatorEventHandler) IsThrottled ¶
func (h *CoordinatorEventHandler) IsThrottled() bool
IsThrottled returns whether events are being throttled
func (*CoordinatorEventHandler) OnError ¶
func (h *CoordinatorEventHandler) OnError(err error)
OnError is called when an error occurs
func (*CoordinatorEventHandler) OnText ¶
func (h *CoordinatorEventHandler) OnText(text string)
OnText is called when text is generated
func (*CoordinatorEventHandler) OnToolResult ¶
func (h *CoordinatorEventHandler) OnToolResult(toolName string, output string)
OnToolResult is called when a tool returns a result
func (*CoordinatorEventHandler) OnToolUse ¶
func (h *CoordinatorEventHandler) OnToolUse(toolName string, input string)
OnToolUse is called when a tool is invoked
func (*CoordinatorEventHandler) OnTurnEnd ¶
func (h *CoordinatorEventHandler) OnTurnEnd(turnNum int)
OnTurnEnd is called when a turn ends
func (*CoordinatorEventHandler) OnTurnStart ¶
func (h *CoordinatorEventHandler) OnTurnStart(turnNum int)
OnTurnStart is called when a new turn starts
func (*CoordinatorEventHandler) SetEventStorer ¶
func (h *CoordinatorEventHandler) SetEventStorer(storer EventStorer)
SetEventStorer sets the database storage function for persisting events.
func (*CoordinatorEventHandler) SetTaskContext ¶
func (h *CoordinatorEventHandler) SetTaskContext(ctx *TaskEventContext)
SetTaskContext sets the task context for event enrichment. Call this after construction to add workspace, directive, and agent info to all events.
func (*CoordinatorEventHandler) UpdateMetrics ¶
func (h *CoordinatorEventHandler) UpdateMetrics(tokensIn, tokensOut int, cost float64)
UpdateMetrics updates the token and cost metrics
type Daemon ¶
type Daemon struct {
// contains filtered or unexported fields
}
Daemon is the coordinator daemon
func (*Daemon) Close ¶
Close releases resources held by the daemon. This must be called when done with the daemon to release file handles. On Windows, this is required before the log file can be deleted.
func (*Daemon) GetActiveTaskMetrics ¶
func (d *Daemon) GetActiveTaskMetrics() []*ResourceMetrics
GetActiveTaskMetrics returns metrics for all currently running tasks
func (*Daemon) GetContext ¶
GetContext returns the daemon's context
func (*Daemon) GetResourceRegistry ¶
func (d *Daemon) GetResourceRegistry() *ResourceTrackerRegistry
GetResourceRegistry returns the resource tracker registry for external access
func (*Daemon) GetWatcherStatus ¶
func (d *Daemon) GetWatcherStatus() WatcherStatus
GetWatcherStatus returns the current status of the ApprovalWatcher. Returns a default (not running) status if the watcher is not initialized.
func (*Daemon) HandleApproval ¶
HandleApproval processes an approved task. For GitHub-linked tasks at design/sprint stages, this triggers the next stage. For merge-ready tasks, this merges worktree changes to main branch.
func (*Daemon) HandleRejection ¶
HandleRejection processes a rejected task - preserves worktree and marks task rejected. For tasks with linked GitHub issues, feedback is posted to GitHub (M-DASHBOARD-APPROVAL-INTEGRATION).
func (*Daemon) IncrementTasksRun ¶
func (d *Daemon) IncrementTasksRun()
IncrementTasksRun increments the tasks run counter
func (*Daemon) ProcessStageCompletion ¶
func (d *Daemon) ProcessStageCompletion(ctx context.Context, task *TaskRecord, execResult *ExecuteResult) error
ProcessStageCompletion handles the completion of a stage for GitHub-linked tasks. It calls the appropriate TaskChain callback and handles stage transitions.
Artifact discovery strategy (in order of preference): 1. Git diff + artifact patterns (deterministic, reliable) 2. Output markers (fallback for backwards compatibility)
func (*Daemon) SetCloudDispatcher ¶
func (d *Daemon) SetCloudDispatcher(dispatcher CloudDispatcher)
SetCloudDispatcher sets the cloud task dispatcher. Call this after NewDaemon() but before Start() to enable Cloud Run Job dispatch. The dispatcher is created in the CLI entry point to avoid circular imports.
func (*Daemon) SetEventBroadcaster ¶
func (d *Daemon) SetEventBroadcaster(broadcaster EventBroadcaster)
SetEventBroadcaster sets the event broadcaster for real-time task updates. When set, task execution events will be streamed via the broadcaster.
func (*Daemon) SetStores ¶
func (d *Daemon) SetStores(taskStore Store, msgStore messaging.MessageStore, obsBackend observatory.Backend)
SetStores pre-sets the task store, messaging store, and observatory backend. When set, initTaskProcessing() will use these instead of opening local SQLite databases. Call this after NewDaemon() but before Start() to use cloud backends.
func (*Daemon) StatusJSON ¶
StatusJSON returns status as JSON string
type DesignDocResult ¶
type DesignDocResult struct {
Path string // Path to .md file (may be empty if no .md found)
AllArtifacts []string // All discovered artifacts (any file type)
Duration time.Duration
Cost float64
TokensUsed int
InputTokens int
OutputTokens int
}
DesignDocResult contains the result of design document creation.
type DetailedStats ¶
type DetailedStats struct {
Count int `json:"count"`
CostUSD float64 `json:"cost_usd"`
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
}
DetailedStats provides cost/token breakdown for a provider or workspace
type DispatchParams ¶
type DispatchParams struct {
TaskID string // Coordinator task ID (e.g., "task-29404032")
AgentID string // Target agent (e.g., "sprint-executor")
Workspace string // Workspace path
Provider string // AI provider ("claude" or "gemini")
Directive string // Task prompt (optional — job can fetch from Firestore)
RepoURL string // Git repo URL
Branch string // Base branch (default: "dev")
PushBranch string // If set, push directly to this branch (skip coordinator/ branch creation)
PluginRepo string // Git URL for shared skills plugin (M-CLOUD-PLUGIN-SKILLS, v0.9.1)
Model string // AI model override (e.g., "sonnet", "opus") — from agent config
Timeout string // Executor timeout (e.g., "15m", "60m") — from agent config (M-CLOUD-OAUTH)
AuthMode string // "oauth" (default) or "apikey" — selects Cloud Run Job template (M-CLOUD-DUAL-AUTH)
APIKey string // User-provided Anthropic API key, only when AuthMode == "apikey"
MaxCostUSD float64 // Per-task cost budget (0 = unlimited) — M-CLOUD-PROGRESS-TRACKING
GitMode string // "guardrails", "strict", "permissive" — M-GIT-GUARDRAILS
SiteSlug string // Website site slug for commit message — M-HARNESS-COMMIT-CONTRACT
BriefID string // Brief ID for commit message — M-HARNESS-COMMIT-CONTRACT
Subdirectory string // Monorepo subdirectory for package agents — M-PKG-AUTONOMOUS-UPDATES
}
DispatchParams contains the parameters needed to trigger remote task execution.
type EventBroadcaster ¶
type EventBroadcaster func(*websocket.TaskStreamEvent)
EventBroadcaster is a function that broadcasts task stream events. This allows the event handler to be decoupled from the WebSocket server.
func CreateWebSocketBroadcaster ¶
func CreateWebSocketBroadcaster(wsServer interface {
BroadcastTaskEvent(stream *websocket.TaskStreamEvent)
}) EventBroadcaster
CreateWebSocketBroadcaster creates an EventBroadcaster that broadcasts to a WebSocket server. This is used when the coordinator is connected to the collaboration hub server.
type EventStorer ¶
type EventStorer func(*TaskEventRecord) error
EventStorer is a function that stores task stream events to database. This allows events to be persisted for historical replay.
type EventsResponse ¶
type EventsResponse struct {
TaskID string `json:"task_id"`
TotalEvents int `json:"total_events"`
TotalTurns int `json:"total_turns"`
Iteration int `json:"iteration,omitempty"`
Events []*TaskEventRecord `json:"events"`
}
EventsResponse is the JSON response for the events API endpoint.
func FormatEventsAsJSON ¶
func FormatEventsAsJSON(taskID string, events []*TaskEventRecord, opts *FormatOptions) (*EventsResponse, error)
FormatEventsAsJSON formats events as a JSON response.
type ExecuteOptions ¶
type ExecuteOptions struct {
Timeout time.Duration // Hard ceiling (max wall-clock time)
IdleTimeout time.Duration // Kill if no events for this long (v0.8.1+)
DryRun bool
Workspace string // Working directory for the task
Model string // Model to use (provider-specific)
EventHandler executor.EventHandler // Optional handler for streaming events
// Observatory context for trace linking (M-TASK-HIERARCHY)
ObservatoryContext *ObservatoryContext
// InvokeConfig for script execution (v0.6.4+)
// Set when the agent has invoke.type: "script"
InvokeConfig *InvokeConfig
// AgentConfig for system prompt construction (v0.8.0+)
// Used by providers to build the meta-prompt with agent-specific instructions.
AgentConfig *AgentConfig
// Effort level for Claude Code (low/medium/high). Empty = Claude's default.
Effort string
// PluginDirs for Claude Code (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
// Paths to plugin directories passed as --plugin-dir flags.
PluginDirs []string
// Plugins for per-agent third-party plugin installation (M-CLOUD-PLUGIN-SKILLS, v0.9.1).
Plugins *PluginsConfig
}
ExecuteOptions configures task execution
func DefaultExecuteOptions ¶
func DefaultExecuteOptions() *ExecuteOptions
DefaultExecuteOptions returns sensible defaults
type ExecuteResult ¶
type ExecuteResult struct {
Success bool
Output string
Error string
Provider string
Duration time.Duration
Cost float64
TokensUsed int // Total tokens (InputTokens + OutputTokens)
// Detailed token breakdown
InputTokens int
OutputTokens int
// Execution detail
NumTurns int
ToolCallCount int
// Files affected
FilesCreated []string
FilesModified []string
// Session continuity (for agent-to-agent handoffs)
SessionID string // Claude Code --resume ID or Gemini CLI --conversation-id
}
ExecuteResult contains the result of task execution
type ExecutorProvider ¶
type ExecutorProvider struct {
// contains filtered or unexported fields
}
ExecutorProvider wraps any executor.Executor as a coordinator Provider. This is the unified provider for all CLI-based agentic executors (Claude Code, Gemini CLI, Codex, etc.). Adding a new executor only requires creating the executor package with an init() registration — no coordinator changes needed.
func NewExecutorProvider ¶
func NewExecutorProvider(executorName string) (*ExecutorProvider, error)
NewExecutorProvider creates an ExecutorProvider for the named executor. The executor must be registered in the global executor factory (via init()).
func (*ExecutorProvider) CanHandle ¶
func (p *ExecutorProvider) CanHandle(task *AnalyzedTask) bool
CanHandle returns true — executor-based providers can handle any coding task.
func (*ExecutorProvider) Execute ¶
func (p *ExecutorProvider) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
Execute runs a task using the underlying executor CLI.
func (*ExecutorProvider) ExecutorName ¶
func (p *ExecutorProvider) ExecutorName() string
ExecutorName returns the underlying executor name (e.g., "claude", "gemini").
func (*ExecutorProvider) Name ¶
func (p *ExecutorProvider) Name() string
Name returns the provider name (e.g., "claude-code", "gemini-cli").
type FirebaseConfig ¶
type FirebaseConfig struct {
ProjectID string `yaml:"project_id"` // GCP/Firebase project ID (e.g., "ailang-dev")
}
FirebaseConfig contains Firebase authentication settings.
func LoadFirebaseConfig ¶
func LoadFirebaseConfig() *FirebaseConfig
LoadFirebaseConfig loads Firebase configuration from ~/.ailang/config.yaml. Returns nil if no Firebase config is set (Firebase auth will be disabled).
type FormatOptions ¶
type FormatOptions struct {
// ShowTimestamps includes timestamps in text output
ShowTimestamps bool
// ShowToolInputs includes full tool input in text output
ShowToolInputs bool
// MaxTextLength truncates long text blocks (0 = no limit)
MaxTextLength int
// TurnFilter only shows specific turn (0 = all turns)
TurnFilter int
// TypeFilter only shows specific event types (empty = all types)
TypeFilter []string
}
FormatOptions controls how events are formatted.
func DefaultFormatOptions ¶
func DefaultFormatOptions() *FormatOptions
DefaultFormatOptions returns sensible defaults for CLI display.
type GeminiAPIProvider ¶
type GeminiAPIProvider struct {
// contains filtered or unexported fields
}
GeminiAPIProvider executes tasks using Gemini API (text generation only). This uses internal/ai/gemini for simple research and documentation tasks. Unlike ExecutorProvider (CLI-based agentic coding), this is API-based and does not support file editing or tool use.
func NewGeminiAPIProvider ¶
func NewGeminiAPIProvider() (*GeminiAPIProvider, error)
NewGeminiAPIProvider creates a new Gemini API provider
func (*GeminiAPIProvider) CanHandle ¶
func (p *GeminiAPIProvider) CanHandle(task *AnalyzedTask) bool
CanHandle returns true for tasks that Gemini API can handle
func (*GeminiAPIProvider) Execute ¶
func (p *GeminiAPIProvider) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
Execute runs a task using Gemini API
func (*GeminiAPIProvider) Name ¶
func (p *GeminiAPIProvider) Name() string
Name returns the provider name
type GitHubPoster ¶
type GitHubPoster struct {
// contains filtered or unexported fields
}
GitHubPoster provides GitHub integration for the coordinator. It wraps the messaging GitHubClient to post comments, manage labels, and close issues as part of the autonomous workflow.
func NewGitHubPoster ¶
func NewGitHubPoster() (*GitHubPoster, error)
NewGitHubPoster creates a new GitHub poster for coordinator tasks. Enables auto-switch so the coordinator can automatically switch GitHub accounts if there's a mismatch with the expected user.
func (*GitHubPoster) AddLabel ¶
func (p *GitHubPoster) AddLabel(issueNum int, label string) error
AddLabel adds a label to a GitHub issue. Creates the label if it doesn't exist. Deprecated: Use AddLabelInRepo with an explicit repo parameter.
func (*GitHubPoster) AddLabelInRepo ¶
func (p *GitHubPoster) AddLabelInRepo(repo string, issueNum int, label string) error
AddLabelInRepo adds a label to a GitHub issue in a specific repo. Creates the label if it doesn't exist. If repo is empty, falls back to the default repo.
func (*GitHubPoster) ClaimIssue ¶
func (p *GitHubPoster) ClaimIssue(issueNum int) error
ClaimIssue attempts to claim an issue by adding the in-progress label. Returns an error if the issue is already claimed by another coordinator. This prevents race conditions when multiple coordinators poll GitHub. Deprecated: Use ClaimIssueInRepo with an explicit repo parameter.
func (*GitHubPoster) ClaimIssueInRepo ¶
func (p *GitHubPoster) ClaimIssueInRepo(repo string, issueNum int) error
ClaimIssueInRepo attempts to claim an issue in a specific repo. Returns an error if the issue is already claimed by another coordinator. If repo is empty, falls back to the default repo.
func (*GitHubPoster) Client ¶
func (p *GitHubPoster) Client() *messaging.GitHubClient
Client returns the underlying GitHub client.
func (*GitHubPoster) CloseIssue ¶
func (p *GitHubPoster) CloseIssue(issueNum int, comment string) error
CloseIssue closes a GitHub issue with an optional comment. Uses the default repo configured in the poster.
func (*GitHubPoster) CloseIssueInRepo ¶
func (p *GitHubPoster) CloseIssueInRepo(repo string, issueNum int, comment string) error
CloseIssueInRepo closes a GitHub issue in a specific repo with an optional comment. If repo is empty, falls back to the default repo.
func (*GitHubPoster) EnsureLabel ¶
func (p *GitHubPoster) EnsureLabel(label string) error
EnsureLabel creates a label if it doesn't exist. Deprecated: Use EnsureLabelInRepo with an explicit repo parameter.
func (*GitHubPoster) EnsureLabelInRepo ¶
func (p *GitHubPoster) EnsureLabelInRepo(repo, label string) error
EnsureLabelInRepo creates a label in a specific repo if it doesn't exist. If repo is empty, falls back to the default repo.
func (*GitHubPoster) GetLabels ¶
func (p *GitHubPoster) GetLabels(issueNum int) ([]string, error)
GetLabels returns the current labels on an issue. Deprecated: Use GetLabelsInRepo with an explicit repo parameter.
func (*GitHubPoster) GetLabelsInRepo ¶
func (p *GitHubPoster) GetLabelsInRepo(repo string, issueNum int) ([]string, error)
GetLabelsInRepo returns the current labels on an issue in a specific repo. If repo is empty, falls back to the default repo.
func (*GitHubPoster) GetLatestHumanComment ¶
func (p *GitHubPoster) GetLatestHumanComment(issueNum int, since time.Time) (*IssueComment, error)
GetLatestHumanComment returns the most recent human comment, or nil if none.
func (*GitHubPoster) GetRecentHumanComments ¶
func (p *GitHubPoster) GetRecentHumanComments(issueNum int, since time.Time) ([]IssueComment, error)
GetRecentHumanComments fetches comments from a GitHub issue, filtering out bot comments and only returning those after the given time. Returns error if issueNum is invalid (<=0) or repo is not configured.
func (*GitHubPoster) HasLabel ¶
func (p *GitHubPoster) HasLabel(issueNum int, label string) (bool, error)
HasLabel checks if an issue has a specific label. Deprecated: Use HasLabelInRepo with an explicit repo parameter.
func (*GitHubPoster) HasLabelInRepo ¶
HasLabelInRepo checks if an issue in a specific repo has a specific label. If repo is empty, falls back to the default repo.
func (*GitHubPoster) IsIssueClaimed ¶
func (p *GitHubPoster) IsIssueClaimed(issueNum int) (bool, error)
IsIssueClaimed checks if an issue is already claimed by any coordinator. Deprecated: Use IsIssueClaimedInRepo with an explicit repo parameter.
func (*GitHubPoster) IsIssueClaimedInRepo ¶
func (p *GitHubPoster) IsIssueClaimedInRepo(repo string, issueNum int) (bool, error)
IsIssueClaimedInRepo checks if an issue in a specific repo is already claimed. If repo is empty, falls back to the default repo.
func (*GitHubPoster) PostComment ¶
func (p *GitHubPoster) PostComment(issueNum int, body string) error
PostComment posts a comment to a GitHub issue. Deprecated: Use PostCommentInRepo with an explicit repo parameter.
func (*GitHubPoster) PostCommentInRepo ¶
func (p *GitHubPoster) PostCommentInRepo(repo string, issueNum int, body string) error
PostCommentInRepo posts a comment to a specific repo's issue. If repo is empty, falls back to the default repo.
func (*GitHubPoster) PostFeedback ¶
func (p *GitHubPoster) PostFeedback(issueNum int, feedback string, iteration int, channel string) error
PostFeedback posts a feedback comment to a GitHub issue. Includes iteration context for tracking. Returns error if issueNum is invalid, iteration is out of bounds, or channel is empty. Deprecated: Use PostFeedbackInRepo with an explicit repo parameter.
func (*GitHubPoster) PostFeedbackInRepo ¶
func (p *GitHubPoster) PostFeedbackInRepo(repo string, issueNum int, feedback string, iteration int, channel string) error
PostFeedbackInRepo posts a feedback comment to a GitHub issue in a specific repo. If repo is empty, falls back to the default repo. Returns error if issueNum is invalid, iteration is out of bounds, or channel is empty.
func (*GitHubPoster) PostWorkingStatus ¶
func (p *GitHubPoster) PostWorkingStatus(issueNum int, taskID, agent string) error
PostWorkingStatus posts a "working on it" comment to the issue. Deprecated: Use PostWorkingStatusInRepo with an explicit repo parameter.
func (*GitHubPoster) PostWorkingStatusInRepo ¶
func (p *GitHubPoster) PostWorkingStatusInRepo(repo string, issueNum int, taskID, agent string) error
PostWorkingStatusInRepo posts a "working on it" comment to an issue in a specific repo. If repo is empty, falls back to the default repo.
func (*GitHubPoster) ReleaseIssue ¶
func (p *GitHubPoster) ReleaseIssue(issueNum int) error
ReleaseIssue releases a claimed issue by removing the in-progress label. Called when a task completes, fails, or is cancelled. Uses the default repo configured in the poster.
func (*GitHubPoster) ReleaseIssueInRepo ¶
func (p *GitHubPoster) ReleaseIssueInRepo(repo string, issueNum int) error
ReleaseIssueInRepo releases a claimed issue in a specific repo. If repo is empty, falls back to the default repo.
func (*GitHubPoster) RemoveLabel ¶
func (p *GitHubPoster) RemoveLabel(issueNum int, label string) error
RemoveLabel removes a label from a GitHub issue. Deprecated: Use RemoveLabelInRepo with an explicit repo parameter.
func (*GitHubPoster) RemoveLabelInRepo ¶
func (p *GitHubPoster) RemoveLabelInRepo(repo string, issueNum int, label string) error
RemoveLabelInRepo removes a label from a GitHub issue in a specific repo. If repo is empty, falls back to the default repo.
func (*GitHubPoster) Repo ¶
func (p *GitHubPoster) Repo() string
Repo returns the configured repository.
type GitHubSyncConfig ¶
type GitHubSyncConfig struct {
// Legacy single-repo fields (for backwards compatibility)
Enabled bool `yaml:"enabled" json:"enabled"`
IntervalSecs int `yaml:"interval_secs" json:"interval_secs"` // Default: 300 (5 min)
WatchLabels []string `yaml:"watch_labels" json:"watch_labels"` // Filter by labels
TargetInbox string `yaml:"target_inbox" json:"target_inbox"` // Where to send imported issues
ResyncLabels bool `yaml:"resync_labels" json:"resync_labels"` // Re-check labels on imported messages
ResyncIntervalSec int `yaml:"resync_interval_secs" json:"resync_interval_secs"` // Default: 3600 (1 hour)
// Multi-repo configuration (v0.6.6+)
// If Repos is non-empty, uses multi-repo mode (ignores legacy fields above except ResyncLabels)
Repos []RepoSyncConfig `yaml:"repos" json:"repos"`
}
GitHubSyncConfig configures automatic GitHub issue import. Supports both single-repo (legacy) and multi-repo configurations.
func (*GitHubSyncConfig) GetRepos ¶
func (c *GitHubSyncConfig) GetRepos(defaultRepo string) []RepoSyncConfig
GetRepos returns the list of repos to sync, handling backwards compatibility. If Repos is non-empty, returns it directly. Otherwise, constructs a single-repo config from legacy fields.
type GlobalBudget ¶
type GlobalBudget struct {
WorkspaceBudget float64 `yaml:"workspace_budget"`
DailyBudget float64 `yaml:"daily_budget"`
TaskMaxCost float64 `yaml:"task_max_cost"`
WarningThreshold float64 `yaml:"warning_threshold"`
}
GlobalBudget defines default budget limits
type HTTPBroadcaster ¶
type HTTPBroadcaster struct {
// contains filtered or unexported fields
}
HTTPBroadcaster sends task events to the Collaboration Hub server via HTTP. This enables real-time streaming when daemon and server run as separate processes.
func NewHTTPBroadcaster ¶
func NewHTTPBroadcaster(serverURL string, logger *log.Logger) *HTTPBroadcaster
NewHTTPBroadcaster creates a broadcaster that POSTs events to the server.
func (*HTTPBroadcaster) Broadcast ¶
func (h *HTTPBroadcaster) Broadcast(event *websocket.TaskStreamEvent)
Broadcast sends an event to the server. Implements EventBroadcaster.
func (*HTTPBroadcaster) BroadcastFunc ¶
func (h *HTTPBroadcaster) BroadcastFunc() EventBroadcaster
BroadcastFunc returns the EventBroadcaster function.
func (*HTTPBroadcaster) CheckServerAvailable ¶
func (h *HTTPBroadcaster) CheckServerAvailable() bool
CheckServerAvailable checks if the server is reachable
type HumanFeedback ¶
type HumanFeedback struct {
TaskID string // Task being reviewed
Iteration int // Current iteration of the task
Feedback string // Human's feedback text
Action string // "reject" or "approve"
Timestamp time.Time // When feedback was provided
UserID string // Who provided feedback (optional)
}
HumanFeedback represents feedback provided by a human reviewer.
type ImplementResult ¶
type ImplementResult struct {
BranchName string
WorktreePath string
Duration time.Duration
Cost float64
TokensUsed int
InputTokens int
OutputTokens int
FilesCreated []string
FilesModified []string
}
ImplementResult contains the result of implementation.
type InboxMessageAdapter ¶
type InboxMessageAdapter struct {
// contains filtered or unexported fields
}
InboxMessageAdapter adapts messaging.MessageStore (inbox_messages table) to the coordinator's MessageStore interface
func NewInboxMessageAdapter ¶
func NewInboxMessageAdapter(store messaging.MessageStore, inbox string) *InboxMessageAdapter
NewInboxMessageAdapter creates a new adapter that watches a specific inbox
func OpenDefaultInboxAdapter ¶
func OpenDefaultInboxAdapter(targetInbox string) (*InboxMessageAdapter, messaging.MessageStore, error)
OpenDefaultInboxAdapter opens the default collaboration database and creates an adapter
func (*InboxMessageAdapter) ListUnread ¶
func (a *InboxMessageAdapter) ListUnread() ([]*Message, error)
ListUnread returns unread messages for the configured inbox
func (*InboxMessageAdapter) MarkAsRead ¶
func (a *InboxMessageAdapter) MarkAsRead(id string) error
MarkAsRead marks a message as read
type InvokeConfig ¶
type InvokeConfig struct {
Type string `yaml:"type" json:"type"` // "skill", "agent", "prompt", or "script"
Name string `yaml:"name" json:"name"` // Skill/agent name (for skill/agent types)
Template string `yaml:"template" json:"template"` // Custom template (for prompt type) - inline
TemplateFile string `yaml:"template_file" json:"template_file"` // Path to template file (for prompt type) - v0.6.7+
// Script-specific fields (v0.6.4+)
// Used when Type == "script" for deterministic workflow execution
Command string `yaml:"command" json:"command,omitempty"` // Script path or inline command
Shell string `yaml:"shell" json:"shell,omitempty"` // Shell to use (default: /bin/sh)
EnvFromPayload bool `yaml:"env_from_payload" json:"env_from_payload,omitempty"` // Parse JSON payload as env vars
Timeout string `yaml:"timeout" json:"timeout,omitempty"` // Execution timeout (e.g., "30m", "2h")
WorkingDir string `yaml:"working_dir" json:"working_dir,omitempty"` // Working directory (supports {{.Workspace}})
}
InvokeConfig specifies how an agent should be invoked. Supports four types:
- "skill": Invoke a Claude Code skill (e.g., "/design-doc-creator")
- "agent": Send message to another agent (e.g., "sprint-planner")
- "prompt": Use custom prompt template with variable substitution
- "script": Execute a shell script with JSON payload as environment variables (v0.6.4+)
func DefaultInvokeConfig ¶
func DefaultInvokeConfig(agentID string) *InvokeConfig
DefaultInvokeConfig returns the default invoke config for known AILANG agent IDs. Returns nil for unknown agents (no default behavior). Used by GetEffectiveInvokeConfig() when agent has no explicit YAML config.
func (*InvokeConfig) ResolveTemplate ¶
func (ic *InvokeConfig) ResolveTemplate(workspace string) (string, error)
ResolveTemplate returns the template content, loading from file if template_file is set. Priority: template_file > template (inline) Supports:
- Absolute paths: /path/to/template.md
- Home directory: ~/.ailang/templates/design-doc.md
- Relative to workspace: templates/design-doc.md (requires workspace param)
type IssueComment ¶
IssueComment represents a comment on a GitHub issue with bot detection.
type KMSEncrypter ¶
type KMSEncrypter struct {
// contains filtered or unexported fields
}
KMSEncrypter encrypts API keys using Cloud KMS before they are passed as Cloud Run Job env var overrides. This prevents plaintext keys from appearing in Cloud Audit Logs.
M-CLOUD-DUAL-AUTH: Coordinator SA has roles/cloudkms.cryptoKeyEncrypter (encrypt only — cannot read back user keys).
func NewKMSEncrypter ¶
func NewKMSEncrypter() *KMSEncrypter
NewKMSEncrypter creates an encrypter using the AILANG_KMS_KEY env var. Returns nil if the env var is not set (local dev — plaintext passthrough).
type LabelRouteConfig ¶
type LabelRouteConfig struct {
LabelPrefix string `yaml:"label_prefix" json:"label_prefix"` // Match labels starting with this
Target string `yaml:"target" json:"target"` // Route to this inbox
}
LabelRouteConfig maps a label prefix to a target inbox.
type MergeResult ¶
type MergeResult struct {
Success bool `json:"success"`
MergedFiles []string `json:"merged_files,omitempty"`
ConflictFiles []string `json:"conflict_files,omitempty"`
Error string `json:"error,omitempty"`
CommitHash string `json:"commit_hash,omitempty"`
}
MergeResult contains the result of a merge operation.
func MergeWorktree ¶
func MergeWorktree(ctx context.Context, worktreePath, mainBranch string) (*MergeResult, error)
MergeWorktree merges changes from a worktree into the main branch. It performs the following steps: 1. Get the list of changed files 2. Attempt to merge the worktree branch into main 3. If conflicts occur, report them without forcing 4. On success, return the merge commit hash
type Message ¶
type Message struct {
ID string
From string
Title string
Content string
Inbox string // Target inbox (set by PubSubInboxAdapter from message attributes, M-CLOUD-E2E)
Type string // bug, feature, task, etc. (category)
Kind string // directive, question (message type)
Priority string // high, medium, low
GithubIssue int // Linked GitHub issue number (M-COORD-GITHUB-AUTO-ROUTING)
GithubRepo string // GitHub repo (owner/repo) for issue operations (M-COORD-GITHUB-CLOSE-ON-MERGE)
ParentTaskID string // Parent task ID for hierarchy tracking (M-TASK-HIERARCHY)
Iteration int // Iteration number for feedback loops (M-TASK-HIERARCHY)
ChainID string // ExecutionChain ID for unified hierarchy (M-CHAINS-SIMPLIFY)
CreatedAt time.Time
}
Message represents a message from the messaging system
type MessageStore ¶
MessageStore is the interface for accessing messages
type MessageWatcher ¶
type MessageWatcher struct {
// contains filtered or unexported fields
}
MessageWatcher polls for unread messages and emits tasks
func NewMessageWatcher ¶
func NewMessageWatcher(store MessageStore, pollInterval time.Duration) *MessageWatcher
NewMessageWatcher creates a new message watcher
func (*MessageWatcher) ClearSeen ¶
func (w *MessageWatcher) ClearSeen()
ClearSeen clears the seen messages (useful for testing)
func (*MessageWatcher) SeenCount ¶
func (w *MessageWatcher) SeenCount() int
SeenCount returns the number of seen messages
func (*MessageWatcher) Start ¶
func (w *MessageWatcher) Start(ctx context.Context) error
Start begins watching for messages
func (*MessageWatcher) Tasks ¶
func (w *MessageWatcher) Tasks() <-chan *Task
Tasks returns the channel of discovered tasks
type MockMessageStore ¶
type MockMessageStore struct {
// contains filtered or unexported fields
}
MockMessageStore is a mock implementation for testing
func NewMockMessageStore ¶
func NewMockMessageStore() *MockMessageStore
NewMockMessageStore creates a new mock message store
func (*MockMessageStore) AddMessage ¶
func (m *MockMessageStore) AddMessage(msg *Message)
AddMessage adds a message to the mock store
func (*MockMessageStore) ListUnread ¶
func (m *MockMessageStore) ListUnread() ([]*Message, error)
ListUnread returns unread messages
func (*MockMessageStore) MarkAsRead ¶
func (m *MockMessageStore) MarkAsRead(id string) error
MarkAsRead marks a message as read
type ObservatoryContext ¶
type ObservatoryContext struct {
TaskID string // Coordinator task ID
AgentID string // Agent handling the task (e.g., "design-doc-creator")
AssignmentID string // Observatory agent_assignment ID (aa_xxx)
WorkspaceID string // Observatory workspace ID (ws_xxx)
// Chain context for unified hierarchy tracking (M-CHAINS-SIMPLIFY)
ChainID string // Execution chain ID (UUID)
StageID string // Chain stage ID (UUID)
MessageID string // Source message ID that triggered this chain
}
ObservatoryContext holds context for linking traces to coordinator entities. This enables the unified hierarchy: CHAIN → STAGE → TASK → SPANS in Observatory.
type ObservatorySync ¶
type ObservatorySync struct {
// contains filtered or unexported fields
}
ObservatorySync handles syncing coordinator entities to Observatory. This enables the full entity hierarchy: WORKSPACE → TASK → AGENT → SPANS
func NewObservatorySync ¶
func NewObservatorySync(backend observatory.Backend, logger *log.Logger) *ObservatorySync
NewObservatorySync creates a new ObservatorySync instance.
func (*ObservatorySync) CompleteAgentAssignment ¶
func (s *ObservatorySync) CompleteAgentAssignment(ctx context.Context, assignmentID string, success bool) error
CompleteAgentAssignment marks an agent assignment as completed.
func (*ObservatorySync) GetWorkspaceID ¶
func (s *ObservatorySync) GetWorkspaceID(path string) string
GetWorkspaceID returns the cached workspace ID for a given path. Returns empty string if not cached (workspace not yet synced).
func (*ObservatorySync) SyncAgentAssignment ¶
func (s *ObservatorySync) SyncAgentAssignment(ctx context.Context, taskID, agentID, provider string) (string, error)
SyncAgentAssignment syncs an agent assignment to Observatory. This creates the link between a task and the agent executing it.
func (*ObservatorySync) SyncTask ¶
func (s *ObservatorySync) SyncTask(ctx context.Context, task *TaskRecord) error
SyncTask syncs a coordinator task to Observatory. This should be called when a task is created or updated.
type PluginsConfig ¶
type PluginsConfig struct {
// Marketplaces to register (e.g., "anthropics/claude-code", "anthropics/skills")
Marketplaces []string `yaml:"marketplaces" json:"marketplaces,omitempty"`
// Plugins to install from registered marketplaces (e.g., "frontend-design@anthropics-claude-code")
Install []string `yaml:"install" json:"install,omitempty"`
}
PluginsConfig specifies third-party plugins to install for an agent. Marketplaces are registered first, then plugins are installed from those marketplaces. This runs before task execution and is complementary to PluginDirs (static plugin paths).
type Provider ¶
type Provider interface {
// Name returns the provider identifier (e.g., "claude", "gemini", "gemini-api")
Name() string
// CanHandle returns true if this provider can handle the given task type
CanHandle(task *AnalyzedTask) bool
// Execute runs a task and returns the result
Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
}
Provider executes tasks using a specific AI backend. There are two types of providers:
- Executor-based (Claude Code, Gemini CLI): For agentic coding tasks
- API-based (Gemini API, Claude API): For simple text generation
type ProviderLimit ¶
type ProviderLimit struct {
DailyBudget float64 `yaml:"daily_budget"`
TaskMaxCost float64 `yaml:"task_max_cost"`
HardLimit bool `yaml:"hard_limit"`
WarningThreshold float64 `yaml:"warning_threshold"`
}
ProviderLimit defines per-provider budget overrides
type PubSubBroadcaster ¶
type PubSubBroadcaster struct {
// contains filtered or unexported fields
}
PubSubBroadcaster sends task stream events to the Pub/Sub events topic. This replaces HTTPBroadcaster when running in cloud mode, enabling the dashboard and laptop to receive events via pull subscriptions.
func NewPubSubBroadcaster ¶
func NewPubSubBroadcaster(publisher *pubsub.Publisher, workspace string, logger *log.Logger) *PubSubBroadcaster
NewPubSubBroadcaster creates a broadcaster that publishes events to Pub/Sub.
func (*PubSubBroadcaster) Broadcast ¶
func (b *PubSubBroadcaster) Broadcast(event *websocket.TaskStreamEvent)
Broadcast sends a task event to the Pub/Sub events topic.
func (*PubSubBroadcaster) BroadcastFunc ¶
func (b *PubSubBroadcaster) BroadcastFunc() EventBroadcaster
BroadcastFunc returns the EventBroadcaster function.
type PubSubInboxAdapter ¶
type PubSubInboxAdapter struct {
// contains filtered or unexported fields
}
PubSubInboxAdapter buffers incoming message notifications for the coordinator. Messages arrive either via pull subscription (Start) or push HTTP endpoint (HandleNotification). ListUnread() drains the buffer.
func NewPubSubInboxAdapter ¶
func NewPubSubInboxAdapter(subscriber *pubsub.Subscriber, subName, inbox string, msgStore messaging.MessageStore, logger *log.Logger) *PubSubInboxAdapter
NewPubSubInboxAdapter creates an adapter for receiving message notifications. msgStore is used to fetch full message content from Firestore when a notification arrives. For pull mode, call Start(). For push mode, the HTTP handler calls HandleNotification() directly.
func (*PubSubInboxAdapter) HandleNotification ¶
func (a *PubSubInboxAdapter) HandleNotification(data []byte, attrs map[string]string) error
HandleNotification processes a message notification from either pull subscription or push HTTP endpoint. It decodes the notification, fetches full content from Firestore, and buffers the message for ListUnread().
func (*PubSubInboxAdapter) ListUnread ¶
func (a *PubSubInboxAdapter) ListUnread() ([]*Message, error)
ListUnread returns buffered messages and clears the buffer.
func (*PubSubInboxAdapter) MarkAsRead ¶
func (a *PubSubInboxAdapter) MarkAsRead(_ string) error
MarkAsRead is a no-op for Pub/Sub — messages are acked on receipt.
func (*PubSubInboxAdapter) Start ¶
func (a *PubSubInboxAdapter) Start(ctx context.Context)
Start begins pulling messages from the Pub/Sub subscription in the background. Not used in push mode — the HTTP handler calls HandleNotification() directly.
type RepoSyncConfig ¶
type RepoSyncConfig struct {
Repo string `yaml:"repo" json:"repo"` // GitHub repo (owner/repo)
Enabled bool `yaml:"enabled" json:"enabled"` // Enable sync for this repo
IntervalSecs int `yaml:"interval_secs" json:"interval_secs"` // Override default interval
WatchLabels []string `yaml:"watch_labels" json:"watch_labels"` // Filter by labels
TargetInbox string `yaml:"target_inbox" json:"target_inbox"` // Default inbox for this repo
LabelRouting []LabelRouteConfig `yaml:"label_routing" json:"label_routing"` // Route by label prefix
}
RepoSyncConfig configures GitHub sync for a single repository.
type ResourceMetrics ¶
type ResourceMetrics struct {
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
Timestamp time.Time `json:"timestamp"`
// Process metrics
CPUPercent float64 `json:"cpu_percent"`
MemoryMB float64 `json:"memory_mb"`
PID int `json:"pid,omitempty"`
// Token metrics (accumulated)
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
// Cost metrics
Cost float64 `json:"cost"`
// Duration
DurationSec int `json:"duration_sec"`
// Peak values
PeakCPU float64 `json:"peak_cpu"`
PeakMemory float64 `json:"peak_memory_mb"`
}
ResourceMetrics holds current resource usage for a task
type ResourceTracker ¶
type ResourceTracker struct {
// contains filtered or unexported fields
}
ResourceTracker tracks resource usage for a running task. It polls process metrics periodically and accumulates token usage from events.
func NewResourceTracker ¶
func NewResourceTracker(taskID, threadID string, pid int) *ResourceTracker
NewResourceTracker creates a new resource tracker for a task
func (*ResourceTracker) GetMetrics ¶
func (rt *ResourceTracker) GetMetrics() *ResourceMetrics
GetMetrics returns the current resource metrics
func (*ResourceTracker) SetCost ¶
func (rt *ResourceTracker) SetCost(cost float64)
SetCost sets the cost (replacement, not additive)
func (*ResourceTracker) SetPollInterval ¶
func (rt *ResourceTracker) SetPollInterval(interval time.Duration)
SetPollInterval sets the polling interval for process metrics
func (*ResourceTracker) SetUpdateCallback ¶
func (rt *ResourceTracker) SetUpdateCallback(callback func(*ResourceMetrics))
SetUpdateCallback sets the callback for metric updates
func (*ResourceTracker) Start ¶
func (rt *ResourceTracker) Start(ctx context.Context)
Start begins polling for resource metrics
func (*ResourceTracker) UpdateCost ¶
func (rt *ResourceTracker) UpdateCost(cost float64)
UpdateCost updates the accumulated cost
func (*ResourceTracker) UpdateTokens ¶
func (rt *ResourceTracker) UpdateTokens(inputTokens, outputTokens int)
UpdateTokens adds tokens to the accumulated count
type ResourceTrackerRegistry ¶
type ResourceTrackerRegistry struct {
// contains filtered or unexported fields
}
ResourceTrackerRegistry manages multiple resource trackers for concurrent tasks
func NewResourceTrackerRegistry ¶
func NewResourceTrackerRegistry() *ResourceTrackerRegistry
NewResourceTrackerRegistry creates a new registry
func (*ResourceTrackerRegistry) Get ¶
func (r *ResourceTrackerRegistry) Get(taskID string) *ResourceTracker
Get returns a tracker by task ID
func (*ResourceTrackerRegistry) GetAllMetrics ¶
func (r *ResourceTrackerRegistry) GetAllMetrics() []*ResourceMetrics
GetAllMetrics returns metrics for all active trackers
func (*ResourceTrackerRegistry) Register ¶
func (r *ResourceTrackerRegistry) Register(taskID string, tracker *ResourceTracker)
Register adds a new tracker to the registry
func (*ResourceTrackerRegistry) StopAll ¶
func (r *ResourceTrackerRegistry) StopAll()
StopAll stops all trackers
func (*ResourceTrackerRegistry) Unregister ¶
func (r *ResourceTrackerRegistry) Unregister(taskID string)
Unregister removes a tracker from the registry
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore implements Store using SQLite
func NewSQLiteStore ¶
func NewSQLiteStore(dbPath string) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLite store
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the database connection
func (*SQLiteStore) CreateApprovalRequest ¶
func (s *SQLiteStore) CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error
CreateApprovalRequest creates a new approval request in the database
func (*SQLiteStore) CreateTask ¶
func (s *SQLiteStore) CreateTask(ctx context.Context, task *TaskRecord) error
CreateTask creates a new task
func (*SQLiteStore) DeleteOldApprovals ¶
DeleteOldApprovals removes approval requests older than the specified duration
func (*SQLiteStore) DeleteOldTaskEvents ¶
func (s *SQLiteStore) DeleteOldTaskEvents(ctx context.Context, olderThan time.Duration) (int, error)
DeleteOldTaskEvents removes events older than the specified duration
func (*SQLiteStore) DeleteOldTasks ¶
DeleteOldTasks removes tasks older than the specified duration
func (*SQLiteStore) DeleteTask ¶
func (s *SQLiteStore) DeleteTask(ctx context.Context, id string) error
DeleteTask deletes a task
func (*SQLiteStore) DeleteTaskEvents ¶
func (s *SQLiteStore) DeleteTaskEvents(ctx context.Context, taskID string) error
DeleteTaskEvents removes all events for a task
func (*SQLiteStore) FindDuplicateTask ¶
func (s *SQLiteStore) FindDuplicateTask(ctx context.Context, fingerprint uint64, threshold float64) (*TaskRecord, error)
FindDuplicateTask finds a similar task by fingerprint
func (*SQLiteStore) GetApprovalRequest ¶
func (s *SQLiteStore) GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error)
GetApprovalRequest retrieves an approval request by ID
func (*SQLiteStore) GetApprovalRequestByTask ¶
func (s *SQLiteStore) GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)
GetApprovalRequestByTask retrieves a pending approval request for a task
func (*SQLiteStore) GetApprovalRequestByTaskAnyStatus ¶
func (s *SQLiteStore) GetApprovalRequestByTaskAnyStatus(ctx context.Context, taskID string) (*ApprovalRequestRecord, error)
GetApprovalRequestByTaskAnyStatus retrieves the approval request for a task regardless of status. Use this when you need to fetch the approval context after the request has been processed (e.g., for handoffs).
func (*SQLiteStore) GetCostByProvider ¶
func (s *SQLiteStore) GetCostByProvider() (map[string]float64, error)
GetCostByProvider returns total cost per provider for budget tracking.
func (*SQLiteStore) GetTask ¶
func (s *SQLiteStore) GetTask(ctx context.Context, id string) (*TaskRecord, error)
GetTask retrieves a task by ID
func (*SQLiteStore) GetTaskAgentInfo ¶
func (s *SQLiteStore) GetTaskAgentInfo(ctx context.Context, taskID string) (agentID, inbox, title string, err error)
GetTaskAgentInfo returns agent info for a task (for cross-db correlation in Control Plane)
func (*SQLiteStore) GetTaskEvents ¶
func (s *SQLiteStore) GetTaskEvents(ctx context.Context, taskID string, limit int) ([]*TaskEventRecord, error)
GetTaskEvents retrieves all events for a task
func (*SQLiteStore) GetTaskStats ¶
func (s *SQLiteStore) GetTaskStats(ctx context.Context) (*TaskStats, error)
GetTaskStats returns aggregate statistics
func (*SQLiteStore) GetTasksByGithubIssue ¶
func (s *SQLiteStore) GetTasksByGithubIssue(ctx context.Context, issueNum int) ([]*TaskRecord, error)
GetTasksByGithubIssue retrieves all tasks linked to a GitHub issue
func (*SQLiteStore) GetTasksByStage ¶
func (s *SQLiteStore) GetTasksByStage(ctx context.Context, stage TaskStage) ([]*TaskRecord, error)
GetTasksByStage retrieves all tasks in a specific pipeline stage
func (*SQLiteStore) ListApprovedMergeHandoffsWithoutTrigger ¶
func (s *SQLiteStore) ListApprovedMergeHandoffsWithoutTrigger(ctx context.Context) ([]*ApprovalRequestRecord, error)
ListApprovedMergeHandoffsWithoutTrigger finds approved merge_handoff requests where handoffs were never sent. These are approvals that were processed before the handoff triggering code was deployed (catch-up mechanism). Only returns approvals from the last 7 days to avoid re-triggering very old approvals.
func (*SQLiteStore) ListPendingApprovals ¶
func (s *SQLiteStore) ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)
ListPendingApprovals retrieves all pending approval requests
func (*SQLiteStore) ListResolvedApprovals ¶
func (s *SQLiteStore) ListResolvedApprovals(ctx context.Context, limit int) ([]*ApprovalRequestRecord, error)
ListResolvedApprovals returns resolved (approved/rejected) approval requests
func (*SQLiteStore) ListTasks ¶
func (s *SQLiteStore) ListTasks(ctx context.Context, filter *TaskFilter) ([]*TaskRecord, error)
ListTasks retrieves tasks matching the filter
func (*SQLiteStore) MarkApprovalHandoffsTriggered ¶
func (s *SQLiteStore) MarkApprovalHandoffsTriggered(ctx context.Context, taskID string) error
MarkApprovalHandoffsTriggered marks that handoffs have been sent for an approval request. This is used to track whether handoffs were triggered, enabling catch-up on daemon startup.
func (*SQLiteStore) MarkTaskCancelled ¶
func (s *SQLiteStore) MarkTaskCancelled(ctx context.Context, id string) error
MarkTaskCancelled marks a task as cancelled
func (*SQLiteStore) MarkTaskCompleted ¶
func (s *SQLiteStore) MarkTaskCompleted(ctx context.Context, id string, result *ExecuteResult) error
MarkTaskCompleted marks a task as completed with results
func (*SQLiteStore) MarkTaskFailed ¶
MarkTaskFailed marks a task as failed
func (*SQLiteStore) MarkTaskPendingApproval ¶
func (s *SQLiteStore) MarkTaskPendingApproval(ctx context.Context, id, worktreePath, worktreeBranch, baseBranch, baseCommit string, result *ExecuteResult) error
MarkTaskPendingApproval marks a task as awaiting human approval
func (*SQLiteStore) MarkTaskQueued ¶
func (s *SQLiteStore) MarkTaskQueued(ctx context.Context, id string) error
MarkTaskQueued marks a task as queued
func (*SQLiteStore) MarkTaskRejected ¶
func (s *SQLiteStore) MarkTaskRejected(ctx context.Context, id string) error
MarkTaskRejected marks a task as rejected by human
func (*SQLiteStore) MarkTaskRunning ¶
func (s *SQLiteStore) MarkTaskRunning(ctx context.Context, id, provider, worktreeID string) error
MarkTaskRunning marks a task as running
func (*SQLiteStore) RecoverStaleTasks ¶
func (s *SQLiteStore) RecoverStaleTasks(ctx context.Context, staleThreshold time.Duration) (int, error)
RecoverStaleTasks marks stale running/queued tasks as cancelled on daemon startup. This handles tasks that were running when the daemon crashed or was killed.
func (*SQLiteStore) ReopenTask ¶
func (s *SQLiteStore) ReopenTask(ctx context.Context, taskID string) error
ReopenTask moves a rejected/cancelled task back to pending_approval status
func (*SQLiteStore) RequeueTask ¶
func (s *SQLiteStore) RequeueTask(ctx context.Context, id string) error
RequeueTask resets a task to pending status for re-execution.
func (*SQLiteStore) ResetTaskToPending ¶
func (s *SQLiteStore) ResetTaskToPending(ctx context.Context, id string) error
ResetTaskToPending resets a running task back to pending state.
func (*SQLiteStore) ResolveApprovalRequest ¶
func (s *SQLiteStore) ResolveApprovalRequest(ctx context.Context, id string, status string, resolvedBy string) error
ResolveApprovalRequest marks an approval request as approved or rejected
func (*SQLiteStore) ResolveApprovalRequestByTask ¶
func (s *SQLiteStore) ResolveApprovalRequestByTask(ctx context.Context, taskID string, status string, resolvedBy string) error
ResolveApprovalRequestByTask marks approval requests for a task as approved or rejected. Resolves both "merge" and "merge_handoff" type approvals - pure "handoff" approvals must be resolved separately via ResolveApprovalRequest.
func (*SQLiteStore) RetryAllFailedTasks ¶
func (s *SQLiteStore) RetryAllFailedTasks(ctx context.Context) (int, error)
RetryAllFailedTasks resets all failed tasks to pending so they will be retried.
func (*SQLiteStore) SetTaskDesignDocPath ¶
SetTaskDesignDocPath stores the design doc path for a task
func (*SQLiteStore) SetTaskFingerprint ¶
SetTaskFingerprint sets the fingerprint for duplicate detection
func (*SQLiteStore) SetTaskGithubIssue ¶
SetTaskGithubIssue links a task to a GitHub issue number
func (*SQLiteStore) SetTaskSprintPlanPath ¶
SetTaskSprintPlanPath stores the sprint plan path for a task
func (*SQLiteStore) SetTaskStage ¶
SetTaskStage sets the pipeline stage for a task
func (*SQLiteStore) SetTaskThreadID ¶
SetTaskThreadID links a task to a thread in collaboration.db for dashboard visibility
func (*SQLiteStore) StoreTaskEvent ¶
func (s *SQLiteStore) StoreTaskEvent(ctx context.Context, event *TaskEventRecord) error
StoreTaskEvent saves a task streaming event to the database
func (*SQLiteStore) UpdateTask ¶
func (s *SQLiteStore) UpdateTask(ctx context.Context, task *TaskRecord) error
UpdateTask updates an existing task
func (*SQLiteStore) UpdateTaskChainInfo ¶
func (s *SQLiteStore) UpdateTaskChainInfo(ctx context.Context, id, chainID, stageID string) error
UpdateTaskChainInfo updates the chain_id and stage_id for a task (M-CHAINS-SIMPLIFY)
func (*SQLiteStore) UpdateTaskMetrics ¶
func (s *SQLiteStore) UpdateTaskMetrics(ctx context.Context, id string, peakCPU, peakMemory float64) error
UpdateTaskMetrics updates peak resource metrics for a task
type ScriptProvider ¶
type ScriptProvider struct {
// contains filtered or unexported fields
}
ScriptProvider executes shell scripts for deterministic workflow tasks. Unlike AI providers, scripts run locally with predictable output.
Usage in agent config:
invoke: type: script command: ./scripts/run-eval.sh env_from_payload: true timeout: 30m
func NewScriptProvider ¶
func NewScriptProvider() *ScriptProvider
NewScriptProvider creates a new script execution provider.
func (*ScriptProvider) CanHandle ¶
func (p *ScriptProvider) CanHandle(task *AnalyzedTask) bool
CanHandle returns true for tasks with script invoke type. Note: Script tasks are explicitly routed via InvokeConfig, not auto-detected.
func (*ScriptProvider) Execute ¶
func (p *ScriptProvider) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
Execute runs a shell script with environment variables from the task payload.
func (*ScriptProvider) Name ¶
func (p *ScriptProvider) Name() string
Name returns the provider identifier.
type SprintPlanResult ¶
type SprintPlanResult struct {
Path string // Path to .md file (may be empty if no .md found)
AllArtifacts []string // All discovered artifacts (any file type)
Duration time.Duration
Cost float64
TokensUsed int
InputTokens int
OutputTokens int
}
SprintPlanResult contains the result of sprint plan creation.
type StageExecutionResult ¶
type StageExecutionResult struct {
// Design doc stage
DesignDocPath string
// Sprint plan stage
SprintPlanPath string
// Implementation stage
BranchName string
FilesCreated []string
FilesModified []string
// Common fields
Duration time.Duration
Cost float64
TokensUsed int
InputTokens int
OutputTokens int
}
StageExecutionResult contains extracted artifacts from stage execution
func ParseStageOutput ¶
func ParseStageOutput(output string, stage TaskStage) *StageExecutionResult
ParseStageOutput extracts structured artifacts from execution output. Handles both plain text markers and markdown-formatted markers:
- DESIGN_DOC_PATH: path.md
- **DESIGN_DOC_PATH**: `path.md`
type StaleTaskDetector ¶
type StaleTaskDetector struct {
// contains filtered or unexported fields
}
StaleTaskDetector periodically checks for tasks stuck in queued/running status and marks them as failed after their timeout expires. This catches cases where:
- Cloud Run Job container fails to start (image pull, OOM, quota)
- Pub/Sub completion publish fails
- Job process crashes before publishing completion
Only runs in cloud mode (COORDINATOR_MODE=cloud). Local mode uses RecoverStaleTasks at startup instead.
func NewStaleTaskDetector ¶
func NewStaleTaskDetector(store Store, agentRegistry *AgentRegistry, msgStore messaging.MessageStore, logger *log.Logger) *StaleTaskDetector
NewStaleTaskDetector creates a detector that checks for stale tasks periodically.
func (*StaleTaskDetector) Run ¶
func (d *StaleTaskDetector) Run(ctx context.Context)
Run starts the periodic stale task check. Blocks until ctx is cancelled.
type Status ¶
type Status struct {
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
Uptime string `json:"uptime,omitempty"`
TasksRun int `json:"tasks_run"`
// Extended stats from database
PendingTasks int `json:"pending_tasks,omitempty"`
RunningTasks int `json:"running_tasks,omitempty"`
PendingApprovals int `json:"pending_approvals,omitempty"` // Tasks awaiting human approval
FailedTasks int `json:"failed_tasks,omitempty"`
TotalCost float64 `json:"total_cost,omitempty"`
TotalTokens int `json:"total_tokens,omitempty"`
}
Status represents the daemon's current state
type Store ¶
type Store interface {
// Task CRUD operations
CreateTask(ctx context.Context, task *TaskRecord) error
GetTask(ctx context.Context, id string) (*TaskRecord, error)
UpdateTask(ctx context.Context, task *TaskRecord) error
DeleteTask(ctx context.Context, id string) error
// Task queries
ListTasks(ctx context.Context, filter *TaskFilter) ([]*TaskRecord, error)
GetTaskStats(ctx context.Context) (*TaskStats, error)
// Task state transitions
MarkTaskQueued(ctx context.Context, id string) error
MarkTaskRunning(ctx context.Context, id, provider, worktreeID string) error
MarkTaskPendingApproval(ctx context.Context, id, worktreePath, worktreeBranch, baseBranch, baseCommit string, result *ExecuteResult) error // Work done, awaiting human review
MarkTaskCompleted(ctx context.Context, id string, result *ExecuteResult) error
MarkTaskFailed(ctx context.Context, id string, err error) error
MarkTaskRejected(ctx context.Context, id string) error // Human rejected the work
MarkTaskCancelled(ctx context.Context, id string) error
RequeueTask(ctx context.Context, id string) error // Reset status to pending for next stage execution
ResetTaskToPending(ctx context.Context, id string) error // Reset running task back to pending (worktree limit recovery)
// Duplicate detection
FindDuplicateTask(ctx context.Context, fingerprint uint64, threshold float64) (*TaskRecord, error)
SetTaskFingerprint(ctx context.Context, id string, fingerprint uint64) error
// Thread linking (for dashboard visibility)
SetTaskThreadID(ctx context.Context, id string, threadID string) error
// Execution chain tracking (M-CHAINS-SIMPLIFY)
UpdateTaskChainInfo(ctx context.Context, id, chainID, stageID string) error
// Cross-database correlation (for Control Plane event classification)
// Returns agent info for a task: agentID (FromAgent), inbox (ToInbox), title
// Note: By convention, agent id == inbox in agent config
GetTaskAgentInfo(ctx context.Context, taskID string) (agentID, inbox, title string, err error)
// GitHub integration (M-COORD-GITHUB-AUTO-ROUTING)
SetTaskGithubIssue(ctx context.Context, id string, issueNum int) error
SetTaskStage(ctx context.Context, id string, stage TaskStage) error
SetTaskDesignDocPath(ctx context.Context, id string, path string) error
SetTaskSprintPlanPath(ctx context.Context, id string, path string) error
GetTasksByGithubIssue(ctx context.Context, issueNum int) ([]*TaskRecord, error)
GetTasksByStage(ctx context.Context, stage TaskStage) ([]*TaskRecord, error)
// Resource metrics
UpdateTaskMetrics(ctx context.Context, id string, peakCPU, peakMemory float64) error
// Budget tracking (per-provider)
GetCostByProvider() (map[string]float64, error)
// Approval requests
CreateApprovalRequest(ctx context.Context, req *ApprovalRequestRecord) error
GetApprovalRequest(ctx context.Context, id string) (*ApprovalRequestRecord, error) // Get by approval ID
GetApprovalRequestByTask(ctx context.Context, taskID string) (*ApprovalRequestRecord, error) // Get pending by task ID
GetApprovalRequestByTaskAnyStatus(ctx context.Context, taskID string) (*ApprovalRequestRecord, error) // Get approval regardless of status (for handoff triggering)
ListPendingApprovals(ctx context.Context) ([]*ApprovalRequestRecord, error)
ListResolvedApprovals(ctx context.Context, limit int) ([]*ApprovalRequestRecord, error) // List resolved (approved/rejected) approvals
ResolveApprovalRequest(ctx context.Context, id, status, resolvedBy string) error
ResolveApprovalRequestByTask(ctx context.Context, taskID, status, resolvedBy string) error
MarkApprovalHandoffsTriggered(ctx context.Context, taskID string) error // Mark that handoffs were sent
ListApprovedMergeHandoffsWithoutTrigger(ctx context.Context) ([]*ApprovalRequestRecord, error) // Find missed handoffs
// Cleanup
DeleteOldTasks(ctx context.Context, olderThan time.Duration) (int, error)
RecoverStaleTasks(ctx context.Context, staleThreshold time.Duration) (int, error) // Cancel stale running/queued tasks on startup
RetryAllFailedTasks(ctx context.Context) (int, error) // Reset all failed tasks to pending
// Event storage (for task logs)
StoreTaskEvent(ctx context.Context, event *TaskEventRecord) error
GetTaskEvents(ctx context.Context, taskID string, limit int) ([]*TaskEventRecord, error)
// Lifecycle
Close() error
}
Store is the neutral interface for task persistence. Implementations can be SQLite (local), PostgreSQL, or cloud services.
type StoreBackedApprovalCheckpoint ¶
type StoreBackedApprovalCheckpoint struct {
*ApprovalCheckpoint
// contains filtered or unexported fields
}
StoreBackedApprovalCheckpoint wraps ApprovalCheckpoint with SQLite persistence. This allows CLI commands to approve/reject requests that the daemon is waiting on.
func NewStoreBackedApprovalCheckpoint ¶
func NewStoreBackedApprovalCheckpoint(store ApprovalStore, defaultTimeout time.Duration) *StoreBackedApprovalCheckpoint
NewStoreBackedApprovalCheckpoint creates a store-backed approval checkpoint
func (*StoreBackedApprovalCheckpoint) RequestApproval ¶
func (sac *StoreBackedApprovalCheckpoint) RequestApproval(ctx context.Context, request *ApprovalRequest) (ApprovalStatus, error)
RequestApproval creates an approval request and waits for resolution. It persists the request to the store and polls for status changes.
type Task ¶
type Task struct {
ID string
Title string
Content string
Kind string // "directive" or "question" - affects execution mode
Priority int
MessageID string
ParentTaskID string // Parent task ID for hierarchy tracking (M-TASK-HIERARCHY)
CreatedAt time.Time
// M-TRANSCRIPT: Feedback loop support
Iteration int // Current iteration (1 = first run, 2+ = re-run with feedback)
SessionID string // Session ID for resume capability
}
Task represents a task to be executed
type TaskAnalyzer ¶
type TaskAnalyzer struct {
// contains filtered or unexported fields
}
TaskAnalyzer analyzes and classifies tasks
func NewTaskAnalyzer ¶
func NewTaskAnalyzer(similarityThreshold float64) *TaskAnalyzer
NewTaskAnalyzer creates a new task analyzer
func (*TaskAnalyzer) Analyze ¶
func (a *TaskAnalyzer) Analyze(task *Task) *AnalyzedTask
Analyze processes a task and returns an AnalyzedTask
func (*TaskAnalyzer) ClearFingerprints ¶
func (a *TaskAnalyzer) ClearFingerprints()
ClearFingerprints clears all stored fingerprints (useful for testing)
func (*TaskAnalyzer) FingerprintCount ¶
func (a *TaskAnalyzer) FingerprintCount() int
FingerprintCount returns the number of stored fingerprints
type TaskChain ¶
type TaskChain struct {
// contains filtered or unexported fields
}
TaskChain manages the pipeline: design-doc → sprint-planner → sprint-executor. It handles stage transitions and GitHub notifications.
func NewTaskChain ¶
func NewTaskChain(poster *GitHubPoster, store Store, watcher *ApprovalWatcher) *TaskChain
NewTaskChain creates a new task chain manager.
func (*TaskChain) OnAgentApproved ¶
func (tc *TaskChain) OnAgentApproved(ctx context.Context, event *ApprovalEvent, agentID string) error
OnAgentApproved is called when an agent's work is approved via config-driven workflow.
func (*TaskChain) OnAgentComplete ¶
func (tc *TaskChain) OnAgentComplete(ctx context.Context, taskID, agentID string, result *AgentResult, registry *AgentRegistry) error
OnAgentComplete is the unified handler for any agent completion. It uses the agent's ApprovalConfig to determine the appropriate GitHub labels and comment template, eliminating the need for hardcoded stage handlers.
This handler: 1. Stores the artifact path for later use 2. Reads artifact content from worktree (if applicable) 3. Posts completion comment to GitHub 4. Adds needs-approval label from agent config
For agents without ApprovalConfig, this is a no-op (agent handles its own workflow).
func (*TaskChain) OnDesignApproved ¶
func (tc *TaskChain) OnDesignApproved(ctx context.Context, event *ApprovalEvent) error
OnDesignApproved is called when a design document is approved.
func (*TaskChain) OnDesignDocComplete ¶
func (tc *TaskChain) OnDesignDocComplete(ctx context.Context, taskID string, result *DesignDocResult) error
OnDesignDocComplete is called when a design document is created. Posts a summary to GitHub and adds the needs-design-approval label.
func (*TaskChain) OnImplementationComplete ¶
func (tc *TaskChain) OnImplementationComplete(ctx context.Context, taskID string, result *ImplementResult) error
OnImplementationComplete is called when implementation is finished.
func (*TaskChain) OnMergeApproved ¶
func (tc *TaskChain) OnMergeApproved(ctx context.Context, event *ApprovalEvent) error
OnMergeApproved is called when merge is approved.
func (*TaskChain) OnNeedsRevision ¶
func (tc *TaskChain) OnNeedsRevision(ctx context.Context, event *ApprovalEvent) error
OnNeedsRevision is called when revision is requested.
func (*TaskChain) OnSprintApproved ¶
func (tc *TaskChain) OnSprintApproved(ctx context.Context, event *ApprovalEvent) error
OnSprintApproved is called when a sprint plan is approved.
func (*TaskChain) OnSprintPlanComplete ¶
func (tc *TaskChain) OnSprintPlanComplete(ctx context.Context, taskID string, result *SprintPlanResult) error
OnSprintPlanComplete is called when a sprint plan is created.
func (*TaskChain) SetAgentRegistry ¶
func (tc *TaskChain) SetAgentRegistry(registry *AgentRegistry)
SetAgentRegistry sets the agent registry for config-driven workflows.
func (*TaskChain) SetMessageStore ¶
func (tc *TaskChain) SetMessageStore(msgStore messaging.MessageStore)
SetMessageStore sets the message store for handoff messages.
type TaskEventContext ¶
type TaskEventContext struct {
Workspace string // Working directory path
Directive string // Full initial directive/prompt
AgentID string // Agent identifier (e.g., "design-doc-creator")
SourceType string // Event source: coordinator, eval, github, direct
}
TaskEventContext holds context information for task events. This is used to enrich events with workspace, directive, and agent info.
type TaskEventRecord ¶
type TaskEventRecord struct {
ID int64 `json:"id"`
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
StreamType string `json:"stream_type"` // "text", "tool_use", "tool_result", "error", "status", "turn_start", "turn_end"
TurnNum int `json:"turn_num,omitempty"`
Text string `json:"text,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ToolInput string `json:"tool_input,omitempty"`
ToolOutput string `json:"tool_output,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
Status string `json:"status,omitempty"`
TokensIn int `json:"tokens_in,omitempty"`
TokensOut int `json:"tokens_out,omitempty"`
Cost float64 `json:"cost,omitempty"`
DurationSec int `json:"duration_sec,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
TaskEventRecord represents a stored task streaming event
type TaskExecutor ¶
type TaskExecutor struct {
// contains filtered or unexported fields
}
TaskExecutor orchestrates task execution across multiple providers. It routes tasks to the appropriate provider based on task type and provider capabilities.
func DefaultTaskExecutor ¶
func DefaultTaskExecutor() (*TaskExecutor, error)
DefaultTaskExecutor creates a TaskExecutor with all available providers. It discovers executor-based providers from the global executor factory, so adding a new executor (e.g., codex) only requires registering it via init().
func NewTaskExecutor ¶
func NewTaskExecutor(providers ...Provider) *TaskExecutor
NewTaskExecutor creates a new task executor with the given providers
func (*TaskExecutor) Execute ¶
func (te *TaskExecutor) Execute(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions) (*ExecuteResult, error)
Execute runs a task using the best available provider
func (*TaskExecutor) ExecuteWithRetry ¶
func (te *TaskExecutor) ExecuteWithRetry(ctx context.Context, task *AnalyzedTask, opts *ExecuteOptions, maxRetries int) (*ExecuteResult, error)
ExecuteWithRetry runs a task with retry logic
func (*TaskExecutor) ListProviders ¶
func (te *TaskExecutor) ListProviders() []string
ListProviders returns the names of all registered providers
type TaskFilter ¶
type TaskFilter struct {
Status []TaskStatus
Type []TaskType
Provider string
Workspace string // Filter by source workspace
Since *time.Time
Until *time.Time
Limit int
Offset int
OrderBy string // "created_at", "priority", "started_at"
OrderDesc bool
}
TaskFilter for querying tasks
type TaskRecord ¶
type TaskRecord struct {
ID string `json:"id"`
MessageID string `json:"message_id,omitempty"`
ThreadID string `json:"thread_id,omitempty"` // Thread in collaboration.db for dashboard visibility
ParentTaskID string `json:"parent_task_id,omitempty"` // Parent task for hierarchy tracking (handoffs)
Title string `json:"title"`
Content string `json:"content"`
Type TaskType `json:"type"`
Kind string `json:"kind,omitempty"` // "directive" or "question" - affects execution mode
Priority int `json:"priority"`
Status TaskStatus `json:"status"`
Provider string `json:"provider,omitempty"`
AgentID string `json:"agent_id,omitempty"` // ID of agent that processed this task
WorktreeID string `json:"worktree_id,omitempty"`
WorktreePath string `json:"worktree_path,omitempty"` // Path to git worktree (preserved until approval)
BaseBranch string `json:"base_branch,omitempty"` // Base branch worktree was created from (for diff comparison)
BaseCommit string `json:"base_commit,omitempty"` // Base commit hash at worktree creation (stable reference for diff)
SessionID string `json:"session_id,omitempty"` // Claude Code/Gemini CLI session for resumption
Iteration int `json:"iteration,omitempty"` // Iteration number (1 = first, 2+ = re-run with feedback)
Workspace string `json:"workspace,omitempty"` // Source workspace from thread (not worktree)
// Execution chain tracking (M-CHAINS-SIMPLIFY)
ChainID string `json:"chain_id,omitempty"` // ExecutionChain ID for unified hierarchy
StageID string `json:"stage_id,omitempty"` // ChainStage ID for this agent's execution
// GitHub integration (M-COORD-GITHUB-AUTO-ROUTING)
GithubIssue int `json:"github_issue,omitempty"` // Linked GitHub issue number
GithubRepo string `json:"github_repo,omitempty"` // GitHub repo (owner/repo) for issue operations
Stage TaskStage `json:"stage,omitempty"` // Pipeline stage (design, sprint, implementation, merge)
DesignDocPath string `json:"design_doc_path,omitempty"` // Path to design doc (for merge comment)
SprintPlanPath string `json:"sprint_plan_path,omitempty"` // Path to sprint plan (for merge comment)
// Timestamps
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
Output string `json:"output,omitempty"`
Cost float64 `json:"cost,omitempty"`
TokensUsed int `json:"tokens_used,omitempty"`
// Detailed token breakdown
InputTokens int `json:"input_tokens,omitempty"`
OutputTokens int `json:"output_tokens,omitempty"`
// Resource metrics
PeakCPU float64 `json:"peak_cpu,omitempty"`
PeakMemory float64 `json:"peak_memory_mb,omitempty"`
// Capability detection (M-DEPRECATE-AILANG-AGENT)
Capabilities []Capability `json:"capabilities,omitempty"` // Detected capability requirements
ImpactLevel string `json:"impact_level,omitempty"` // "low", "medium", or "high"
EstimatedCost float64 `json:"estimated_cost,omitempty"` // Pre-execution cost estimate in USD
// M-HARNESS-COMMIT-CONTRACT: Website builder metadata from message payload
SiteSlug string `json:"site_slug,omitempty"` // Site identifier for commit messages
BriefID string `json:"brief_id,omitempty"` // Brief identifier for commit messages
}
TaskRecord represents a task stored in the database
type TaskStage
deprecated
type TaskStage string
TaskStage represents the pipeline stage for GitHub-linked tasks.
Deprecated: Use agent_id tracking with trigger_on_complete configuration instead. Stage is maintained for backwards compatibility with existing tasks. Will be removed in v0.9.0. See M-GENERIC-PIPELINE design doc for migration guidance.
Migration guide:
- Replace TaskStageDesign with agent_id = "design-doc-creator"
- Replace TaskStageSprint with agent_id = "sprint-planner"
- Replace TaskStageImplementation with agent_id = "sprint-executor"
- Configure trigger_on_complete in ~/.ailang/config.yaml for automatic handoffs
const ( // Deprecated: Use agent_id tracking instead TaskStageNone TaskStage = "" // Not part of a pipeline // Deprecated: Use agent_id = "design-doc-creator" with approval config instead TaskStageDesign TaskStage = "design" // Creating design document // Deprecated: Use agent_id = "sprint-planner" with approval config instead TaskStageSprint TaskStage = "sprint" // Creating sprint plan // Deprecated: Use agent_id = "sprint-executor" with approval config instead TaskStageImplementation TaskStage = "implementation" // Implementing the sprint // Deprecated: Use agent_id with approval config for merge workflow instead TaskStageMerge TaskStage = "merge" // Awaiting merge approval )
type TaskStats ¶
type TaskStats struct {
TotalTasks int `json:"total_tasks"`
PendingTasks int `json:"pending_tasks"`
RunningTasks int `json:"running_tasks"`
PendingApprovals int `json:"pending_approvals"` // Tasks awaiting human approval
CompletedTasks int `json:"completed_tasks"`
FailedTasks int `json:"failed_tasks"`
ByType map[string]int `json:"by_type"`
ByProvider map[string]*DetailedStats `json:"by_provider"`
ByWorkspace map[string]*DetailedStats `json:"by_workspace"` // Per-workspace breakdown
TotalCost float64 `json:"total_cost"`
TotalTokens int `json:"total_tokens"`
AvgDuration time.Duration `json:"avg_duration"`
}
TaskStats provides aggregate statistics
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the lifecycle state of a task
const ( TaskStatusPending TaskStatus = "pending" TaskStatusQueued TaskStatus = "queued" TaskStatusRunning TaskStatus = "running" TaskStatusPendingApproval TaskStatus = "pending_approval" // Work done, awaiting human review TaskStatusCompleted TaskStatus = "completed" // Approved and merged TaskStatusFailed TaskStatus = "failed" TaskStatusRejected TaskStatus = "rejected" // Human rejected the work TaskStatusCancelled TaskStatus = "cancelled" TaskStatusDuplicate TaskStatus = "duplicate" )
type WatcherStatus ¶
type WatcherStatus struct {
Running bool `json:"running"`
LastPoll time.Time `json:"last_poll"`
PollInterval time.Duration `json:"poll_interval"`
WatchedIssues map[int]string `json:"watched_issues"` // issue number -> task ID
}
WatcherStatus represents the current state of the ApprovalWatcher.
type WorkspaceMapping ¶
type WorkspaceMapping struct {
Pattern string `yaml:"pattern" json:"pattern"` // Glob pattern (e.g., "*/dev/sunholo/ailang")
Workspace string `yaml:"workspace" json:"workspace"` // Workspace ID (e.g., "sunholo-data/ailang")
}
WorkspaceMapping defines a path pattern to workspace ID mapping.
type WorkspacesConfig ¶
type WorkspacesConfig struct {
Mappings []WorkspaceMapping `yaml:"mappings" json:"mappings"`
DefaultWorkspace string `yaml:"default_workspace" json:"default_workspace"`
DeriveFromPath bool `yaml:"derive_from_path" json:"derive_from_path"` // If true, derive workspace from path when not matched
}
WorkspacesConfig contains workspace-related configuration for access control.
func DefaultWorkspacesConfig ¶
func DefaultWorkspacesConfig() *WorkspacesConfig
DefaultWorkspacesConfig returns minimal default workspace mappings. Only internal patterns are hardcoded - user projects should be in config. For unmapped paths, DeriveWorkspaceFromPath() extracts workspace ID from path.
func LoadWorkspacesConfig ¶
func LoadWorkspacesConfig() *WorkspacesConfig
LoadWorkspacesConfig loads workspace configuration from ~/.ailang/config.yaml. Returns a default configuration if no config is set.
func (*WorkspacesConfig) BuildWorkspaceMappingSQL ¶
func (c *WorkspacesConfig) BuildWorkspaceMappingSQL(cwdColumn string) string
BuildWorkspaceMappingSQL generates a SQL CASE statement for mapping file paths to workspace IDs. Used by the analytics backend to map process.cwd values to Firestore workspace IDs.
When DeriveFromPath is true, unmapped paths are derived using SQL expressions: - Paths with /dev/{org}/{repo} return "org/repo" - Other paths return "unknown"
func (*WorkspacesConfig) GetPathPatternsForWorkspace ¶
func (c *WorkspacesConfig) GetPathPatternsForWorkspace(workspaceID string) []string
GetPathPatternsForWorkspace returns SQL LIKE patterns that match a workspace ID. Used for filtering spans by workspace when the filter is an org/repo ID. Returns nil if no matching mappings found (caller should use fallback).
Example: For workspace "MarkEdmondson1234/TwilightGame" with mapping {Pattern: "*/TwilightGame*", Workspace: "MarkEdmondson1234/TwilightGame"}, returns ["%/TwilightGame%"] as the pattern to match process.cwd values.
func (*WorkspacesConfig) GetWorkspaceLabel ¶
func (c *WorkspacesConfig) GetWorkspaceLabel(workspaceID string) string
GetWorkspaceLabel returns a human-friendly label for a workspace ID. For internal workspaces, returns predefined labels. For user workspaces (org/repo format), returns a formatted label. For raw paths (from DeriveFromPath fallback), derives and formats the label.
type Worktree ¶
type Worktree struct {
TaskID string
Path string
Branch string
BaseBranch string // Base branch the worktree was created from (for diff comparison)
BaseCommit string // Base commit hash at worktree creation (stable reference - branch may move)
CreatedAt time.Time
}
Worktree represents a git worktree for task isolation
type WorktreeChanges ¶
type WorktreeChanges struct {
TaskID string `json:"task_id"`
Path string `json:"path"`
Branch string `json:"branch"`
FilesChanged []string `json:"files_changed"`
DiffSummary string `json:"diff_summary"`
CommitsAhead string `json:"commits_ahead"`
}
WorktreeChanges describes the changes in a worktree
type WorktreeManager ¶
type WorktreeManager struct {
// contains filtered or unexported fields
}
WorktreeManager manages git worktrees for isolated task execution
func NewWorktreeManager ¶
func NewWorktreeManager(repoDir, baseDir string, maxWorktrees int) (*WorktreeManager, error)
NewWorktreeManager creates a new worktree manager
func (*WorktreeManager) CleanupAll ¶
func (wm *WorktreeManager) CleanupAll() error
CleanupAll removes all worktrees
func (*WorktreeManager) CleanupOrphaned ¶
func (wm *WorktreeManager) CleanupOrphaned() (int, error)
CleanupOrphaned removes worktrees that no longer exist on disk
func (*WorktreeManager) Count ¶
func (wm *WorktreeManager) Count() int
Count returns the number of active worktrees
func (*WorktreeManager) CreateWorktree ¶
func (wm *WorktreeManager) CreateWorktree(taskID, baseBranch string) (*Worktree, error)
CreateWorktree creates a new worktree for a task. baseBranch specifies the branch to create the worktree from (e.g., "dev"). If baseBranch is empty, it queries git for the remote's default branch.
func (*WorktreeManager) GetChangeSummary ¶
func (wm *WorktreeManager) GetChangeSummary(taskID string) (*WorktreeChanges, error)
GetChangeSummary returns a summary of changes in the worktree
func (*WorktreeManager) GetWorktree ¶
func (wm *WorktreeManager) GetWorktree(taskID string) (*Worktree, bool)
GetWorktree returns a worktree by task ID
func (*WorktreeManager) HasChanges ¶
func (wm *WorktreeManager) HasChanges(taskID string) (bool, error)
HasChanges checks if a worktree has uncommitted changes
func (*WorktreeManager) ListWorktrees ¶
func (wm *WorktreeManager) ListWorktrees() []*Worktree
ListWorktrees returns all active worktrees
func (*WorktreeManager) RemoveWorktree ¶
func (wm *WorktreeManager) RemoveWorktree(taskID string) error
RemoveWorktree removes a worktree
Source Files
¶
- agent_config.go
- agent_registry.go
- analyzer.go
- apikey_cache.go
- approval_checkpoint.go
- approval_processor.go
- approval_watcher.go
- artifact_discovery.go
- autonomy_router.go
- capability_detector.go
- cascade_scheduler.go
- cloud_dispatcher.go
- daemon.go
- daemon_approval.go
- daemon_github.go
- daemon_http.go
- daemon_lifecycle.go
- daemon_tasks_budget.go
- daemon_tasks_chain.go
- daemon_tasks_exec.go
- daemon_tasks_init.go
- daemon_tasks_output.go
- daemon_tasks_polling.go
- daemon_tasks_worktrees.go
- event_formatter.go
- event_handler.go
- github_comments.go
- github_poster.go
- github_webhook.go
- history_collector.go
- http_broadcaster.go
- human_interaction.go
- kms.go
- merge.go
- message_adapter.go
- meta_prompt.go
- observatory_sync.go
- provider.go
- provider_executor.go
- provider_gemini.go
- provider_script.go
- pubsub_adapter.go
- pubsub_broadcaster.go
- pubsub_completion_handler.go
- resource_tracker.go
- sanitize.go
- stage_execution.go
- stale_task_detector.go
- store.go
- store_sqlite.go
- store_sqlite_approvals.go
- store_sqlite_events.go
- store_sqlite_queries.go
- task_chain.go
- task_chain_agent.go
- task_executor.go
- templates.go
- watcher.go
- worktree.go