parallel

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2025 License: Apache-2.0, MIT Imports: 7 Imported by: 0

Documentation

Overview

Package parallel provides concurrent processing utilities

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Process

func Process[T, R any](
	wp *WorkerPool,
	items []T,
	worker func(T) R,
) []R

Process executes work items in parallel using fan-out/fan-in pattern

func ProcessGeneric

func ProcessGeneric[T, R any](pool *AdvancedWorkerPool, items []T, worker func(T) R) []R

ProcessGeneric executes work items using the advanced worker pool with generic types

func ProcessIndexed

func ProcessIndexed[T, R any](
	wp *WorkerPool,
	items []T,
	worker func(int, T) R,
) []R

ProcessIndexed executes work items in parallel while preserving order

Types

type AdvancedMemoryPool

type AdvancedMemoryPool struct {
	// contains filtered or unexported fields
}

AdvancedMemoryPool provides enhanced memory pool management with adaptive sizing

func NewAdvancedMemoryPool

func NewAdvancedMemoryPool(maxSize int, memoryThreshold int64, adaptiveSize bool) *AdvancedMemoryPool

NewAdvancedMemoryPool creates a new advanced memory pool

func (*AdvancedMemoryPool) Close

func (amp *AdvancedMemoryPool) Close()

Close closes the advanced memory pool

func (*AdvancedMemoryPool) GetAllocator

func (amp *AdvancedMemoryPool) GetAllocator() memory.Allocator

GetAllocator gets an allocator from the pool with memory monitoring

func (*AdvancedMemoryPool) GetStats

func (amp *AdvancedMemoryPool) GetStats() (PoolStats, *MemoryMonitor)

GetStats returns comprehensive pool statistics

func (*AdvancedMemoryPool) PutAllocator

func (amp *AdvancedMemoryPool) PutAllocator(alloc memory.Allocator)

PutAllocator returns an allocator to the pool

type AdvancedWorkerPool

type AdvancedWorkerPool struct {
	// contains filtered or unexported fields
}

AdvancedWorkerPool provides dynamic scaling, work stealing, and resource management

func NewAdvancedWorkerPool

func NewAdvancedWorkerPool(config AdvancedWorkerPoolConfig) *AdvancedWorkerPool

NewAdvancedWorkerPool creates a new advanced worker pool with the specified configuration

func (*AdvancedWorkerPool) Close

func (pool *AdvancedWorkerPool) Close()

Close gracefully shuts down the worker pool

func (*AdvancedWorkerPool) CurrentWorkerCount

func (pool *AdvancedWorkerPool) CurrentWorkerCount() int

CurrentWorkerCount returns the current number of workers

func (*AdvancedWorkerPool) GetMetrics

func (pool *AdvancedWorkerPool) GetMetrics() *PoolMetrics

GetMetrics returns the current pool metrics

func (*AdvancedWorkerPool) Process

func (pool *AdvancedWorkerPool) Process(items []interface{}, worker func(interface{}) interface{}) []interface{}

Process executes work items using the advanced worker pool

func (*AdvancedWorkerPool) ProcessWithPriority

func (pool *AdvancedWorkerPool) ProcessWithPriority(tasks []PriorityTask, worker func(PriorityTask) int) []int

ProcessWithPriority executes priority tasks

type AdvancedWorkerPoolConfig

type AdvancedWorkerPoolConfig struct {
	MinWorkers         int
	MaxWorkers         int
	WorkQueueSize      int
	ScaleThreshold     float64
	EnableWorkStealing bool
	EnableMetrics      bool
	EnablePriority     bool
	MemoryMonitor      *MemoryMonitor
	ResourceLimits     ResourceLimits
	BackpressurePolicy BackpressurePolicy
}

AdvancedWorkerPoolConfig provides configuration for the advanced worker pool

type AllocatorPool

type AllocatorPool struct {
	// contains filtered or unexported fields
}

AllocatorPool manages a pool of memory allocators for safe reuse in parallel processing

func NewAllocatorPool

func NewAllocatorPool(maxSize int) *AllocatorPool

NewAllocatorPool creates a new allocator pool with the specified maximum size

func (*AllocatorPool) ActiveCount

func (p *AllocatorPool) ActiveCount() int64

ActiveCount returns the number of allocators currently in use

func (*AllocatorPool) Close

func (p *AllocatorPool) Close()

Close shuts down the allocator pool

func (*AllocatorPool) Get

func (p *AllocatorPool) Get() memory.Allocator

Get retrieves an allocator from the pool

func (*AllocatorPool) GetStats

func (p *AllocatorPool) GetStats() PoolStats

GetStats returns current pool statistics

func (*AllocatorPool) PeakAllocated

func (p *AllocatorPool) PeakAllocated() int64

PeakAllocated returns the peak memory allocated by the pool

func (*AllocatorPool) Put

func (p *AllocatorPool) Put(alloc memory.Allocator)

Put returns an allocator to the pool for reuse

func (*AllocatorPool) RecordAllocation

func (p *AllocatorPool) RecordAllocation(bytes int64)

RecordAllocation records memory allocation in the pool

func (*AllocatorPool) RecordDeallocation

func (p *AllocatorPool) RecordDeallocation(bytes int64)

RecordDeallocation records memory deallocation in the pool

func (*AllocatorPool) TotalAllocated

func (p *AllocatorPool) TotalAllocated() int64

TotalAllocated returns the total memory allocated by the pool

type BackpressurePolicy

type BackpressurePolicy int
const (
	BackpressureBlock BackpressurePolicy = iota
	BackpressureDrop
	BackpressureSpill
)

type ChunkProcessor

type ChunkProcessor struct {
	// contains filtered or unexported fields
}

ChunkProcessor provides isolated processing context for parallel chunks

func NewChunkProcessor

func NewChunkProcessor(pool *AllocatorPool, chunkID int) *ChunkProcessor

NewChunkProcessor creates a new chunk processor with isolated memory context

func (*ChunkProcessor) ChunkID

func (cp *ChunkProcessor) ChunkID() int

ChunkID returns the unique ID for this chunk

func (*ChunkProcessor) GetAllocator

func (cp *ChunkProcessor) GetAllocator() memory.Allocator

GetAllocator returns the allocator for this chunk processor

func (*ChunkProcessor) Release

func (cp *ChunkProcessor) Release()

Release returns the allocator to the pool and marks this processor as released

type MemoryMonitor

type MemoryMonitor struct {
	// contains filtered or unexported fields
}

MemoryMonitor tracks memory usage and provides adaptive parallelism control

func NewMemoryMonitor

func NewMemoryMonitor(threshold int64, maxParallel int) *MemoryMonitor

NewMemoryMonitor creates a new memory monitor with the specified threshold and max parallelism

func (*MemoryMonitor) AdjustParallelism

func (m *MemoryMonitor) AdjustParallelism() int

AdjustParallelism returns the recommended parallelism level based on current memory pressure

func (*MemoryMonitor) CanAllocate

func (m *MemoryMonitor) CanAllocate(size int64) bool

CanAllocate checks if the requested memory size can be allocated without exceeding threshold

func (*MemoryMonitor) CurrentUsage

func (m *MemoryMonitor) CurrentUsage() int64

CurrentUsage returns the current memory usage

func (*MemoryMonitor) RecordAllocation

func (m *MemoryMonitor) RecordAllocation(size int64)

RecordAllocation records a memory allocation

func (*MemoryMonitor) RecordDeallocation

func (m *MemoryMonitor) RecordDeallocation(size int64)

RecordDeallocation records a memory deallocation

type MonitoredAllocator

type MonitoredAllocator struct {
	// contains filtered or unexported fields
}

MonitoredAllocator wraps a memory allocator with monitoring capabilities

func NewMonitoredAllocator

func NewMonitoredAllocator(underlying memory.Allocator, pool *AllocatorPool) *MonitoredAllocator

NewMonitoredAllocator creates a new monitored allocator

func (*MonitoredAllocator) Allocate

func (ma *MonitoredAllocator) Allocate(size int) []byte

Allocate allocates memory and records the allocation

func (*MonitoredAllocator) AllocatedBytes

func (ma *MonitoredAllocator) AllocatedBytes() int64

AllocatedBytes returns the number of bytes allocated by the underlying allocator

func (*MonitoredAllocator) Free

func (ma *MonitoredAllocator) Free(b []byte)

Free frees memory and records the deallocation

func (*MonitoredAllocator) Reallocate

func (ma *MonitoredAllocator) Reallocate(size int, b []byte) []byte

Reallocate reallocates memory and updates allocation records

type PoolMetrics

type PoolMetrics struct {
	TotalTasksProcessed  int64
	AverageTaskDuration  time.Duration
	MaxWorkerCount       int32
	TotalProcessingTime  time.Duration
	WorkStealingCount    int64
	MemoryPressureEvents int64
}

type PoolStats

type PoolStats struct {
	ActiveAllocators int64
	TotalAllocated   int64
	PeakAllocated    int64
	MaxSize          int
}

PoolStats provides statistics about the allocator pool

type PriorityItem

type PriorityItem struct {
	Priority int
	Index    int
	Task     PriorityTask
	Worker   func(PriorityTask) int
	Result   chan<- advancedIndexedResult
}

type PriorityQueue

type PriorityQueue []*PriorityItem

PriorityQueue implements a priority queue using a binary heap

func NewPriorityQueue

func NewPriorityQueue() *PriorityQueue

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type PriorityTask

type PriorityTask struct {
	Priority int
	Value    int
}

PriorityTask represents a task with an associated priority level

type ResourceLimits

type ResourceLimits struct {
	MaxCPUUsage    float64
	MaxMemoryUsage int64
}

ResourceLimits defines constraints for worker pool resource usage

type SafeDataFrame

type SafeDataFrame struct {
	// contains filtered or unexported fields
}

SafeDataFrame provides thread-safe access to DataFrame data with copy-on-access semantics

func NewSafeDataFrame

func NewSafeDataFrame(data interface{}) *SafeDataFrame

NewSafeDataFrame creates a new thread-safe DataFrame wrapper

func (*SafeDataFrame) Clone

func (sdf *SafeDataFrame) Clone(allocator memory.Allocator) (interface{}, error)

Clone creates an independent copy for safe parallel access This will be implemented with actual DataFrame integration

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool manages a pool of goroutines for parallel processing

func NewWorkerPool

func NewWorkerPool(numWorkers int) *WorkerPool

NewWorkerPool creates a new worker pool

func (*WorkerPool) Close

func (wp *WorkerPool) Close()

Close shuts down the worker pool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL