Documentation
¶
Overview ¶
Package parallel provides concurrent processing utilities
Index ¶
- func Process[T, R any](wp *WorkerPool, items []T, worker func(T) R) []R
- func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R
- func ProcessIndexed[T, R any](wp *WorkerPool, items []T, worker func(int, T) R) []R
- type AdvancedMemoryPool
- type AdvancedWorkerPool
- func (pool *AdvancedWorkerPool) Close()
- func (pool *AdvancedWorkerPool) CurrentWorkerCount() int
- func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics
- func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}
- func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int
- type AdvancedWorkerPoolConfig
- type AllocatorPool
- func (p *AllocatorPool) ActiveCount() int64
- func (p *AllocatorPool) Close()
- func (p *AllocatorPool) Get() memory.Allocator
- func (p *AllocatorPool) GetStats() PoolStats
- func (p *AllocatorPool) PeakAllocated() int64
- func (p *AllocatorPool) Put(alloc memory.Allocator)
- func (p *AllocatorPool) RecordAllocation(bytes int64)
- func (p *AllocatorPool) RecordDeallocation(bytes int64)
- func (p *AllocatorPool) TotalAllocated() int64
- type BackpressurePolicy
- type ChunkProcessor
- type MemoryMonitor
- type MonitoredAllocator
- type PoolMetrics
- type PoolStats
- type PriorityItem
- type PriorityQueue
- type PriorityTask
- type ResourceLimits
- type SafeDataFrame
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Process ¶
func Process[T, R any]( wp *WorkerPool, items []T, worker func(T) R, ) []R
Process executes work items in parallel using fan-out/fan-in pattern
func ProcessGeneric ¶
func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R
ProcessGeneric executes work items using the advanced worker pool with generic types
func ProcessIndexed ¶
func ProcessIndexed[T, R any]( wp *WorkerPool, items []T, worker func(int, T) R, ) []R
ProcessIndexed executes work items in parallel while preserving order
Types ¶
type AdvancedMemoryPool ¶
type AdvancedMemoryPool struct {
// contains filtered or unexported fields
}
AdvancedMemoryPool provides enhanced memory pool management with adaptive sizing
func NewAdvancedMemoryPool ¶
func NewAdvancedMemoryPool(maxSize int, memoryThreshold int64, adaptiveSize bool) *AdvancedMemoryPool
NewAdvancedMemoryPool creates a new advanced memory pool
func (*AdvancedMemoryPool) Close ¶
func (amp *AdvancedMemoryPool) Close()
Close closes the advanced memory pool
func (*AdvancedMemoryPool) GetAllocator ¶
func (amp *AdvancedMemoryPool) GetAllocator() memory.Allocator
GetAllocator gets an allocator from the pool with memory monitoring
func (*AdvancedMemoryPool) GetStats ¶
func (amp *AdvancedMemoryPool) GetStats() (PoolStats, *MemoryMonitor)
GetStats returns comprehensive pool statistics
func (*AdvancedMemoryPool) PutAllocator ¶
func (amp *AdvancedMemoryPool) PutAllocator(alloc memory.Allocator)
PutAllocator returns an allocator to the pool
type AdvancedWorkerPool ¶
type AdvancedWorkerPool struct {
// contains filtered or unexported fields
}
AdvancedWorkerPool provides dynamic scaling, work stealing, and resource management
func NewAdvancedWorkerPool ¶
func NewAdvancedWorkerPool(config AdvancedWorkerPoolConfig) *AdvancedWorkerPool
NewAdvancedWorkerPool creates a new advanced worker pool with the specified configuration
func (*AdvancedWorkerPool) Close ¶
func (pool *AdvancedWorkerPool) Close()
Close gracefully shuts down the worker pool
func (*AdvancedWorkerPool) CurrentWorkerCount ¶
func (pool *AdvancedWorkerPool) CurrentWorkerCount() int
CurrentWorkerCount returns the current number of workers
func (*AdvancedWorkerPool) GetMetrics ¶
func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics
GetMetrics returns the current pool metrics
func (*AdvancedWorkerPool) Process ¶
func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}
Process executes work items using the advanced worker pool
func (*AdvancedWorkerPool) ProcessWithPriority ¶
func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int
ProcessWithPriority executes priority tasks
type AdvancedWorkerPoolConfig ¶
type AdvancedWorkerPoolConfig struct {
MinWorkers int
MaxWorkers int
WorkQueueSize int
ScaleThreshold float64
EnableWorkStealing bool
EnableMetrics bool
EnablePriority bool
MemoryMonitor *MemoryMonitor
ResourceLimits ResourceLimits
BackpressurePolicy BackpressurePolicy
}
AdvancedWorkerPoolConfig provides configuration for the advanced worker pool
type AllocatorPool ¶
type AllocatorPool struct {
// contains filtered or unexported fields
}
AllocatorPool manages a pool of memory allocators for safe reuse in parallel processing
func NewAllocatorPool ¶
func NewAllocatorPool(maxSize int) *AllocatorPool
NewAllocatorPool creates a new allocator pool with the specified maximum size
func (*AllocatorPool) ActiveCount ¶
func (p *AllocatorPool) ActiveCount() int64
ActiveCount returns the number of allocators currently in use
func (*AllocatorPool) Get ¶
func (p *AllocatorPool) Get() memory.Allocator
Get retrieves an allocator from the pool
func (*AllocatorPool) GetStats ¶
func (p *AllocatorPool) GetStats() PoolStats
GetStats returns current pool statistics
func (*AllocatorPool) PeakAllocated ¶
func (p *AllocatorPool) PeakAllocated() int64
PeakAllocated returns the peak memory allocated by the pool
func (*AllocatorPool) Put ¶
func (p *AllocatorPool) Put(alloc memory.Allocator)
Put returns an allocator to the pool for reuse
func (*AllocatorPool) RecordAllocation ¶
func (p *AllocatorPool) RecordAllocation(bytes int64)
RecordAllocation records memory allocation in the pool
func (*AllocatorPool) RecordDeallocation ¶
func (p *AllocatorPool) RecordDeallocation(bytes int64)
RecordDeallocation records memory deallocation in the pool
func (*AllocatorPool) TotalAllocated ¶
func (p *AllocatorPool) TotalAllocated() int64
TotalAllocated returns the total memory allocated by the pool
type BackpressurePolicy ¶
type BackpressurePolicy int
const ( BackpressureBlock BackpressurePolicy = iota BackpressureDrop BackpressureSpill )
type ChunkProcessor ¶
type ChunkProcessor struct {
// contains filtered or unexported fields
}
ChunkProcessor provides isolated processing context for parallel chunks
func NewChunkProcessor ¶
func NewChunkProcessor(pool *AllocatorPool, chunkID int) *ChunkProcessor
NewChunkProcessor creates a new chunk processor with isolated memory context
func (*ChunkProcessor) ChunkID ¶
func (cp *ChunkProcessor) ChunkID() int
ChunkID returns the unique ID for this chunk
func (*ChunkProcessor) GetAllocator ¶
func (cp *ChunkProcessor) GetAllocator() memory.Allocator
GetAllocator returns the allocator for this chunk processor
func (*ChunkProcessor) Release ¶
func (cp *ChunkProcessor) Release()
Release returns the allocator to the pool and marks this processor as released
type MemoryMonitor ¶
type MemoryMonitor struct {
// contains filtered or unexported fields
}
MemoryMonitor tracks memory usage and provides adaptive parallelism control
func NewMemoryMonitor ¶
func NewMemoryMonitor(threshold int64, maxParallel int) *MemoryMonitor
NewMemoryMonitor creates a new memory monitor with the specified threshold and max parallelism
func (*MemoryMonitor) AdjustParallelism ¶
func (m *MemoryMonitor) AdjustParallelism() int
AdjustParallelism returns the recommended parallelism level based on current memory pressure
func (*MemoryMonitor) CanAllocate ¶
func (m *MemoryMonitor) CanAllocate(size int64) bool
CanAllocate checks if the requested memory size can be allocated without exceeding threshold
func (*MemoryMonitor) CurrentUsage ¶
func (m *MemoryMonitor) CurrentUsage() int64
CurrentUsage returns the current memory usage
func (*MemoryMonitor) RecordAllocation ¶
func (m *MemoryMonitor) RecordAllocation(size int64)
RecordAllocation records a memory allocation
func (*MemoryMonitor) RecordDeallocation ¶
func (m *MemoryMonitor) RecordDeallocation(size int64)
RecordDeallocation records a memory deallocation
type MonitoredAllocator ¶
type MonitoredAllocator struct {
// contains filtered or unexported fields
}
MonitoredAllocator wraps a memory allocator with monitoring capabilities
func NewMonitoredAllocator ¶
func NewMonitoredAllocator(underlying memory.Allocator, pool *AllocatorPool) *MonitoredAllocator
NewMonitoredAllocator creates a new monitored allocator
func (*MonitoredAllocator) Allocate ¶
func (ma *MonitoredAllocator) Allocate(size int) []byte
Allocate allocates memory and records the allocation
func (*MonitoredAllocator) AllocatedBytes ¶
func (ma *MonitoredAllocator) AllocatedBytes() int64
AllocatedBytes returns the number of bytes allocated by the underlying allocator
func (*MonitoredAllocator) Free ¶
func (ma *MonitoredAllocator) Free(b []byte)
Free frees memory and records the deallocation
func (*MonitoredAllocator) Reallocate ¶
func (ma *MonitoredAllocator) Reallocate(size int, b []byte) []byte
Reallocate reallocates memory and updates allocation records
type PoolMetrics ¶
type PoolStats ¶
type PoolStats struct {
ActiveAllocators int64
TotalAllocated int64
PeakAllocated int64
MaxSize int
}
PoolStats provides statistics about the allocator pool
type PriorityItem ¶
type PriorityItem struct {
Priority int
Index int
Task PriorityTask
Worker func(PriorityTask) int
Result chan<- advancedIndexedResult
}
type PriorityQueue ¶
type PriorityQueue []*PriorityItem
PriorityQueue implements a priority queue using a binary heap
func NewPriorityQueue ¶
func NewPriorityQueue() *PriorityQueue
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() interface{}
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x interface{})
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type PriorityTask ¶
PriorityTask represents a task with an associated priority level
type ResourceLimits ¶
ResourceLimits defines constraints for worker pool resource usage
type SafeDataFrame ¶
type SafeDataFrame struct {
// contains filtered or unexported fields
}
SafeDataFrame provides thread-safe access to DataFrame data with copy-on-access semantics
func NewSafeDataFrame ¶
func NewSafeDataFrame(data interface{}) *SafeDataFrame
NewSafeDataFrame creates a new thread-safe DataFrame wrapper
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool manages a pool of goroutines for parallel processing
func NewWorkerPool ¶
func NewWorkerPool(numWorkers int) *WorkerPool
NewWorkerPool creates a new worker pool