worker

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package worker unifies the background-execution patterns found across the reference services into one small vocabulary.

Instead of every service hand-rolling its own run loops, supervisors, and retry math, worker provides a handful of composable pieces:

  • Runnable: anything the app supervises (HTTP/gRPC servers, consumers, loops). RunnableFunc adapts a plain start function into one.
  • Group: a supervisor that starts Runnables concurrently and shuts them down together (LIFO), tearing the whole group down on the first failure.
  • Loop: a recurring tick loop with panic recovery — fixed-interval (NewLoop) or adaptively paced (NewPacedLoop).
  • Pool: bounded-concurrency fan-out of one-shot jobs.
  • Processor[T]: a reliable claim -> handle -> ack/retry batch pipeline, built from a Source, Handler, and Sink, and run as a Loop.
  • Backoff: exponential backoff with jitter and a max-attempts budget.

Loops and pacing

A Loop runs a tick on a schedule, each tick wrapped in panic recovery (safetick) so one bad tick cannot crash the process. A fixed Loop ticks every Interval. An adaptive Loop (NewPacedLoop) varies its delay from the Pace its tick reports: Busy (a full batch — drain again after BusyInterval, or immediately) versus Idle (no work — wait Interval, optionally backing off geometrically toward MaxIdleInterval).

Processors

A Processor[T] is the reliable batch-processing FSM behind the outbox relay and pgqueue-style consumers. Each tick it leases a batch from a Source (canonically SELECT ... FOR UPDATE SKIP LOCKED, so replicas can run concurrently), runs the Handler on each item, then records the outcome via the Sink (MarkDone on success, MarkFailed on error with terminal/retryable decided by IsTerminal). A Processor is not itself a Runnable: call Tick (or PacedTick) to get a tick function and wrap it in a Loop.

Usage

Wire a Processor into a Loop and supervise it (plus any other Runnables) in a Group:

proc := worker.NewProcessor[Job](log, src, handler, sink, worker.ProcessorConfig{
    Name:       "jobs",
    BatchSize:  200,
    IsTerminal: func(err error) bool { return errors.Is(err, ErrBadInput) },
})
loop := worker.NewPacedLoop(log, worker.LoopConfig{
    Name:            "jobs",
    Interval:        time.Second, // idle wait
    BusyInterval:    0,           // drain backlog immediately
    MaxIdleInterval: 30 * time.Second,
}, proc.PacedTick())

g := worker.NewGroup(log, 10*time.Second)
g.Add(loop)
g.Add(worker.RunnableFunc{
    NameValue: "http",
    StartFn:   func(ctx context.Context) error { return srv.ListenAndServe() },
    StopFn:    func(ctx context.Context) error { return srv.Shutdown(ctx) },
})
if err := g.Run(ctx); err != nil {
    log.Error("group stopped", "err", err)
}

For discrete fan-out work, submit jobs to a Pool:

pool := worker.NewPool(8)
key, err := pool.Submit(ctx, func(ctx context.Context) { do(ctx) })
_ = key // pool.Cancel(key) stops that one job
defer pool.Shutdown(context.Background())

Config

LoopConfig: Name, Interval (required; the idle interval for an adaptive loop), ImmediateFirstTick, TickTimeout, OnPanic, and the adaptive fields BusyInterval and MaxIdleInterval.

ProcessorConfig: Name (default "processor"), BatchSize (default 100), HandleTimeout, IsTerminal (nil = all failures retryable), Now (default time.Now().UTC; override in tests).

Backoff: Base (first-retry delay), Max (cap; 0 = uncapped), Factor (default 2), MaxAttempts (retry budget; checked via Exhausted), Jitter in [0,1] (apply via NextWithRand, which takes the random fraction for determinism).

Group: NewGroup(log, shutdownTimeout) — shutdownTimeout of 0 means no bound on how long Run waits for Runnables to Stop.

Index

Constants

This section is empty.

Variables

View Source
var ErrPoolShutdown = errors.New("worker: pool is shutting down")

ErrPoolShutdown is returned by Pool.Submit once Shutdown has been called.

Functions

This section is empty.

Types

type Backoff

type Backoff struct {
	// Base is the delay for the first retry (attempt 1).
	Base time.Duration
	// Max caps the delay (0 = uncapped).
	Max time.Duration
	// Factor is the multiplier per attempt (defaults to 2 when <= 1).
	Factor float64
	// MaxAttempts is the retry budget; Exhausted reports when it is spent.
	MaxAttempts int
	// Jitter in [0,1] randomizes the delay by ±(jitter*delay). The caller
	// supplies the random fraction to NextWithRand to keep Backoff deterministic
	// and testable.
	Jitter float64
}

Backoff computes retry delays with exponential growth and optional jitter, capped at Max, and exposes a max-attempts budget. It is shared by retrying HTTP clients and reliable processors so backoff policy lives in one place.

func (Backoff) Exhausted

func (b Backoff) Exhausted(attempts int) bool

Exhausted reports whether attempts has reached the MaxAttempts budget. A non-positive MaxAttempts means "never exhausted".

func (Backoff) Next

func (b Backoff) Next(attempt int) time.Duration

Next returns the delay before the given attempt (1-based), without jitter.

func (Backoff) NextWithRand

func (b Backoff) NextWithRand(attempt int, randFraction float64) time.Duration

NextWithRand is Next with an explicit random fraction in [0,1) used to apply jitter. Pass rand.Float64() in production; pass a fixed value in tests.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group supervises a set of Runnables. Run starts them all concurrently and returns when ctx is canceled or any Runnable returns a non-nil, non-canceled error — whichever comes first — after which every Runnable is asked to Stop.

This replaces hand-rolled run loops that juggle separate slices of servers, consumers, and workers: register everything as a Runnable and supervise it here.

func NewGroup

func NewGroup(log *slog.Logger, shutdownTimeout time.Duration) *Group

NewGroup builds a Group. shutdownTimeout bounds how long Run waits for all Runnables to Stop (0 means no bound). log may be nil.

func (*Group) Add

func (g *Group) Add(r ...Runnable)

Add registers one or more Runnables. Not safe to call once Run has started.

func (*Group) Run

func (g *Group) Run(ctx context.Context) error

Run starts all Runnables and blocks until shutdown. It always attempts to Stop every Runnable before returning, and returns the first triggering error (nil on a clean, ctx-driven shutdown).

type Handler

type Handler[T any] interface {
	Handle(ctx context.Context, item T) error
}

Handler processes one claimed item. A nil error means success (the item is passed to Sink.MarkDone); a non-nil error routes the item to Sink.MarkFailed, with terminality decided by the processor's IsTerminal classifier.

type HandlerFunc

type HandlerFunc[T any] func(ctx context.Context, item T) error

HandlerFunc adapts a plain function to a Handler.

func (HandlerFunc[T]) Handle

func (f HandlerFunc[T]) Handle(ctx context.Context, item T) error

Handle implements Handler.

type JobFn

type JobFn func(ctx context.Context)

JobFn is a unit of work submitted to a Pool.

type Loop

type Loop struct {
	// contains filtered or unexported fields
}

Loop is a Runnable that calls a tick on an interval, each tick wrapped in panic recovery so a single bad tick cannot crash the process. Built with NewLoop it runs on a fixed interval; built with NewPacedLoop it paces adaptively from the tick's reported Pace. This is the common shape behind the outbox/queue background workers.

func NewLoop

func NewLoop(log *slog.Logger, cfg LoopConfig, tick TickFunc) *Loop

NewLoop builds a fixed-interval Loop. log may be nil.

func NewPacedLoop added in v0.5.0

func NewPacedLoop(log *slog.Logger, cfg LoopConfig, tick PacedTickFunc) *Loop

NewPacedLoop builds an adaptive Loop: after each tick it waits BusyInterval (default: immediately) when the tick reported Busy, or Interval — growing toward MaxIdleInterval over consecutive idle ticks — when it reported Idle. This drains a backlog quickly yet idles cheaply. log may be nil.

func (*Loop) Name

func (l *Loop) Name() string

Name implements Runnable.

func (*Loop) Start

func (l *Loop) Start(ctx context.Context) error

Start implements Runnable: it ticks until ctx is canceled.

func (*Loop) Stop

func (l *Loop) Stop(context.Context) error

Stop implements Runnable; the loop stops when its context is canceled.

type LoopConfig

type LoopConfig struct {
	// Name identifies the loop in logs and panic metrics.
	Name string
	// Interval between ticks. Required (> 0). In an adaptive loop (NewPacedLoop)
	// this is the idle interval — the wait after a tick that found no work.
	Interval time.Duration
	// ImmediateFirstTick runs a tick at startup before the first interval.
	ImmediateFirstTick bool
	// TickTimeout bounds each tick (0 = inherit the loop context with no extra bound).
	TickTimeout time.Duration
	// OnPanic is invoked when a tick panics (e.g. to bump a metric); may be nil.
	OnPanic safetick.PanicHandler

	// BusyInterval is the wait after a Busy tick in an adaptive loop (0 = run
	// again immediately to drain the backlog). Ignored by a fixed loop.
	BusyInterval time.Duration
	// MaxIdleInterval, when greater than Interval, makes an adaptive loop back
	// off geometrically (×2 per consecutive idle tick) from Interval up to this
	// cap, so a long-idle loop polls less often. 0 disables backoff (the loop
	// always waits Interval when idle). Ignored by a fixed loop.
	MaxIdleInterval time.Duration
}

LoopConfig configures a Loop.

type Pace added in v0.5.0

type Pace int

Pace is the signal an adaptive tick returns to tell the loop how soon to run again: drain a backlog fast while there is work, idle cheaply when there is none.

const (
	// Idle means the tick found no (or only partial) work; the loop waits the
	// idle interval before the next tick.
	Idle Pace = iota
	// Busy means the tick drained a full batch, so a backlog likely remains; the
	// loop runs again after the (short) busy interval.
	Busy
)

type PacedTickFunc added in v0.5.0

type PacedTickFunc func(ctx context.Context) (Pace, error)

PacedTickFunc is a tick that reports its Pace so an adaptive loop can vary the delay between runs. Like TickFunc, a returned error is logged but does not stop the loop.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool runs one-shot jobs with a bounded degree of concurrency. Submit blocks until a slot is free (or ctx is canceled / the pool is shutting down), then launches the job on its own goroutine. Shutdown cancels every in-flight job and waits for them to return.

Use Pool for bounded fan-out of discrete tasks (the rd/service foundation/worker shape). For recurring work use Loop; for reliable claim/process/ack pipelines use Processor.

func NewPool

func NewPool(maxConcurrent int) *Pool

NewPool constructs a Pool that runs at most maxConcurrent jobs at once. maxConcurrent is clamped to a minimum of 1.

func (*Pool) Cancel

func (p *Pool) Cancel(key string)

Cancel stops a single in-flight job by its key. Unknown keys (already finished or never issued) are a no-op.

func (*Pool) Running

func (p *Pool) Running() int

Running reports the number of jobs currently executing.

func (*Pool) Shutdown

func (p *Pool) Shutdown(ctx context.Context) error

Shutdown signals the pool to stop accepting work, cancels every in-flight job, and waits for them to return or until ctx is canceled. It is idempotent.

func (*Pool) Submit

func (p *Pool) Submit(ctx context.Context, job JobFn) (string, error)

Submit blocks until a slot is free, then launches job on its own goroutine. The job receives a context that is canceled either when the submission ctx is canceled or when Shutdown is called, whichever comes first. It returns a key that can be passed to Cancel to stop that single job early.

type Processor

type Processor[T any] struct {
	// contains filtered or unexported fields
}

Processor is the reliable batch-processing FSM shared by the outbox relay, push pipelines, and pgqueue consumers: each tick leases a batch from the Source, runs the Handler on each item, then records the outcome via the Sink (done on success, failed/terminal-or-retryable on error).

A Processor is not itself a Runnable; call Tick to get a TickFunc and wrap it in a Loop to run it on a schedule. Run several processors over the same Source for higher throughput — SKIP LOCKED coordinates the claims.

func NewProcessor

func NewProcessor[T any](log *slog.Logger, src Source[T], h Handler[T], sink Sink[T], cfg ProcessorConfig) *Processor[T]

NewProcessor builds a Processor. log may be nil.

func (*Processor[T]) PacedTick added in v0.5.0

func (p *Processor[T]) PacedTick() PacedTickFunc

PacedTick returns the drain cycle as a PacedTickFunc for NewPacedLoop: it reports Busy when it leased a full batch (a backlog likely remains, so the adaptive loop drains again promptly) and Idle otherwise.

func (*Processor[T]) Tick

func (p *Processor[T]) Tick() TickFunc

Tick returns the processor's drain cycle as a TickFunc suitable for NewLoop (fixed interval).

type ProcessorConfig

type ProcessorConfig struct {
	// Name labels the processor in logs (defaults to "processor").
	Name string
	// BatchSize is the max number of items leased per tick (defaults to 100).
	BatchSize int
	// HandleTimeout bounds each Handle call and its follow-up mark (0 = inherit
	// the tick context with no extra bound).
	HandleTimeout time.Duration
	// IsTerminal classifies a Handle error as permanent (true) or retryable
	// (false). When nil, every failure is treated as retryable.
	IsTerminal func(err error) bool
	// Now returns the tick clock; defaults to time.Now().UTC. Override in tests.
	Now func() time.Time
}

ProcessorConfig configures a Processor.

type Runnable

type Runnable interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Name() string
}

Runnable is a supervised unit of work. Start blocks until ctx is canceled or the unit fails; Stop performs a graceful shutdown. Name identifies it in logs.

type RunnableFunc

type RunnableFunc struct {
	NameValue string
	StartFn   func(ctx context.Context) error
	StopFn    func(ctx context.Context) error
}

RunnableFunc adapts a plain start function (plus name and optional stop) into a Runnable, so callers can supply behavior as functions instead of types.

func (RunnableFunc) Name

func (r RunnableFunc) Name() string

Name implements Runnable.

func (RunnableFunc) Start

func (r RunnableFunc) Start(ctx context.Context) error

Start implements Runnable.

func (RunnableFunc) Stop

func (r RunnableFunc) Stop(ctx context.Context) error

Stop implements Runnable.

type Sink

type Sink[T any] interface {
	MarkDone(ctx context.Context, item T, now time.Time) error
	MarkFailed(ctx context.Context, item T, errMsg string, terminal bool, now time.Time) error
}

Sink records the outcome of processing an item. MarkDone acknowledges success; MarkFailed records a failure, where terminal distinguishes a permanent failure (move to a dead state) from a retryable one (return to pending for a later tick). errMsg has already been passed through errs.Sanitize by the processor.

type Source

type Source[T any] interface {
	Claim(ctx context.Context, now time.Time, limit int) ([]T, error)
}

Source leases a batch of pending items for processing. Implementations must be safe to run from multiple replicas concurrently — the canonical SQL backing is SELECT ... FOR UPDATE SKIP LOCKED, which hands each item to exactly one caller. now is the processor's wall clock for the tick (passed in so it is testable); limit bounds the batch size.

type SourceFunc

type SourceFunc[T any] func(ctx context.Context, now time.Time, limit int) ([]T, error)

SourceFunc adapts a plain function to a Source.

func (SourceFunc[T]) Claim

func (f SourceFunc[T]) Claim(ctx context.Context, now time.Time, limit int) ([]T, error)

Claim implements Source.

type TickFunc

type TickFunc func(ctx context.Context) error

TickFunc performs one unit of periodic work. Returning an error logs it but does not stop the loop; the loop stops only when its context is canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL