workerpool

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package workerpool provides a reusable, high-performance worker pool for parallel task execution with dynamic scaling and graceful shutdown.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPoolClosed   = errors.New("workerpool: pool is closed")
	ErrPoolRunning  = errors.New("workerpool: pool is already running")
	ErrInvalidSize  = errors.New("workerpool: invalid pool size")
	ErrTaskPanic    = errors.New("workerpool: task panicked")
	ErrTaskCanceled = errors.New("workerpool: task canceled")
)

Common errors

Functions

func GetRow

func GetRow() domain.Row

GetRow gets a row from the global pool

func PutRow

func PutRow(row domain.Row)

PutRow returns a row to the global pool

Types

type Config

type Config struct {
	// Size is the number of workers in the pool
	Size int
	// QueueSize is the task queue buffer size (0 = unbuffered)
	QueueSize int
	// IdleTimeout is the duration after which idle workers are reduced
	IdleTimeout time.Duration
	// EnableDynamicScaling allows the pool to scale up/down based on load
	EnableDynamicScaling bool
	// MinWorkers is the minimum number of workers when dynamic scaling is enabled
	MinWorkers int
	// MaxWorkers is the maximum number of workers when dynamic scaling is enabled
	MaxWorkers int
}

Config holds worker pool configuration

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults

type MapPool

type MapPool[K comparable, V any] struct {
	// contains filtered or unexported fields
}

MapPool is a pool for maps

func NewMapPool

func NewMapPool[K comparable, V any](initialSize int) *MapPool[K, V]

NewMapPool creates a new map pool

func (*MapPool[K, V]) Get

func (mp *MapPool[K, V]) Get() map[K]V

Get retrieves a map from the pool

func (*MapPool[K, V]) Put

func (mp *MapPool[K, V]) Put(m map[K]V)

Put returns a map to the pool

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool represents a worker pool

func New

func New(config Config) (*Pool, error)

New creates a new worker pool with the given configuration

func NewWithSize

func NewWithSize(size int) (*Pool, error)

NewWithSize creates a new worker pool with a specific size

func (*Pool) Close

func (p *Pool) Close() error

Close gracefully shuts down the worker pool

func (*Pool) CloseWithTimeout

func (p *Pool) CloseWithTimeout(timeout time.Duration) error

CloseWithTimeout shuts down the pool with a timeout

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns true if the pool is closed

func (*Pool) IsRunning

func (p *Pool) IsRunning() bool

IsRunning returns true if the pool is running

func (*Pool) QueueLen

func (p *Pool) QueueLen() int

QueueLen returns the current queue length

func (*Pool) Results

func (p *Pool) Results() <-chan Result

Results returns the results channel

func (*Pool) Start

func (p *Pool) Start() error

Start starts the worker pool

func (*Pool) Stats

func (p *Pool) Stats() Stats

Stats returns current pool statistics

func (*Pool) Submit

func (p *Pool) Submit(ctx context.Context, task Task) (<-chan Result, error)

Submit submits a task to the pool and returns a channel for the result

func (*Pool) SubmitBatch

func (p *Pool) SubmitBatch(ctx context.Context, tasks []Task) (<-chan Result, error)

SubmitBatch submits multiple tasks and returns a channel for all results

func (*Pool) SubmitFunc

func (p *Pool) SubmitFunc(ctx context.Context, fn TaskFunc) (<-chan Result, error)

SubmitFunc submits a function that returns a value

func (*Pool) SubmitWait

func (p *Pool) SubmitWait(ctx context.Context, task Task) error

SubmitWait submits a task and waits for the result

func (*Pool) WorkerCount

func (p *Pool) WorkerCount() int

WorkerCount returns the current number of workers

type Result

type Result struct {
	Value interface{}
	Error error
}

Result represents the result of a task execution

type RowPool

type RowPool struct {
	// contains filtered or unexported fields
}

RowPool is a sync.Pool for reusing domain.Row objects (map[string]interface{})

func NewRowPool

func NewRowPool() *RowPool

NewRowPool creates a new row pool

func (*RowPool) Get

func (rp *RowPool) Get() domain.Row

Get retrieves a row from the pool, creating a new one if necessary

func (*RowPool) Put

func (rp *RowPool) Put(row domain.Row)

Put returns a row to the pool for reuse

func (*RowPool) Stats

func (rp *RowPool) Stats() RowPoolStats

Stats returns pool statistics

type RowPoolStats

type RowPoolStats struct {
	Allocations int64
	Reuses      int64
	Returns     int64
	ReuseRate   float64
}

RowPoolStats holds row pool statistics

type RowSlicePool

type RowSlicePool struct {
	// contains filtered or unexported fields
}

RowSlicePool is a pool for slices of rows

func NewRowSlicePool

func NewRowSlicePool(initialSize int) *RowSlicePool

NewRowSlicePool creates a pool for row slices

func (*RowSlicePool) Get

func (rsp *RowSlicePool) Get() *[]domain.Row

Get retrieves a row slice from the pool

func (*RowSlicePool) Put

func (rsp *RowSlicePool) Put(slice *[]domain.Row)

Put returns a row slice to the pool

type ScanFunc

type ScanFunc func(ctx context.Context, task ScanTask) (ScanResult, error)

ScanFunc is the function type for processing a scan task

type ScanPool

type ScanPool struct {
	// contains filtered or unexported fields
}

ScanPool is a specialized pool for parallel scanning operations

func NewScanPool

func NewScanPool(size int, scanFunc ScanFunc) (*ScanPool, error)

NewScanPool creates a new scan pool

func (*ScanPool) Close

func (sp *ScanPool) Close() error

Close closes the scan pool

func (*ScanPool) ExecuteParallel

func (sp *ScanPool) ExecuteParallel(ctx context.Context, tasks []ScanTask) ([]ScanResult, error)

ExecuteParallel executes scan tasks in parallel and collects results

func (*ScanPool) ExecuteParallelWithPool

func (sp *ScanPool) ExecuteParallelWithPool(ctx context.Context, tasks []ScanTask) ([]ScanResult, error)

ExecuteParallelWithPool uses the worker pool to execute tasks

func (*ScanPool) Start

func (sp *ScanPool) Start() error

Start starts the scan pool

func (*ScanPool) Stats

func (sp *ScanPool) Stats() Stats

Stats returns pool statistics

type ScanResult

type ScanResult struct {
	TaskID int
	Items  []interface{}
	Error  error
}

ScanResult represents the result of a scan task

type ScanTask

type ScanTask struct {
	ID         int
	StartIndex int
	EndIndex   int
	Data       interface{}
}

ScanTask represents a data scanning task with partition info

type SlicePool

type SlicePool[T any] struct {
	// contains filtered or unexported fields
}

SlicePool is a generic pool for slices

func NewSlicePool

func NewSlicePool[T any](initialSize int) *SlicePool[T]

NewSlicePool creates a new slice pool

func (*SlicePool[T]) Get

func (sp *SlicePool[T]) Get() *[]T

Get retrieves a slice from the pool

func (*SlicePool[T]) Put

func (sp *SlicePool[T]) Put(slice *[]T)

Put returns a slice to the pool

type Stats

type Stats struct {
	Workers       int
	TasksExecuted int64
	TasksFailed   int64
	QueueSize     int
	MaxQueueSize  int
	IsRunning     bool
	IsClosed      bool
}

Stats holds pool statistics

type Task

type Task func(ctx context.Context) error

Task represents a unit of work to be executed by the pool

type TaskFunc

type TaskFunc func(ctx context.Context) (interface{}, error)

TaskFunc is a function that produces a result

type ValuePool

type ValuePool[T any] struct {
	// contains filtered or unexported fields
}

ValuePool is a generic pool for any value type

func NewValuePool

func NewValuePool[T any](newFn func() T, resetFn func(T)) *ValuePool[T]

NewValuePool creates a new value pool

func (*ValuePool[T]) Get

func (vp *ValuePool[T]) Get() T

Get retrieves a value from the pool

func (*ValuePool[T]) Put

func (vp *ValuePool[T]) Put(v T)

Put returns a value to the pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL