Documentation
¶
Overview ¶
Package parallel provides parallel processing infrastructure for DataFrame operations.
This package implements worker pools and parallel execution strategies for DataFrame operations that exceed the parallelization threshold. It provides both generic parallel processing and specialized order-preserving variants.
Key features:
- Adaptive worker pool sizing based on CPU count and data size
- Fan-out/fan-in patterns for parallel execution
- Thread-safe operations with independent data copies
- Memory-efficient chunking for large datasets
- Order-preserving parallel operations when needed
The package automatically activates for DataFrames with 1000+ rows and uses runtime.NumCPU() as the default worker count, with dynamic adjustment based on workload characteristics.
Index ¶
- func Process[T, R any](wp *WorkerPool, items []T, worker func(T) R) []R
- func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R
- func ProcessIndexed[T, R any](wp *WorkerPool, items []T, worker func(int, T) R) []R
- type AdvancedMemoryPool
- type AdvancedWorkerPool
- func (pool *AdvancedWorkerPool) Close()
- func (pool *AdvancedWorkerPool) CurrentWorkerCount() int
- func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics
- func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}
- func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int
- type AdvancedWorkerPoolConfig
- type AllocatorPool
- func (p *AllocatorPool) ActiveCount() int64
- func (p *AllocatorPool) Close()
- func (p *AllocatorPool) Get() memory.Allocator
- func (p *AllocatorPool) GetStats() PoolStats
- func (p *AllocatorPool) PeakAllocated() int64
- func (p *AllocatorPool) Put(alloc memory.Allocator)
- func (p *AllocatorPool) RecordAllocation(bytes int64)
- func (p *AllocatorPool) RecordDeallocation(bytes int64)
- func (p *AllocatorPool) TotalAllocated() int64
- type BackpressurePolicy
- type ChunkProcessor
- type MemoryMonitor
- type MonitoredAllocator
- type PoolMetrics
- type PoolStats
- type PriorityItem
- type PriorityQueue
- type PriorityTask
- type ResourceLimits
- type SafeDataFrame
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Process ¶
func Process[T, R any]( wp *WorkerPool, items []T, worker func(T) R, ) []R
Process executes work items in parallel using fan-out/fan-in pattern
func ProcessGeneric ¶
func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R
ProcessGeneric executes work items using the advanced worker pool with generic types
func ProcessIndexed ¶
func ProcessIndexed[T, R any]( wp *WorkerPool, items []T, worker func(int, T) R, ) []R
ProcessIndexed executes work items in parallel while preserving order
Types ¶
type AdvancedMemoryPool ¶
type AdvancedMemoryPool struct {
// contains filtered or unexported fields
}
AdvancedMemoryPool provides enhanced memory pool management with adaptive sizing
func NewAdvancedMemoryPool ¶
func NewAdvancedMemoryPool(maxSize int, memoryThreshold int64, adaptiveSize bool) *AdvancedMemoryPool
NewAdvancedMemoryPool creates a new advanced memory pool
func (*AdvancedMemoryPool) Close ¶
func (amp *AdvancedMemoryPool) Close()
Close closes the advanced memory pool
func (*AdvancedMemoryPool) GetAllocator ¶
func (amp *AdvancedMemoryPool) GetAllocator() memory.Allocator
GetAllocator gets an allocator from the pool with memory monitoring
func (*AdvancedMemoryPool) GetStats ¶
func (amp *AdvancedMemoryPool) GetStats() (PoolStats, *MemoryMonitor)
GetStats returns comprehensive pool statistics
func (*AdvancedMemoryPool) PutAllocator ¶
func (amp *AdvancedMemoryPool) PutAllocator(alloc memory.Allocator)
PutAllocator returns an allocator to the pool
type AdvancedWorkerPool ¶
type AdvancedWorkerPool struct {
// contains filtered or unexported fields
}
AdvancedWorkerPool provides dynamic scaling, work stealing, and resource management
func NewAdvancedWorkerPool ¶
func NewAdvancedWorkerPool(config AdvancedWorkerPoolConfig) *AdvancedWorkerPool
NewAdvancedWorkerPool creates a new advanced worker pool with the specified configuration
func (*AdvancedWorkerPool) Close ¶
func (pool *AdvancedWorkerPool) Close()
Close gracefully shuts down the worker pool
func (*AdvancedWorkerPool) CurrentWorkerCount ¶
func (pool *AdvancedWorkerPool) CurrentWorkerCount() int
CurrentWorkerCount returns the current number of workers
func (*AdvancedWorkerPool) GetMetrics ¶
func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics
GetMetrics returns the current pool metrics
func (*AdvancedWorkerPool) Process ¶
func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}
Process executes work items using the advanced worker pool
func (*AdvancedWorkerPool) ProcessWithPriority ¶
func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int
ProcessWithPriority executes priority tasks
type AdvancedWorkerPoolConfig ¶
type AdvancedWorkerPoolConfig struct {
MinWorkers int
MaxWorkers int
WorkQueueSize int
ScaleThreshold float64
EnableWorkStealing bool
EnableMetrics bool
EnablePriority bool
MemoryMonitor *MemoryMonitor
ResourceLimits ResourceLimits
BackpressurePolicy BackpressurePolicy
}
AdvancedWorkerPoolConfig provides configuration for the advanced worker pool
type AllocatorPool ¶
type AllocatorPool struct {
// contains filtered or unexported fields
}
AllocatorPool manages a pool of memory allocators for safe reuse in parallel processing
func NewAllocatorPool ¶
func NewAllocatorPool(maxSize int) *AllocatorPool
NewAllocatorPool creates a new allocator pool with the specified maximum size
func (*AllocatorPool) ActiveCount ¶
func (p *AllocatorPool) ActiveCount() int64
ActiveCount returns the number of allocators currently in use
func (*AllocatorPool) Get ¶
func (p *AllocatorPool) Get() memory.Allocator
Get retrieves an allocator from the pool
func (*AllocatorPool) GetStats ¶
func (p *AllocatorPool) GetStats() PoolStats
GetStats returns current pool statistics
func (*AllocatorPool) PeakAllocated ¶
func (p *AllocatorPool) PeakAllocated() int64
PeakAllocated returns the peak memory allocated by the pool
func (*AllocatorPool) Put ¶
func (p *AllocatorPool) Put(alloc memory.Allocator)
Put returns an allocator to the pool for reuse
func (*AllocatorPool) RecordAllocation ¶
func (p *AllocatorPool) RecordAllocation(bytes int64)
RecordAllocation records memory allocation in the pool
func (*AllocatorPool) RecordDeallocation ¶
func (p *AllocatorPool) RecordDeallocation(bytes int64)
RecordDeallocation records memory deallocation in the pool
func (*AllocatorPool) TotalAllocated ¶
func (p *AllocatorPool) TotalAllocated() int64
TotalAllocated returns the total memory allocated by the pool
type BackpressurePolicy ¶
type BackpressurePolicy int
const ( BackpressureBlock BackpressurePolicy = iota BackpressureDrop BackpressureSpill )
type ChunkProcessor ¶
type ChunkProcessor struct {
// contains filtered or unexported fields
}
ChunkProcessor provides isolated processing context for parallel chunks
func NewChunkProcessor ¶
func NewChunkProcessor(pool *AllocatorPool, chunkID int) *ChunkProcessor
NewChunkProcessor creates a new chunk processor with isolated memory context
func (*ChunkProcessor) ChunkID ¶
func (cp *ChunkProcessor) ChunkID() int
ChunkID returns the unique ID for this chunk
func (*ChunkProcessor) GetAllocator ¶
func (cp *ChunkProcessor) GetAllocator() memory.Allocator
GetAllocator returns the allocator for this chunk processor
func (*ChunkProcessor) Release ¶
func (cp *ChunkProcessor) Release()
Release returns the allocator to the pool and marks this processor as released
type MemoryMonitor ¶
type MemoryMonitor struct {
// contains filtered or unexported fields
}
MemoryMonitor tracks memory usage and provides adaptive parallelism control
func NewMemoryMonitor ¶
func NewMemoryMonitor(threshold int64, maxParallel int) *MemoryMonitor
NewMemoryMonitor creates a new memory monitor with the specified threshold and max parallelism
func (*MemoryMonitor) AdjustParallelism ¶
func (m *MemoryMonitor) AdjustParallelism() int
AdjustParallelism returns the recommended parallelism level based on current memory pressure
func (*MemoryMonitor) CanAllocate ¶
func (m *MemoryMonitor) CanAllocate(size int64) bool
CanAllocate checks if the requested memory size can be allocated without exceeding threshold
func (*MemoryMonitor) CurrentUsage ¶
func (m *MemoryMonitor) CurrentUsage() int64
CurrentUsage returns the current memory usage
func (*MemoryMonitor) RecordAllocation ¶
func (m *MemoryMonitor) RecordAllocation(size int64)
RecordAllocation records a memory allocation
func (*MemoryMonitor) RecordDeallocation ¶
func (m *MemoryMonitor) RecordDeallocation(size int64)
RecordDeallocation records a memory deallocation
type MonitoredAllocator ¶
type MonitoredAllocator struct {
// contains filtered or unexported fields
}
MonitoredAllocator wraps a memory allocator with monitoring capabilities
func NewMonitoredAllocator ¶
func NewMonitoredAllocator(underlying memory.Allocator, pool *AllocatorPool) *MonitoredAllocator
NewMonitoredAllocator creates a new monitored allocator
func (*MonitoredAllocator) Allocate ¶
func (ma *MonitoredAllocator) Allocate(size int) []byte
Allocate allocates memory and records the allocation
func (*MonitoredAllocator) AllocatedBytes ¶
func (ma *MonitoredAllocator) AllocatedBytes() int64
AllocatedBytes returns the number of bytes allocated by the underlying allocator
func (*MonitoredAllocator) Free ¶
func (ma *MonitoredAllocator) Free(b []byte)
Free frees memory and records the deallocation
func (*MonitoredAllocator) Reallocate ¶
func (ma *MonitoredAllocator) Reallocate(size int, b []byte) []byte
Reallocate reallocates memory and updates allocation records
type PoolMetrics ¶
type PoolStats ¶
type PoolStats struct {
ActiveAllocators int64
TotalAllocated int64
PeakAllocated int64
MaxSize int
}
PoolStats provides statistics about the allocator pool
type PriorityItem ¶
type PriorityItem struct {
Priority int
Index int
Task PriorityTask
Worker func(PriorityTask) int
Result chan<- advancedIndexedResult
}
type PriorityQueue ¶
type PriorityQueue []*PriorityItem
PriorityQueue implements a priority queue using a binary heap
func NewPriorityQueue ¶
func NewPriorityQueue() *PriorityQueue
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type PriorityTask ¶
PriorityTask represents a task with an associated priority level
type ResourceLimits ¶
ResourceLimits defines constraints for worker pool resource usage
type SafeDataFrame ¶
type SafeDataFrame struct {
// contains filtered or unexported fields
}
SafeDataFrame provides thread-safe access to DataFrame data with copy-on-access semantics
func NewSafeDataFrame ¶
func NewSafeDataFrame(data interface{}) *SafeDataFrame
NewSafeDataFrame creates a new thread-safe DataFrame wrapper
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of goroutines for parallel processing
func NewWorkerPool ¶
func NewWorkerPool(numWorkers int) *WorkerPool
NewWorkerPool creates a new worker pool