workerpool

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package workerpool provides a service for running small parts of code (called jobs) in the background.

Jobs can have contexts, timeouts, and retry strategies.

Example
package main

import (
	"context"
	"fmt"
	"sync"

	"github.com/metalagman/appkit/workerpool"
)

func main() {
	pool, _ := workerpool.New()

	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	job := func(ctx context.Context) error {
		fmt.Println("hello")
		wg.Done()

		return nil
	}

	pool.Submit(context.Background(), job)
	wg.Wait()
	pool.Stop(context.Background())
}
Output:
hello
Example (AdvancedUsage)
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/Rican7/retry/strategy"
	"github.com/metalagman/appkit/workerpool"
)

func main() {
	pool, _ := workerpool.New()

	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	job := func(ctx context.Context) error {
		fmt.Println("job executed")
		wg.Done()

		return nil
	}
	// add 3 seconds timeout for a job execution
	job = workerpool.AddTimeout(job, time.Second*3)
	// retry job execution within 5 attempts
	job = workerpool.AddRetry(job, strategy.Limit(5))

	pool.Submit(context.Background(), job)
	wg.Wait()
	pool.Stop(context.Background())
}
Output:
job executed

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job func(ctx context.Context) error

Job is a function that receives a context and is run asynchronously by the worker pool.

func AddLogger

func AddLogger(job Job, l logger.Logger) Job

AddLogger replaces the logger in the job context with the specified one.

Example
package main

import (
	"context"
	"os"
	"sync"
	"time"

	"github.com/metalagman/appkit/logger"
	"github.com/metalagman/appkit/workerpool"
	"github.com/rs/zerolog"
)

func main() {
	// mock time for test purposes
	zerolog.TimestampFunc = func() time.Time {
		t, _ := time.Parse("2006-01-02", "2021-01-01")

		return t
	}

	zl := zerolog.New(os.Stdout).Level(zerolog.WarnLevel).With().Timestamp().Logger()
	l := logger.NewZerolog(zl)
	l1 := l.With("logger", "logger1")
	l2 := l.With("logger", "logger2")

	pool, _ := workerpool.New(workerpool.WithLogger(l))

	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(2)

	job1 := func(ctx context.Context) error {
		logger.FromContext(ctx).Warnf("hello from job1")

		wg.Done()

		return nil
	}
	job2 := func(ctx context.Context) error {
		logger.FromContext(ctx).Warnf("hello from job2")

		wg.Done()

		return nil
	}
	// adding custom jobs
	job1 = workerpool.AddLogger(job1, l1)
	job2 = workerpool.AddLogger(job2, l2)

	pool.Submit(context.Background(), job1)
	pool.Submit(context.Background(), job2)

	wg.Wait()
	pool.Stop(context.Background())

}
Output:
{"level":"warn","logger":"logger1","time":"2021-01-01T00:00:00Z","message":"hello from job1"}
{"level":"warn","logger":"logger2","time":"2021-01-01T00:00:00Z","message":"hello from job2"}

func AddPanicRecovery

func AddPanicRecovery(job Job) Job

AddPanicRecovery wraps a job with panic recovery. If a panic occurs, it converts the panic into an error.

Example
package main

import (
	"context"
	"sync"

	"github.com/metalagman/appkit/logger"
	"github.com/metalagman/appkit/workerpool"
)

func main() {
	pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop())) // Nop logger to avoid output noise
	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	job := func(ctx context.Context) error {
		defer wg.Done()

		panic("oops")
	}

	pool.Submit(context.Background(), job)
	wg.Wait()
	// Panic is recovered by middleware and logged by the pool worker.

	pool.Stop(context.Background())
}

func AddPostRun

func AddPostRun(job Job, hook func(err error)) Job

AddPostRun wraps a job with a hook that is executed after the job completes or fails. The err parameter in the hook will contain the error returned by the job.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"github.com/metalagman/appkit/logger"
	"github.com/metalagman/appkit/workerpool"
)

func main() {
	pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))

	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	job := func(ctx context.Context) error {
		return errors.New("unrecoverable error")
	}
	job = workerpool.AddPostRun(job, func(err error) {
		if err != nil {
			fmt.Println(err.Error())

			wg.Done()
		}
	})

	pool.Submit(context.Background(), job)
	wg.Wait()
	pool.Stop(context.Background())

}
Output:
unrecoverable error

func AddRetry

func AddRetry(job Job, strategies ...strategy.Strategy) Job

AddRetry wraps a job with retry strategies.

See https://github.com/Rican7/retry for details.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"github.com/Rican7/retry/strategy"
	"github.com/metalagman/appkit/logger"
	"github.com/metalagman/appkit/workerpool"
)

func main() {
	pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))

	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	retryCount := 0

	job := func(ctx context.Context) error {
		retryCount++

		if retryCount < 5 {
			return errors.New("temporary error")
		}

		wg.Done()

		return nil
	}
	job = workerpool.AddRetry(job, strategy.Limit(5))

	pool.Submit(context.Background(), job)
	wg.Wait()
	pool.Stop(context.Background())

	fmt.Println(retryCount)
}
Output:
5

func AddTimeout

func AddTimeout(job Job, timeout time.Duration) Job

AddTimeout wraps a job with a timeout. The timeout will be set on the job context.

Example
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/metalagman/appkit/logger"
	"github.com/metalagman/appkit/workerpool"
)

func main() {
	pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))

	pool.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	job := func(ctx context.Context) error {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-time.After(200 * time.Millisecond):
			return nil
		}
	}
	// adding timeout to job
	job = workerpool.AddTimeout(job, time.Millisecond*10)
	// adding post run hook to job to output timeout error
	job = workerpool.AddPostRun(job, func(err error) {
		if err != nil {
			fmt.Println(err.Error())

			wg.Done()
		}
	})

	pool.Submit(context.Background(), job)
	wg.Wait()
	pool.Stop(context.Background())

}
Output:
context deadline exceeded

type Option

type Option func(o *Options)

func WithLogger

func WithLogger(opt logger.Logger) Option

WithLogger sets the logger used by the pool and its workers.

func WithNumWorkers

func WithNumWorkers(opt int) Option

WithNumWorkers sets the number of worker goroutines to start.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options contain configuration for the worker pool.

func NewOptions

func NewOptions(
	options ...Option,
) Options

func (*Options) Validate

func (o *Options) Validate() error

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a worker pool that manages a set of worker goroutines to execute jobs.

func New

func New(options ...Option) (*Pool, error)

New creates a new Pool with the provided options.

func (*Pool) Start

func (s *Pool) Start(_ context.Context) error

Start initializes the worker goroutines.

func (*Pool) Stop

func (s *Pool) Stop(ctx context.Context) error

Stop shuts down all workers and waits for them to complete their current jobs. It respects the provided context's deadline for the waiting phase.

func (*Pool) Submit

func (s *Pool) Submit(ctx context.Context, job Job) bool

Submit sends a job to the pool for execution with the given context. It returns false if the pool is not running.

Example
package main

import (
	"context"
	"fmt"
	"sync"

	"github.com/metalagman/appkit/logger"
	"github.com/metalagman/appkit/workerpool"
)

func main() {
	p, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))

	p.Start(context.Background())

	var wg sync.WaitGroup

	wg.Add(1)

	ctxKey := struct{}{}

	job := func(ctx context.Context) error {
		if v, ok := ctx.Value(ctxKey).(string); ok {
			fmt.Println(v)
		}

		wg.Done()

		return nil
	}

	// submitting with custom context directly
	ctx := context.WithValue(context.Background(), ctxKey, "from context")
	p.Submit(ctx, job)

	wg.Wait()
	p.Stop(context.Background())

}
Output:
from context

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL