worker

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package worker unifies the background-execution patterns found across the reference services into one small vocabulary:

  • Runnable: anything the app supervises (HTTP/gRPC servers, consumers, loops).
  • Group: a supervisor that starts Runnables concurrently and shuts them down together.
  • Loop: a recurring tick loop with panic recovery (the shape shared by outbox relay/sweeper/cleaner and push mapper/sender/recoverer).
  • Pool: bounded-concurrency fan-out of one-shot jobs.
  • Processor: a reliable claim -> handle -> ack/retry batch pipeline (run as a Loop) generalizing the outbox relay and pgqueue consumers.
  • Backoff: exponential backoff with jitter and a max-attempts budget.

Index

Constants

This section is empty.

Variables

View Source
var ErrPoolShutdown = errors.New("worker: pool is shutting down")

ErrPoolShutdown is returned by Pool.Submit once Shutdown has been called.

Functions

This section is empty.

Types

type Backoff

type Backoff struct {
	// Base is the delay for the first retry (attempt 1).
	Base time.Duration
	// Max caps the delay (0 = uncapped).
	Max time.Duration
	// Factor is the multiplier per attempt (defaults to 2 when <= 1).
	Factor float64
	// MaxAttempts is the retry budget; Exhausted reports when it is spent.
	MaxAttempts int
	// Jitter in [0,1] randomizes the delay by ±(jitter*delay). The caller
	// supplies the random fraction to NextWithRand to keep Backoff deterministic
	// and testable.
	Jitter float64
}

Backoff computes retry delays with exponential growth and optional jitter, capped at Max, and exposes a max-attempts budget. It is shared by retrying HTTP clients and reliable processors so backoff policy lives in one place.

func (Backoff) Exhausted

func (b Backoff) Exhausted(attempts int) bool

Exhausted reports whether attempts has reached the MaxAttempts budget. A non-positive MaxAttempts means "never exhausted".

func (Backoff) Next

func (b Backoff) Next(attempt int) time.Duration

Next returns the delay before the given attempt (1-based), without jitter.

func (Backoff) NextWithRand

func (b Backoff) NextWithRand(attempt int, randFraction float64) time.Duration

NextWithRand is Next with an explicit random fraction in [0,1) used to apply jitter. Pass rand.Float64() in production; pass a fixed value in tests.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group supervises a set of Runnables. Run starts them all concurrently and returns when ctx is canceled or any Runnable returns a non-nil, non-canceled error — whichever comes first — after which every Runnable is asked to Stop.

This replaces hand-rolled run loops that juggle separate slices of servers, consumers, and workers: register everything as a Runnable and supervise it here.

func NewGroup

func NewGroup(log *slog.Logger, shutdownTimeout time.Duration) *Group

NewGroup builds a Group. shutdownTimeout bounds how long Run waits for all Runnables to Stop (0 means no bound). log may be nil.

func (*Group) Add

func (g *Group) Add(r ...Runnable)

Add registers one or more Runnables. Not safe to call once Run has started.

func (*Group) Run

func (g *Group) Run(ctx context.Context) error

Run starts all Runnables and blocks until shutdown. It always attempts to Stop every Runnable before returning, and returns the first triggering error (nil on a clean, ctx-driven shutdown).

type Handler

type Handler[T any] interface {
	Handle(ctx context.Context, item T) error
}

Handler processes one claimed item. A nil error means success (the item is passed to Sink.MarkDone); a non-nil error routes the item to Sink.MarkFailed, with terminality decided by the processor's IsTerminal classifier.

type HandlerFunc

type HandlerFunc[T any] func(ctx context.Context, item T) error

HandlerFunc adapts a plain function to a Handler.

func (HandlerFunc[T]) Handle

func (f HandlerFunc[T]) Handle(ctx context.Context, item T) error

Handle implements Handler.

type JobFn

type JobFn func(ctx context.Context)

JobFn is a unit of work submitted to a Pool.

type Loop

type Loop struct {
	// contains filtered or unexported fields
}

Loop is a Runnable that calls a TickFunc on a fixed interval. Each tick is wrapped in panic recovery so a single bad tick cannot crash the process. This is the common shape behind outbox/push background workers.

func NewLoop

func NewLoop(log *slog.Logger, cfg LoopConfig, tick TickFunc) *Loop

NewLoop builds a Loop. log may be nil.

func (*Loop) Name

func (l *Loop) Name() string

Name implements Runnable.

func (*Loop) Start

func (l *Loop) Start(ctx context.Context) error

Start implements Runnable: it ticks until ctx is canceled.

func (*Loop) Stop

func (l *Loop) Stop(context.Context) error

Stop implements Runnable; the loop stops when its context is canceled.

type LoopConfig

type LoopConfig struct {
	// Name identifies the loop in logs and panic metrics.
	Name string
	// Interval between ticks. Required (> 0).
	Interval time.Duration
	// ImmediateFirstTick runs a tick at startup before the first interval.
	ImmediateFirstTick bool
	// TickTimeout bounds each tick (0 = inherit the loop context with no extra bound).
	TickTimeout time.Duration
	// OnPanic is invoked when a tick panics (e.g. to bump a metric); may be nil.
	OnPanic safetick.PanicHandler
}

LoopConfig configures a Loop.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool runs one-shot jobs with a bounded degree of concurrency. Submit blocks until a slot is free (or ctx is canceled / the pool is shutting down), then launches the job on its own goroutine. Shutdown cancels every in-flight job and waits for them to return.

Use Pool for bounded fan-out of discrete tasks (the rd/service foundation/worker shape). For recurring work use Loop; for reliable claim/process/ack pipelines use Processor.

func NewPool

func NewPool(maxConcurrent int) *Pool

NewPool constructs a Pool that runs at most maxConcurrent jobs at once. maxConcurrent is clamped to a minimum of 1.

func (*Pool) Cancel

func (p *Pool) Cancel(key string)

Cancel stops a single in-flight job by its key. Unknown keys (already finished or never issued) are a no-op.

func (*Pool) Running

func (p *Pool) Running() int

Running reports the number of jobs currently executing.

func (*Pool) Shutdown

func (p *Pool) Shutdown(ctx context.Context) error

Shutdown signals the pool to stop accepting work, cancels every in-flight job, and waits for them to return or until ctx is canceled. It is idempotent.

func (*Pool) Submit

func (p *Pool) Submit(ctx context.Context, job JobFn) (string, error)

Submit blocks until a slot is free, then launches job on its own goroutine. The job receives a context that is canceled either when the submission ctx is canceled or when Shutdown is called, whichever comes first. It returns a key that can be passed to Cancel to stop that single job early.

type Processor

type Processor[T any] struct {
	// contains filtered or unexported fields
}

Processor is the reliable batch-processing FSM shared by the outbox relay, push pipelines, and pgqueue consumers: each tick leases a batch from the Source, runs the Handler on each item, then records the outcome via the Sink (done on success, failed/terminal-or-retryable on error).

A Processor is not itself a Runnable; call Tick to get a TickFunc and wrap it in a Loop to run it on a schedule. Run several processors over the same Source for higher throughput — SKIP LOCKED coordinates the claims.

func NewProcessor

func NewProcessor[T any](log *slog.Logger, src Source[T], h Handler[T], sink Sink[T], cfg ProcessorConfig) *Processor[T]

NewProcessor builds a Processor. log may be nil.

func (*Processor[T]) Tick

func (p *Processor[T]) Tick() TickFunc

Tick returns the processor's drain cycle as a TickFunc suitable for NewLoop.

type ProcessorConfig

type ProcessorConfig struct {
	// Name labels the processor in logs (defaults to "processor").
	Name string
	// BatchSize is the max number of items leased per tick (defaults to 100).
	BatchSize int
	// HandleTimeout bounds each Handle call and its follow-up mark (0 = inherit
	// the tick context with no extra bound).
	HandleTimeout time.Duration
	// IsTerminal classifies a Handle error as permanent (true) or retryable
	// (false). When nil, every failure is treated as retryable.
	IsTerminal func(err error) bool
	// Now returns the tick clock; defaults to time.Now().UTC. Override in tests.
	Now func() time.Time
}

ProcessorConfig configures a Processor.

type Runnable

type Runnable interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Name() string
}

Runnable is a supervised unit of work. Start blocks until ctx is canceled or the unit fails; Stop performs a graceful shutdown. Name identifies it in logs.

type RunnableFunc

type RunnableFunc struct {
	NameValue string
	StartFn   func(ctx context.Context) error
	StopFn    func(ctx context.Context) error
}

RunnableFunc adapts a plain start function (plus name and optional stop) into a Runnable, so callers can supply behavior as functions instead of types.

func (RunnableFunc) Name

func (r RunnableFunc) Name() string

Name implements Runnable.

func (RunnableFunc) Start

func (r RunnableFunc) Start(ctx context.Context) error

Start implements Runnable.

func (RunnableFunc) Stop

func (r RunnableFunc) Stop(ctx context.Context) error

Stop implements Runnable.

type Sink

type Sink[T any] interface {
	MarkDone(ctx context.Context, item T, now time.Time) error
	MarkFailed(ctx context.Context, item T, errMsg string, terminal bool, now time.Time) error
}

Sink records the outcome of processing an item. MarkDone acknowledges success; MarkFailed records a failure, where terminal distinguishes a permanent failure (move to a dead state) from a retryable one (return to pending for a later tick). errMsg has already been passed through errs.Sanitize by the processor.

type Source

type Source[T any] interface {
	Claim(ctx context.Context, now time.Time, limit int) ([]T, error)
}

Source leases a batch of pending items for processing. Implementations must be safe to run from multiple replicas concurrently — the canonical SQL backing is SELECT ... FOR UPDATE SKIP LOCKED, which hands each item to exactly one caller. now is the processor's wall clock for the tick (passed in so it is testable); limit bounds the batch size.

type SourceFunc

type SourceFunc[T any] func(ctx context.Context, now time.Time, limit int) ([]T, error)

SourceFunc adapts a plain function to a Source.

func (SourceFunc[T]) Claim

func (f SourceFunc[T]) Claim(ctx context.Context, now time.Time, limit int) ([]T, error)

Claim implements Source.

type TickFunc

type TickFunc func(ctx context.Context) error

TickFunc performs one unit of periodic work. Returning an error logs it but does not stop the loop; the loop stops only when its context is canceled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL