workers

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 9 Imported by: 0

README

Workers

A Go library for managing background workers with periodic execution, retry logic, and graceful shutdown.

Features

  • Periodic execution - Run jobs at configurable intervals or specific times
  • Scheduling - Daily, weekly, or every N days at specific times
  • Retry logic - Automatic retries with configurable attempts and delay
  • Backoff strategies - Constant, Linear, and Exponential backoff
  • Timeout support - Set max execution time per job
  • Execution limit - Limit total number of job runs
  • Graceful shutdown - Handles SIGINT/SIGTERM signals properly
  • Structured logging - Built-in slog integration
  • Panic recovery - Workers continue even if one panics

Installation

go get github.com/assaidy/workers

Quick Start

package main

import (
    "context"
    "log/slog"
    "os"
    "time"
    
    "github.com/assaidy/workers"
)

func main() {
    logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
    
    wm := workers.NewWorkerManager(
        workers.WithLogger(logger),
    )
    
    wm.RegisterWorker(workers.NewWorker("cleanup", cleanupJob,
        workers.WithTick(30*time.Minute),
        workers.WithTimeout(5*time.Minute),
        workers.WithNRetries(3),
        workers.WithRetryDelay(5*time.Second),
    ))
    
    wm.Start()
}

func cleanupJob(ctx context.Context, log *slog.Logger) error {
    log.Info("running cleanup job")
    return nil
}

Use Cases

1. Basic Periodic Worker

Run a job at fixed time intervals.

wm.RegisterWorker(workers.NewWorker("metrics", metricsJob,
    workers.WithTick(1*time.Minute),     // Run every minute
    workers.WithTimeout(30*time.Second), // Max execution time
))

When to use: Data collection, health checks, cache warming.

2. Daily Scheduled Tasks

Run jobs at specific times of the day.

wm.RegisterWorker(workers.NewWorker("report", reportJob,
    workers.WithSchedules(
        workers.DailyAt(9, 0),   // Daily at 9:00 AM
        workers.DailyAt(17, 0),  // Daily at 5:00 PM
    ),
    workers.WithTimeout(10*time.Minute),
))

When to use: Daily reports, backups, maintenance windows.

3. Weekly Scheduled Tasks

Run jobs on specific days of the week.

wm.RegisterWorker(workers.NewWorker("weekly-sync", weeklySyncJob,
    workers.WithSchedules(
        workers.WeeklyAt(time.Monday, 8, 0),    // Every Monday at 8:00 AM
        workers.WeeklyAt(time.Friday, 16, 30),  // Every Friday at 4:30 PM
    ),
    workers.WithTimeout(2*time.Hour),
))

When to use: Weekly summaries, end-of-week processing, weekend maintenance.

4. Every N Days

Run jobs on custom intervals.

wm.RegisterWorker(workers.NewWorker("bi-daily", biDailyJob,
    workers.WithSchedules(
        workers.EveryNDays(2, 2, 0),   // Every 2 days at 2:00 AM
    ),
    workers.WithTimeout(1*time.Hour),
))

When to use: Bi-daily processing, custom business cycles.

5. Retry with Backoff Strategies

Handle failures gracefully with automatic retries.

// Constant backoff: 5s, 5s, 5s, 5s...
wm.RegisterWorker(workers.NewWorker("api-sync", apiSyncJob,
    workers.WithTick(5*time.Minute),
    workers.WithNRetries(5),
    workers.WithRetryDelay(5*time.Second),
    workers.WithBackoffStrategy(workers.ConstantBackoff),
))

// Linear backoff: 5s, 10s, 15s, 20s...
wm.RegisterWorker(workers.NewWorker("slow-api", slowApiJob,
    workers.WithTick(10*time.Minute),
    workers.WithNRetries(4),
    workers.WithRetryDelay(5*time.Second),
    workers.WithBackoffStrategy(workers.LinearBackoff),
))

// Exponential backoff: 5s, 10s, 20s, 40s...
wm.RegisterWorker(workers.NewWorker("unstable-api", unstableApiJob,
    workers.WithTick(15*time.Minute),
    workers.WithNRetries(3),
    workers.WithRetryDelay(5*time.Second),
    workers.WithBackoffStrategy(workers.ExponentialBackoff),
))

When to use: API integrations, external service calls, flaky network operations.

6. Limited Execution Count

Run a job a fixed number of times then stop.

wm.RegisterWorker(workers.NewWorker("migration", migrationJob,
    workers.WithTick(1*time.Minute),
    workers.WithNRuns(10),  // Run exactly 10 times
    workers.WithTimeout(2*time.Minute),
))

When to use: Data migrations, one-time setup tasks, batch processing with known size.

7. Run Once Without Timeout

Execute a job once without any time limits.

wm.RegisterWorker(workers.NewWorker("long-task", longRunningJob,
    workers.WithNRuns(1),  // Run once and stop
))

When to use: Long-running initialization, unbounded processing tasks.

8. Custom Logger Per Worker

Override the default logger for specific workers.

Note: The *slog.Logger passed as the second parameter to your job function is the worker's logger (inherited from WorkerManager unless overridden with WithItsOwnLogger).

customLogger := slog.New(slog.NewJSONHandler(os.Stdout, nil))

wm.RegisterWorker(workers.NewWorker("json-logger", jobFunc,
    workers.WithTick(5*time.Minute),
    workers.WithItsOwnLogger(customLogger),
))

When to use: Different log formats per worker, separate log destinations, custom log levels.

9. Timezone Support

Schedule jobs in specific timezones.

est := time.FixedZone("EST", -5*60*60)
utc := time.UTC

wm.RegisterWorker(workers.NewWorker("ny-report", reportJob,
    workers.WithSchedules(
        workers.DailyAt(9, 0).In(est),  // 9:00 AM EST
    ),
))

wm.RegisterWorker(workers.NewWorker("utc-cleanup", cleanupJob,
    workers.WithSchedules(
        workers.DailyAt(0, 0).In(utc),  // Midnight UTC
    ),
))

When to use: Multi-region applications, coordinating with teams in different timezones.

Configuration Options

WorkerManager Options
Option Description Default
WithLogger(logger) Set logger for all workers slog.Default()
Worker Options
Option Description Default
WithTick(duration) Interval between executions 1 hour
WithTimeout(duration) Max execution time per job No timeout
WithNRetries(n) Number of retry attempts 3
WithRetryDelay(duration) Delay between retries 5 seconds
WithBackoffStrategy(strategy) Retry backoff pattern ConstantBackoff
WithSchedules(schedules...) Specific run times Tick-based
WithNRuns(n) Limit total executions Unlimited
WithItsOwnLogger(logger) Worker-specific logger Manager's logger
Schedule Functions
Function Description
DailyAt(hour, minute) Run daily at specified time
WeeklyAt(weekday, hour, minute) Run weekly on specified day
EveryNDays(n, hour, minute) Run every N days
schedule.In(location) Set timezone for schedule
Backoff Strategies
Strategy Pattern Example (5s base)
ConstantBackoff Fixed delay 5s, 5s, 5s, 5s
LinearBackoff Linear increase 5s, 10s, 15s, 20s
ExponentialBackoff Exponential increase 5s, 10s, 20s, 40s

Custom backoff strategies: You can define your own backoff strategy by implementing the BackoffStrategy type:

type BackoffStrategy func(baseDelay time.Duration, attempt int) time.Duration

// Example: Custom backoff that increases by 3x each attempt
myBackoff := func(baseDelay time.Duration, attempt int) time.Duration {
    return baseDelay * time.Duration(math.Pow(3, float64(attempt)))
}
// 5s, 15s, 45s, 135s...

wm.RegisterWorker(workers.NewWorker("custom", jobFunc,
    workers.WithBackoffStrategy(myBackoff),
))

Graceful Shutdown

The library handles SIGINT (Ctrl+C) and SIGTERM signals gracefully:

  1. All running jobs complete their current execution
  2. Workers stop accepting new jobs
  3. Application exits cleanly

Press Ctrl+C twice to force immediate shutdown.

Complete Example

package main

import (
    "context"
    "fmt"
    "log/slog"
    "os"
    "time"
    
    "github.com/assaidy/workers"
)

func main() {
    logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
    
    wm := workers.NewWorkerManager(
        workers.WithLogger(logger),
    )
    
    // 1. Periodic worker with retries
    wm.RegisterWorker(workers.NewWorker("cleanup", cleanupJob,
        workers.WithTick(30*time.Minute),
        workers.WithTimeout(5*time.Minute),
        workers.WithNRetries(3),
        workers.WithRetryDelay(5*time.Second),
    ))
    
    // 2. Daily scheduled reports
    wm.RegisterWorker(workers.NewWorker("report", reportJob,
        workers.WithSchedules(
            workers.DailyAt(9, 0),
            workers.WeeklyAt(time.Monday, 14, 30),
        ),
        workers.WithTimeout(10*time.Minute),
    ))
    
    // 3. Limited run migration
    wm.RegisterWorker(workers.NewWorker("migrate", migrateJob,
        workers.WithTick(5*time.Minute),
        workers.WithNRuns(10),
        workers.WithTimeout(2*time.Minute),
    ))
    
    // 4. One-time long task
    wm.RegisterWorker(workers.NewWorker("long-task", longRunningJob,
        workers.WithNRuns(1),
    ))
    
    wm.Start()
}

func cleanupJob(ctx context.Context, log *slog.Logger) error {
    log.Info("running cleanup job")
    return nil
}

func reportJob(ctx context.Context, log *slog.Logger) error {
    log.Info("generating report")
    return nil
}

func migrateJob(ctx context.Context, log *slog.Logger) error {
    log.Info("running migration")
    return nil
}

func longRunningJob(ctx context.Context, log *slog.Logger) error {
    log.Info("starting long task")
    select {}
}

Documentation

Overview

Package workers provides background job management with periodic execution, scheduling, retry logic, and graceful shutdown.

Workers is designed for running background tasks like data processing, cleanup jobs, API synchronization, and scheduled reports.

Basic Usage

Create a WorkerManager, register workers with their jobs, and start:

wm := workers.NewWorkerManager()
wm.RegisterWorker(workers.NewWorker("cleanup", cleanupJob))
wm.Start()

Jobs are functions that receive a context and logger:

func cleanupJob(ctx context.Context, log *slog.Logger) error {
    log.Info("running cleanup")
    return nil
}

Configuration

Configure workers using functional options:

workers.NewWorker("sync", syncJob,
    workers.WithTick(5*time.Minute),
    workers.WithTimeout(30*time.Second),
    workers.WithNRetries(3),
)

Scheduling

Run jobs at specific times instead of intervals:

workers.WithSchedules(
    workers.DailyAt(9, 0),
    workers.WeeklyAt(time.Monday, 14, 30),
)

Retry and Backoff

Automatic retries with configurable backoff strategies:

workers.WithNRetries(5),
workers.WithRetryDelay(10*time.Second),
workers.WithBackoffStrategy(workers.ExponentialBackoff)

Available strategies: ConstantBackoff, LinearBackoff, ExponentialBackoff. Custom strategies can be defined as: func(time.Duration, int) time.Duration

Graceful Shutdown

The library handles SIGINT and SIGTERM signals, allowing running jobs to complete before shutdown.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffStrategy

type BackoffStrategy func(baseDelay time.Duration, attempt int) time.Duration

BackoffStrategy defines how long to wait between retry attempts. The baseDelay is the initial delay configured, and attempt is 1-based (first retry is attempt 1). Return the calculated delay duration.

var (
	// ConstantBackoff returns a fixed delay regardless of attempt number.
	//
	// Example: 5s, 5s, 5s, 5s...
	ConstantBackoff BackoffStrategy = func(baseDelay time.Duration, attempt int) time.Duration {
		return baseDelay
	}

	// LinearBackoff increases delay linearly with each attempt.
	// Formula: delay * attempt
	//
	// Example: 5s, 10s, 15s, 20s...
	LinearBackoff BackoffStrategy = func(baseDelay time.Duration, attempt int) time.Duration {
		return baseDelay * time.Duration(attempt)
	}

	// ExponentialBackoff doubles the delay with each attempt.
	// Formula: delay * 2^attempt
	//
	// Example: 5s, 10s, 20s, 40s...
	ExponentialBackoff BackoffStrategy = func(baseDelay time.Duration, attempt int) time.Duration {
		return baseDelay * time.Duration(math.Pow(2, float64(attempt)))
	}
)

Predefined backoff strategies provide common retry delay patterns. All strategies are type-safe and implement the BackoffStrategy type.

type Schedule

type Schedule struct {
	// contains filtered or unexported fields
}

Schedule defines when a worker should execute. Use DailyAt, WeeklyAt, or EveryNDays to create schedules. Modify with the In method to set timezones.

func DailyAt

func DailyAt(hour, minute int) Schedule

DailyAt creates a schedule that runs daily at the specified hour and minute. Hour must be between 0-23 and minute must be between 0-59.

Example:

workers.DailyAt(9, 0)    // Daily at 9:00 AM
workers.DailyAt(14, 30)  // Daily at 2:30 PM

func EveryNDays

func EveryNDays(n, hour, minute int) Schedule

EveryNDays creates a schedule that runs every N days at the specified time. The anchor date is set to today at midnight, so the pattern starts from today. For example, if created on Monday with n=2, it will run on Mon, Wed, Fri, Sun, etc. Hour must be between 0-23 and minute must be between 0-59.

Example:

workers.EveryNDays(2, 9, 0)   // Every 2 days at 9:00 AM
workers.EveryNDays(7, 14, 0)  // Weekly at 2:00 PM - same as WeeklyAt(time.Now().Weekday(), hour, minute)

func WeeklyAt

func WeeklyAt(weekday time.Weekday, hour, minute int) Schedule

WeeklyAt creates a schedule that runs weekly on a specific day at the specified time. Weekday specifies which day of the week (time.Monday through time.Sunday). Hour must be between 0-23 and minute must be between 0-59.

Example:

workers.WeeklyAt(time.Monday, 9, 0)     // Every Monday at 9:00 AM
workers.WeeklyAt(time.Friday, 17, 30)   // Every Friday at 5:30 PM

func (Schedule) In

func (me Schedule) In(location *time.Location) Schedule

In sets the timezone for the schedule and returns the modified schedule. By default, schedules use the local timezone. Use this to specify a different timezone.

Example:

workers.DailyAt(9, 0).In(time.UTC)           // Daily at 9:00 AM UTC
workers.WeeklyAt(time.Monday, 9, 0).In(time.FixedZone("EST", -5*60*60))  // Monday at 9:00 AM EST

func (Schedule) String

func (me Schedule) String() string

String returns a human-readable description of the schedule. Examples: "daily at 09:00", "Monday at 14:30", "every 2 days at 02:00"

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker manages a background job with configurable execution intervals, retry logic, timeouts, and scheduling. Workers can run periodically or at specific times defined by schedules.

func NewWorker

func NewWorker(name string, job WorkerJob, options ...WorkerOption) Worker

NewWorker creates a worker with the specified name and job function. Apply options to configure tick intervals, timeouts, retries, schedules, and other behaviors. Panics if any option is invalid.

type WorkerJob

type WorkerJob func(context.Context, *slog.Logger) error

WorkerJob defines the function signature for job implementations. The function receives a context for cancellation and timeout handling, and a logger for structured logging within the job.

type WorkerManager

type WorkerManager struct {
	// contains filtered or unexported fields
}

WorkerManager coordinates multiple workers, managing their lifecycle from startup to graceful shutdown. It handles OS signals (SIGINT, SIGTERM) and recovers from worker panics to ensure system stability.

func NewWorkerManager

func NewWorkerManager(options ...WorkerManagerOption) WorkerManager

NewWorkerManager creates a manager for coordinating workers. Configure with options before registering workers and calling Start.

func (*WorkerManager) RegisterWorker

func (me *WorkerManager) RegisterWorker(worker Worker)

RegisterWorker adds a worker to the manager. The worker inherits the manager's logger if it doesn't have its own. Workers must be registered before Start is called.

func (WorkerManager) Start

func (me WorkerManager) Start()

Start runs all registered workers and blocks until shutdown. Workers execute concurrently. The method blocks until SIGINT or SIGTERM is received, then waits for running jobs to complete before returning.

type WorkerManagerOption

type WorkerManagerOption func(*WorkerManager)

WorkerManagerOption configures a WorkerManager. Use WithLogger to create options.

func WithLogger

func WithLogger(logger *slog.Logger) WorkerManagerOption

WithLogger sets the default logger for the manager and all workers without their own logger.

Default: slog.Default()

type WorkerOption

type WorkerOption func(*Worker)

WorkerOption configures a Worker. Use the With* functions to create options.

func WithBackoffStrategy

func WithBackoffStrategy(strategy BackoffStrategy) WorkerOption

WithBackoffStrategy sets how the retry delay increases with each attempt. Use ConstantBackoff, LinearBackoff, ExponentialBackoff, or a custom function.

Default: ConstantBackoff

func WithItsOwnLogger

func WithItsOwnLogger(logger *slog.Logger) WorkerOption

WithItsOwnLogger sets a worker-specific logger, overriding the WorkerManager's logger. The logger must not be nil.

Default: inherits from WorkerManager

func WithNRetries

func WithNRetries(n int) WorkerOption

WithNRetries sets how many times to retry a failed job. Set to 0 to disable retries.

Default: 3 retries

func WithNRuns

func WithNRuns(n int) WorkerOption

WithNRuns limits the total number of times a worker executes. After reaching the limit, the worker stops automatically. Must be greater than 0.

Default: unlimited runs

func WithRetryDelay

func WithRetryDelay(delay time.Duration) WorkerOption

WithRetryDelay sets the initial delay between retry attempts. The actual delay may increase based on the backoff strategy. Must be greater than 0.

Default: 5 seconds

func WithSchedules

func WithSchedules(schedules ...Schedule) WorkerOption

WithSchedules sets specific times for the worker to run, replacing tick-based execution. When schedules are configured, the worker runs only at the specified times. Multiple schedules can be combined to run at different times.

Example:

workers.WithSchedules(
    workers.DailyAt(9, 0),
    workers.WeeklyAt(time.Friday, 14, 30),
    workers.EveryNDays(2, 2, 0),
)

func WithTick

func WithTick(tick time.Duration) WorkerOption

WithTick sets the interval between job executions. Ignored when schedules are set. The tick must be greater than 0.

Default: 1 hour

func WithTimeout

func WithTimeout(timeout time.Duration) WorkerOption

WithTimeout sets a maximum execution time for each job run. Jobs exceeding this duration are cancelled via context. Must be greater than 0.

Default: no timeout (jobs run indefinitely)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL