parallel

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: Apache-2.0, MIT Imports: 9 Imported by: 0

Documentation

Overview

Package parallel provides parallel processing infrastructure for DataFrame operations.

This package implements worker pools and parallel execution strategies for DataFrame operations that exceed the parallelization threshold. It provides both generic parallel processing and specialized order-preserving variants.

Key features:

  • Adaptive worker pool sizing based on CPU count and data size
  • Fan-out/fan-in patterns for parallel execution
  • Thread-safe operations with independent data copies
  • Memory-efficient chunking for large datasets
  • Order-preserving parallel operations when needed

The package automatically activates for DataFrames with 1000+ rows and uses runtime.NumCPU() as the default worker count, with dynamic adjustment based on workload characteristics.

Index

Constants

View Source
const (
	// DefaultMemoryThreshold is the default memory threshold for parallel operations (1GB).
	DefaultMemoryThreshold = 1024 * 1024 * 1024
	// DefaultGCPressureThreshold is the default GC pressure threshold.
	DefaultGCPressureThreshold = 0.8
	// ModeratePressureOffset is the offset for moderate pressure threshold.
	ModeratePressureOffset = 0.2
	// VeryHighPressureOffset is the offset for very high pressure threshold.
	VeryHighPressureOffset = 0.1
)

Variables

This section is empty.

Functions

func Process

func Process[T, R any](
	wp *WorkerPool,
	items []T,
	worker func(T) R,
) []R

Process executes work items in parallel using fan-out/fan-in pattern.

func ProcessGeneric

func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R

ProcessGeneric executes work items using the advanced worker pool with generic types.

func ProcessIndexed

func ProcessIndexed[T, R any](
	wp *WorkerPool,
	items []T,
	worker func(int, T) R,
) []R

ProcessIndexed executes work items in parallel while preserving order.

Types

type AdvancedMemoryPool

type AdvancedMemoryPool struct {
	// contains filtered or unexported fields
}

AdvancedMemoryPool provides enhanced memory pool management with adaptive sizing.

func NewAdvancedMemoryPool

func NewAdvancedMemoryPool(maxSize int, memoryThreshold int64, adaptiveSize bool) *AdvancedMemoryPool

NewAdvancedMemoryPool creates a new advanced memory pool.

func (*AdvancedMemoryPool) Close

func (amp *AdvancedMemoryPool) Close()

Close closes the advanced memory pool.

func (*AdvancedMemoryPool) GetAllocator

func (amp *AdvancedMemoryPool) GetAllocator() memory.Allocator

GetAllocator gets an allocator from the pool with memory monitoring.

func (*AdvancedMemoryPool) GetStats

func (amp *AdvancedMemoryPool) GetStats() (PoolStats, *MemoryMonitor)

GetStats returns comprehensive pool statistics.

func (*AdvancedMemoryPool) PutAllocator

func (amp *AdvancedMemoryPool) PutAllocator(alloc memory.Allocator)

PutAllocator returns an allocator to the pool.

type AdvancedWorkerPool

type AdvancedWorkerPool struct {
	// contains filtered or unexported fields
}

AdvancedWorkerPool provides dynamic scaling, work stealing, and resource management.

func NewAdvancedWorkerPool

func NewAdvancedWorkerPool(config AdvancedWorkerPoolConfig) *AdvancedWorkerPool

NewAdvancedWorkerPool creates a new advanced worker pool with the specified configuration.

func (*AdvancedWorkerPool) Close

func (pool *AdvancedWorkerPool) Close()

Close gracefully shuts down the worker pool.

func (*AdvancedWorkerPool) CurrentWorkerCount

func (pool *AdvancedWorkerPool) CurrentWorkerCount() int

CurrentWorkerCount returns the current number of workers.

func (*AdvancedWorkerPool) GetMetrics

func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics

GetMetrics returns the current pool metrics.

func (*AdvancedWorkerPool) Process

func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}

Process executes work items using the advanced worker pool.

func (*AdvancedWorkerPool) ProcessWithPriority

func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int

ProcessWithPriority executes priority tasks.

type AdvancedWorkerPoolConfig

type AdvancedWorkerPoolConfig struct {
	MinWorkers         int
	MaxWorkers         int
	WorkQueueSize      int
	ScaleThreshold     float64
	EnableWorkStealing bool
	EnableMetrics      bool
	EnablePriority     bool
	MemoryMonitor      *MemoryMonitor
	ResourceLimits     ResourceLimits
	BackpressurePolicy BackpressurePolicy
}

AdvancedWorkerPoolConfig provides configuration for the advanced worker pool.

type AllocatorPool

type AllocatorPool struct {
	// contains filtered or unexported fields
}

AllocatorPool manages a pool of memory allocators for safe reuse in parallel processing.

func NewAllocatorPool

func NewAllocatorPool(maxSize int) *AllocatorPool

NewAllocatorPool creates a new allocator pool with the specified maximum size.

func (*AllocatorPool) ActiveCount

func (p *AllocatorPool) ActiveCount() int64

ActiveCount returns the number of allocators currently in use.

func (*AllocatorPool) Close

func (p *AllocatorPool) Close()

Close shuts down the allocator pool.

func (*AllocatorPool) Get

func (p *AllocatorPool) Get() memory.Allocator

Get retrieves an allocator from the pool.

func (*AllocatorPool) GetStats

func (p *AllocatorPool) GetStats() PoolStats

GetStats returns current pool statistics.

func (*AllocatorPool) PeakAllocated

func (p *AllocatorPool) PeakAllocated() int64

PeakAllocated returns the peak memory allocated by the pool.

func (*AllocatorPool) Put

func (p *AllocatorPool) Put(alloc memory.Allocator)

Put returns an allocator to the pool for reuse.

func (*AllocatorPool) RecordAllocation

func (p *AllocatorPool) RecordAllocation(bytes int64)

RecordAllocation records memory allocation in the pool.

func (*AllocatorPool) RecordDeallocation

func (p *AllocatorPool) RecordDeallocation(bytes int64)

RecordDeallocation records memory deallocation in the pool.

func (*AllocatorPool) TotalAllocated

func (p *AllocatorPool) TotalAllocated() int64

TotalAllocated returns the total memory allocated by the pool.

type BackpressurePolicy

type BackpressurePolicy int
const (
	BackpressureBlock BackpressurePolicy = iota
	BackpressureDrop
	BackpressureSpill
)

type ChunkProcessor

type ChunkProcessor struct {
	// contains filtered or unexported fields
}

ChunkProcessor provides isolated processing context for parallel chunks.

func NewChunkProcessor

func NewChunkProcessor(pool *AllocatorPool, chunkID int) *ChunkProcessor

NewChunkProcessor creates a new chunk processor with isolated memory context.

func (*ChunkProcessor) ChunkID

func (cp *ChunkProcessor) ChunkID() int

ChunkID returns the unique ID for this chunk.

func (*ChunkProcessor) GetAllocator

func (cp *ChunkProcessor) GetAllocator() memory.Allocator

GetAllocator returns the allocator for this chunk processor.

func (*ChunkProcessor) Release

func (cp *ChunkProcessor) Release()

Release returns the allocator to the pool and marks this processor as released.

type MemoryMonitor

type MemoryMonitor struct {
	// contains filtered or unexported fields
}

MemoryMonitor tracks memory usage and provides adaptive parallelism control.

func NewMemoryMonitor

func NewMemoryMonitor(threshold int64, maxParallel int) *MemoryMonitor

NewMemoryMonitor creates a new memory monitor with the specified threshold and max parallelism.

func NewMemoryMonitorFromConfig added in v0.3.1

func NewMemoryMonitorFromConfig(cfg config.Config) *MemoryMonitor

NewMemoryMonitorFromConfig creates a new memory monitor using configuration values.

func (*MemoryMonitor) AdjustParallelism

func (m *MemoryMonitor) AdjustParallelism() int

AdjustParallelism returns the recommended parallelism level based on current memory pressure.

func (*MemoryMonitor) CanAllocate

func (m *MemoryMonitor) CanAllocate(size int64) bool

CanAllocate checks if the requested memory size can be allocated without exceeding threshold.

func (*MemoryMonitor) CurrentUsage

func (m *MemoryMonitor) CurrentUsage() int64

CurrentUsage returns the current memory usage.

func (*MemoryMonitor) RecordAllocation

func (m *MemoryMonitor) RecordAllocation(size int64)

RecordAllocation records a memory allocation.

func (*MemoryMonitor) RecordDeallocation

func (m *MemoryMonitor) RecordDeallocation(size int64)

RecordDeallocation records a memory deallocation.

type MonitoredAllocator

type MonitoredAllocator struct {
	// contains filtered or unexported fields
}

MonitoredAllocator wraps a memory allocator with monitoring capabilities.

func NewMonitoredAllocator

func NewMonitoredAllocator(underlying memory.Allocator, pool *AllocatorPool) *MonitoredAllocator

NewMonitoredAllocator creates a new monitored allocator.

func (*MonitoredAllocator) Allocate

func (ma *MonitoredAllocator) Allocate(size int) []byte

Allocate allocates memory and records the allocation.

func (*MonitoredAllocator) AllocatedBytes

func (ma *MonitoredAllocator) AllocatedBytes() int64

AllocatedBytes returns the number of bytes allocated by the underlying allocator.

func (*MonitoredAllocator) Free

func (ma *MonitoredAllocator) Free(b []byte)

Free frees memory and records the deallocation.

func (*MonitoredAllocator) Reallocate

func (ma *MonitoredAllocator) Reallocate(size int, b []byte) []byte

Reallocate reallocates memory and updates allocation records.

type PoolMetrics

type PoolMetrics struct {
	TotalTasksProcessed  int64
	AverageTaskDuration  time.Duration
	MaxWorkerCount       int32
	TotalProcessingTime  time.Duration
	WorkStealingCount    int64
	MemoryPressureEvents int64
}

type PoolStats

type PoolStats struct {
	ActiveAllocators int64
	TotalAllocated   int64
	PeakAllocated    int64
	MaxSize          int
}

PoolStats provides statistics about the allocator pool.

type PriorityItem

type PriorityItem struct {
	Priority int
	Index    int
	Task     PriorityTask
	Worker   func(PriorityTask) int
	Result   chan<- advancedIndexedResult
}

type PriorityQueue

type PriorityQueue []*PriorityItem

PriorityQueue implements a priority queue using a binary heap.

func NewPriorityQueue

func NewPriorityQueue() *PriorityQueue

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type PriorityTask

type PriorityTask struct {
	Priority int
	Value    int
}

PriorityTask represents a task with an associated priority level.

type ResourceLimits

type ResourceLimits struct {
	MaxCPUUsage    float64
	MaxMemoryUsage int64
}

ResourceLimits defines constraints for worker pool resource usage.

type SafeDataFrame

type SafeDataFrame struct {
	// contains filtered or unexported fields
}

SafeDataFrame provides thread-safe access to DataFrame data with copy-on-access semantics.

func NewSafeDataFrame

func NewSafeDataFrame(data interface{}) *SafeDataFrame

NewSafeDataFrame creates a new thread-safe DataFrame wrapper.

func (*SafeDataFrame) Clone

func (sdf *SafeDataFrame) Clone(_ memory.Allocator) (interface{}, error)

Clone creates an independent copy for safe parallel access This will be implemented with actual DataFrame integration.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of goroutines for parallel processing.

func NewWorkerPool

func NewWorkerPool(numWorkers int) *WorkerPool

NewWorkerPool creates a new worker pool.

func (*WorkerPool) Close

func (wp *WorkerPool) Close()

Close shuts down the worker pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL