Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("queue closed")
var ErrVirtualPoolClosed = errors.New("virtual pool closed for submit")
Functions ¶
Types ¶
type ErrorCollector ¶
type ErrorCollector struct {
// contains filtered or unexported fields
}
ErrorCollector collects errors for a virtual pool. It stores a snapshotable slice of errors.
func (*ErrorCollector) Errors ¶
func (ec *ErrorCollector) Errors() []error
Errors returns a snapshot of collected errors.
type Pool ¶
type Pool[T any] struct { // contains filtered or unexported fields }
Pool[T] is a worker pool backed by WorkerPoolQueue. It uses an atomic inflight counter + cond to avoid deadlocks between closing the queue and tracking outstanding work.
func NewWorkerPool ¶
NewWorkerPool creates a new Pool. If workerCount <= 0 it defaults to runtime.GOMAXPROCS(0).
func (*Pool[T]) CloseForSubmit ¶
func (p *Pool[T]) CloseForSubmit()
CloseForSubmit indicates the caller will not submit more external (caller-side) tasks. Workers may still call Submit to add child tasks. When inflight reaches zero and queue is empty, the pool will close tasks so workers exit.
func (*Pool[T]) Start ¶
Start spawns workerCount workers that call handler(ctx, item, submit). Handler should process the item and return an error if it wants to abort the whole pool. Handler may call submit(...) to add child tasks (workers are allowed to submit).
type SharedTaskPool ¶
type SharedTaskPool struct {
// contains filtered or unexported fields
}
SharedTaskPool manages a shared worker pool (reusing Pool[Task]) and provides creation of VirtualPools that submit into the shared pool.
func NewSharedTaskPool ¶
func NewSharedTaskPool(parent context.Context, workerCount int) *SharedTaskPool
NewSharedTaskPool constructs a shared pool; caller should call Start() to begin workers.
func (*SharedTaskPool) CloseForSubmit ¶
func (s *SharedTaskPool) CloseForSubmit()
CloseForSubmit proxies to underlying pool when caller is done submitting to all virtuals.
func (*SharedTaskPool) NewVirtualPool ¶
func (s *SharedTaskPool) NewVirtualPool(mode VirtualMode) VirtualPoolI
NewVirtualPool creates and registers a virtual pool on top of the shared pool. mode controls failure semantics.
func (*SharedTaskPool) Wait ¶
func (s *SharedTaskPool) Wait() error
Wait proxies to underlying pool wait. Note: this waits for all tasks on the shared pool.
type Task ¶
Task is a unit of work executed by the shared worker pool. submit allows spawning child tasks into the same logical/virtual pool.
type VirtualMode ¶
type VirtualMode int
VirtualMode controls virtual pool failure semantics.
const ( // VirtualFailFast: first error stops executing further tasks for this virtual pool. VirtualFailFast VirtualMode = iota // VirtualTolerant: errors are collected, tasks continue. VirtualTolerant )
type VirtualPool ¶
type VirtualPool struct {
// contains filtered or unexported fields
}
VirtualPool represents a logical pool view that reuses shared workers. It enforces per-virtual behaviour like fail-fast or tolerant error collection.
func (*VirtualPool) CloseAndWait ¶
func (v *VirtualPool) CloseAndWait()
CloseAndWait is a convenience method that closes the virtual pool for submission and waits for all inflight tasks to complete.
func (*VirtualPool) CloseForSubmit ¶
func (v *VirtualPool) CloseForSubmit()
CloseForSubmit marks this virtual pool as no longer accepting top-level submissions. Note: this does not close the shared pool; caller is responsible for closing the shared pool when all virtual pools are done (call SharedTaskPool.CloseForSubmit()).
func (*VirtualPool) Errors ¶
func (v *VirtualPool) Errors() []error
Errors returns a snapshot of collected errors for tolerant virtual pools. For fail-fast virtual pools this returns the first error if any.
func (*VirtualPool) FirstError ¶
func (v *VirtualPool) FirstError() error
FirstError returns the first encountered error for fail-fast virtual pools, or nil.
func (*VirtualPool) Submit ¶
func (v *VirtualPool) Submit(t Task) error
Submit enqueues a Task into this virtual pool. It wraps the Task into a virtualTask that remembers the virtual identity.
func (*VirtualPool) SubmitFunc ¶
func (v *VirtualPool) SubmitFunc(f TaskFunc) error
SubmitFunc convenience to submit a TaskFunc.
func (*VirtualPool) Wait ¶
func (v *VirtualPool) Wait()
Wait blocks until this virtual pool has been closed for submit and all inflight tasks (including queued tasks) have completed. Call this after CloseForSubmit when you want to wait for the virtual's queue to drain.
type VirtualPoolFactory ¶
type VirtualPoolFactory interface {
NewVirtualPool(mode VirtualMode) VirtualPoolI
}
type VirtualPoolI ¶
type VirtualPoolI interface {
// Submit enqueues a Task into this virtual pool.
Submit(Task) error
// SubmitFunc convenience to submit a TaskFunc.
SubmitFunc(TaskFunc) error
// CloseForSubmit marks this virtual pool as no longer accepting top-level submissions.
CloseForSubmit()
// Wait blocks until the virtual has been closed for submit and all inflight tasks have completed.
Wait()
// CloseAndWait is a convenience method that closes for submission and waits for drain.
CloseAndWait()
// FirstError returns the first encountered error for fail-fast virtual pools, or nil.
FirstError() error
// Errors returns a snapshot of collected errors for tolerant virtual pools.
Errors() []error
}
VirtualPoolI defines the public behaviour used by callers of VirtualPool. It intentionally exposes only the stable, public methods used by consumers (submission, lifecycle control and error inspection).
type WorkerPoolQueue ¶
type WorkerPoolQueue[T any] struct { // contains filtered or unexported fields }
WorkerPoolQueue is a simple, single-mutex MPMC queue. This is easier to reason about than a two-lock variant and avoids lost-wakeup races.
func NewWorkerPoolQueue ¶
func NewWorkerPoolQueue[T any]() *WorkerPoolQueue[T]
NewWorkerPoolQueue constructs a new queue.
func (*WorkerPoolQueue[T]) Close ¶
func (q *WorkerPoolQueue[T]) Close()
func (*WorkerPoolQueue[T]) Get ¶
func (q *WorkerPoolQueue[T]) Get() (T, bool)
func (*WorkerPoolQueue[T]) Len ¶
func (q *WorkerPoolQueue[T]) Len() int
func (*WorkerPoolQueue[T]) Put ¶
func (q *WorkerPoolQueue[T]) Put(v T) error
func (*WorkerPoolQueue[T]) TryGet ¶
func (q *WorkerPoolQueue[T]) TryGet() (T, bool)