workerpool

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// MaxWorkers is the maximum number of workers to spawn.
	// Must be greater than 0. Default is 10.
	MaxWorkers int

	// QueueSize is the maximum number of jobs that can be queued.
	// If 0, the queue is unbounded (not recommended for production).
	// Default is 1000.
	QueueSize int

	// WorkerTimeout is the maximum time a worker will wait for a new job
	// before checking if it should shut down. Default is 5 seconds.
	WorkerTimeout time.Duration

	// TaskTimeout is the maximum time allowed for a single task execution.
	// If 0, no timeout is applied. Default is 0 (no timeout).
	TaskTimeout time.Duration

	// EnableMetrics determines whether to collect execution metrics.
	// Default is true.
	EnableMetrics bool
}

Config defines the configuration for a worker pool.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid and applies defaults where needed.

type Job

type Job struct {
	Task       Task
	SubmitTime time.Time
	StartTime  time.Time
	EndTime    time.Time
	Error      error
	ResultCh   chan<- JobResult // Optional result channel
}

Job wraps a Task with additional metadata for internal tracking.

type JobResult

type JobResult struct {
	JobID     string
	Task      Task
	Error     error
	Duration  time.Duration
	StartTime time.Time
	EndTime   time.Time
}

JobResult contains the result of a job execution.

type Metrics

type Metrics struct {
	JobsSubmitted   int64
	JobsCompleted   int64
	JobsFailed      int64
	WorkersActive   int32
	TotalWorkers    int32
	AverageExecTime time.Duration
}

Metrics tracks worker pool statistics.

type Task

type Task interface {
	// Execute performs the actual work. It receives a context for cancellation
	// and should return an error if the operation fails.
	Execute(ctx context.Context) error

	// ID returns a unique identifier for this task (optional, for logging/tracking)
	ID() string
}

Task represents a unit of work that can be executed by a worker. Users implement this interface to define their custom operations.

type TaskFunc

type TaskFunc struct {
	// contains filtered or unexported fields
}

TaskFunc is a convenience type that allows functions to implement the Task interface.

func NewTaskFunc

func NewTaskFunc(id string, fn func(ctx context.Context) error) *TaskFunc

NewTaskFunc creates a new TaskFunc with the given ID and function.

func (*TaskFunc) Execute

func (tf *TaskFunc) Execute(ctx context.Context) error

Execute implements the Task interface.

func (*TaskFunc) ID

func (tf *TaskFunc) ID() string

ID implements the Task interface.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of workers that execute tasks concurrently.

Example

ExampleWorkerPool demonstrates basic usage of the worker pool.

// Create a worker pool configuration
config := DefaultConfig()
config.MaxWorkers = 3
config.QueueSize = 100
config.TaskTimeout = 30 * time.Second

// Create and start the worker pool
pool := New(config)
err := pool.Start()
if err != nil {
	log.Fatalf("Failed to start worker pool: %v", err)
}
defer pool.Stop()

// Submit tasks using TaskFunc for simple operations
for i := 0; i < 5; i++ {
	taskID := fmt.Sprintf("task-%d", i)
	task := NewTaskFunc(taskID, func(ctx context.Context) error {
		fmt.Printf("Executing %s\n", taskID)
		time.Sleep(100 * time.Millisecond) // Simulate work
		fmt.Printf("Completed %s\n", taskID)
		return nil
	})

	err := pool.Submit(task)
	if err != nil {
		log.Printf("Failed to submit %s: %v", taskID, err)
	}
}

// Wait for tasks to complete
time.Sleep(500 * time.Millisecond)

// Print metrics
metrics := pool.GetMetrics()
fmt.Printf("Jobs submitted: %d\n", metrics.JobsSubmitted)
fmt.Printf("Jobs completed: %d\n", metrics.JobsCompleted)
fmt.Printf("Jobs failed: %d\n", metrics.JobsFailed)
fmt.Printf("Average execution time: %v\n", metrics.AverageExecTime)
Example (CustomTasks)

ExampleWorkerPool_customTasks demonstrates using custom task types.

config := DefaultConfig()
config.MaxWorkers = 2
config.QueueSize = 50

pool := New(config)
err := pool.Start()
if err != nil {
	log.Fatalf("Failed to start worker pool: %v", err)
}
defer pool.Stop()

// Submit data processing tasks
for i := 0; i < 3; i++ {
	data := make([]byte, (i+1)*100) // Different data sizes
	task := NewDataProcessingTask(fmt.Sprintf("data-%d", i), data)

	err := pool.Submit(task)
	if err != nil {
		log.Printf("Failed to submit data processing task: %v", err)
	}
}

// Wait for processing to complete
time.Sleep(500 * time.Millisecond)
Example (WithResults)

ExampleWorkerPool_withResults demonstrates using result channels.

config := DefaultConfig()
config.MaxWorkers = 2

pool := New(config)
err := pool.Start()
if err != nil {
	log.Fatalf("Failed to start worker pool: %v", err)
}
defer pool.Stop()

// Create a result channel
results := make(chan JobResult, 5)

// Submit tasks with result reporting
for i := 0; i < 5; i++ {
	taskID := fmt.Sprintf("result-task-%d", i)
	task := NewTaskFunc(taskID, func(ctx context.Context) error {
		time.Sleep(50 * time.Millisecond) // Simulate work
		return nil
	})

	err := pool.SubmitWithResult(task, results)
	if err != nil {
		log.Printf("Failed to submit %s: %v", taskID, err)
	}
}

// Collect results
CollectResults:
for i := 0; i < 5; i++ {
	select {
	case result := <-results:
		if result.Error != nil {
			fmt.Printf("Task %s failed: %v\n", result.JobID, result.Error)
		} else {
			fmt.Printf("Task %s completed in %v\n", result.JobID, result.Duration)
		}
	case <-time.After(2 * time.Second):
		fmt.Printf("Timeout waiting for result %d\n", i)
		break CollectResults
	}
}

func New

func New(config *Config) *WorkerPool

New creates a new WorkerPool with the given configuration.

func (*WorkerPool) GetMetrics

func (wp *WorkerPool) GetMetrics() Metrics

GetMetrics returns a copy of the current metrics.

func (*WorkerPool) IsRunning

func (wp *WorkerPool) IsRunning() bool

IsRunning returns true if the worker pool is currently running.

func (*WorkerPool) Start

func (wp *WorkerPool) Start() error

Start initializes and starts all workers in the pool.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop() error

Stop gracefully shuts down the worker pool. It waits for all running tasks to complete unless the context is cancelled.

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(task Task) error

Submit adds a task to the worker pool for execution.

func (*WorkerPool) SubmitWithResult

func (wp *WorkerPool) SubmitWithResult(task Task, resultCh chan<- JobResult) error

SubmitWithResult adds a task to the worker pool and optionally sends the result to the provided channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL