Documentation
¶
Overview ¶
Package workerpool provides a service for running small parts of code (called jobs) in the background.
Jobs can have contexts, timeouts, and retry strategies.
Example ¶
package main
import (
"context"
"fmt"
"sync"
"github.com/metalagman/appkit/workerpool"
)
func main() {
pool, _ := workerpool.New()
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
job := func(ctx context.Context) error {
fmt.Println("hello")
wg.Done()
return nil
}
pool.Submit(context.Background(), job)
wg.Wait()
pool.Stop(context.Background())
}
Output: hello
Example (AdvancedUsage) ¶
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/Rican7/retry/strategy"
"github.com/metalagman/appkit/workerpool"
)
func main() {
pool, _ := workerpool.New()
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
job := func(ctx context.Context) error {
fmt.Println("job executed")
wg.Done()
return nil
}
// add 3 seconds timeout for a job execution
job = workerpool.AddTimeout(job, time.Second*3)
// retry job execution within 5 attempts
job = workerpool.AddRetry(job, strategy.Limit(5))
pool.Submit(context.Background(), job)
wg.Wait()
pool.Stop(context.Background())
}
Output: job executed
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Job ¶
Job is a function that receives a context and is run asynchronously by the worker pool.
func AddLogger ¶
AddLogger replaces the logger in the job context with the specified one.
Example ¶
package main
import (
"context"
"os"
"sync"
"time"
"github.com/metalagman/appkit/logger"
"github.com/metalagman/appkit/workerpool"
"github.com/rs/zerolog"
)
func main() {
// mock time for test purposes
zerolog.TimestampFunc = func() time.Time {
t, _ := time.Parse("2006-01-02", "2021-01-01")
return t
}
zl := zerolog.New(os.Stdout).Level(zerolog.WarnLevel).With().Timestamp().Logger()
l := logger.NewZerolog(zl)
l1 := l.With("logger", "logger1")
l2 := l.With("logger", "logger2")
pool, _ := workerpool.New(workerpool.WithLogger(l))
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(2)
job1 := func(ctx context.Context) error {
logger.FromContext(ctx).Warnf("hello from job1")
wg.Done()
return nil
}
job2 := func(ctx context.Context) error {
logger.FromContext(ctx).Warnf("hello from job2")
wg.Done()
return nil
}
// adding custom jobs
job1 = workerpool.AddLogger(job1, l1)
job2 = workerpool.AddLogger(job2, l2)
pool.Submit(context.Background(), job1)
pool.Submit(context.Background(), job2)
wg.Wait()
pool.Stop(context.Background())
}
Output: {"level":"warn","logger":"logger1","time":"2021-01-01T00:00:00Z","message":"hello from job1"} {"level":"warn","logger":"logger2","time":"2021-01-01T00:00:00Z","message":"hello from job2"}
func AddPanicRecovery ¶
AddPanicRecovery wraps a job with panic recovery. If a panic occurs, it converts the panic into an error.
Example ¶
package main
import (
"context"
"sync"
"github.com/metalagman/appkit/logger"
"github.com/metalagman/appkit/workerpool"
)
func main() {
pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop())) // Nop logger to avoid output noise
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
job := func(ctx context.Context) error {
defer wg.Done()
panic("oops")
}
pool.Submit(context.Background(), job)
wg.Wait()
// Panic is recovered by middleware and logged by the pool worker.
pool.Stop(context.Background())
}
Output:
func AddPostRun ¶
AddPostRun wraps a job with a hook that is executed after the job completes or fails. The err parameter in the hook will contain the error returned by the job.
Example ¶
package main
import (
"context"
"errors"
"fmt"
"sync"
"github.com/metalagman/appkit/logger"
"github.com/metalagman/appkit/workerpool"
)
func main() {
pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
job := func(ctx context.Context) error {
return errors.New("unrecoverable error")
}
job = workerpool.AddPostRun(job, func(err error) {
if err != nil {
fmt.Println(err.Error())
wg.Done()
}
})
pool.Submit(context.Background(), job)
wg.Wait()
pool.Stop(context.Background())
}
Output: unrecoverable error
func AddRetry ¶
AddRetry wraps a job with retry strategies.
See https://github.com/Rican7/retry for details.
Example ¶
package main
import (
"context"
"errors"
"fmt"
"sync"
"github.com/Rican7/retry/strategy"
"github.com/metalagman/appkit/logger"
"github.com/metalagman/appkit/workerpool"
)
func main() {
pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
retryCount := 0
job := func(ctx context.Context) error {
retryCount++
if retryCount < 5 {
return errors.New("temporary error")
}
wg.Done()
return nil
}
job = workerpool.AddRetry(job, strategy.Limit(5))
pool.Submit(context.Background(), job)
wg.Wait()
pool.Stop(context.Background())
fmt.Println(retryCount)
}
Output: 5
func AddTimeout ¶
AddTimeout wraps a job with a timeout. The timeout will be set on the job context.
Example ¶
package main
import (
"context"
"fmt"
"sync"
"time"
"github.com/metalagman/appkit/logger"
"github.com/metalagman/appkit/workerpool"
)
func main() {
pool, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))
pool.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
job := func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(200 * time.Millisecond):
return nil
}
}
// adding timeout to job
job = workerpool.AddTimeout(job, time.Millisecond*10)
// adding post run hook to job to output timeout error
job = workerpool.AddPostRun(job, func(err error) {
if err != nil {
fmt.Println(err.Error())
wg.Done()
}
})
pool.Submit(context.Background(), job)
wg.Wait()
pool.Stop(context.Background())
}
Output: context deadline exceeded
type Option ¶
type Option func(o *Options)
func WithLogger ¶
WithLogger sets the logger used by the pool and its workers.
func WithNumWorkers ¶
WithNumWorkers sets the number of worker goroutines to start.
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options contain configuration for the worker pool.
func NewOptions ¶
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a worker pool that manages a set of worker goroutines to execute jobs.
func (*Pool) Stop ¶
Stop shuts down all workers and waits for them to complete their current jobs. It respects the provided context's deadline for the waiting phase.
func (*Pool) Submit ¶
Submit sends a job to the pool for execution with the given context. It returns false if the pool is not running.
Example ¶
package main
import (
"context"
"fmt"
"sync"
"github.com/metalagman/appkit/logger"
"github.com/metalagman/appkit/workerpool"
)
func main() {
p, _ := workerpool.New(workerpool.WithLogger(logger.NewNop()))
p.Start(context.Background())
var wg sync.WaitGroup
wg.Add(1)
ctxKey := struct{}{}
job := func(ctx context.Context) error {
if v, ok := ctx.Value(ctxKey).(string); ok {
fmt.Println(v)
}
wg.Done()
return nil
}
// submitting with custom context directly
ctx := context.WithValue(context.Background(), ctxKey, "from context")
p.Submit(ctx, job)
wg.Wait()
p.Stop(context.Background())
}
Output: from context