parallel

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package parallel provides utilities for parallel processing with worker pools.

This package implements a generic worker pool pattern that can be used to parallelize CPU-bound tasks like file parsing and analysis. The pool manages a configurable number of worker goroutines that process work items concurrently.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ProcessAll

func ProcessAll[T any, R any](workers int, items []T, processor func(T) R) []R

ProcessAll is a convenience method that processes all items and returns results as a slice.

This method: 1. Starts the pool 2. Submits all items 3. Closes the pool 4. Collects all results

It's useful for simple batch processing where you have all items upfront.

Types

type ProgressReporter

type ProgressReporter struct {
	// contains filtered or unexported fields
}

ProgressReporter is a thread-safe wrapper around status.Reporter.

It provides atomic progress counting and synchronized updates for use in parallel processing scenarios where multiple goroutines need to report progress.

func NewProgressReporter

func NewProgressReporter(inner status.Reporter, prefix string, total int) *ProgressReporter

NewProgressReporter creates a new thread-safe progress reporter.

Parameters:

  • inner: the underlying status reporter to delegate to
  • prefix: the status prefix (e.g., "[PARSE]")
  • total: the total number of items to process (can be updated later)

func (*ProgressReporter) Processed

func (r *ProgressReporter) Processed() int

Processed returns the current count of processed items.

func (*ProgressReporter) RecordProgress

func (r *ProgressReporter) RecordProgress(detail string)

RecordProgress atomically increments the progress counter and updates the status.

This method is safe to call from multiple goroutines concurrently. The detail string provides context about the current item being processed.

func (*ProgressReporter) Reset

func (r *ProgressReporter) Reset()

Reset resets the progress counter to zero.

func (*ProgressReporter) SetTotal

func (r *ProgressReporter) SetTotal(total int)

SetTotal updates the total count of items to process. This is useful when the total is not known at creation time.

func (*ProgressReporter) Update

func (r *ProgressReporter) Update(message string)

Update shows a simple status message through the underlying reporter. This method is synchronized to prevent interleaving with progress updates.

type WorkerPool

type WorkerPool[T any, R any] struct {
	// contains filtered or unexported fields
}

WorkerPool manages concurrent task execution with a fixed number of workers.

Type parameters:

  • T: the type of work items to process
  • R: the type of results produced

The pool uses channels for communication:

  • Work items are submitted via Submit()
  • Results are collected via the Results() channel

Example usage (drain results concurrently to avoid deadlock with large inputs):

pool := NewWorkerPool(4, func(path string) *FileMetrics {
    return parseFile(path)
})
pool.Start()

// Drain results in a separate goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    for result := range pool.Results() {
        // process result
    }
}()

// Submit work items
for _, path := range paths {
    pool.Submit(path)
}
pool.Close()
wg.Wait()

Alternatively, use ProcessAll() for simpler batch processing.

func NewWorkerPool

func NewWorkerPool[T any, R any](workers int, processor func(T) R) *WorkerPool[T, R]

NewWorkerPool creates a new worker pool.

Parameters:

  • workers: number of worker goroutines (0 = runtime.NumCPU())
  • processor: function that processes a work item and returns a result

The pool is not started until Start() is called.

func (*WorkerPool[T, R]) Close

func (p *WorkerPool[T, R]) Close()

Close signals that no more work items will be submitted.

This method closes the work channel, causing workers to finish processing remaining items and then exit. The result channel will be closed after all workers have finished.

If Close() is called without Start() having been called, the result channel will be closed immediately.

Close should only be called once. It is safe to call from any goroutine.

func (*WorkerPool[T, R]) Results

func (p *WorkerPool[T, R]) Results() <-chan R

Results returns the channel of results from processed work items.

The channel will be closed after Close() is called and all workers have finished processing. Callers should range over this channel to collect all results.

func (*WorkerPool[T, R]) Start

func (p *WorkerPool[T, R]) Start()

Start spawns worker goroutines that begin processing work items.

Workers will continue processing until Close() is called and all work items have been processed. Each worker reads from the work channel, processes items using the processor function, and writes results to the result channel.

Start must be called before Submit() or Results() are used.

func (*WorkerPool[T, R]) Submit

func (p *WorkerPool[T, R]) Submit(item T)

Submit adds a work item to the pool for processing.

This method blocks if the work channel buffer is full. Panics if called after Close().

func (*WorkerPool[T, R]) Workers

func (p *WorkerPool[T, R]) Workers() int

Workers returns the number of worker goroutines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL