Documentation
¶
Overview ¶
Package parallel provides utilities for parallel processing with worker pools.
This package implements a generic worker pool pattern that can be used to parallelize CPU-bound tasks like file parsing and analysis. The pool manages a configurable number of worker goroutines that process work items concurrently.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ProcessAll ¶
ProcessAll is a convenience method that processes all items and returns results as a slice.
This method: 1. Starts the pool 2. Submits all items 3. Closes the pool 4. Collects all results
It's useful for simple batch processing where you have all items upfront.
Types ¶
type ProgressReporter ¶
type ProgressReporter struct {
// contains filtered or unexported fields
}
ProgressReporter is a thread-safe wrapper around status.Reporter.
It provides atomic progress counting and synchronized updates for use in parallel processing scenarios where multiple goroutines need to report progress.
func NewProgressReporter ¶
func NewProgressReporter(inner status.Reporter, prefix string, total int) *ProgressReporter
NewProgressReporter creates a new thread-safe progress reporter.
Parameters:
- inner: the underlying status reporter to delegate to
- prefix: the status prefix (e.g., "[PARSE]")
- total: the total number of items to process (can be updated later)
func (*ProgressReporter) Processed ¶
func (r *ProgressReporter) Processed() int
Processed returns the current count of processed items.
func (*ProgressReporter) RecordProgress ¶
func (r *ProgressReporter) RecordProgress(detail string)
RecordProgress atomically increments the progress counter and updates the status.
This method is safe to call from multiple goroutines concurrently. The detail string provides context about the current item being processed.
func (*ProgressReporter) Reset ¶
func (r *ProgressReporter) Reset()
Reset resets the progress counter to zero.
func (*ProgressReporter) SetTotal ¶
func (r *ProgressReporter) SetTotal(total int)
SetTotal updates the total count of items to process. This is useful when the total is not known at creation time.
func (*ProgressReporter) Update ¶
func (r *ProgressReporter) Update(message string)
Update shows a simple status message through the underlying reporter. This method is synchronized to prevent interleaving with progress updates.
type WorkerPool ¶
WorkerPool manages concurrent task execution with a fixed number of workers.
Type parameters:
- T: the type of work items to process
- R: the type of results produced
The pool uses channels for communication:
- Work items are submitted via Submit()
- Results are collected via the Results() channel
Example usage (drain results concurrently to avoid deadlock with large inputs):
pool := NewWorkerPool(4, func(path string) *FileMetrics {
return parseFile(path)
})
pool.Start()
// Drain results in a separate goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for result := range pool.Results() {
// process result
}
}()
// Submit work items
for _, path := range paths {
pool.Submit(path)
}
pool.Close()
wg.Wait()
Alternatively, use ProcessAll() for simpler batch processing.
func NewWorkerPool ¶
func NewWorkerPool[T any, R any](workers int, processor func(T) R) *WorkerPool[T, R]
NewWorkerPool creates a new worker pool.
Parameters:
- workers: number of worker goroutines (0 = runtime.NumCPU())
- processor: function that processes a work item and returns a result
The pool is not started until Start() is called.
func (*WorkerPool[T, R]) Close ¶
func (p *WorkerPool[T, R]) Close()
Close signals that no more work items will be submitted.
This method closes the work channel, causing workers to finish processing remaining items and then exit. The result channel will be closed after all workers have finished.
If Close() is called without Start() having been called, the result channel will be closed immediately.
Close should only be called once. It is safe to call from any goroutine.
func (*WorkerPool[T, R]) Results ¶
func (p *WorkerPool[T, R]) Results() <-chan R
Results returns the channel of results from processed work items.
The channel will be closed after Close() is called and all workers have finished processing. Callers should range over this channel to collect all results.
func (*WorkerPool[T, R]) Start ¶
func (p *WorkerPool[T, R]) Start()
Start spawns worker goroutines that begin processing work items.
Workers will continue processing until Close() is called and all work items have been processed. Each worker reads from the work channel, processes items using the processor function, and writes results to the result channel.
Start must be called before Submit() or Results() are used.
func (*WorkerPool[T, R]) Submit ¶
func (p *WorkerPool[T, R]) Submit(item T)
Submit adds a work item to the pool for processing.
This method blocks if the work channel buffer is full. Panics if called after Close().
func (*WorkerPool[T, R]) Workers ¶
func (p *WorkerPool[T, R]) Workers() int
Workers returns the number of worker goroutines.