Documentation
¶
Overview ¶
Package workerpool provides a dynamic pool of workers that can be used to process tasks concurrently. The pool automatically scales the number of workers based on load.
Index ¶
- type MockWorkerContext
- type Pool
- func (p *Pool) AddCompleted()
- func (p *Pool) AddFailed()
- func (p *Pool) AddSubmitted()
- func (p *Pool) AddSuccessful()
- func (p *Pool) GetStats() Stats
- func (p *Pool) MonitorPool(ctx context.Context, eg *errgroup.Group, poolFunc PoolFunc, ...) error
- func (p *Pool) ShouldIStop(queueLength int) bool
- func (p *Pool) StartWorkerPool(poolFunc PoolFunc, dbRoPool, dbRwPool dbconnpool.ConnectionPool, ...)
- func (p *Pool) TimeSinceLastCompletion() time.Duration
- type PoolFunc
- type PoolStats
- type PoolStatsSnapshot
- type Stats
- type WorkerContext
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MockWorkerContext ¶
type MockWorkerContext struct {
ShouldStopFunc func(int) bool
SubmittedCount int
CompletedCount int
FailedCount int
SuccessfulCount int
}
MockWorkerContext is a mock implementation of WorkerContext for testing.
func (*MockWorkerContext) AddCompleted ¶
func (m *MockWorkerContext) AddCompleted()
AddCompleted increments the completed counter.
func (*MockWorkerContext) AddFailed ¶
func (m *MockWorkerContext) AddFailed()
AddFailed increments the failed counter.
func (*MockWorkerContext) AddSubmitted ¶
func (m *MockWorkerContext) AddSubmitted()
AddSubmitted increments the submitted counter.
func (*MockWorkerContext) AddSuccessful ¶
func (m *MockWorkerContext) AddSuccessful()
AddSuccessful increments the successful counter.
func (*MockWorkerContext) ShouldIStop ¶
func (m *MockWorkerContext) ShouldIStop(queueLength int) bool
ShouldIStop returns true if the mock function says so.
type Pool ¶
type Pool struct {
G *errgroup.Group
MaxWorkers int
MinWorkers int
MaxIdleTime time.Duration
Stats *PoolStats
// contains filtered or unexported fields
}
Pool represents a worker pool for processing tasks concurrently.
func NewPool ¶
NewPool creates and initializes a new worker pool with the specified minimum and maximum number of workers and maximum idle time.
func (*Pool) AddCompleted ¶
func (p *Pool) AddCompleted()
AddCompleted increments the count of completed tasks and updates the last completion time.
func (*Pool) AddSubmitted ¶
func (p *Pool) AddSubmitted()
AddSubmitted increments the count of submitted tasks and updates the last submission time.
func (*Pool) AddSuccessful ¶
func (p *Pool) AddSuccessful()
AddSuccessful increments the count of successful tasks.
func (*Pool) MonitorPool ¶
func (p *Pool) MonitorPool(ctx context.Context, eg *errgroup.Group, poolFunc PoolFunc, dbRoPool, dbRwPool dbconnpool.ConnectionPool, queueLenFunc func() int, ) error
MonitorPool dynamically adjusts the number of workers based on the queue length and pool state. It runs in a separate goroutine and periodically checks whether to scale up the number of workers.
func (*Pool) ShouldIStop ¶
ShouldIStop determines if an individual worker should stop based on the pool's current state, such as the number of running workers, queue length, and idle time.
func (*Pool) StartWorkerPool ¶
func (p *Pool) StartWorkerPool( poolFunc PoolFunc, dbRoPool, dbRwPool dbconnpool.ConnectionPool, queueLenFunc func() int, )
StartWorkerPool initializes and starts the worker pool. It launches the initial set of workers and the pool monitor goroutine.
func (*Pool) TimeSinceLastCompletion ¶
TimeSinceLastCompletion returns the duration since the last task was completed.
type PoolFunc ¶
type PoolFunc func(ctx context.Context, wc WorkerContext, dbRoPool, dbRwPool dbconnpool.ConnectionPool, queueLenFunc func() int, id int, ) error
PoolFunc is the function signature for worker functions. Using ConnectionPool interface instead of concrete type.
type PoolStats ¶
type PoolStats struct {
RunningWorkers atomic.Int64 // Number of currently running worker goroutines.
SubmittedTasks atomic.Uint64 // Total number of tasks submitted to the pool.
WaitingTasks atomic.Uint64 // Number of tasks waiting in the queue.
SuccessfulTasks atomic.Uint64 // Number of tasks that completed successfully.
FailedTasks atomic.Uint64 // Number of tasks that failed.
CompletedTasks atomic.Uint64 // Total number of tasks completed (success or failure).
DroppedTasks atomic.Uint64 // Number of tasks dropped due to a full queue in non-blocking mode.
// contains filtered or unexported fields
}
PoolStats holds statistics about the worker pool. All fields are protected by atomic operations for lock-free access.
type PoolStatsSnapshot ¶
type PoolStatsSnapshot struct {
RunningWorkers int64
SubmittedTasks uint64
SuccessfulTasks uint64
FailedTasks uint64
CompletedTasks uint64
}
PoolStatsSnapshot provides read-only access to pool statistics.
type Stats ¶
type Stats struct {
RunningWorkers int64
SubmittedTasks uint64
WaitingTasks uint64
SuccessfulTasks uint64
FailedTasks uint64
CompletedTasks uint64
DroppedTasks uint64
MaxWorkers int
MinWorkers int
}
Stats holds a snapshot of pool statistics.
type WorkerContext ¶
type WorkerContext interface {
// ShouldIStop returns true if this worker should terminate based on
// current pool state and queue length.
ShouldIStop(queueLength int) bool
// AddSubmitted increments the submitted task counter.
AddSubmitted()
// AddCompleted increments the completed task counter.
AddCompleted()
// AddFailed increments the failed task counter.
AddFailed()
// AddSuccessful increments the successful task counter.
AddSuccessful()
}
WorkerContext provides the interface workers use to interact with the pool. This abstracts the Pool's methods that workers call during execution.