Documentation
¶
Overview ¶
Package scheduler implements the daemon-owned mechanical task scheduler.
The scheduler is intentionally narrow: it sweeps expired task-run leases through the task service, selects eligible idle sessions for queued work, and emits wake notifications. It never claims task runs directly; sessions remain responsible for calling the task claim API.
Index ¶
- Variables
- type BatchWaker
- type CycleResult
- type EscalationActor
- type Option
- func WithActor(actor taskpkg.ActorContext) Option
- func WithClock(clock clockwork.Clock) Option
- func WithEscalationActor(actor EscalationActor) Option
- func WithInterval(interval time.Duration) Option
- func WithLogger(logger *slog.Logger) Option
- func WithPauseStore(store PauseStore) Option
- func WithStarvationAge(age time.Duration) Option
- func WithStarvationStore(store StarvationStore) Option
- func WithStarvationThresholds(thresholds StarvationThresholds) Option
- func WithSweepLimit(limit int) Option
- func WithSweepReason(reason string) Option
- func WithWakeCooldown(cooldown time.Duration) Option
- func WithWakeReason(reason string) Option
- type PauseStore
- type RebuildResult
- type RunSnapshot
- type Scheduler
- type SessionSnapshot
- type SessionSource
- type StarvationStore
- type StarvationThresholds
- type Stats
- type TaskSource
- type WakeTarget
- type Waker
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStopped reports that a stopped scheduler cannot be restarted. ErrStopped = errors.New("scheduler: stopped") // ErrSpawnUnresolvable reports that no agent covers a starved run's required // capabilities, so Tier2 spawn is skipped without failing the cycle; Tier3/Tier4 // escalation still proceed as the wake count climbs. ErrSpawnUnresolvable = errors.New("scheduler: starvation spawn has no capable agent") )
Functions ¶
This section is empty.
Types ¶
type BatchWaker ¶
type BatchWaker interface {
WakeMany(ctx context.Context, targets []WakeTarget) []error
}
BatchWaker handles every selected wake target in one scheduler cycle.
type CycleResult ¶
type CycleResult struct {
PendingRuns int
ActiveRuns int
SessionsScanned int
RecoveredLeases int
WakeAttempts int
WakeSucceeded int
WakeFailed int
NoMatchRuns int
RecentlyNotified int
UnclaimableRuns int
Paused bool
StarvedRuns int
SpawnRequested int
NeedsAttention int
SelectedRunIDs []string
NoMatchRunIDs []string
RecoveredRunIDs []string
StarvedRunIDs []string
SpawnRequestedRunIDs []string
NeedsAttentionRunIDs []string
}
CycleResult reports one mechanical scheduler pass.
type EscalationActor ¶ added in v0.0.5
type EscalationActor interface {
EmitRunStarved(ctx context.Context, work *RunSnapshot, age time.Duration) error
RequestWorkerSpawn(ctx context.Context, work *RunSnapshot) error
MarkRunNeedsAttention(ctx context.Context, runID string, diagnostic string) (taskpkg.Run, error)
}
EscalationActor is the single seam through which the scheduler drives convergence escalation. It never claims work: it emits observability events, requests a capability-matched worker spawn (the spawned session self-claims), and marks a run needs_attention. Implemented by the daemon over the task service.
type Option ¶
type Option func(*Scheduler)
Option customizes scheduler runtime behavior.
func WithActor ¶
func WithActor(actor taskpkg.ActorContext) Option
WithActor overrides the daemon actor used for recovery writes.
func WithEscalationActor ¶ added in v0.0.5
func WithEscalationActor(actor EscalationActor) Option
WithEscalationActor injects the seam used to emit starvation signals, request worker spawns, and mark runs needs_attention. The scheduler never claims.
func WithInterval ¶
WithInterval overrides the background sweep/notify interval.
func WithLogger ¶
WithLogger overrides the scheduler logger.
func WithPauseStore ¶
func WithPauseStore(store PauseStore) Option
WithPauseStore lets the scheduler skip wake dispatch while preserving lease sweep.
func WithStarvationAge ¶ added in v0.0.5
WithStarvationAge overrides how long a claimable run may sit queued before the scheduler escalates it (fan the advisory wake to every eligible session and emit a starvation signal). Zero disables starvation escalation.
func WithStarvationStore ¶ added in v0.0.5
func WithStarvationStore(store StarvationStore) Option
WithStarvationStore injects the durable escalation budget the convergence ladder advances. Without it the tier ladder is disabled (the scheduler still fans out starved runs).
func WithStarvationThresholds ¶ added in v0.0.5
func WithStarvationThresholds(thresholds StarvationThresholds) Option
WithStarvationThresholds overrides the convergence tier ladder bounds.
func WithSweepLimit ¶
WithSweepLimit overrides the maximum expired leases recovered per pass.
func WithSweepReason ¶
WithSweepReason overrides the task-service recovery reason.
func WithWakeCooldown ¶
WithWakeCooldown overrides how long a run/session wake key is suppressed before the scheduler may notify that same session again.
func WithWakeReason ¶
WithWakeReason overrides the synthetic wake metadata reason.
type PauseStore ¶
type PauseStore interface {
GetSchedulerPause(ctx context.Context) (taskpkg.SchedulerPauseState, error)
}
PauseStore supplies the persisted scheduler-wide pause flag.
type RebuildResult ¶
type RebuildResult struct {
PendingRuns int
ActiveRuns int
SessionsScanned int
ClearedWakeKeys int
RebuiltAt time.Time
}
RebuildResult reports the durable state discovered while rebuilding scheduler-owned ephemeral state.
type RunSnapshot ¶
RunSnapshot joins a durable run with its owning task.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler owns one mechanical sweep/notify loop.
func New ¶
func New(tasks TaskSource, sessions SessionSource, waker Waker, opts ...Option) (*Scheduler, error)
New constructs a mechanical scheduler over durable task and session sources.
func (*Scheduler) Rebuild ¶
func (s *Scheduler) Rebuild(ctx context.Context) (RebuildResult, error)
Rebuild clears scheduler-owned ephemeral wake state after reading durable task/session state. The returned counts are observability only; durable recovery remains in RunOnce through the task service.
func (*Scheduler) RunOnce ¶
func (s *Scheduler) RunOnce(ctx context.Context) (CycleResult, error)
RunOnce executes one sweep/notify pass.
func (*Scheduler) Shutdown ¶
Shutdown cancels the scheduler loop and waits for owned goroutines to exit.
type SessionSnapshot ¶
type SessionSnapshot struct {
ID string
AgentName string
WorkspaceID string
Channel string
Type string
State string
Prompting bool
Capabilities []string
CreatedAt time.Time
}
SessionSnapshot is the scheduler's rebuildable view of one live session.
type SessionSource ¶
type SessionSource interface {
Sessions(ctx context.Context) ([]SessionSnapshot, error)
}
SessionSource provides live runtime sessions that may be notified.
type StarvationStore ¶ added in v0.0.5
type StarvationStore interface {
LoadRunStarvation(ctx context.Context, runID string) (taskpkg.RunStarvation, bool, error)
ListRunStarvation(ctx context.Context) ([]taskpkg.RunStarvation, error)
UpsertRunStarvation(ctx context.Context, mutation taskpkg.RunStarvationMutation) (taskpkg.RunStarvation, error)
ClearRunStarvation(ctx context.Context, runID string) error
}
StarvationStore persists the durable per-run escalation budget so the convergence tier ladder survives daemon restart (in-memory wake state is wiped on Rebuild; the budget is not).
type StarvationThresholds ¶ added in v0.0.5
type StarvationThresholds struct {
FanOutAfter int
SpawnAfter int
EventAfter int
NeedsAttentionAfter int
MinQueuedAge time.Duration
}
StarvationThresholds bounds the convergence escalation ladder. The counts are wake cycles a claimable run must remain queued before each tier fires; they must be monotonic and positive.
func DefaultStarvationThresholds ¶ added in v0.0.5
func DefaultStarvationThresholds() StarvationThresholds
DefaultStarvationThresholds returns the built-in convergence ladder.
type Stats ¶
type Stats struct {
Cycles int
Rebuilds int
RecoveredLeases int
RecoveryErrors int
WakeAttempts int
WakeSucceeded int
WakeFailed int
NoMatchRuns int
RecentlyNotified int
UnclaimableRuns int
StarvedRuns int
SpawnRequested int
NeedsAttention int
LastCycleAt time.Time
LastRebuildAt time.Time
LastRecoveryError string
LastWakeError string
}
Stats is a lock-protected snapshot of scheduler observability counters.
type TaskSource ¶
type TaskSource interface {
PendingRuns(ctx context.Context) ([]RunSnapshot, error)
ActiveRuns(ctx context.Context) ([]taskpkg.Run, error)
GetRunStatus(ctx context.Context, runID string) (taskpkg.RunStatus, bool, error)
RecoverExpiredRunLeases(
ctx context.Context,
recovery taskpkg.ExpiredLeaseRecovery,
actor taskpkg.ActorContext,
) ([]taskpkg.ExpiredLeaseRecoveryResult, error)
}
TaskSource provides durable task-run snapshots and lease recovery.
type WakeTarget ¶
type WakeTarget struct {
Work RunSnapshot
Session SessionSnapshot
Reason string
}
WakeTarget records the exact run/session pair selected for notification.