pool

package
v0.0.67 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("queue closed")
View Source
var ErrVirtualPoolClosed = errors.New("virtual pool closed for submit")

Functions

func WorkerIndexFromContext

func WorkerIndexFromContext(ctx context.Context) (int, bool)

WorkerIndexFromContext returns the worker index stored in ctx, if present.

Types

type ErrorCollector

type ErrorCollector struct {
	// contains filtered or unexported fields
}

ErrorCollector collects errors for a virtual pool. It stores a snapshotable slice of errors.

func (*ErrorCollector) Errors

func (ec *ErrorCollector) Errors() []error

Errors returns a snapshot of collected errors.

type Pool

type Pool[T any] struct {
	// contains filtered or unexported fields
}

Pool[T] is a worker pool backed by WorkerPoolQueue. It uses an atomic inflight counter + cond to avoid deadlocks between closing the queue and tracking outstanding work.

func NewWorkerPool

func NewWorkerPool[T any](parent context.Context, workerCount int) *Pool[T]

NewWorkerPool creates a new Pool. If workerCount <= 0 it defaults to runtime.GOMAXPROCS(0).

func (*Pool[T]) CloseForSubmit

func (p *Pool[T]) CloseForSubmit()

CloseForSubmit indicates the caller will not submit more external (caller-side) tasks. Workers may still call Submit to add child tasks. When inflight reaches zero and queue is empty, the pool will close tasks so workers exit.

func (*Pool[T]) Start

func (p *Pool[T]) Start(handler func(ctx context.Context, item T, submit func(T) error) error)

Start spawns workerCount workers that call handler(ctx, item, submit). Handler should process the item and return an error if it wants to abort the whole pool. Handler may call submit(...) to add child tasks (workers are allowed to submit).

func (*Pool[T]) Submit

func (p *Pool[T]) Submit(item T) error

Submit enqueues a task. It increments the inflight counter BEFORE attempting to enqueue. If ctx is already cancelled, Submit returns ctx.Err() and does NOT increment inflight.

func (*Pool[T]) Wait

func (p *Pool[T]) Wait() error

Wait blocks until all workers have exited and returns the first error (if any).

type SharedTaskPool

type SharedTaskPool struct {
	// contains filtered or unexported fields
}

SharedTaskPool manages a shared worker pool (reusing Pool[Task]) and provides creation of VirtualPools that submit into the shared pool.

func NewSharedTaskPool

func NewSharedTaskPool(parent context.Context, workerCount int) *SharedTaskPool

NewSharedTaskPool constructs a shared pool; caller should call Start() to begin workers.

func (*SharedTaskPool) CloseForSubmit

func (s *SharedTaskPool) CloseForSubmit()

CloseForSubmit proxies to underlying pool when caller is done submitting to all virtuals.

func (*SharedTaskPool) NewVirtualPool

func (s *SharedTaskPool) NewVirtualPool(mode VirtualMode) VirtualPoolI

NewVirtualPool creates and registers a virtual pool on top of the shared pool. mode controls failure semantics.

func (*SharedTaskPool) Wait

func (s *SharedTaskPool) Wait() error

Wait proxies to underlying pool wait. Note: this waits for all tasks on the shared pool.

type Task

type Task interface {
	Run(ctx context.Context, submit func(Task) error) error
}

Task is a unit of work executed by the shared worker pool. submit allows spawning child tasks into the same logical/virtual pool.

type TaskFunc

type TaskFunc func(ctx context.Context, submit func(Task) error) error

TaskFunc convenience adapter so closures are easy to submit.

func (TaskFunc) Run

func (f TaskFunc) Run(ctx context.Context, submit func(Task) error) error

type VirtualMode

type VirtualMode int

VirtualMode controls virtual pool failure semantics.

const (
	// VirtualFailFast: first error stops executing further tasks for this virtual pool.
	VirtualFailFast VirtualMode = iota
	// VirtualTolerant: errors are collected, tasks continue.
	VirtualTolerant
)

type VirtualPool

type VirtualPool struct {
	// contains filtered or unexported fields
}

VirtualPool represents a logical pool view that reuses shared workers. It enforces per-virtual behaviour like fail-fast or tolerant error collection.

func (*VirtualPool) CloseAndWait

func (v *VirtualPool) CloseAndWait()

CloseAndWait is a convenience method that closes the virtual pool for submission and waits for all inflight tasks to complete.

func (*VirtualPool) CloseForSubmit

func (v *VirtualPool) CloseForSubmit()

CloseForSubmit marks this virtual pool as no longer accepting top-level submissions. Note: this does not close the shared pool; caller is responsible for closing the shared pool when all virtual pools are done (call SharedTaskPool.CloseForSubmit()).

func (*VirtualPool) Errors

func (v *VirtualPool) Errors() []error

Errors returns a snapshot of collected errors for tolerant virtual pools. For fail-fast virtual pools this returns the first error if any.

func (*VirtualPool) FirstError

func (v *VirtualPool) FirstError() error

FirstError returns the first encountered error for fail-fast virtual pools, or nil.

func (*VirtualPool) Submit

func (v *VirtualPool) Submit(t Task) error

Submit enqueues a Task into this virtual pool. It wraps the Task into a virtualTask that remembers the virtual identity.

func (*VirtualPool) SubmitFunc

func (v *VirtualPool) SubmitFunc(f TaskFunc) error

SubmitFunc convenience to submit a TaskFunc.

func (*VirtualPool) Wait

func (v *VirtualPool) Wait()

Wait blocks until this virtual pool has been closed for submit and all inflight tasks (including queued tasks) have completed. Call this after CloseForSubmit when you want to wait for the virtual's queue to drain.

type VirtualPoolFactory

type VirtualPoolFactory interface {
	NewVirtualPool(mode VirtualMode) VirtualPoolI
}

type VirtualPoolI

type VirtualPoolI interface {
	// Submit enqueues a Task into this virtual pool.
	Submit(Task) error
	// SubmitFunc convenience to submit a TaskFunc.
	SubmitFunc(TaskFunc) error
	// CloseForSubmit marks this virtual pool as no longer accepting top-level submissions.
	CloseForSubmit()
	// Wait blocks until the virtual has been closed for submit and all inflight tasks have completed.
	Wait()
	// CloseAndWait is a convenience method that closes for submission and waits for drain.
	CloseAndWait()
	// FirstError returns the first encountered error for fail-fast virtual pools, or nil.
	FirstError() error
	// Errors returns a snapshot of collected errors for tolerant virtual pools.
	Errors() []error
}

VirtualPoolI defines the public behaviour used by callers of VirtualPool. It intentionally exposes only the stable, public methods used by consumers (submission, lifecycle control and error inspection).

type WorkerPoolQueue

type WorkerPoolQueue[T any] struct {
	// contains filtered or unexported fields
}

WorkerPoolQueue is a simple, single-mutex MPMC queue. This is easier to reason about than a two-lock variant and avoids lost-wakeup races.

func NewWorkerPoolQueue

func NewWorkerPoolQueue[T any]() *WorkerPoolQueue[T]

NewWorkerPoolQueue constructs a new queue.

func (*WorkerPoolQueue[T]) Close

func (q *WorkerPoolQueue[T]) Close()

func (*WorkerPoolQueue[T]) Get

func (q *WorkerPoolQueue[T]) Get() (T, bool)

func (*WorkerPoolQueue[T]) Len

func (q *WorkerPoolQueue[T]) Len() int

func (*WorkerPoolQueue[T]) Put

func (q *WorkerPoolQueue[T]) Put(v T) error

func (*WorkerPoolQueue[T]) TryGet

func (q *WorkerPoolQueue[T]) TryGet() (T, bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL