Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// MaxWorkers is the maximum number of workers to spawn.
// Must be greater than 0. Default is 10.
MaxWorkers int
// QueueSize is the maximum number of jobs that can be queued.
// If 0, the queue is unbounded (not recommended for production).
// Default is 1000.
QueueSize int
// WorkerTimeout is the maximum time a worker will wait for a new job
// before checking if it should shut down. Default is 5 seconds.
WorkerTimeout time.Duration
// TaskTimeout is the maximum time allowed for a single task execution.
// If 0, no timeout is applied. Default is 0 (no timeout).
TaskTimeout time.Duration
// EnableMetrics determines whether to collect execution metrics.
// Default is true.
EnableMetrics bool
}
Config defines the configuration for a worker pool.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults.
type Job ¶
type Job struct {
Task Task
SubmitTime time.Time
StartTime time.Time
EndTime time.Time
Error error
ResultCh chan<- JobResult // Optional result channel
}
Job wraps a Task with additional metadata for internal tracking.
type JobResult ¶
type JobResult struct {
JobID string
Task Task
Error error
Duration time.Duration
StartTime time.Time
EndTime time.Time
}
JobResult contains the result of a job execution.
type Metrics ¶
type Metrics struct {
JobsSubmitted int64
JobsCompleted int64
JobsFailed int64
WorkersActive int32
TotalWorkers int32
AverageExecTime time.Duration
}
Metrics tracks worker pool statistics.
type Task ¶
type Task interface {
// Execute performs the actual work. It receives a context for cancellation
// and should return an error if the operation fails.
Execute(ctx context.Context) error
// ID returns a unique identifier for this task (optional, for logging/tracking)
ID() string
}
Task represents a unit of work that can be executed by a worker. Users implement this interface to define their custom operations.
type TaskFunc ¶
type TaskFunc struct {
// contains filtered or unexported fields
}
TaskFunc is a convenience type that allows functions to implement the Task interface.
func NewTaskFunc ¶
NewTaskFunc creates a new TaskFunc with the given ID and function.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of workers that execute tasks concurrently.
Example ¶
ExampleWorkerPool demonstrates basic usage of the worker pool.
// Create a worker pool configuration
config := DefaultConfig()
config.MaxWorkers = 3
config.QueueSize = 100
config.TaskTimeout = 30 * time.Second
// Create and start the worker pool
pool := New(config)
err := pool.Start()
if err != nil {
log.Fatalf("Failed to start worker pool: %v", err)
}
defer pool.Stop()
// Submit tasks using TaskFunc for simple operations
for i := 0; i < 5; i++ {
taskID := fmt.Sprintf("task-%d", i)
task := NewTaskFunc(taskID, func(ctx context.Context) error {
fmt.Printf("Executing %s\n", taskID)
time.Sleep(100 * time.Millisecond) // Simulate work
fmt.Printf("Completed %s\n", taskID)
return nil
})
err := pool.Submit(task)
if err != nil {
log.Printf("Failed to submit %s: %v", taskID, err)
}
}
// Wait for tasks to complete
time.Sleep(500 * time.Millisecond)
// Print metrics
metrics := pool.GetMetrics()
fmt.Printf("Jobs submitted: %d\n", metrics.JobsSubmitted)
fmt.Printf("Jobs completed: %d\n", metrics.JobsCompleted)
fmt.Printf("Jobs failed: %d\n", metrics.JobsFailed)
fmt.Printf("Average execution time: %v\n", metrics.AverageExecTime)
Example (CustomTasks) ¶
ExampleWorkerPool_customTasks demonstrates using custom task types.
config := DefaultConfig()
config.MaxWorkers = 2
config.QueueSize = 50
pool := New(config)
err := pool.Start()
if err != nil {
log.Fatalf("Failed to start worker pool: %v", err)
}
defer pool.Stop()
// Submit data processing tasks
for i := 0; i < 3; i++ {
data := make([]byte, (i+1)*100) // Different data sizes
task := NewDataProcessingTask(fmt.Sprintf("data-%d", i), data)
err := pool.Submit(task)
if err != nil {
log.Printf("Failed to submit data processing task: %v", err)
}
}
// Wait for processing to complete
time.Sleep(500 * time.Millisecond)
Example (WithResults) ¶
ExampleWorkerPool_withResults demonstrates using result channels.
config := DefaultConfig()
config.MaxWorkers = 2
pool := New(config)
err := pool.Start()
if err != nil {
log.Fatalf("Failed to start worker pool: %v", err)
}
defer pool.Stop()
// Create a result channel
results := make(chan JobResult, 5)
// Submit tasks with result reporting
for i := 0; i < 5; i++ {
taskID := fmt.Sprintf("result-task-%d", i)
task := NewTaskFunc(taskID, func(ctx context.Context) error {
time.Sleep(50 * time.Millisecond) // Simulate work
return nil
})
err := pool.SubmitWithResult(task, results)
if err != nil {
log.Printf("Failed to submit %s: %v", taskID, err)
}
}
// Collect results
CollectResults:
for i := 0; i < 5; i++ {
select {
case result := <-results:
if result.Error != nil {
fmt.Printf("Task %s failed: %v\n", result.JobID, result.Error)
} else {
fmt.Printf("Task %s completed in %v\n", result.JobID, result.Duration)
}
case <-time.After(2 * time.Second):
fmt.Printf("Timeout waiting for result %d\n", i)
break CollectResults
}
}
func New ¶
func New(config *Config) *WorkerPool
New creates a new WorkerPool with the given configuration.
func (*WorkerPool) GetMetrics ¶
func (wp *WorkerPool) GetMetrics() Metrics
GetMetrics returns a copy of the current metrics.
func (*WorkerPool) IsRunning ¶
func (wp *WorkerPool) IsRunning() bool
IsRunning returns true if the worker pool is currently running.
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start() error
Start initializes and starts all workers in the pool.
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop() error
Stop gracefully shuts down the worker pool. It waits for all running tasks to complete unless the context is cancelled.
func (*WorkerPool) Submit ¶
func (wp *WorkerPool) Submit(task Task) error
Submit adds a task to the worker pool for execution.
func (*WorkerPool) SubmitWithResult ¶
func (wp *WorkerPool) SubmitWithResult(task Task, resultCh chan<- JobResult) error
SubmitWithResult adds a task to the worker pool and optionally sends the result to the provided channel.