Documentation
¶
Overview ¶
Package workerpool provides a reusable, high-performance worker pool for parallel task execution with dynamic scaling and graceful shutdown.
Index ¶
- Variables
- func GetRow() domain.Row
- func PutRow(row domain.Row)
- type Config
- type MapPool
- type Pool
- func (p *Pool) Close() error
- func (p *Pool) CloseWithTimeout(timeout time.Duration) error
- func (p *Pool) IsClosed() bool
- func (p *Pool) IsRunning() bool
- func (p *Pool) QueueLen() int
- func (p *Pool) Results() <-chan Result
- func (p *Pool) Start() error
- func (p *Pool) Stats() Stats
- func (p *Pool) Submit(ctx context.Context, task Task) (<-chan Result, error)
- func (p *Pool) SubmitBatch(ctx context.Context, tasks []Task) (<-chan Result, error)
- func (p *Pool) SubmitFunc(ctx context.Context, fn TaskFunc) (<-chan Result, error)
- func (p *Pool) SubmitWait(ctx context.Context, task Task) error
- func (p *Pool) WorkerCount() int
- type Result
- type RowPool
- type RowPoolStats
- type RowSlicePool
- type ScanFunc
- type ScanPool
- func (sp *ScanPool) Close() error
- func (sp *ScanPool) ExecuteParallel(ctx context.Context, tasks []ScanTask) ([]ScanResult, error)
- func (sp *ScanPool) ExecuteParallelWithPool(ctx context.Context, tasks []ScanTask) ([]ScanResult, error)
- func (sp *ScanPool) Start() error
- func (sp *ScanPool) Stats() Stats
- type ScanResult
- type ScanTask
- type SlicePool
- type Stats
- type Task
- type TaskFunc
- type ValuePool
Constants ¶
This section is empty.
Variables ¶
var ( ErrPoolClosed = errors.New("workerpool: pool is closed") ErrPoolRunning = errors.New("workerpool: pool is already running") ErrInvalidSize = errors.New("workerpool: invalid pool size") ErrTaskPanic = errors.New("workerpool: task panicked") ErrTaskCanceled = errors.New("workerpool: task canceled") )
Common errors
Functions ¶
Types ¶
type Config ¶
type Config struct {
// Size is the number of workers in the pool
Size int
// QueueSize is the task queue buffer size (0 = unbuffered)
QueueSize int
// IdleTimeout is the duration after which idle workers are reduced
IdleTimeout time.Duration
// EnableDynamicScaling allows the pool to scale up/down based on load
EnableDynamicScaling bool
// MinWorkers is the minimum number of workers when dynamic scaling is enabled
MinWorkers int
// MaxWorkers is the maximum number of workers when dynamic scaling is enabled
MaxWorkers int
}
Config holds worker pool configuration
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults
type MapPool ¶
type MapPool[K comparable, V any] struct { // contains filtered or unexported fields }
MapPool is a pool for maps
func NewMapPool ¶
func NewMapPool[K comparable, V any](initialSize int) *MapPool[K, V]
NewMapPool creates a new map pool
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool represents a worker pool
func NewWithSize ¶
NewWithSize creates a new worker pool with a specific size
func (*Pool) CloseWithTimeout ¶
CloseWithTimeout shuts down the pool with a timeout
func (*Pool) SubmitBatch ¶
SubmitBatch submits multiple tasks and returns a channel for all results
func (*Pool) SubmitFunc ¶
SubmitFunc submits a function that returns a value
func (*Pool) SubmitWait ¶
SubmitWait submits a task and waits for the result
func (*Pool) WorkerCount ¶
WorkerCount returns the current number of workers
type Result ¶
type Result struct {
Value interface{}
Error error
}
Result represents the result of a task execution
type RowPool ¶
type RowPool struct {
// contains filtered or unexported fields
}
RowPool is a sync.Pool for reusing domain.Row objects (map[string]interface{})
type RowPoolStats ¶
RowPoolStats holds row pool statistics
type RowSlicePool ¶
type RowSlicePool struct {
// contains filtered or unexported fields
}
RowSlicePool is a pool for slices of rows
func NewRowSlicePool ¶
func NewRowSlicePool(initialSize int) *RowSlicePool
NewRowSlicePool creates a pool for row slices
func (*RowSlicePool) Get ¶
func (rsp *RowSlicePool) Get() *[]domain.Row
Get retrieves a row slice from the pool
func (*RowSlicePool) Put ¶
func (rsp *RowSlicePool) Put(slice *[]domain.Row)
Put returns a row slice to the pool
type ScanFunc ¶
type ScanFunc func(ctx context.Context, task ScanTask) (ScanResult, error)
ScanFunc is the function type for processing a scan task
type ScanPool ¶
type ScanPool struct {
// contains filtered or unexported fields
}
ScanPool is a specialized pool for parallel scanning operations
func NewScanPool ¶
NewScanPool creates a new scan pool
func (*ScanPool) ExecuteParallel ¶
ExecuteParallel executes scan tasks in parallel and collects results
func (*ScanPool) ExecuteParallelWithPool ¶
func (sp *ScanPool) ExecuteParallelWithPool(ctx context.Context, tasks []ScanTask) ([]ScanResult, error)
ExecuteParallelWithPool uses the worker pool to execute tasks
type ScanResult ¶
ScanResult represents the result of a scan task
type SlicePool ¶
type SlicePool[T any] struct { // contains filtered or unexported fields }
SlicePool is a generic pool for slices
func NewSlicePool ¶
NewSlicePool creates a new slice pool
type Stats ¶
type Stats struct {
Workers int
TasksExecuted int64
TasksFailed int64
QueueSize int
MaxQueueSize int
IsRunning bool
IsClosed bool
}
Stats holds pool statistics
type ValuePool ¶
type ValuePool[T any] struct { // contains filtered or unexported fields }
ValuePool is a generic pool for any value type
func NewValuePool ¶
NewValuePool creates a new value pool