orchestrator

package
v0.601.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package orchestrator coordinates agent tasks and dependencies.

Index

Constants

This section is empty.

Variables

View Source
var ErrNoWarmTarget = errors.New("no warm target available; use cold path")

ErrNoWarmTarget signals that a delegated task cannot (or must not) be served by a warm per-project instance — e.g. the project is unregistered, has no config, is served by an external editor instance, auto-start is disabled with nothing running, or the per-instance concurrency cap is reached. The orchestrator falls back to the cold subprocess path when a resolver returns this. Any other error from RunWarm is treated as a genuine warm-run failure (terminal-failed task).

Functions

This section is empty.

Types

type AwaitIntent added in v0.601.2

type AwaitIntent struct {
	// SessionID is the parent agent session that is awaiting.
	SessionID string
	// TaskIDs are the awaited correlated task ids. The await tool resolves an empty
	// caller-supplied list to all currently-outstanding correlated tasks before
	// registering, so this is never empty once stored.
	TaskIDs []string
	// Policy is the join policy ("all" | "any" | "quorum").
	Policy AwaitPolicy
	// Quorum is the number of reported tasks required when Policy is quorum.
	Quorum int
	// Deadline is a safety timeout after which the supervisor flushes whatever has
	// accumulated even if the join condition is not yet satisfied. Zero means none.
	Deadline time.Time
	// CreatedAt is when the intent was registered.
	CreatedAt time.Time
}

AwaitIntent records a parent agent session's intent to wait, non-blocking, for a set of correlated delegated tasks. The await tool registers it and ends the model's turn; the delegation supervisor consults it on each completion and resurrects the idle parent loop once the join condition is satisfied (or the Deadline passes). It is in-memory only and not persisted: it mirrors the supervisor's own in-memory batch state, so neither survives a restart (the task conclusions themselves remain durable on the store and re-fetchable).

type AwaitPolicy added in v0.601.2

type AwaitPolicy string

AwaitPolicy is the join policy a parent agent registers when it awaits one or more delegated tasks before ending its turn.

const (
	// AwaitPolicyAll resolves when every awaited task has reported (reached a
	// terminal state).
	AwaitPolicyAll AwaitPolicy = "all"
	// AwaitPolicyAny resolves as soon as at least one awaited task has reported.
	AwaitPolicyAny AwaitPolicy = "any"
	// AwaitPolicyQuorum resolves when at least Quorum awaited tasks have reported.
	AwaitPolicyQuorum AwaitPolicy = "quorum"
)

type Config

type Config struct {
	StorePath string
	LogDir    string
	// EnginesDir is the directory scanned at startup for *.template.yaml custom
	// engine files. When empty the manager derives it from LogDir.
	EnginesDir  string
	MaxParallel int
	// DefaultMCPConfig is an optional explicit override for the MCP config file
	// passed to subagents. When empty (the default), pando builds a dynamic
	// config at spawn time that includes pando itself as an MCP server plus all
	// configured MCP servers.
	DefaultMCPConfig string
	DefaultEngine    string
	PersonaPath      string
	AppConfig        *mesnadaconfig.Config // Full app config for passing to managers
	// MCPServers lists the MCP servers configured in pando that should be
	// forwarded to subagents. Populated from the pando application config.
	MCPServers []agent.PandoMCPServerEntry
	// GatewayExposeEnabled indicates that MCPGateway re-exports all configured
	// MCP servers through pando's own MCP server. When true, the individual
	// MCPServers entries are not forwarded separately (they are already
	// accessible via the "pando" MCP server entry).
	GatewayExposeEnabled bool
	// ModelResolver converts a model ID (possibly empty or shorthand) into the
	// full "provider.model" string expected by the pando CLI's -m flag.
	// When nil, model IDs are forwarded as-is to the pando CLI spawner.
	ModelResolver func(string) string
	// Delegation carries the conclusion-protocol options. When Delegation.Enabled
	// is false (the default) the orchestrator behaves exactly as before.
	Delegation DelegationConfig
	// ProjectResolver maps a canonical project path to its registry id and display
	// name, used by the conclusion enricher. When nil the enricher derives the
	// display name from filepath.Base(projectPath).
	ProjectResolver conclusion.ProjectResolver
	// WarmTargetResolver routes delegated tasks to warm per-project ACP instances
	// (Phase 7.3). When nil (or Delegation.ReuseWarmInstances is false) every task
	// takes the cold subprocess path, preserving today's behavior.
	WarmTargetResolver WarmTargetResolver
}

Config holds orchestrator configuration.

type DelegationConfig added in v0.601.2

type DelegationConfig struct {
	// Enabled gates the whole conclusion protocol. When false the orchestrator
	// preserves today's behavior byte-for-byte: no brief is appended and the
	// enricher is not run.
	Enabled bool
	// SynthesizeFallback enables deriving a conclusion from output/error when the
	// subagent did not emit a sentinel block.
	SynthesizeFallback bool
	// ReuseWarmInstances routes a delegated task whose project is known to a warm
	// per-project ACP instance (via WarmTargetResolver) instead of cold-spawning a
	// CLI. Master switch for warm reuse (default off); requires Enabled and a wired
	// resolver. The reuse-then-autostart policy and concurrency cap are applied by
	// the resolver adapter, not here.
	ReuseWarmInstances bool
}

DelegationConfig mirrors the conclusion-relevant subset of the application's delegation config. It is a plain struct (not an import of internal/config) so the orchestrator stays free of config import cycles, following the same pattern as ModelResolver.

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator coordinates the execution of CLI agents.

func New

func New(cfg Config) (*Orchestrator, error)

New creates a new Orchestrator.

func (*Orchestrator) ACPSessionControl

func (o *Orchestrator) ACPSessionControl(taskID, action, message, mode string) (interface{}, error)

ACPSessionControl sends a control command to an active ACP session. This delegates to the agent manager's ACP session control.

func (*Orchestrator) AwaitIntentFor added in v0.601.2

func (o *Orchestrator) AwaitIntentFor(sessionID string) (*AwaitIntent, bool)

AwaitIntentFor returns the active await intent for a session, if any.

func (*Orchestrator) Cancel

func (o *Orchestrator) Cancel(taskID string) error

Cancel cancels a running task.

func (*Orchestrator) ClearAwaitIntent added in v0.601.2

func (o *Orchestrator) ClearAwaitIntent(sessionID string)

ClearAwaitIntent removes the await intent for a session (no-op if none).

func (*Orchestrator) Delete

func (o *Orchestrator) Delete(taskID string) error

Delete removes a task from the store. If the task is running, it will attempt to cancel it first. If the process is already dead or doesn't exist, the task will be deleted anyway.

func (*Orchestrator) GetStats

func (o *Orchestrator) GetStats() Stats

GetStats returns orchestrator statistics.

func (*Orchestrator) GetTask

func (o *Orchestrator) GetTask(taskID string) (*models.Task, error)

GetTask retrieves a task by ID.

func (*Orchestrator) ListByParentSession added in v0.601.2

func (o *Orchestrator) ListByParentSession(sessionID string) ([]*models.Task, error)

ListByParentSession returns all tasks correlated to the given parent agent session id. The delegation supervisor uses it to decide whether any sibling delegated tasks are still outstanding before resurrecting an idle parent loop (Case B). Thin pass-through to the underlying store.

func (*Orchestrator) ListCustomEngines added in v0.416.4

func (o *Orchestrator) ListCustomEngines() []string

ListCustomEngines returns the names of all custom engine templates loaded at startup.

func (*Orchestrator) ListPersonas

func (o *Orchestrator) ListPersonas() []string

ListPersonas returns a list of available persona names.

func (*Orchestrator) ListTasks

func (o *Orchestrator) ListTasks(req models.ListRequest) ([]*models.Task, error)

ListTasks lists tasks matching the filter.

func (*Orchestrator) Pause

func (o *Orchestrator) Pause(taskID string) (*models.Task, error)

Pause pauses a running or pending task. Pausing stops the underlying Copilot process (if any) and marks the task as paused.

func (*Orchestrator) Purge

func (o *Orchestrator) Purge(taskID string) error

Purge stops a running task (if needed), deletes its log file (if any), and removes it from the store. This operation is intentionally idempotent: purging a missing task returns nil.

func (*Orchestrator) Relaunch added in v0.294.1

func (o *Orchestrator) Relaunch(ctx context.Context, taskID string, opts RelaunchOptions) (*models.Task, error)

Relaunch resets an existing task (of any status) back to pending and re-runs it. Unlike Retry (which creates a new task), Relaunch reuses the same task ID so that any dependent tasks automatically benefit from the new execution without needing their dependency arrays updated.

Optional fields in opts override the stored task configuration; zero values keep the original values.

func (*Orchestrator) Resume

func (o *Orchestrator) Resume(ctx context.Context, taskID string, opts ResumeOptions) (*models.Task, error)

Resume creates a new task to continue work from a previously paused task.

func (*Orchestrator) Retry

func (o *Orchestrator) Retry(ctx context.Context, taskID string, opts RetryOptions) (*models.Task, error)

Retry relaunches a failed task or reactivates a pending task. For failed tasks, it creates a new task reusing the same log file (append mode) with the original prompt plus a retry notice. For pending tasks, it checks dependencies and starts the task if ready.

func (*Orchestrator) SetAwaitIntent added in v0.601.2

func (o *Orchestrator) SetAwaitIntent(intent *AwaitIntent)

SetAwaitIntent registers (replacing any existing) the await intent for a session. Safe for concurrent use.

func (*Orchestrator) SetProgress

func (o *Orchestrator) SetProgress(taskID string, percentage int, description string) error

SetProgress updates the progress of a running task.

func (*Orchestrator) Shutdown

func (o *Orchestrator) Shutdown() error

Shutdown gracefully shuts down the orchestrator.

func (*Orchestrator) Spawn

Spawn creates and optionally starts a new agent task.

func (*Orchestrator) SubscribeCompletions added in v0.601.2

func (o *Orchestrator) SubscribeCompletions() (<-chan *models.Task, func())

SubscribeCompletions registers a global subscriber that receives every task that reaches a terminal state via onTaskComplete (after its conclusion has been captured). It returns a receive-only channel and an unsubscribe function that removes the channel; the unsubscribe function is idempotent. The channel is buffered; deliveries are non-blocking so a slow consumer drops events rather than stalling the orchestrator. This is the hook the delegation supervisor uses for Case A (inject into a live parent loop) and, later, Case B (resurrection).

func (*Orchestrator) Wait

func (o *Orchestrator) Wait(ctx context.Context, taskID string, timeout time.Duration) (*models.Task, error)

Wait waits for a task to complete.

func (*Orchestrator) WaitMultiple

func (o *Orchestrator) WaitMultiple(ctx context.Context, taskIDs []string, waitAll bool, timeout time.Duration) (map[string]*models.Task, error)

WaitMultiple waits for multiple tasks.

type RelaunchOptions added in v0.294.1

type RelaunchOptions struct {
	// Prompt overrides the original prompt when non-empty.
	Prompt string
	// Engine overrides the engine when non-zero.
	Engine models.Engine
	// Model overrides the model when non-empty.
	Model string
	// Timeout overrides the timeout when non-empty.
	Timeout string
	// Background controls whether the relaunch runs in the background.
	Background bool
}

RelaunchOptions controls how a task is relaunched in-place.

type ResumeOptions

type ResumeOptions struct {
	Prompt     string
	Model      string
	Background bool
	Timeout    string
	Tags       *[]string
}

ResumeOptions controls how a paused task is resumed.

type RetryOptions

type RetryOptions struct {
	Background bool
}

RetryOptions controls how a failed or pending task is retried.

type Stats

type Stats struct {
	Total           int                         `json:"total"`
	Pending         int                         `json:"pending"`
	Running         int                         `json:"running"`
	Paused          int                         `json:"paused"`
	Completed       int                         `json:"completed"`
	Failed          int                         `json:"failed"`
	Cancelled       int                         `json:"cancelled"`
	RunningProgress map[string]TaskProgressInfo `json:"running_progress,omitempty"`
}

Stats holds orchestrator statistics.

type TaskProgressInfo

type TaskProgressInfo struct {
	TaskID      string    `json:"task_id"`
	Percentage  int       `json:"percentage"`
	Description string    `json:"description"`
	UpdatedAt   time.Time `json:"updated_at"`
}

TaskProgressInfo holds progress information for a task.

type WarmRunResult added in v0.601.2

type WarmRunResult struct {
	// ChildSessionID is the ACP session id created in the warm child for this run.
	ChildSessionID string
	// Output is the agent's full streamed message text for the turn.
	Output string
	// StopReason is the ACP stop reason reported by the child (e.g. end_turn).
	StopReason string
}

WarmRunResult is the captured outcome of running a delegated prompt inside a warm per-project ACP instance. It mirrors internal/project.DelegateResult but lives here so the orchestrator stays free of an internal/project import cycle (the adapter that bridges the two lives in internal/app).

type WarmTargetResolver added in v0.601.2

type WarmTargetResolver interface {
	// RunWarm runs promptText inside a warm instance for the given project,
	// applying the reuse-then-autostart policy and the per-instance concurrency
	// cap. projectID may be empty, in which case projectPath is resolved to a
	// registered project. It returns ErrNoWarmTarget when the project cannot be
	// served by a warm instance so the caller takes the cold path.
	RunWarm(ctx context.Context, projectID, projectPath, promptText string) (*WarmRunResult, error)
}

WarmTargetResolver routes a delegated task to a warm per-project ACP instance, capturing its conclusion over the wire instead of cold-spawning a CLI. It is implemented by an adapter over internal/project.Manager and injected into the orchestrator to avoid an import cycle (same pattern as ProjectResolver / ModelResolver).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL