Documentation
¶
Overview ¶
Package orchestrator coordinates agent tasks and dependencies.
Index ¶
- Variables
- type AwaitIntent
- type AwaitPolicy
- type Config
- type DelegationConfig
- type DelegationMetrics
- type DelegationMetricsSnapshot
- type Orchestrator
- func (o *Orchestrator) ACPSessionControl(taskID, action, message, mode string) (interface{}, error)
- func (o *Orchestrator) AwaitIntentFor(sessionID string) (*AwaitIntent, bool)
- func (o *Orchestrator) Cancel(taskID string) error
- func (o *Orchestrator) ClearAwaitIntent(sessionID string)
- func (o *Orchestrator) DelegationMetrics() DelegationMetricsSnapshot
- func (o *Orchestrator) Delete(taskID string) error
- func (o *Orchestrator) GetStats() Stats
- func (o *Orchestrator) GetTask(taskID string) (*models.Task, error)
- func (o *Orchestrator) ListByParentSession(sessionID string) ([]*models.Task, error)
- func (o *Orchestrator) ListCustomEngines() []string
- func (o *Orchestrator) ListPersonas() []string
- func (o *Orchestrator) ListProjectRefs(ctx context.Context) []ProjectRef
- func (o *Orchestrator) ListTasks(req models.ListRequest) ([]*models.Task, error)
- func (o *Orchestrator) Pause(taskID string) (*models.Task, error)
- func (o *Orchestrator) ProjectRefsSupported() bool
- func (o *Orchestrator) Purge(taskID string) error
- func (o *Orchestrator) RecordLiveInjection()
- func (o *Orchestrator) RecordResurrection()
- func (o *Orchestrator) Relaunch(ctx context.Context, taskID string, opts RelaunchOptions) (*models.Task, error)
- func (o *Orchestrator) ResolveProjectRef(ctx context.Context, ref string) (ProjectRef, bool)
- func (o *Orchestrator) Resume(ctx context.Context, taskID string, opts ResumeOptions) (*models.Task, error)
- func (o *Orchestrator) Retry(ctx context.Context, taskID string, opts RetryOptions) (*models.Task, error)
- func (o *Orchestrator) SetAwaitIntent(intent *AwaitIntent)
- func (o *Orchestrator) SetProgress(taskID string, percentage int, description string) error
- func (o *Orchestrator) Shutdown() error
- func (o *Orchestrator) Spawn(ctx context.Context, req models.SpawnRequest) (*models.Task, error)
- func (o *Orchestrator) SubscribeCompletions() (<-chan *models.Task, func())
- func (o *Orchestrator) Wait(ctx context.Context, taskID string, timeout time.Duration) (*models.Task, error)
- func (o *Orchestrator) WaitMultiple(ctx context.Context, taskIDs []string, waitAll bool, timeout time.Duration) (map[string]*models.Task, error)
- type ProjectRef
- type ProjectRefResolver
- type RelaunchOptions
- type ResumeOptions
- type RetryOptions
- type Stats
- type TaskProgressInfo
- type WarmRunResult
- type WarmTargetResolver
Constants ¶
This section is empty.
Variables ¶
var ErrNoWarmTarget = errors.New("no warm target available; use cold path")
ErrNoWarmTarget signals that a delegated task cannot (or must not) be served by a warm per-project instance — e.g. the project is unregistered, has no config, is served by an external editor instance, auto-start is disabled with nothing running, or the per-instance concurrency cap is reached. The orchestrator falls back to the cold subprocess path when a resolver returns this. Any other error from RunWarm is treated as a genuine warm-run failure (terminal-failed task).
var ErrWarmCapReached = fmt.Errorf("warm instance concurrency cap reached: %w", ErrNoWarmTarget)
ErrWarmCapReached is a more specific flavour of ErrNoWarmTarget: the project HAS a warm instance but it is at its concurrency cap (and the warm queue, if any, is full), so this delegation falls back to the cold path. It wraps ErrNoWarmTarget so existing `errors.Is(err, ErrNoWarmTarget)` cold-path checks keep working; the warm resolver adapter returns it (instead of plain ErrNoWarmTarget) only for cap-driven fallbacks so the orchestrator can count them separately for telemetry (item E1).
Functions ¶
This section is empty.
Types ¶
type AwaitIntent ¶ added in v0.601.2
type AwaitIntent struct {
// SessionID is the parent agent session that is awaiting.
SessionID string
// TaskIDs are the awaited correlated task ids. The await tool resolves an empty
// caller-supplied list to all currently-outstanding correlated tasks before
// registering, so this is never empty once stored.
TaskIDs []string
// Policy is the join policy ("all" | "any" | "quorum").
Policy AwaitPolicy
// Quorum is the number of reported tasks required when Policy is quorum.
Quorum int
// Deadline is a safety timeout after which the supervisor flushes whatever has
// accumulated even if the join condition is not yet satisfied. Zero means none.
Deadline time.Time
// CreatedAt is when the intent was registered.
CreatedAt time.Time
}
AwaitIntent records a parent agent session's intent to wait, non-blocking, for a set of correlated delegated tasks. The await tool registers it and ends the model's turn; the delegation supervisor consults it on each completion and resurrects the idle parent loop once the join condition is satisfied (or the Deadline passes). It is in-memory only and not persisted: it mirrors the supervisor's own in-memory batch state, so neither survives a restart (the task conclusions themselves remain durable on the store and re-fetchable).
type AwaitPolicy ¶ added in v0.601.2
type AwaitPolicy string
AwaitPolicy is the join policy a parent agent registers when it awaits one or more delegated tasks before ending its turn.
const ( // AwaitPolicyAll resolves when every awaited task has reported (reached a // terminal state). AwaitPolicyAll AwaitPolicy = "all" // AwaitPolicyAny resolves as soon as at least one awaited task has reported. AwaitPolicyAny AwaitPolicy = "any" // AwaitPolicyQuorum resolves when at least Quorum awaited tasks have reported. AwaitPolicyQuorum AwaitPolicy = "quorum" )
type Config ¶
type Config struct {
StorePath string
LogDir string
// EnginesDir is the directory scanned at startup for *.template.yaml custom
// engine files. When empty the manager derives it from LogDir.
EnginesDir string
MaxParallel int
// DefaultMCPConfig is an optional explicit override for the MCP config file
// passed to subagents. When empty (the default), pando builds a dynamic
// config at spawn time that includes pando itself as an MCP server plus all
// configured MCP servers.
DefaultMCPConfig string
DefaultEngine string
PersonaPath string
AppConfig *mesnadaconfig.Config // Full app config for passing to managers
// MCPServers lists the MCP servers configured in pando that should be
// forwarded to subagents. Populated from the pando application config.
MCPServers []agent.PandoMCPServerEntry
// GatewayExposeEnabled indicates that MCPGateway re-exports all configured
// MCP servers through pando's own MCP server. When true, the individual
// MCPServers entries are not forwarded separately (they are already
// accessible via the "pando" MCP server entry).
GatewayExposeEnabled bool
// ModelResolver converts a model ID (possibly empty or shorthand) into the
// full "provider.model" string expected by the pando CLI's -m flag.
// When nil, model IDs are forwarded as-is to the pando CLI spawner.
ModelResolver func(string) string
// Delegation carries the conclusion-protocol options. When Delegation.Enabled
// is false (the default) the orchestrator behaves exactly as before.
Delegation DelegationConfig
// ProjectResolver maps a canonical project path to its registry id and display
// name, used by the conclusion enricher. When nil the enricher derives the
// display name from filepath.Base(projectPath).
ProjectResolver conclusion.ProjectResolver
// WarmTargetResolver routes delegated tasks to warm per-project ACP instances
// (Phase 7.3). When nil (or Delegation.ReuseWarmInstances is false) every task
// takes the cold subprocess path, preserving today's behavior.
WarmTargetResolver WarmTargetResolver
// ProjectRefResolver resolves a free-form project reference supplied to the
// spawn tool (item B1) to a registered project id/path. When nil the spawn
// tool's optional "project" argument is rejected as unsupported.
ProjectRefResolver ProjectRefResolver
}
Config holds orchestrator configuration.
type DelegationConfig ¶ added in v0.601.2
type DelegationConfig struct {
// Enabled gates the whole conclusion protocol. When false the orchestrator
// preserves today's behavior byte-for-byte: no brief is appended and the
// enricher is not run.
Enabled bool
// SynthesizeFallback enables deriving a conclusion from output/error when the
// subagent did not emit a sentinel block.
SynthesizeFallback bool
// ReuseWarmInstances routes a delegated task whose project is known to a warm
// per-project ACP instance (via WarmTargetResolver) instead of cold-spawning a
// CLI. Master switch for warm reuse (default off); requires Enabled and a wired
// resolver. The reuse-then-autostart policy and concurrency cap are applied by
// the resolver adapter, not here.
ReuseWarmInstances bool
}
DelegationConfig mirrors the conclusion-relevant subset of the application's delegation config. It is a plain struct (not an import of internal/config) so the orchestrator stays free of config import cycles, following the same pattern as ModelResolver.
type DelegationMetrics ¶ added in v0.603.0
type DelegationMetrics struct {
// contains filtered or unexported fields
}
DelegationMetrics holds process-lifetime counters describing how delegated tasks were routed and how the parent loop re-entered after their completion (item E1). All counters are monotonic and lock-free (atomic), so recording a metric from the hot delegation path never contends with a reader taking a Snapshot. The zero value is ready to use; the orchestrator owns one instance for its whole lifetime.
The counters are intentionally orchestrator-global rather than per-project: they answer "is warm reuse paying off and is resurrection firing as expected" across the whole instance, which is the question the panel telemetry surfaces.
func (*DelegationMetrics) Snapshot ¶ added in v0.603.0
func (m *DelegationMetrics) Snapshot() DelegationMetricsSnapshot
Snapshot returns a consistent-enough point-in-time copy of the counters with the derived hit rate. Counters are read independently (no global lock), so a snapshot taken during concurrent updates may mix values from adjacent instants; this is acceptable for telemetry and never blocks the delegation path.
type DelegationMetricsSnapshot ¶ added in v0.603.0
type DelegationMetricsSnapshot struct {
WarmAttempts int64 `json:"warm_attempts"`
WarmHits int64 `json:"warm_hits"`
WarmFailures int64 `json:"warm_failures"`
ColdFallbacks int64 `json:"cold_fallbacks"`
CapRejections int64 `json:"cap_rejections"`
Resurrections int64 `json:"resurrections"`
LiveInjections int64 `json:"live_injections"`
// WarmHitRate is warmHits / warmAttempts in [0,1]; 0 when there were no
// attempts. It is the headline "is warm reuse paying off" number.
WarmHitRate float64 `json:"warm_hit_rate"`
}
DelegationMetricsSnapshot is an immutable, JSON-friendly view of DelegationMetrics taken at one instant, plus the derived warm-reuse hit rate.
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator coordinates the execution of CLI agents.
func (*Orchestrator) ACPSessionControl ¶
func (o *Orchestrator) ACPSessionControl(taskID, action, message, mode string) (interface{}, error)
ACPSessionControl sends a control command to an active ACP session. This delegates to the agent manager's ACP session control.
func (*Orchestrator) AwaitIntentFor ¶ added in v0.601.2
func (o *Orchestrator) AwaitIntentFor(sessionID string) (*AwaitIntent, bool)
AwaitIntentFor returns the active await intent for a session, if any.
func (*Orchestrator) Cancel ¶
func (o *Orchestrator) Cancel(taskID string) error
Cancel cancels a running task.
func (*Orchestrator) ClearAwaitIntent ¶ added in v0.601.2
func (o *Orchestrator) ClearAwaitIntent(sessionID string)
ClearAwaitIntent removes the await intent for a session (no-op if none).
func (*Orchestrator) DelegationMetrics ¶ added in v0.603.0
func (o *Orchestrator) DelegationMetrics() DelegationMetricsSnapshot
DelegationMetrics returns a point-in-time snapshot of the delegation routing / re-entry counters (item E1). Safe to call concurrently; never blocks the delegation path. The returned snapshot includes the derived warm-reuse hit rate.
func (*Orchestrator) Delete ¶
func (o *Orchestrator) Delete(taskID string) error
Delete removes a task from the store. If the task is running, it will attempt to cancel it first. If the process is already dead or doesn't exist, the task will be deleted anyway.
func (*Orchestrator) GetStats ¶
func (o *Orchestrator) GetStats() Stats
GetStats returns orchestrator statistics.
func (*Orchestrator) GetTask ¶
func (o *Orchestrator) GetTask(taskID string) (*models.Task, error)
GetTask retrieves a task by ID.
func (*Orchestrator) ListByParentSession ¶ added in v0.601.2
func (o *Orchestrator) ListByParentSession(sessionID string) ([]*models.Task, error)
ListByParentSession returns all tasks correlated to the given parent agent session id. The delegation supervisor uses it to decide whether any sibling delegated tasks are still outstanding before resurrecting an idle parent loop (Case B). Thin pass-through to the underlying store.
func (*Orchestrator) ListCustomEngines ¶ added in v0.416.4
func (o *Orchestrator) ListCustomEngines() []string
ListCustomEngines returns the names of all custom engine templates loaded at startup.
func (*Orchestrator) ListPersonas ¶
func (o *Orchestrator) ListPersonas() []string
ListPersonas returns a list of available persona names.
func (*Orchestrator) ListProjectRefs ¶ added in v0.603.0
func (o *Orchestrator) ListProjectRefs(ctx context.Context) []ProjectRef
ListProjectRefs returns all registered projects (used to build a helpful error when a project reference does not resolve). Returns nil when no resolver wired.
func (*Orchestrator) ListTasks ¶
func (o *Orchestrator) ListTasks(req models.ListRequest) ([]*models.Task, error)
ListTasks lists tasks matching the filter.
func (*Orchestrator) Pause ¶
func (o *Orchestrator) Pause(taskID string) (*models.Task, error)
Pause pauses a running or pending task. Pausing stops the underlying Copilot process (if any) and marks the task as paused.
func (*Orchestrator) ProjectRefsSupported ¶ added in v0.603.0
func (o *Orchestrator) ProjectRefsSupported() bool
ProjectRefsSupported reports whether project-targeting is available (a resolver is wired). When false the spawn tool rejects the "project" argument instead of silently ignoring it.
func (*Orchestrator) Purge ¶
func (o *Orchestrator) Purge(taskID string) error
Purge stops a running task (if needed), deletes its log file (if any), and removes it from the store. This operation is intentionally idempotent: purging a missing task returns nil.
func (*Orchestrator) RecordLiveInjection ¶ added in v0.603.0
func (o *Orchestrator) RecordLiveInjection()
RecordLiveInjection is called by the delegation supervisor when a conclusion was successfully injected into a still-running parent loop (Case A), including the resume-race fallback injection.
func (*Orchestrator) RecordResurrection ¶ added in v0.603.0
func (o *Orchestrator) RecordResurrection()
RecordResurrection is called by the delegation supervisor when an idle parent loop was successfully resurrected (Case B). It is exported because the supervisor lives in internal/app and only holds the concrete orchestrator.
func (*Orchestrator) Relaunch ¶ added in v0.294.1
func (o *Orchestrator) Relaunch(ctx context.Context, taskID string, opts RelaunchOptions) (*models.Task, error)
Relaunch resets an existing task (of any status) back to pending and re-runs it. Unlike Retry (which creates a new task), Relaunch reuses the same task ID so that any dependent tasks automatically benefit from the new execution without needing their dependency arrays updated.
Optional fields in opts override the stored task configuration; zero values keep the original values.
func (*Orchestrator) ResolveProjectRef ¶ added in v0.603.0
func (o *Orchestrator) ResolveProjectRef(ctx context.Context, ref string) (ProjectRef, bool)
ResolveProjectRef resolves a free-form project reference (id/name/path) to a registered project via the injected resolver. ok is false when no resolver is wired or no project matches.
func (*Orchestrator) Resume ¶
func (o *Orchestrator) Resume(ctx context.Context, taskID string, opts ResumeOptions) (*models.Task, error)
Resume creates a new task to continue work from a previously paused task.
func (*Orchestrator) Retry ¶
func (o *Orchestrator) Retry(ctx context.Context, taskID string, opts RetryOptions) (*models.Task, error)
Retry relaunches a failed task or reactivates a pending task. For failed tasks, it creates a new task reusing the same log file (append mode) with the original prompt plus a retry notice. For pending tasks, it checks dependencies and starts the task if ready.
func (*Orchestrator) SetAwaitIntent ¶ added in v0.601.2
func (o *Orchestrator) SetAwaitIntent(intent *AwaitIntent)
SetAwaitIntent registers (replacing any existing) the await intent for a session. Safe for concurrent use.
func (*Orchestrator) SetProgress ¶
func (o *Orchestrator) SetProgress(taskID string, percentage int, description string) error
SetProgress updates the progress of a running task.
func (*Orchestrator) Shutdown ¶
func (o *Orchestrator) Shutdown() error
Shutdown gracefully shuts down the orchestrator.
func (*Orchestrator) Spawn ¶
func (o *Orchestrator) Spawn(ctx context.Context, req models.SpawnRequest) (*models.Task, error)
Spawn creates and optionally starts a new agent task.
func (*Orchestrator) SubscribeCompletions ¶ added in v0.601.2
func (o *Orchestrator) SubscribeCompletions() (<-chan *models.Task, func())
SubscribeCompletions registers a global subscriber that receives every task that reaches a terminal state via onTaskComplete (after its conclusion has been captured). It returns a receive-only channel and an unsubscribe function that removes the channel; the unsubscribe function is idempotent. The channel is buffered; deliveries are non-blocking so a slow consumer drops events rather than stalling the orchestrator. This is the hook the delegation supervisor uses for Case A (inject into a live parent loop) and, later, Case B (resurrection).
type ProjectRef ¶ added in v0.603.0
ProjectRef identifies a registered project, used by the spawn tool to target a specific project by id/name/path (item B1). Path is the project's working directory; Name its display name.
type ProjectRefResolver ¶ added in v0.603.0
type ProjectRefResolver interface {
// ResolveProjectRef returns the registered project matching ref, or ok=false
// when none matches.
ResolveProjectRef(ctx context.Context, ref string) (ProjectRef, bool)
// ListProjectRefs returns all registered projects (for error messages).
ListProjectRefs(ctx context.Context) []ProjectRef
}
ProjectRefResolver resolves a free-form project reference (registry id, display name, or directory path) to a registered project, and lists registered projects for building helpful error messages. It is injected from internal/app over the project registry to avoid an import cycle (same pattern as WarmTargetResolver).
type RelaunchOptions ¶ added in v0.294.1
type RelaunchOptions struct {
// Prompt overrides the original prompt when non-empty.
Prompt string
// Engine overrides the engine when non-zero.
Engine models.Engine
// Model overrides the model when non-empty.
Model string
// Timeout overrides the timeout when non-empty.
Timeout string
// Background controls whether the relaunch runs in the background.
Background bool
}
RelaunchOptions controls how a task is relaunched in-place.
type ResumeOptions ¶
type ResumeOptions struct {
Prompt string
Model string
Background bool
Timeout string
Tags *[]string
}
ResumeOptions controls how a paused task is resumed.
type RetryOptions ¶
type RetryOptions struct {
Background bool
}
RetryOptions controls how a failed or pending task is retried.
type Stats ¶
type Stats struct {
Total int `json:"total"`
Pending int `json:"pending"`
Running int `json:"running"`
Paused int `json:"paused"`
Completed int `json:"completed"`
Failed int `json:"failed"`
Cancelled int `json:"cancelled"`
RunningProgress map[string]TaskProgressInfo `json:"running_progress,omitempty"`
}
Stats holds orchestrator statistics.
type TaskProgressInfo ¶
type TaskProgressInfo struct {
TaskID string `json:"task_id"`
Percentage int `json:"percentage"`
Description string `json:"description"`
UpdatedAt time.Time `json:"updated_at"`
}
TaskProgressInfo holds progress information for a task.
type WarmRunResult ¶ added in v0.601.2
type WarmRunResult struct {
// ChildSessionID is the ACP session id created in the warm child for this run.
ChildSessionID string
// Output is the agent's full streamed message text for the turn.
Output string
// StopReason is the ACP stop reason reported by the child (e.g. end_turn).
StopReason string
}
WarmRunResult is the captured outcome of running a delegated prompt inside a warm per-project ACP instance. It mirrors internal/project.DelegateResult but lives here so the orchestrator stays free of an internal/project import cycle (the adapter that bridges the two lives in internal/app).
type WarmTargetResolver ¶ added in v0.601.2
type WarmTargetResolver interface {
// RunWarm runs promptText inside a warm instance for the given project,
// applying the reuse-then-autostart policy and the per-instance concurrency
// cap. projectID may be empty, in which case projectPath is resolved to a
// registered project. It returns ErrNoWarmTarget when the project cannot be
// served by a warm instance so the caller takes the cold path.
RunWarm(ctx context.Context, projectID, projectPath, promptText string) (*WarmRunResult, error)
}
WarmTargetResolver routes a delegated task to a warm per-project ACP instance, capturing its conclusion over the wire instead of cold-spawning a CLI. It is implemented by an adapter over internal/project.Manager and injected into the orchestrator to avoid an import cycle (same pattern as ProjectResolver / ModelResolver).