scheduler

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package scheduler implements the daemon-owned mechanical task scheduler.

The scheduler is intentionally narrow: it sweeps expired task-run leases through the task service, selects eligible idle sessions for queued work, and emits wake notifications. It never claims task runs directly; sessions remain responsible for calling the task claim API.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStopped reports that a stopped scheduler cannot be restarted.
	ErrStopped = errors.New("scheduler: stopped")
	// ErrSpawnUnresolvable reports that no agent covers a starved run's required
	// capabilities, so Tier2 spawn is skipped without failing the cycle; Tier3/Tier4
	// escalation still proceed as the wake count climbs.
	ErrSpawnUnresolvable = errors.New("scheduler: starvation spawn has no capable agent")
)

Functions

This section is empty.

Types

type BatchWaker

type BatchWaker interface {
	WakeMany(ctx context.Context, targets []WakeTarget) []error
}

BatchWaker handles every selected wake target in one scheduler cycle.

type CycleResult

type CycleResult struct {
	PendingRuns          int
	ActiveRuns           int
	SessionsScanned      int
	RecoveredLeases      int
	WakeAttempts         int
	WakeSucceeded        int
	WakeFailed           int
	NoMatchRuns          int
	RecentlyNotified     int
	UnclaimableRuns      int
	Paused               bool
	StarvedRuns          int
	SpawnRequested       int
	NeedsAttention       int
	SelectedRunIDs       []string
	NoMatchRunIDs        []string
	RecoveredRunIDs      []string
	StarvedRunIDs        []string
	SpawnRequestedRunIDs []string
	NeedsAttentionRunIDs []string
}

CycleResult reports one mechanical scheduler pass.

type EscalationActor added in v0.0.5

type EscalationActor interface {
	EmitRunStarved(ctx context.Context, work *RunSnapshot, age time.Duration) error
	RequestWorkerSpawn(ctx context.Context, work *RunSnapshot) error
	MarkRunNeedsAttention(ctx context.Context, runID string, diagnostic string) (taskpkg.Run, error)
}

EscalationActor is the single seam through which the scheduler drives convergence escalation. It never claims work: it emits observability events, requests a capability-matched worker spawn (the spawned session self-claims), and marks a run needs_attention. Implemented by the daemon over the task service.

type Option

type Option func(*Scheduler)

Option customizes scheduler runtime behavior.

func WithActor

func WithActor(actor taskpkg.ActorContext) Option

WithActor overrides the daemon actor used for recovery writes.

func WithClock

func WithClock(clock clockwork.Clock) Option

WithClock overrides the scheduler clock, mainly for deterministic tests.

func WithEscalationActor added in v0.0.5

func WithEscalationActor(actor EscalationActor) Option

WithEscalationActor injects the seam used to emit starvation signals, request worker spawns, and mark runs needs_attention. The scheduler never claims.

func WithInterval

func WithInterval(interval time.Duration) Option

WithInterval overrides the background sweep/notify interval.

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger overrides the scheduler logger.

func WithPauseStore

func WithPauseStore(store PauseStore) Option

WithPauseStore lets the scheduler skip wake dispatch while preserving lease sweep.

func WithStarvationAge added in v0.0.5

func WithStarvationAge(age time.Duration) Option

WithStarvationAge overrides how long a claimable run may sit queued before the scheduler escalates it (fan the advisory wake to every eligible session and emit a starvation signal). Zero disables starvation escalation.

func WithStarvationStore added in v0.0.5

func WithStarvationStore(store StarvationStore) Option

WithStarvationStore injects the durable escalation budget the convergence ladder advances. Without it the tier ladder is disabled (the scheduler still fans out starved runs).

func WithStarvationThresholds added in v0.0.5

func WithStarvationThresholds(thresholds StarvationThresholds) Option

WithStarvationThresholds overrides the convergence tier ladder bounds.

func WithSweepLimit

func WithSweepLimit(limit int) Option

WithSweepLimit overrides the maximum expired leases recovered per pass.

func WithSweepReason

func WithSweepReason(reason string) Option

WithSweepReason overrides the task-service recovery reason.

func WithWakeCooldown

func WithWakeCooldown(cooldown time.Duration) Option

WithWakeCooldown overrides how long a run/session wake key is suppressed before the scheduler may notify that same session again.

func WithWakeReason

func WithWakeReason(reason string) Option

WithWakeReason overrides the synthetic wake metadata reason.

type PauseStore

type PauseStore interface {
	GetSchedulerPause(ctx context.Context) (taskpkg.SchedulerPauseState, error)
}

PauseStore supplies the persisted scheduler-wide pause flag.

type RebuildResult

type RebuildResult struct {
	PendingRuns     int
	ActiveRuns      int
	SessionsScanned int
	ClearedWakeKeys int
	RebuiltAt       time.Time
}

RebuildResult reports the durable state discovered while rebuilding scheduler-owned ephemeral state.

type RunSnapshot

type RunSnapshot struct {
	Task taskpkg.Task
	Run  taskpkg.Run
}

RunSnapshot joins a durable run with its owning task.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler owns one mechanical sweep/notify loop.

func New

func New(tasks TaskSource, sessions SessionSource, waker Waker, opts ...Option) (*Scheduler, error)

New constructs a mechanical scheduler over durable task and session sources.

func (*Scheduler) Rebuild

func (s *Scheduler) Rebuild(ctx context.Context) (RebuildResult, error)

Rebuild clears scheduler-owned ephemeral wake state after reading durable task/session state. The returned counts are observability only; durable recovery remains in RunOnce through the task service.

func (*Scheduler) RunOnce

func (s *Scheduler) RunOnce(ctx context.Context) (CycleResult, error)

RunOnce executes one sweep/notify pass.

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown(ctx context.Context) error

Shutdown cancels the scheduler loop and waits for owned goroutines to exit.

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

Start begins the context-bound background scheduler loop.

func (*Scheduler) Stats

func (s *Scheduler) Stats() Stats

Stats returns a consistent snapshot of scheduler counters.

type SessionSnapshot

type SessionSnapshot struct {
	ID           string
	AgentName    string
	WorkspaceID  string
	Channel      string
	Type         string
	State        string
	Prompting    bool
	Capabilities []string
	CreatedAt    time.Time
}

SessionSnapshot is the scheduler's rebuildable view of one live session.

type SessionSource

type SessionSource interface {
	Sessions(ctx context.Context) ([]SessionSnapshot, error)
}

SessionSource provides live runtime sessions that may be notified.

type StarvationStore added in v0.0.5

type StarvationStore interface {
	LoadRunStarvation(ctx context.Context, runID string) (taskpkg.RunStarvation, bool, error)
	ListRunStarvation(ctx context.Context) ([]taskpkg.RunStarvation, error)
	UpsertRunStarvation(ctx context.Context, mutation taskpkg.RunStarvationMutation) (taskpkg.RunStarvation, error)
	ClearRunStarvation(ctx context.Context, runID string) error
}

StarvationStore persists the durable per-run escalation budget so the convergence tier ladder survives daemon restart (in-memory wake state is wiped on Rebuild; the budget is not).

type StarvationThresholds added in v0.0.5

type StarvationThresholds struct {
	FanOutAfter         int
	SpawnAfter          int
	EventAfter          int
	NeedsAttentionAfter int
	MinQueuedAge        time.Duration
}

StarvationThresholds bounds the convergence escalation ladder. The counts are wake cycles a claimable run must remain queued before each tier fires; they must be monotonic and positive.

func DefaultStarvationThresholds added in v0.0.5

func DefaultStarvationThresholds() StarvationThresholds

DefaultStarvationThresholds returns the built-in convergence ladder.

type Stats

type Stats struct {
	Cycles            int
	Rebuilds          int
	RecoveredLeases   int
	RecoveryErrors    int
	WakeAttempts      int
	WakeSucceeded     int
	WakeFailed        int
	NoMatchRuns       int
	RecentlyNotified  int
	UnclaimableRuns   int
	StarvedRuns       int
	SpawnRequested    int
	NeedsAttention    int
	LastCycleAt       time.Time
	LastRebuildAt     time.Time
	LastRecoveryError string
	LastWakeError     string
}

Stats is a lock-protected snapshot of scheduler observability counters.

type TaskSource

type TaskSource interface {
	PendingRuns(ctx context.Context) ([]RunSnapshot, error)
	ActiveRuns(ctx context.Context) ([]taskpkg.Run, error)
	GetRunStatus(ctx context.Context, runID string) (taskpkg.RunStatus, bool, error)
	RecoverExpiredRunLeases(
		ctx context.Context,
		recovery taskpkg.ExpiredLeaseRecovery,
		actor taskpkg.ActorContext,
	) ([]taskpkg.ExpiredLeaseRecoveryResult, error)
}

TaskSource provides durable task-run snapshots and lease recovery.

type WakeTarget

type WakeTarget struct {
	Work    RunSnapshot
	Session SessionSnapshot
	Reason  string
}

WakeTarget records the exact run/session pair selected for notification.

type Waker

type Waker interface {
	Wake(ctx context.Context, target *WakeTarget) error
}

Waker sends an advisory notification to one selected idle session.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL