Documentation
¶
Overview ¶
Package parallel provides parallel processing infrastructure for DataFrame operations.
This package implements worker pools and parallel execution strategies for DataFrame operations that exceed the parallelization threshold. It provides both generic parallel processing and specialized order-preserving variants.
Key features:
- Adaptive worker pool sizing based on CPU count and data size
- Fan-out/fan-in patterns for parallel execution
- Thread-safe operations with independent data copies
- Memory-efficient chunking for large datasets
- Order-preserving parallel operations when needed
The package automatically activates for DataFrames with 1000+ rows and uses runtime.NumCPU() as the default worker count, with dynamic adjustment based on workload characteristics.
Index ¶
- Constants
- func Process[T, R any](wp *WorkerPool, items []T, worker func(T) R) []R
- func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R
- func ProcessIndexed[T, R any](wp *WorkerPool, items []T, worker func(int, T) R) []R
- type AdvancedMemoryPool
- type AdvancedWorkerPool
- func (pool *AdvancedWorkerPool) Close()
- func (pool *AdvancedWorkerPool) CurrentWorkerCount() int
- func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics
- func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}
- func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int
- type AdvancedWorkerPoolConfig
- type AllocatorPool
- func (p *AllocatorPool) ActiveCount() int64
- func (p *AllocatorPool) Close()
- func (p *AllocatorPool) Get() memory.Allocator
- func (p *AllocatorPool) GetStats() PoolStats
- func (p *AllocatorPool) PeakAllocated() int64
- func (p *AllocatorPool) Put(alloc memory.Allocator)
- func (p *AllocatorPool) RecordAllocation(bytes int64)
- func (p *AllocatorPool) RecordDeallocation(bytes int64)
- func (p *AllocatorPool) TotalAllocated() int64
- type BackpressurePolicy
- type ChunkProcessor
- type MemoryMonitor
- type MonitoredAllocator
- type PoolMetrics
- type PoolStats
- type PriorityItem
- type PriorityQueue
- type PriorityTask
- type ResourceLimits
- type SafeDataFrame
- type WorkerPool
Constants ¶
const ( // DefaultMemoryThreshold is the default memory threshold for parallel operations (1GB). DefaultMemoryThreshold = 1024 * 1024 * 1024 // DefaultGCPressureThreshold is the default GC pressure threshold. DefaultGCPressureThreshold = 0.8 // ModeratePressureOffset is the offset for moderate pressure threshold. ModeratePressureOffset = 0.2 // VeryHighPressureOffset is the offset for very high pressure threshold. VeryHighPressureOffset = 0.1 )
Variables ¶
This section is empty.
Functions ¶
func Process ¶
func Process[T, R any]( wp *WorkerPool, items []T, worker func(T) R, ) []R
Process executes work items in parallel using fan-out/fan-in pattern.
func ProcessGeneric ¶
func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R
ProcessGeneric executes work items using the advanced worker pool with generic types.
func ProcessIndexed ¶
func ProcessIndexed[T, R any]( wp *WorkerPool, items []T, worker func(int, T) R, ) []R
ProcessIndexed executes work items in parallel while preserving order.
Types ¶
type AdvancedMemoryPool ¶
type AdvancedMemoryPool struct {
// contains filtered or unexported fields
}
AdvancedMemoryPool provides enhanced memory pool management with adaptive sizing.
func NewAdvancedMemoryPool ¶
func NewAdvancedMemoryPool(maxSize int, memoryThreshold int64, adaptiveSize bool) *AdvancedMemoryPool
NewAdvancedMemoryPool creates a new advanced memory pool.
func (*AdvancedMemoryPool) Close ¶
func (amp *AdvancedMemoryPool) Close()
Close closes the advanced memory pool.
func (*AdvancedMemoryPool) GetAllocator ¶
func (amp *AdvancedMemoryPool) GetAllocator() memory.Allocator
GetAllocator gets an allocator from the pool with memory monitoring.
func (*AdvancedMemoryPool) GetStats ¶
func (amp *AdvancedMemoryPool) GetStats() (PoolStats, *MemoryMonitor)
GetStats returns comprehensive pool statistics.
func (*AdvancedMemoryPool) PutAllocator ¶
func (amp *AdvancedMemoryPool) PutAllocator(alloc memory.Allocator)
PutAllocator returns an allocator to the pool.
type AdvancedWorkerPool ¶
type AdvancedWorkerPool struct {
// contains filtered or unexported fields
}
AdvancedWorkerPool provides dynamic scaling, work stealing, and resource management.
func NewAdvancedWorkerPool ¶
func NewAdvancedWorkerPool(config AdvancedWorkerPoolConfig) *AdvancedWorkerPool
NewAdvancedWorkerPool creates a new advanced worker pool with the specified configuration.
func (*AdvancedWorkerPool) Close ¶
func (pool *AdvancedWorkerPool) Close()
Close gracefully shuts down the worker pool.
func (*AdvancedWorkerPool) CurrentWorkerCount ¶
func (pool *AdvancedWorkerPool) CurrentWorkerCount() int
CurrentWorkerCount returns the current number of workers.
func (*AdvancedWorkerPool) GetMetrics ¶
func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics
GetMetrics returns the current pool metrics.
func (*AdvancedWorkerPool) Process ¶
func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}
Process executes work items using the advanced worker pool.
func (*AdvancedWorkerPool) ProcessWithPriority ¶
func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int
ProcessWithPriority executes priority tasks.
type AdvancedWorkerPoolConfig ¶
type AdvancedWorkerPoolConfig struct {
MinWorkers int
MaxWorkers int
WorkQueueSize int
ScaleThreshold float64
EnableWorkStealing bool
EnableMetrics bool
EnablePriority bool
MemoryMonitor *MemoryMonitor
ResourceLimits ResourceLimits
BackpressurePolicy BackpressurePolicy
}
AdvancedWorkerPoolConfig provides configuration for the advanced worker pool.
type AllocatorPool ¶
type AllocatorPool struct {
// contains filtered or unexported fields
}
AllocatorPool manages a pool of memory allocators for safe reuse in parallel processing.
func NewAllocatorPool ¶
func NewAllocatorPool(maxSize int) *AllocatorPool
NewAllocatorPool creates a new allocator pool with the specified maximum size.
func (*AllocatorPool) ActiveCount ¶
func (p *AllocatorPool) ActiveCount() int64
ActiveCount returns the number of allocators currently in use.
func (*AllocatorPool) Get ¶
func (p *AllocatorPool) Get() memory.Allocator
Get retrieves an allocator from the pool.
func (*AllocatorPool) GetStats ¶
func (p *AllocatorPool) GetStats() PoolStats
GetStats returns current pool statistics.
func (*AllocatorPool) PeakAllocated ¶
func (p *AllocatorPool) PeakAllocated() int64
PeakAllocated returns the peak memory allocated by the pool.
func (*AllocatorPool) Put ¶
func (p *AllocatorPool) Put(alloc memory.Allocator)
Put returns an allocator to the pool for reuse.
func (*AllocatorPool) RecordAllocation ¶
func (p *AllocatorPool) RecordAllocation(bytes int64)
RecordAllocation records memory allocation in the pool.
func (*AllocatorPool) RecordDeallocation ¶
func (p *AllocatorPool) RecordDeallocation(bytes int64)
RecordDeallocation records memory deallocation in the pool.
func (*AllocatorPool) TotalAllocated ¶
func (p *AllocatorPool) TotalAllocated() int64
TotalAllocated returns the total memory allocated by the pool.
type BackpressurePolicy ¶
type BackpressurePolicy int
const ( BackpressureBlock BackpressurePolicy = iota BackpressureDrop BackpressureSpill )
type ChunkProcessor ¶
type ChunkProcessor struct {
// contains filtered or unexported fields
}
ChunkProcessor provides isolated processing context for parallel chunks.
func NewChunkProcessor ¶
func NewChunkProcessor(pool *AllocatorPool, chunkID int) *ChunkProcessor
NewChunkProcessor creates a new chunk processor with isolated memory context.
func (*ChunkProcessor) ChunkID ¶
func (cp *ChunkProcessor) ChunkID() int
ChunkID returns the unique ID for this chunk.
func (*ChunkProcessor) GetAllocator ¶
func (cp *ChunkProcessor) GetAllocator() memory.Allocator
GetAllocator returns the allocator for this chunk processor.
func (*ChunkProcessor) Release ¶
func (cp *ChunkProcessor) Release()
Release returns the allocator to the pool and marks this processor as released.
type MemoryMonitor ¶
type MemoryMonitor struct {
// contains filtered or unexported fields
}
MemoryMonitor tracks memory usage and provides adaptive parallelism control.
func NewMemoryMonitor ¶
func NewMemoryMonitor(threshold int64, maxParallel int) *MemoryMonitor
NewMemoryMonitor creates a new memory monitor with the specified threshold and max parallelism.
func NewMemoryMonitorFromConfig ¶ added in v0.3.1
func NewMemoryMonitorFromConfig(cfg config.Config) *MemoryMonitor
NewMemoryMonitorFromConfig creates a new memory monitor using configuration values.
func (*MemoryMonitor) AdjustParallelism ¶
func (m *MemoryMonitor) AdjustParallelism() int
AdjustParallelism returns the recommended parallelism level based on current memory pressure.
func (*MemoryMonitor) CanAllocate ¶
func (m *MemoryMonitor) CanAllocate(size int64) bool
CanAllocate checks if the requested memory size can be allocated without exceeding threshold.
func (*MemoryMonitor) CurrentUsage ¶
func (m *MemoryMonitor) CurrentUsage() int64
CurrentUsage returns the current memory usage.
func (*MemoryMonitor) RecordAllocation ¶
func (m *MemoryMonitor) RecordAllocation(size int64)
RecordAllocation records a memory allocation.
func (*MemoryMonitor) RecordDeallocation ¶
func (m *MemoryMonitor) RecordDeallocation(size int64)
RecordDeallocation records a memory deallocation.
type MonitoredAllocator ¶
type MonitoredAllocator struct {
// contains filtered or unexported fields
}
MonitoredAllocator wraps a memory allocator with monitoring capabilities.
func NewMonitoredAllocator ¶
func NewMonitoredAllocator(underlying memory.Allocator, pool *AllocatorPool) *MonitoredAllocator
NewMonitoredAllocator creates a new monitored allocator.
func (*MonitoredAllocator) Allocate ¶
func (ma *MonitoredAllocator) Allocate(size int) []byte
Allocate allocates memory and records the allocation.
func (*MonitoredAllocator) AllocatedBytes ¶
func (ma *MonitoredAllocator) AllocatedBytes() int64
AllocatedBytes returns the number of bytes allocated by the underlying allocator.
func (*MonitoredAllocator) Free ¶
func (ma *MonitoredAllocator) Free(b []byte)
Free frees memory and records the deallocation.
func (*MonitoredAllocator) Reallocate ¶
func (ma *MonitoredAllocator) Reallocate(size int, b []byte) []byte
Reallocate reallocates memory and updates allocation records.
type PoolMetrics ¶
type PoolStats ¶
type PoolStats struct {
ActiveAllocators int64
TotalAllocated int64
PeakAllocated int64
MaxSize int
}
PoolStats provides statistics about the allocator pool.
type PriorityItem ¶
type PriorityItem struct {
Priority int
Index int
Task PriorityTask
Worker func(PriorityTask) int
Result chan<- advancedIndexedResult
}
type PriorityQueue ¶
type PriorityQueue []*PriorityItem
PriorityQueue implements a priority queue using a binary heap.
func NewPriorityQueue ¶
func NewPriorityQueue() *PriorityQueue
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type PriorityTask ¶
PriorityTask represents a task with an associated priority level.
type ResourceLimits ¶
ResourceLimits defines constraints for worker pool resource usage.
type SafeDataFrame ¶
type SafeDataFrame struct {
// contains filtered or unexported fields
}
SafeDataFrame provides thread-safe access to DataFrame data with copy-on-access semantics.
func NewSafeDataFrame ¶
func NewSafeDataFrame(data interface{}) *SafeDataFrame
NewSafeDataFrame creates a new thread-safe DataFrame wrapper.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of goroutines for parallel processing.
func NewWorkerPool ¶
func NewWorkerPool(numWorkers int) *WorkerPool
NewWorkerPool creates a new worker pool.