Documentation
¶
Overview ¶
Example ¶
Example demonstrates basic usage of the worker pool.
package main
import (
"context"
"fmt"
"time"
"github.com/grhili/cd-operator/pkg/workerpool"
)
func main() {
// Define work processing function.
workerFunc := func(ctx context.Context, work workerpool.Work) error {
item := work.(string)
fmt.Printf("Processing: %s\n", item)
return nil
}
// Create pool with 3 workers.
pool := workerpool.New(3, workerFunc)
ctx := context.Background()
// Start the pool.
if err := pool.Start(ctx); err != nil {
panic(err)
}
// Submit work items.
items := []string{"task1", "task2", "task3"}
for _, item := range items {
if err := pool.Submit(ctx, item); err != nil {
panic(err)
}
}
// Graceful shutdown with timeout.
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := pool.Shutdown(shutdownCtx); err != nil {
panic(err)
}
fmt.Println("All work completed")
}
Output:
Example (Backpressure) ¶
Example_backpressure demonstrates handling backpressure when the queue is full.
package main
import (
"context"
"fmt"
"time"
"github.com/grhili/cd-operator/pkg/workerpool"
)
func main() {
// Slow worker to create backpressure.
workerFunc := func(ctx context.Context, work workerpool.Work) error {
time.Sleep(200 * time.Millisecond)
return nil
}
// Small pool with limited buffer.
pool := workerpool.New(1, workerFunc)
ctx := context.Background()
if err := pool.Start(ctx); err != nil {
panic(err)
}
// Fill buffer (size=1, buffer=2*size=2, so total capacity=3).
// First submit starts processing, next 2 go in buffer, 4th will block.
for i := 0; i < 4; i++ {
submitCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
err := pool.Submit(submitCtx, i)
cancel()
if err != nil {
fmt.Printf("Submit %d: blocked\n", i)
} else {
fmt.Printf("Submit %d: queued\n", i)
}
}
shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
if err := pool.Shutdown(shutdownCtx); err != nil {
panic(err)
}
}
Output: Submit 0: queued Submit 1: queued Submit 2: queued Submit 3: blocked
Example (ContextCancellation) ¶
Example_contextCancellation demonstrates context cancellation.
package main
import (
"context"
"fmt"
"time"
"github.com/grhili/cd-operator/pkg/workerpool"
)
func main() {
workerFunc := func(ctx context.Context, work workerpool.Work) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
fmt.Printf("Processed: %v\n", work)
return nil
}
}
pool := workerpool.New(2, workerFunc)
// Create cancellable context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := pool.Start(ctx); err != nil {
panic(err)
}
// Submit some work.
_ = pool.Submit(ctx, "item1")
// Cancel context to stop workers.
cancel()
// Wait for graceful shutdown.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), time.Second)
defer shutdownCancel()
_ = pool.Shutdown(shutdownCtx)
fmt.Println("Pool stopped")
}
Output:
Example (GracefulShutdown) ¶
Example_gracefulShutdown demonstrates proper shutdown handling.
package main
import (
"context"
"fmt"
"time"
"github.com/grhili/cd-operator/pkg/workerpool"
)
func main() {
completed := make(chan int, 5)
workerFunc := func(ctx context.Context, work workerpool.Work) error {
id := work.(int)
time.Sleep(50 * time.Millisecond)
completed <- id
return nil
}
pool := workerpool.New(2, workerFunc)
ctx := context.Background()
if err := pool.Start(ctx); err != nil {
panic(err)
}
// Submit work.
for i := 0; i < 5; i++ {
if err := pool.Submit(ctx, i); err != nil {
panic(err)
}
}
// Shutdown waits for all in-flight work to complete.
shutdownCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
if err := pool.Shutdown(shutdownCtx); err != nil {
panic(err)
}
close(completed)
// All work completes before shutdown returns.
count := 0
for range completed {
count++
}
fmt.Printf("Completed %d tasks before shutdown\n", count)
}
Output: Completed 5 tasks before shutdown
Example (RealWorld) ¶
Example_realWorld demonstrates a realistic use case processing tasks with error handling and metrics.
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/grhili/cd-operator/pkg/workerpool"
)
// Task represents a unit of work with metadata.
type Task struct {
ID int
Payload string
Priority int
}
func main() {
var (
processed atomic.Int64
failed atomic.Int64
)
// Worker function with error handling and metrics.
workerFunc := func(ctx context.Context, work workerpool.Work) error {
task := work.(Task)
// Process task.
if task.ID%10 == 9 {
// Simulate occasional failure.
failed.Add(1)
return fmt.Errorf("task %d failed", task.ID)
}
processed.Add(1)
return nil
}
// Create pool with optimal worker count.
pool := workerpool.New(3, workerFunc)
ctx := context.Background()
// Start pool.
if err := pool.Start(ctx); err != nil {
panic(err)
}
// Submit tasks.
for i := 0; i < 5; i++ {
task := Task{
ID: i,
Payload: fmt.Sprintf("data-%d", i),
Priority: i % 3,
}
if err := pool.Submit(ctx, task); err != nil {
fmt.Printf("Failed to submit task %d: %v\n", i, err)
}
}
// Graceful shutdown with generous timeout.
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := pool.Shutdown(shutdownCtx); err != nil {
fmt.Printf("Shutdown error: %v\n", err)
}
// Report metrics.
fmt.Printf("Processed: %d, Failed: %d\n", processed.Load(), failed.Load())
}
Output: Processed: 5, Failed: 0
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrPoolNotStarted is returned when Submit is called before Start. ErrPoolNotStarted = errors.New("pool not started") // ErrPoolShutdown is returned when Submit is called after Shutdown. ErrPoolShutdown = errors.New("pool is shut down") )
Functions ¶
This section is empty.
Types ¶
type Pool ¶
type Pool interface {
// Start spawns the worker goroutines and begins processing work.
// It must be called before Submit. The pool will run until the context
// is cancelled or Shutdown is called.
Start(ctx context.Context) error
// Submit adds work to the pool's queue for processing.
// It blocks if the queue is full until space becomes available or the context is cancelled.
// Returns an error if the context is cancelled or the pool is shut down.
Submit(ctx context.Context, work Work) error
// Shutdown initiates graceful shutdown of the pool.
// It stops accepting new work, waits for in-flight work to complete,
// and returns when all workers have finished or the context times out.
Shutdown(ctx context.Context) error
}
Pool manages a fixed number of worker goroutines that process work items concurrently. It provides bounded parallelism and graceful shutdown capabilities.
func New ¶
func New(size int, workerFunc WorkerFunc) Pool
New creates a new worker pool with the specified number of workers. The size parameter determines the number of concurrent workers. The workerFunc is called by each worker to process work items. The work channel is buffered to size * 2 to allow some queueing.
type Work ¶
type Work interface{}
Work represents a unit of work to be processed by the worker pool. Implementations should be lightweight and contain only the data needed to perform the work. The actual processing logic is provided via WorkerFunc.
type WorkerFunc ¶
WorkerFunc defines the function signature for processing work items. It receives a context for cancellation and the work item to process. The context will be cancelled if the pool is shutting down or if the parent context is cancelled.