workers

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

README

workers

import "github.com/go-coldbrew/workers"

Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown.

Architecture

Every worker runs inside its own supervisor subtree. This means:

  • Each worker gets panic recovery and restart independently
  • Workers can dynamically spawn child workers via WorkerContext.Add
  • When a parent worker stops, all its children stop (scoped lifecycle)
  • The supervisor tree prevents cascading failures and CPU-burn restart storms
Quick Start

Create workers with NewWorker and run them with Run:

workers.Run(ctx, []*workers.Worker{
    workers.NewWorker("kafka", consume),
    workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true),
})
Helpers

Common patterns are provided as helpers:

Dynamic Workers

Manager workers can spawn and remove child workers at runtime using WorkerContext.Add, WorkerContext.Remove, and WorkerContext.Children. Children join the parent's supervisor subtree and get full framework guarantees (tracing, panic recovery, restart). See [Example_dynamicWorkerPool].

Example (Dynamic Worker Pool)

Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick. This demonstrates the pattern used by services like route-store where worker configs are loaded from a database periodically.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// Simulate config that changes over 3 ticks.
	// Tick 1: start worker-a
	// Tick 2: add worker-b
	// Tick 3: remove worker-a
	configs := [][]string{
		{"worker-a"},
		{"worker-a", "worker-b"},
		{"worker-b"},
	}

	tick := 0
	manager := workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error {
		ticker := time.NewTicker(40 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-ticker.C:
				if tick >= len(configs) {
					continue
				}
				desired := map[string]bool{}
				for _, name := range configs[tick] {
					desired[name] = true
				}
				tick++

				// Remove workers no longer desired.
				for _, name := range ctx.Children() {
					if !desired[name] {
						ctx.Remove(name)
					}
				}
				// Add new workers (Add is a no-op replacement if already running).
				for name := range desired {
					name := name
					ctx.Add(workers.NewWorker(name, func(ctx workers.WorkerContext) error {
						<-ctx.Done()
						return ctx.Err()
					}))
				}
				time.Sleep(10 * time.Millisecond) // let children start
				fmt.Printf("tick %d: children=%v\n", tick, ctx.Children())
			}
		}
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
	fmt.Println("pool shut down")
}
Output
tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down

Example (Standalone)

Standalone usage with signal handling — no ColdBrew required.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
	// For the example, use a short timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{
		workers.NewWorker("kafka", func(ctx workers.WorkerContext) error {
			fmt.Println("consuming messages")
			<-ctx.Done()
			return ctx.Err()
		}),
	})
	fmt.Println("shutdown complete")
}
Output
consuming messages
shutdown complete

Index

func BatchChannelWorker

func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error

BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.

Example

BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan int, 10)
	for i := 1; i <= 6; i++ {
		ch <- i
	}
	close(ch)

	fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(ctx workers.WorkerContext, batch []int) error {
		fmt.Println(batch)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("batcher", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output
[1 2 3]
[4 5 6]

func ChannelWorker

func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error

ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.

Example

ChannelWorker consumes items from a channel one at a time.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan string, 3)
	ch <- "hello"
	ch <- "world"
	ch <- "!"
	close(ch)

	fn := workers.ChannelWorker(ch, func(ctx workers.WorkerContext, item string) error {
		fmt.Println(item)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("consumer", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output
hello
world
!

func EveryInterval

func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error

EveryInterval wraps fn in a ticker loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on WithRestart).

Example

EveryInterval wraps a function in a ticker loop.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	fn := workers.EveryInterval(20*time.Millisecond, func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	})

	w := workers.NewWorker("periodic", fn)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output
tick 1
tick 2

func Run

func Run(ctx context.Context, workers []*Worker) error

Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.

Example

Run multiple workers concurrently. All workers start together and stop when the context is cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w1 := workers.NewWorker("api-poller", func(ctx workers.WorkerContext) error {
		fmt.Println("api-poller started")
		<-ctx.Done()
		return ctx.Err()
	})
	w2 := workers.NewWorker("cache-warmer", func(ctx workers.WorkerContext) error {
		fmt.Println("cache-warmer started")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w1, w2})
	fmt.Println("all workers stopped")
}
Output
api-poller started
cache-warmer started
all workers stopped

func RunWorker

func RunWorker(ctx context.Context, w *Worker)

RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without RestartOnFail.

Example

RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("single", func(ctx workers.WorkerContext) error {
		fmt.Println("running")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.RunWorker(ctx, w)
	fmt.Println("done")
}
Output
running
done

type Worker

Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.

type Worker struct {
    // contains filtered or unexported fields
}

func NewWorker
func NewWorker(name string, run func(WorkerContext) error) *Worker

NewWorker creates a Worker with the given name and run function. The run function should block until ctx is cancelled or an error occurs.

Example

A simple worker that runs until cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("greeter", func(ctx workers.WorkerContext) error {
		fmt.Printf("worker %q started (attempt %d)\n", ctx.Name(), ctx.Attempt())
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output
worker "greeter" started (attempt 0)

func (*Worker) Every
func (w *Worker) Every(d time.Duration) *Worker

Every wraps the run function in a ticker loop that calls it at the given interval. The original run function is called once per tick. If it returns an error, the behavior depends on WithRestart: if true, the ticker worker restarts; if false, it exits.

Example

A periodic worker that runs a function on a fixed interval.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	w := workers.NewWorker("ticker", func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	}).Every(20 * time.Millisecond)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output
tick 1
tick 2

func (*Worker) WithBackoffJitter
func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker

WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.

func (*Worker) WithFailureBackoff
func (w *Worker) WithFailureBackoff(d time.Duration) *Worker

WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.

func (*Worker) WithFailureDecay
func (w *Worker) WithFailureDecay(decay float64) *Worker

WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.

func (*Worker) WithFailureThreshold
func (w *Worker) WithFailureThreshold(threshold float64) *Worker

WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.

func (*Worker) WithRestart
func (w *Worker) WithRestart(restart bool) *Worker

WithRestart configures whether the worker should be restarted on failure. When true, the supervisor restarts the worker with backoff on non-context errors.

Example

A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	attempt := 0
	w := workers.NewWorker("resilient", func(ctx workers.WorkerContext) error {
		attempt++
		if attempt <= 2 {
			return fmt.Errorf("transient error")
		}
		fmt.Printf("succeeded on attempt %d\n", attempt)
		<-ctx.Done()
		return ctx.Err()
	}).WithRestart(true)

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
	// This example demonstrates restart behavior. Log output from the
	// supervisor is expected between restarts. The worker prints on success.
}

func (*Worker) WithTimeout
func (w *Worker) WithTimeout(d time.Duration) *Worker

WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.

type WorkerContext

WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.

type WorkerContext interface {
    context.Context
    // Name returns the worker's name.
    Name() string
    // Attempt returns the restart attempt number (0 on first run).
    Attempt() int
    // Add adds or replaces a child worker by name under the same supervisor.
    // If a worker with the same name already exists, it is removed first.
    // Children get full framework guarantees (tracing, panic recovery, restart).
    Add(w *Worker)
    // Remove stops a child worker by name.
    Remove(name string)
    // Children returns the names of currently running child workers.
    Children() []string
}
Example (!dd)

A manager worker that dynamically spawns and removes child workers using WorkerContext.Add, Remove, and Children.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	manager := workers.NewWorker("manager", func(ctx workers.WorkerContext) error {
		// Spawn two child workers dynamically.
		ctx.Add(workers.NewWorker("child-a", func(ctx workers.WorkerContext) error {
			fmt.Printf("%s started\n", ctx.Name())
			<-ctx.Done()
			return ctx.Err()
		}))
		ctx.Add(workers.NewWorker("child-b", func(ctx workers.WorkerContext) error {
			fmt.Printf("%s started\n", ctx.Name())
			<-ctx.Done()
			return ctx.Err()
		}))

		// Give children time to start.
		time.Sleep(30 * time.Millisecond)
		fmt.Printf("children: %v\n", ctx.Children())

		// Remove one child.
		ctx.Remove("child-a")
		time.Sleep(30 * time.Millisecond)
		fmt.Printf("after remove: %v\n", ctx.Children())

		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}
Output
child-a started
child-b started
children: [child-a child-b]
after remove: [child-b]

Example (!dd_replace)

Replace a child worker by adding one with the same name. The old worker is stopped and the new one takes its place.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	manager := workers.NewWorker("manager", func(ctx workers.WorkerContext) error {
		ctx.Add(workers.NewWorker("processor", func(ctx workers.WorkerContext) error {
			fmt.Println("processor v1")
			<-ctx.Done()
			return ctx.Err()
		}))
		time.Sleep(30 * time.Millisecond)

		// Replace with a new version — old one is stopped automatically.
		ctx.Add(workers.NewWorker("processor", func(ctx workers.WorkerContext) error {
			fmt.Println("processor v2")
			<-ctx.Done()
			return ctx.Err()
		}))
		time.Sleep(30 * time.Millisecond)

		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
}
Output
processor v1
processor v2

Generated by gomarkdoc

Documentation

Overview

Package workers provides a worker lifecycle library for Go, built on thejerf/suture. It manages background goroutines with automatic panic recovery, configurable restart with backoff, tracing, and structured shutdown.

Architecture

Every worker runs inside its own supervisor subtree. This means:

  • Each worker gets panic recovery and restart independently
  • Workers can dynamically spawn child workers via [WorkerContext.Add]
  • When a parent worker stops, all its children stop (scoped lifecycle)
  • The supervisor tree prevents cascading failures and CPU-burn restart storms

Quick Start

Create workers with NewWorker and run them with Run:

workers.Run(ctx, []*workers.Worker{
    workers.NewWorker("kafka", consume),
    workers.NewWorker("cleanup", cleanup).Every(5 * time.Minute).WithRestart(true),
})

Helpers

Common patterns are provided as helpers:

Dynamic Workers

Manager workers can spawn and remove child workers at runtime using [WorkerContext.Add], [WorkerContext.Remove], and [WorkerContext.Children]. Children join the parent's supervisor subtree and get full framework guarantees (tracing, panic recovery, restart). See [Example_dynamicWorkerPool].

Example (DynamicWorkerPool)

Simulates a config-driven worker pool manager that reconciles desired workers against running workers on each tick. This demonstrates the pattern used by services like route-store where worker configs are loaded from a database periodically.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// Simulate config that changes over 3 ticks.
	// Tick 1: start worker-a
	// Tick 2: add worker-b
	// Tick 3: remove worker-a
	configs := [][]string{
		{"worker-a"},
		{"worker-a", "worker-b"},
		{"worker-b"},
	}

	tick := 0
	manager := workers.NewWorker("pool-manager", func(ctx workers.WorkerContext) error {
		ticker := time.NewTicker(40 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-ticker.C:
				if tick >= len(configs) {
					continue
				}
				desired := map[string]bool{}
				for _, name := range configs[tick] {
					desired[name] = true
				}
				tick++

				// Remove workers no longer desired.
				for _, name := range ctx.Children() {
					if !desired[name] {
						ctx.Remove(name)
					}
				}
				// Add new workers (Add is a no-op replacement if already running).
				for name := range desired {
					name := name
					ctx.Add(workers.NewWorker(name, func(ctx workers.WorkerContext) error {
						<-ctx.Done()
						return ctx.Err()
					}))
				}
				time.Sleep(10 * time.Millisecond) // let children start
				fmt.Printf("tick %d: children=%v\n", tick, ctx.Children())
			}
		}
	})

	ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{manager})
	fmt.Println("pool shut down")
}
Output:
tick 1: children=[worker-a]
tick 2: children=[worker-a worker-b]
tick 3: children=[worker-b]
pool shut down
Example (Standalone)

Standalone usage with signal handling — no ColdBrew required.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	// In production you'd use signal.NotifyContext(ctx, os.Interrupt).
	// For the example, use a short timeout.
	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{
		workers.NewWorker("kafka", func(ctx workers.WorkerContext) error {
			fmt.Println("consuming messages")
			<-ctx.Done()
			return ctx.Err()
		}),
	})
	fmt.Println("shutdown complete")
}
Output:
consuming messages
shutdown complete

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchChannelWorker

func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(WorkerContext, []T) error) func(WorkerContext) error

BatchChannelWorker collects items from ch into batches and calls fn when either the batch reaches maxSize or maxDelay elapses since the first item in the current batch — whichever comes first. Flushes any partial batch on context cancellation or channel close before returning.

Example

BatchChannelWorker collects items into batches and flushes on maxSize or maxDelay — whichever comes first.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan int, 10)
	for i := 1; i <= 6; i++ {
		ch <- i
	}
	close(ch)

	fn := workers.BatchChannelWorker(ch, 3, time.Hour, func(ctx workers.WorkerContext, batch []int) error {
		fmt.Println(batch)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("batcher", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output:
[1 2 3]
[4 5 6]

func ChannelWorker

func ChannelWorker[T any](ch <-chan T, fn func(WorkerContext, T) error) func(WorkerContext) error

ChannelWorker consumes items from ch one at a time, calling fn for each. Returns when ctx is cancelled or ch is closed.

Example

ChannelWorker consumes items from a channel one at a time.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	ch := make(chan string, 3)
	ch <- "hello"
	ch <- "world"
	ch <- "!"
	close(ch)

	fn := workers.ChannelWorker(ch, func(ctx workers.WorkerContext, item string) error {
		fmt.Println(item)
		return nil
	})

	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	w := workers.NewWorker("consumer", fn)
	workers.Run(ctx, []*workers.Worker{w})
}
Output:
hello
world
!

func EveryInterval

func EveryInterval(d time.Duration, fn func(WorkerContext) error) func(WorkerContext) error

EveryInterval wraps fn in a ticker loop that calls fn at the given interval. Returns when ctx is cancelled. If fn returns an error, EveryInterval returns that error (the supervisor decides whether to restart based on WithRestart).

Example

EveryInterval wraps a function in a ticker loop.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	fn := workers.EveryInterval(20*time.Millisecond, func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	})

	w := workers.NewWorker("periodic", fn)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output:
tick 1
tick 2

func Run

func Run(ctx context.Context, workers []*Worker) error

Run starts all workers under a suture supervisor and blocks until ctx is cancelled and all workers have exited. Each worker gets its own child supervisor — when a worker stops, its children stop too. A worker exiting early (without restart) does not stop other workers. Returns nil on clean shutdown.

Example

Run multiple workers concurrently. All workers start together and stop when the context is cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w1 := workers.NewWorker("api-poller", func(ctx workers.WorkerContext) error {
		fmt.Println("api-poller started")
		<-ctx.Done()
		return ctx.Err()
	})
	w2 := workers.NewWorker("cache-warmer", func(ctx workers.WorkerContext) error {
		fmt.Println("cache-warmer started")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w1, w2})
	fmt.Println("all workers stopped")
}
Output:
api-poller started
cache-warmer started
all workers stopped

func RunWorker

func RunWorker(ctx context.Context, w *Worker)

RunWorker runs a single worker with panic recovery and optional restart. Blocks until ctx is cancelled or the worker exits without RestartOnFail.

Example

RunWorker runs a single worker — useful for dynamic managers that spawn child workers in their own goroutines.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("single", func(ctx workers.WorkerContext) error {
		fmt.Println("running")
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.RunWorker(ctx, w)
	fmt.Println("done")
}
Output:
running
done

Types

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a background goroutine managed by the framework. Create with NewWorker and configure with builder methods.

func NewWorker

func NewWorker(name string, run func(WorkerContext) error) *Worker

NewWorker creates a Worker with the given name and run function. The run function should block until ctx is cancelled or an error occurs.

Example

A simple worker that runs until cancelled.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	w := workers.NewWorker("greeter", func(ctx workers.WorkerContext) error {
		fmt.Printf("worker %q started (attempt %d)\n", ctx.Name(), ctx.Attempt())
		<-ctx.Done()
		return ctx.Err()
	})

	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output:
worker "greeter" started (attempt 0)

func (*Worker) Every

func (w *Worker) Every(d time.Duration) *Worker

Every wraps the run function in a ticker loop that calls it at the given interval. The original run function is called once per tick. If it returns an error, the behavior depends on WithRestart: if true, the ticker worker restarts; if false, it exits.

Example

A periodic worker that runs a function on a fixed interval.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	count := 0
	w := workers.NewWorker("ticker", func(ctx workers.WorkerContext) error {
		count++
		fmt.Printf("tick %d\n", count)
		return nil
	}).Every(20 * time.Millisecond)

	ctx, cancel := context.WithTimeout(context.Background(), 55*time.Millisecond)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
}
Output:
tick 1
tick 2

func (*Worker) WithBackoffJitter

func (w *Worker) WithBackoffJitter(jitter suture.Jitter) *Worker

WithBackoffJitter adds random jitter to the backoff duration to prevent thundering herd on coordinated restarts.

func (*Worker) WithFailureBackoff

func (w *Worker) WithFailureBackoff(d time.Duration) *Worker

WithFailureBackoff sets the duration to wait between restarts. Suture default is 15 seconds.

func (*Worker) WithFailureDecay

func (w *Worker) WithFailureDecay(decay float64) *Worker

WithFailureDecay sets the rate at which failure count decays over time. A value of 1.0 means failures decay by one per second. Suture default is 1.0.

func (*Worker) WithFailureThreshold

func (w *Worker) WithFailureThreshold(threshold float64) *Worker

WithFailureThreshold sets the number of failures allowed before the supervisor gives up restarting. Suture default is 5.

func (*Worker) WithRestart

func (w *Worker) WithRestart(restart bool) *Worker

WithRestart configures whether the worker should be restarted on failure. When true, the supervisor restarts the worker with backoff on non-context errors.

Example

A worker with automatic restart on failure. The supervisor logs restart events; the worker succeeds on the third attempt.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/go-coldbrew/workers"
)

func main() {
	attempt := 0
	w := workers.NewWorker("resilient", func(ctx workers.WorkerContext) error {
		attempt++
		if attempt <= 2 {
			return fmt.Errorf("transient error")
		}
		fmt.Printf("succeeded on attempt %d\n", attempt)
		<-ctx.Done()
		return ctx.Err()
	}).WithRestart(true)

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	workers.Run(ctx, []*workers.Worker{w})
	// This example demonstrates restart behavior. Log output from the
	// supervisor is expected between restarts. The worker prints on success.
}

func (*Worker) WithTimeout

func (w *Worker) WithTimeout(d time.Duration) *Worker

WithTimeout sets the maximum time to wait for the worker to stop during graceful shutdown. Suture default is 10 seconds.

type WorkerContext

type WorkerContext interface {
	context.Context
	// Name returns the worker's name.
	Name() string
	// Attempt returns the restart attempt number (0 on first run).
	Attempt() int
	// Add adds or replaces a child worker by name under the same supervisor.
	// If a worker with the same name already exists, it is removed first.
	// Children get full framework guarantees (tracing, panic recovery, restart).
	Add(w *Worker)
	// Remove stops a child worker by name.
	Remove(name string)
	// Children returns the names of currently running child workers.
	Children() []string
}

WorkerContext extends context.Context with worker metadata and dynamic child worker management. The framework creates these — users never need to implement this interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL