scheduler

package
v0.7.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package scheduler implements the tick loop that turns due schedules into run rows and dispatches them via the JobDispatcher.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Loop

func Loop(ctx context.Context, cadence time.Duration, deps Deps) error

Loop runs Tick on a fixed cadence until ctx is canceled. Individual Tick errors are logged and the loop continues — only ctx cancellation or a panic stops the loop.

The first tick runs immediately at Loop start, so operators (and tests) don't wait a full cadence before seeing any scheduler activity.

Returns ctx.Err() when ctx expires.

func NextFire

func NextFire(expr, tz string, base time.Time) (time.Time, error)

NextFire returns the first time > base at which the given cron expression will fire in the given IANA timezone. The returned time is in UTC.

func SweepOrphans

func SweepOrphans(ctx context.Context, deps Deps) (int64, error)

SweepOrphans marks any non-terminal runs older than `timeout_sec + 300s` as failed with error_kind='shutdown'. Returns the number of rows updated.

Called at serve startup (to recover runs stuck when the previous process died mid-execution) and periodically from the scheduler Loop.

Types

type Decision

type Decision int

Decision tells the scheduler what to do with a freshly-inserted pending run.

const (
	DecisionDispatch Decision = iota // proceed to dispatch
	DecisionSkip                     // delete the pending row; don't dispatch
	DecisionQueue                    // leave the pending row for a later tick
)

func Decide

func Decide(policy Policy, activeCount int) Decision

Decide applies the overlap policy given the count of non-terminal runs for the same schedule (not including the row we just inserted).

"skip" (default) → dispatch only if no active runs; otherwise skip "queue" → dispatch only if no active runs; otherwise queue for next tick "concurrent" → always dispatch

Empty or unknown policies fail closed to the skip semantics.

Queued runs are left pending by processOne and picked up by tick.dispatchPending on a subsequent tick once prior runs finish. The split is deliberate: Tick's primary loop only walks schedules whose next_fire_at is due, so a run that was queued (and has no further scheduled fire until its cron boundary) would otherwise never get picked up. The trade-off is a one-tick latency between prior-run-finish and queued-run-dispatch; a dedicated queue-drain loop would close that gap but is not worth the complexity at MVP scale.

type Deps

type Deps struct {
	Pool          *pgxpool.Pool
	Signer        *token.Signer
	Dispatcher    cloud.JobDispatcher
	Installations InstallationTokenProvider // nil = no GitHub token injection
	APIBaseURL    string                    // e.g. "http://127.0.0.1:8080"
	RunnerAPIURL  string                    // external URL the runner uses to reach serve; falls back to APIBaseURL
	RunnerBinary  string                    // absolute path; typically os.Executable()
}

Deps bundles the scheduler's collaborators.

type InstallationTokenProvider

type InstallationTokenProvider interface {
	Token(ctx context.Context, installID int64) (string, error)
}

InstallationTokenProvider mints short-lived GitHub installation tokens.

type Policy

type Policy string

Policy is the overlap policy declared on a schedule.

const (
	PolicySkip       Policy = "skip"
	PolicyQueue      Policy = "queue"
	PolicyConcurrent Policy = "concurrent"
)

type Stats

type Stats struct {
	Dispatched int
	Skipped    int
	Queued     int
	Errored    int
}

Stats summarizes one Tick's effects.

func Tick

func Tick(ctx context.Context, deps Deps) (Stats, error)

Tick runs a single pass of the scheduler:

  1. List due schedules.
  2. For each: compute next_fire_at, idempotently insert a pending run, update schedule.next_fire_at.
  3. Apply overlap policy using the count of existing active runs.
  4. Dispatch via cloud.JobDispatcher when decided.
  5. Dispatch any already-pending rows not tied to a due schedule — manual run-now triggers and queued scheduled runs whose prior run finished.
  6. Run OrphanSweep to reclaim stalled runs.

Stats fields are informational for logging; errors from individual schedules are logged but don't abort the whole tick.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL