workerpool

package
v0.1.149 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package workerpool provides a dynamic pool of workers that can be used to process tasks concurrently. The pool automatically scales the number of workers based on load.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MockWorkerContext

type MockWorkerContext struct {
	ShouldStopFunc  func(int) bool
	SubmittedCount  int
	CompletedCount  int
	FailedCount     int
	SuccessfulCount int
}

MockWorkerContext is a mock implementation of WorkerContext for testing.

func (*MockWorkerContext) AddCompleted

func (m *MockWorkerContext) AddCompleted()

AddCompleted increments the completed counter.

func (*MockWorkerContext) AddFailed

func (m *MockWorkerContext) AddFailed()

AddFailed increments the failed counter.

func (*MockWorkerContext) AddSubmitted

func (m *MockWorkerContext) AddSubmitted()

AddSubmitted increments the submitted counter.

func (*MockWorkerContext) AddSuccessful

func (m *MockWorkerContext) AddSuccessful()

AddSuccessful increments the successful counter.

func (*MockWorkerContext) ShouldIStop

func (m *MockWorkerContext) ShouldIStop(queueLength int) bool

ShouldIStop returns true if the mock function says so.

type Pool

type Pool struct {
	G           *errgroup.Group
	MaxWorkers  int
	MinWorkers  int
	MaxIdleTime time.Duration
	Stats       *PoolStats
	// contains filtered or unexported fields
}

Pool represents a worker pool for processing tasks concurrently.

func NewPool

func NewPool(ctx context.Context, maxWorkers int, minWorkers int, maxIdleTime time.Duration) *Pool

NewPool creates and initializes a new worker pool with the specified minimum and maximum number of workers and maximum idle time.

func (*Pool) AddCompleted

func (p *Pool) AddCompleted()

AddCompleted increments the count of completed tasks and updates the last completion time.

func (*Pool) AddFailed

func (p *Pool) AddFailed()

AddFailed increments the count of failed tasks.

func (*Pool) AddSubmitted

func (p *Pool) AddSubmitted()

AddSubmitted increments the count of submitted tasks and updates the last submission time.

func (*Pool) AddSuccessful

func (p *Pool) AddSuccessful()

AddSuccessful increments the count of successful tasks.

func (*Pool) GetStats

func (p *Pool) GetStats() Stats

GetStats returns a snapshot of the current pool statistics.

func (*Pool) MonitorPool

func (p *Pool) MonitorPool(ctx context.Context, eg *errgroup.Group,
	poolFunc PoolFunc,
	dbRoPool, dbRwPool dbconnpool.ConnectionPool, queueLenFunc func() int,
) error

MonitorPool dynamically adjusts the number of workers based on the queue length and pool state. It runs in a separate goroutine and periodically checks whether to scale up the number of workers.

func (*Pool) ShouldIStop

func (p *Pool) ShouldIStop(queueLength int) bool

ShouldIStop determines if an individual worker should stop based on the pool's current state, such as the number of running workers, queue length, and idle time.

func (*Pool) StartWorkerPool

func (p *Pool) StartWorkerPool(
	poolFunc PoolFunc,
	dbRoPool, dbRwPool dbconnpool.ConnectionPool,
	queueLenFunc func() int,
)

StartWorkerPool initializes and starts the worker pool. It launches the initial set of workers and the pool monitor goroutine.

func (*Pool) TimeSinceLastCompletion

func (p *Pool) TimeSinceLastCompletion() time.Duration

TimeSinceLastCompletion returns the duration since the last task was completed.

type PoolFunc

type PoolFunc func(ctx context.Context,
	wc WorkerContext,
	dbRoPool, dbRwPool dbconnpool.ConnectionPool,
	queueLenFunc func() int,
	id int,
) error

PoolFunc is the function signature for worker functions. Using ConnectionPool interface instead of concrete type.

type PoolStats

type PoolStats struct {
	RunningWorkers  atomic.Int64  // Number of currently running worker goroutines.
	SubmittedTasks  atomic.Uint64 // Total number of tasks submitted to the pool.
	WaitingTasks    atomic.Uint64 // Number of tasks waiting in the queue.
	SuccessfulTasks atomic.Uint64 // Number of tasks that completed successfully.
	FailedTasks     atomic.Uint64 // Number of tasks that failed.
	CompletedTasks  atomic.Uint64 // Total number of tasks completed (success or failure).
	DroppedTasks    atomic.Uint64 // Number of tasks dropped due to a full queue in non-blocking mode.
	// contains filtered or unexported fields
}

PoolStats holds statistics about the worker pool. All fields are protected by atomic operations for lock-free access.

type PoolStatsSnapshot

type PoolStatsSnapshot struct {
	RunningWorkers  int64
	SubmittedTasks  uint64
	SuccessfulTasks uint64
	FailedTasks     uint64
	CompletedTasks  uint64
}

PoolStatsSnapshot provides read-only access to pool statistics.

type Stats

type Stats struct {
	RunningWorkers  int64
	SubmittedTasks  uint64
	WaitingTasks    uint64
	SuccessfulTasks uint64
	FailedTasks     uint64
	CompletedTasks  uint64
	DroppedTasks    uint64
	MaxWorkers      int
	MinWorkers      int
}

Stats holds a snapshot of pool statistics.

type WorkerContext

type WorkerContext interface {
	// ShouldIStop returns true if this worker should terminate based on
	// current pool state and queue length.
	ShouldIStop(queueLength int) bool

	// AddSubmitted increments the submitted task counter.
	AddSubmitted()

	// AddCompleted increments the completed task counter.
	AddCompleted()

	// AddFailed increments the failed task counter.
	AddFailed()

	// AddSuccessful increments the successful task counter.
	AddSuccessful()
}

WorkerContext provides the interface workers use to interact with the pool. This abstracts the Pool's methods that workers call during execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL