Documentation
¶
Overview ¶
Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, and structured shutdown.
Architecture ¶
Every worker runs inside its own supervisor subtree. This means:
- Each worker gets panic recovery and restart independently
- Workers can dynamically spawn child workers via WorkerInfo
- When a parent worker stops, all its children stop (scoped lifecycle)
- The supervisor tree prevents cascading failures and CPU-burn restart storms
Quick Start ¶
Create workers with NewWorker and run them with Run:
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka").HandlerFunc(consume),
workers.NewWorker("cleanup").HandlerFunc(cleanup).Every(5 * time.Minute),
})
Handler Contract ¶
For long-running workers (no Worker.Every): the handler should block until ctx is cancelled, then return ctx.Err().
For periodic workers (with Worker.Every): the handler runs once per tick and should return quickly. Returning nil means success (the next tick will fire). Returning an error triggers restart (if enabled) or stops the worker.
Returning nil from a non-periodic handler stops the worker permanently, even with restart enabled. Use ErrDoNotRestart for explicit permanent completion from periodic handlers.
Error Semantics for Periodic Handlers ¶
The return value from a periodic handler determines what happens next:
| Return value | Timer loop | Restart? | Use case | |-----------------|------------|----------|--------------------------| | nil | continues | n/a | Success, next tick fires | | ErrSkipTick | continues | n/a | Transient failure, skip | | ErrDoNotRestart | exits | no | Permanent completion | | other error | exits | yes* | Failure, needs restart | | ctx.Err() | exits | no | Graceful shutdown |
*Only if Worker.WithRestart(true) (the default).
Middleware ¶
Cross-cutting concerns like tracing, logging, and panic recovery are implemented as Middleware. The middleware chain follows the gRPC interceptor convention: a flat function that calls next to continue:
func myMiddleware(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
// before
err := next(ctx, info)
// after
return err
}
Attach middleware per-worker via Worker.Interceptors or per-run via WithInterceptors. Built-in middleware is available in the middleware/ sub-package.
Run-level interceptors (WithInterceptors) wrap all workers and are best for cross-cutting defaults (tracing, logging, panic recovery). Worker-level interceptors (Worker.Interceptors) are best for worker-specific concerns (distributed locks with per-worker TTL, rate limiting). Children inherit run-level interceptors but not the parent's worker-level interceptors.
Helpers ¶
Common patterns are provided as helpers:
- EveryInterval — periodic execution on a fixed interval
- ChannelWorker — consume items from a channel one at a time
- BatchChannelWorker — collect items into batches, flush on size or timer
Dynamic Workers ¶
Manager workers can spawn and remove child workers at runtime using the Add, Remove, and GetChildren methods on WorkerInfo. Children join the parent's supervisor subtree and get full framework guarantees (panic recovery, restart). See [Example_dynamicWorkerPool].
Example (DynamicWorkerPool) ¶
Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// Simulate config that changes over 3 ticks.
// Tick 1: start worker-a
// Tick 2: add worker-b
// Tick 3: remove worker-a
configs := [][]string{
{"worker-a"},
{"worker-a", "worker-b"},
{"worker-b"},
}
tick := 0
manager := workers.NewWorker("pool-manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := map[string]bool{}
for _, name := range configs[tick] {
desired[name] = true
}
tick++
running := map[string]bool{}
for _, name := range info.GetChildren() {
running[name] = true
}
// Remove workers no longer desired.
for name := range running {
if !desired[name] {
info.Remove(name)
}
}
// Add workers that aren't already running.
for name := range desired {
if !running[name] {
info.Add(workers.NewWorker(name).HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}))
}
}
time.Sleep(10 * time.Millisecond) // let children start
fmt.Printf("tick %d: children=%v\n", tick, info.GetChildren())
}
}
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
fmt.Println("pool shut down")
}
Output: tick 1: children=[worker-a] tick 2: children=[worker-a worker-b] tick 3: children=[worker-b] pool shut down
Example (ReconcilerWithChangeDetection) ¶
Demonstrates config-driven reconciliation with change detection using the handler-as-metadata pattern. The handler struct carries a config version that the reconciler inspects via GetChild().GetHandler() type assertion, eliminating the need for a parallel tracking map.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
// solverHandler is used in Example_reconcilerWithChangeDetection to
// demonstrate the handler-as-metadata pattern.
type solverHandler struct {
version int
}
func (h *solverHandler) RunCycle(ctx context.Context, _ *workers.WorkerInfo) error {
<-ctx.Done()
return ctx.Err()
}
func (h *solverHandler) Close() error { return nil }
func main() {
type solverConfig struct {
version int
}
// Simulate config that changes over 3 ticks.
configs := []map[string]solverConfig{
{"a": {version: 1}},
{"a": {version: 1}, "b": {version: 1}},
{"a": {version: 2}, "b": {version: 1}}, // a gets new version
}
tick := 0
manager := workers.NewWorker("reconciler").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if tick >= len(configs) {
continue
}
desired := configs[tick]
tick++
// Remove workers no longer desired.
for _, name := range info.GetChildren() {
if _, ok := desired[name]; !ok {
info.Remove(name)
}
}
// Add new or replace changed workers.
for key, cfg := range desired {
child, exists := info.GetChild(key)
if exists {
// Check if config changed via handler type assertion.
if h, ok := child.GetHandler().(*solverHandler); ok && h.version == cfg.version {
continue // unchanged, skip
}
info.Remove(key) // config changed, replace
time.Sleep(10 * time.Millisecond) // let old worker stop
}
info.Add(workers.NewWorker(key).Handler(&solverHandler{version: cfg.version}))
}
time.Sleep(10 * time.Millisecond)
fmt.Printf("tick %d: children=%v count=%d\n", tick, info.GetChildren(), len(info.GetChildren()))
}
}
})
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}
Output: tick 1: children=[a] count=1 tick 2: children=[a b] count=2 tick 3: children=[a b] count=2
Example (Standalone) ¶
Standalone usage with signal handling — no ColdBrew required.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
// For the example, use a short timeout.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{
workers.NewWorker("kafka").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("consuming messages")
<-ctx.Done()
return ctx.Err()
}),
})
fmt.Println("shutdown complete")
}
Output: consuming messages shutdown complete
Index ¶
- Variables
- func Run(ctx context.Context, workers []*Worker, opts ...RunOption) error
- func RunWorker(ctx context.Context, w *Worker, opts ...RunOption)
- type BaseMetrics
- func (BaseMetrics) ObserveRunDuration(string, time.Duration)
- func (BaseMetrics) SetActiveWorkers(int)
- func (BaseMetrics) WorkerFailed(string, error)
- func (BaseMetrics) WorkerPanicked(string)
- func (BaseMetrics) WorkerRestarted(string, int)
- func (BaseMetrics) WorkerStarted(string)
- func (BaseMetrics) WorkerStopped(string)
- type CycleFunc
- type CycleHandler
- type Metrics
- type Middleware
- type RunOption
- type Worker
- func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
- func (w *Worker) Every(d time.Duration) *Worker
- func (w *Worker) GetHandler() CycleHandler
- func (w *Worker) GetInitialDelay() time.Duration
- func (w *Worker) GetInterval() time.Duration
- func (w *Worker) GetJitterPercent() int
- func (w *Worker) GetName() string
- func (w *Worker) GetRestartOnFail() bool
- func (w *Worker) Handler(h CycleHandler) *Worker
- func (w *Worker) HandlerFunc(fn CycleFunc) *Worker
- func (w *Worker) Interceptors(mw ...Middleware) *Worker
- func (w *Worker) WithBackoffJitter(jitter func(time.Duration) time.Duration) *Worker
- func (w *Worker) WithFailureBackoff(d time.Duration) *Worker
- func (w *Worker) WithFailureDecay(decay float64) *Worker
- func (w *Worker) WithFailureThreshold(threshold float64) *Worker
- func (w *Worker) WithInitialDelay(d time.Duration) *Worker
- func (w *Worker) WithJitter(percent int) *Worker
- func (w *Worker) WithMetrics(m Metrics) *Worker
- func (w *Worker) WithRestart(restart bool) *Worker
- func (w *Worker) WithTimeout(d time.Duration) *Worker
- type WorkerInfo
- func (info *WorkerInfo) Add(w *Worker) bool
- func (info *WorkerInfo) GetAttempt() int
- func (info *WorkerInfo) GetChild(name string) (Worker, bool)
- func (info *WorkerInfo) GetChildCount() int
- func (info *WorkerInfo) GetChildren() []string
- func (info *WorkerInfo) GetHandler() CycleHandler
- func (info *WorkerInfo) GetName() string
- func (info *WorkerInfo) Remove(name string)
- type WorkerInfoOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrDoNotRestart = suture.ErrDoNotRestart
ErrDoNotRestart can be returned from a handler to signal that the worker should not be restarted, even when restart is enabled. Use this for permanent completion (e.g., channel closed, work exhausted).
var ErrSkipTick = errors.New("workers: skip tick")
ErrSkipTick can be returned from a periodic handler to skip the current tick without triggering restart. The timer continues and the next tick fires normally. Only meaningful for periodic workers (with Worker.Every).
Functions ¶
func Run ¶
Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.
Example ¶
Run multiple workers concurrently. All workers start together and stop when the context is cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w1 := workers.NewWorker("api-poller").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("api-poller started")
<-ctx.Done()
return ctx.Err()
})
w2 := workers.NewWorker("cache-warmer").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("cache-warmer started")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w1, w2})
fmt.Println("all workers stopped")
}
Output: api-poller started cache-warmer started all workers stopped
func RunWorker ¶
RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without restart. Unlike Run, RunWorker discards the error. Use Run if you need the error.
Example ¶
RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("single").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("running")
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.RunWorker(ctx, w)
fmt.Println("done")
}
Output: running done
Types ¶
type BaseMetrics ¶ added in v0.0.4
type BaseMetrics struct{}
BaseMetrics provides no-op implementations of all Metrics methods. Embed it in custom Metrics implementations so that new methods added to the Metrics interface in future versions get safe no-op defaults instead of breaking your build:
type myMetrics struct {
workers.BaseMetrics // forward-compatible
client *statsd.Client
}
func (m *myMetrics) WorkerStarted(name string) {
m.client.Incr("worker.started", []string{"worker:" + name}, 1)
}
func (BaseMetrics) ObserveRunDuration ¶ added in v0.0.4
func (BaseMetrics) ObserveRunDuration(string, time.Duration)
func (BaseMetrics) SetActiveWorkers ¶ added in v0.0.4
func (BaseMetrics) SetActiveWorkers(int)
func (BaseMetrics) WorkerFailed ¶ added in v0.0.4
func (BaseMetrics) WorkerFailed(string, error)
func (BaseMetrics) WorkerPanicked ¶ added in v0.0.4
func (BaseMetrics) WorkerPanicked(string)
func (BaseMetrics) WorkerRestarted ¶ added in v0.0.4
func (BaseMetrics) WorkerRestarted(string, int)
func (BaseMetrics) WorkerStarted ¶ added in v0.0.4
func (BaseMetrics) WorkerStarted(string)
func (BaseMetrics) WorkerStopped ¶ added in v0.0.4
func (BaseMetrics) WorkerStopped(string)
type CycleFunc ¶ added in v0.1.0
type CycleFunc func(ctx context.Context, info *WorkerInfo) error
CycleFunc adapts a plain function into a CycleHandler. Close is a no-op — use this for simple, stateless handlers.
func BatchChannelWorker ¶
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFunc
BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.
Example ¶
BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan int, 10)
for i := 1; i <= 6; i++ {
ch <- i
}
close(ch)
fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(_ context.Context, _ *workers.WorkerInfo, batch []int) error {
fmt.Println(batch)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("batcher").HandlerFunc(fn)
workers.Run(ctx, []*workers.Worker{w})
}
Output: [1 2 3] [4 5 6]
func ChannelWorker ¶
func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFunc
ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.
Example ¶
ChannelWorker consumes items from a channel one at a time.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
ch := make(chan string, 3)
ch <- "hello"
ch <- "world"
ch <- "!"
close(ch)
fn := workers.ChannelWorker(ch, func(_ context.Context, _ *workers.WorkerInfo, item string) error {
fmt.Println(item)
return nil
})
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
w := workers.NewWorker("consumer").HandlerFunc(fn)
workers.Run(ctx, []*workers.Worker{w})
}
Output: hello world !
func EveryInterval ¶
EveryInterval wraps fn in a timer loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on Worker.WithRestart).
Example ¶
EveryInterval wraps a function in a timer loop.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
fn := workers.EveryInterval(20*time.Millisecond, func(_ context.Context, _ *workers.WorkerInfo) error {
count++
fmt.Printf("tick %d\n", count)
return nil
})
w := workers.NewWorker("periodic").HandlerFunc(fn)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: tick 1 tick 2
type CycleHandler ¶ added in v0.1.0
type CycleHandler interface {
RunCycle(ctx context.Context, info *WorkerInfo) error
Close() error
}
CycleHandler handles worker execution cycles. For periodic workers, RunCycle is called once per tick. Close is called once when the worker stops, allowing cleanup of resources.
type Metrics ¶ added in v0.0.3
type Metrics interface {
WorkerStarted(name string)
WorkerStopped(name string)
WorkerPanicked(name string)
WorkerFailed(name string, err error)
WorkerRestarted(name string, attempt int)
// ObserveRunDuration records the attempt lifetime — the duration from
// when the worker started to when it stopped or failed. For per-cycle
// timing, use middleware.Duration instead.
ObserveRunDuration(name string, duration time.Duration)
SetActiveWorkers(count int)
}
Metrics collects worker lifecycle metrics. Implement this interface to provide custom metrics (e.g., Datadog, StatsD). Use BaseMetrics{} to disable metrics, or NewPrometheusMetrics for the built-in Prometheus implementation.
func NewPrometheusMetrics ¶ added in v0.0.3
NewPrometheusMetrics creates a Metrics implementation backed by Prometheus. The namespace is prepended to all metric names (e.g., "myapp" → "myapp_worker_started_total"). Metrics are auto-registered with the default Prometheus registry. Safe to call multiple times with the same namespace — returns the cached instance. The cache is process-global; use a small number of static namespaces (not per-request/tenant values).
type Middleware ¶ added in v0.1.0
type Middleware func(ctx context.Context, info *WorkerInfo, next CycleFunc) error
Middleware intercepts each execution cycle. Call next to continue the chain. Matches gRPC interceptor convention.
type RunOption ¶ added in v0.0.3
type RunOption func(*runConfig)
RunOption configures the behavior of Run.
func AddInterceptors ¶ added in v0.1.0
func AddInterceptors(mw ...Middleware) RunOption
AddInterceptors appends to the run-level interceptor list.
func WithDefaultJitter ¶ added in v0.1.0
WithDefaultJitter sets a run-level default jitter percentage for all periodic workers. Worker-level Worker.WithJitter takes precedence. Setting Worker.WithJitter(0) disables jitter for a specific worker even when a run-level default is set.
func WithInterceptors ¶ added in v0.1.0
func WithInterceptors(mw ...Middleware) RunOption
WithInterceptors replaces the run-level interceptor list. Run-level interceptors wrap outside worker-level interceptors.
func WithMetrics ¶ added in v0.0.3
WithMetrics sets the metrics implementation for all workers started by Run. Workers inherit this unless they override via Worker.WithMetrics. If not set, BaseMetrics is used.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.
func NewWorker ¶
NewWorker creates a Worker with the given name. Set the handler via Worker.Handler or Worker.HandlerFunc.
Example ¶
A simple worker that runs until cancelled.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
w := workers.NewWorker("greeter").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
fmt.Printf("worker %q started (attempt %d)\n", info.GetName(), info.GetAttempt())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: worker "greeter" started (attempt 0)
func (*Worker) AddInterceptors ¶ added in v0.1.0
func (w *Worker) AddInterceptors(mw ...Middleware) *Worker
AddInterceptors appends to the worker-level interceptor list.
func (*Worker) Every ¶
Every configures the worker to run periodically at the given interval. The interval is stored as data — wrapping is deferred to startup where jitter configuration is resolved.
Example ¶
A periodic worker that runs a function on a fixed interval.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
count := 0
w := workers.NewWorker("ticker").HandlerFunc(func(_ context.Context, _ *workers.WorkerInfo) error {
count++
fmt.Printf("tick %d\n", count)
return nil
}).Every(20 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: tick 1 tick 2
func (*Worker) GetHandler ¶ added in v0.1.0
func (w *Worker) GetHandler() CycleHandler
GetHandler returns the worker's CycleHandler, or nil if not set.
func (*Worker) GetInitialDelay ¶ added in v0.2.0
GetInitialDelay returns the initial delay before the first tick, or 0 if not set.
func (*Worker) GetInterval ¶ added in v0.2.0
GetInterval returns the periodic interval, or 0 if this is not a periodic worker.
func (*Worker) GetJitterPercent ¶ added in v0.2.0
GetJitterPercent returns the jitter percentage. -1 means inherit run-level default, 0 means no jitter.
func (*Worker) GetRestartOnFail ¶ added in v0.2.0
GetRestartOnFail returns whether the worker restarts on failure.
func (*Worker) Handler ¶ added in v0.1.0
func (w *Worker) Handler(h CycleHandler) *Worker
Handler sets the worker's CycleHandler. Use this for handlers that need cleanup via Close (e.g., database connections, leases).
func (*Worker) HandlerFunc ¶ added in v0.1.0
HandlerFunc sets the worker's handler from a plain function. This is the common case for simple, stateless workers.
func (*Worker) Interceptors ¶ added in v0.1.0
func (w *Worker) Interceptors(mw ...Middleware) *Worker
Interceptors replaces the worker-level interceptor list.
Example ¶
Per-worker middleware using the interceptor pattern.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
loggingMW := func(ctx context.Context, info *workers.WorkerInfo, next workers.CycleFunc) error {
fmt.Printf("[%s] cycle start\n", info.GetName())
err := next(ctx, info)
fmt.Printf("[%s] cycle end\n", info.GetName())
return err
}
w := workers.NewWorker("with-logging").
HandlerFunc(func(_ context.Context, info *workers.WorkerInfo) error {
fmt.Printf("[%s] doing work\n", info.GetName())
return nil
}).
Every(20 * time.Millisecond).
Interceptors(loggingMW)
ctx, cancel := context.WithTimeout(context.Background(), 35*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
}
Output: [with-logging] cycle start [with-logging] doing work [with-logging] cycle end
func (*Worker) WithBackoffJitter ¶
WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts. The function receives the base backoff duration and returns a jittered duration.
func (*Worker) WithFailureBackoff ¶
WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.
func (*Worker) WithFailureDecay ¶
WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.
func (*Worker) WithFailureThreshold ¶
WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.
func (*Worker) WithInitialDelay ¶ added in v0.1.0
WithInitialDelay delays the first tick to stagger startup. Requires Worker.Every.
func (*Worker) WithJitter ¶ added in v0.1.0
WithJitter sets per-worker jitter as a percentage of the base interval. Each tick is randomized within ±percent of the base. Requires Worker.Every. Setting WithJitter(0) explicitly disables jitter even when a run-level default is set via WithDefaultJitter.
func (*Worker) WithMetrics ¶ added in v0.0.3
WithMetrics sets a per-worker metrics implementation, overriding the metrics inherited from the parent WorkerInfo or Run options.
func (*Worker) WithRestart ¶
WithRestart configures whether the worker should be restarted on failure. Default is true. Set to false for one-shot workers that should exit after completion or failure. Note: a handler returning nil always stops the worker permanently, regardless of this setting.
Periodic workers (with Worker.Every) should generally keep the default (restart enabled). Use ErrSkipTick for transient failures and ErrDoNotRestart for permanent completion instead of disabling restart.
Example ¶
A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
attempt := 0
w := workers.NewWorker("resilient").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
attempt++
if attempt <= 2 {
return fmt.Errorf("transient error")
}
fmt.Printf("succeeded on attempt %d\n", attempt)
<-ctx.Done()
return ctx.Err()
}).WithRestart(true)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
workers.Run(ctx, []*workers.Worker{w})
// This example demonstrates restart behavior. Log output from the
// supervisor is expected between restarts. The worker prints on success.
}
Output:
type WorkerInfo ¶ added in v0.1.0
type WorkerInfo struct {
// contains filtered or unexported fields
}
WorkerInfo carries worker metadata and child management. The framework always creates it — it is never nil. context.Context handles cancellation/deadlines/values; WorkerInfo handles everything worker-specific.
func NewWorkerInfo ¶ added in v0.1.0
func NewWorkerInfo(name string, attempt int, opts ...WorkerInfoOption) *WorkerInfo
NewWorkerInfo creates a WorkerInfo with the given name and attempt. This is useful for testing middleware and handlers — the framework creates fully populated instances internally.
Use WithTestChildren to enable Add/Remove/GetChildren in tests.
func (*WorkerInfo) Add ¶ added in v0.1.0
func (info *WorkerInfo) Add(w *Worker) bool
Add starts a child worker under this worker's supervisor subtree. Returns true if the worker was added, false if a worker with the same name is already running (no-op). To replace a running worker, call WorkerInfo.Remove first then Add.
Note: Remove + Add is not atomic — there is a brief window where the worker is not running. For most reconciliation patterns this is fine.
Children inherit run-level interceptors, metrics (unless overridden via Worker.WithMetrics), and scoped lifecycle — when this worker stops, all its children stop too.
When a child permanently stops, it is automatically removed from the children map on the next call to WorkerInfo.Add, WorkerInfo.GetChildren, WorkerInfo.GetChild, or WorkerInfo.GetChildCount.
Example ¶
A manager worker that dynamically spawns and removes child workers using WorkerInfo.Add, Remove, and GetChildren.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
manager := workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
// Spawn two child workers dynamically.
info.Add(workers.NewWorker("child-a").HandlerFunc(func(ctx context.Context, childInfo *workers.WorkerInfo) error {
fmt.Printf("%s started\n", childInfo.GetName())
<-ctx.Done()
return ctx.Err()
}))
info.Add(workers.NewWorker("child-b").HandlerFunc(func(ctx context.Context, childInfo *workers.WorkerInfo) error {
fmt.Printf("%s started\n", childInfo.GetName())
<-ctx.Done()
return ctx.Err()
}))
// Give children time to start.
time.Sleep(30 * time.Millisecond)
fmt.Printf("children: %v\n", info.GetChildren())
// Remove one child.
info.Remove("child-a")
time.Sleep(30 * time.Millisecond)
fmt.Printf("after remove: %v\n", info.GetChildren())
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}
Output: child-a started child-b started children: [child-a child-b] after remove: [child-b]
Example (Replace) ¶
Replace a child worker using Remove + Add. Add is a no-op if the name exists, so Remove first.
package main
import (
"context"
"fmt"
"time"
"github.com/go-coldbrew/workers"
)
func main() {
manager := workers.NewWorker("manager").HandlerFunc(func(ctx context.Context, info *workers.WorkerInfo) error {
info.Add(workers.NewWorker("processor").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("processor v1")
<-ctx.Done()
return ctx.Err()
}))
time.Sleep(30 * time.Millisecond)
// Replace: Remove the old worker, then Add the new one.
info.Remove("processor")
time.Sleep(10 * time.Millisecond) // brief gap while old stops
info.Add(workers.NewWorker("processor").HandlerFunc(func(ctx context.Context, _ *workers.WorkerInfo) error {
fmt.Println("processor v2")
<-ctx.Done()
return ctx.Err()
}))
time.Sleep(30 * time.Millisecond)
<-ctx.Done()
return ctx.Err()
})
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
workers.Run(ctx, []*workers.Worker{manager})
}
Output: processor v1 processor v2
func (*WorkerInfo) GetAttempt ¶ added in v0.1.0
func (info *WorkerInfo) GetAttempt() int
GetAttempt returns the restart attempt number (0 on first run).
func (*WorkerInfo) GetChild ¶ added in v0.1.0
func (info *WorkerInfo) GetChild(name string) (Worker, bool)
GetChild returns a copy of a running child worker and true, or the zero value and false if not found. The returned value is a snapshot — mutations to the Worker fields have no effect on the running worker.
The CycleHandler (accessible via Worker.GetHandler) is shared with the running worker, not copied. Use type assertion to inspect handler state (e.g., config versions for reconciliation). See [Example_reconcilerWithChangeDetection].
func (*WorkerInfo) GetChildCount ¶ added in v0.2.0
func (info *WorkerInfo) GetChildCount() int
GetChildCount returns the number of currently running child workers. This is more efficient than calling WorkerInfo.GetChildren and taking len, as it avoids allocating a sorted slice. Stopped children are lazily pruned.
func (*WorkerInfo) GetChildren ¶ added in v0.1.0
func (info *WorkerInfo) GetChildren() []string
GetChildren returns the names of currently running child workers. Stopped children are lazily pruned before building the list.
func (*WorkerInfo) GetHandler ¶ added in v0.2.0
func (info *WorkerInfo) GetHandler() CycleHandler
GetHandler returns the worker's CycleHandler, or nil if not set. Use type assertion to access handler-specific state or interfaces:
if h, ok := info.GetHandler().(MyHandler); ok {
// access h.Config, h.Version, etc.
}
func (*WorkerInfo) GetName ¶ added in v0.1.0
func (info *WorkerInfo) GetName() string
GetName returns the worker's name as passed to NewWorker.
func (*WorkerInfo) Remove ¶ added in v0.1.0
func (info *WorkerInfo) Remove(name string)
Remove stops a child worker by name.
type WorkerInfoOption ¶ added in v0.1.0
type WorkerInfoOption func(*WorkerInfo)
WorkerInfoOption configures a WorkerInfo created by NewWorkerInfo.
func WithTestChildren ¶ added in v0.1.0
func WithTestChildren(ctx context.Context) WorkerInfoOption
WithTestChildren creates an internal supervisor so that Add, Remove, and GetChildren work in tests without calling Run. The caller must cancel the returned context when the test is done to stop the supervisor.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
info := workers.NewWorkerInfo("test", 0, workers.WithTestChildren(ctx))
func WithTestHandler ¶ added in v0.2.0
func WithTestHandler(h CycleHandler) WorkerInfoOption
WithTestHandler sets the handler on a test WorkerInfo so that WorkerInfo.GetHandler works in unit tests.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package middleware provides optional interceptors for go-coldbrew/workers.
|
Package middleware provides optional interceptors for go-coldbrew/workers. |