Documentation
¶
Overview ¶
Package task provides a lightweight goroutine lifecycle manager modeled on flux-local's TaskService. Active tasks are bounded units of work (a single reconciliation, a dependency wait) whose completion is tracked via BlockTillDone. A single Service should be associated with one orchestrator run.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Coalescer ¶
type Coalescer[K comparable] struct { // contains filtered or unexported fields }
Coalescer keeps at most one task per key in flight; submits that arrive while a key is running collapse into a single re-run after it returns. fn must re-read its inputs each call — a coalesced re-run exists precisely because the previous submit's inputs went stale.
func NewCoalescer ¶
func NewCoalescer[K comparable](svc *Service) *Coalescer[K]
NewCoalescer constructs a Coalescer that schedules work onto svc.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service tracks active goroutines.
func New ¶
func New() *Service
New constructs a Service with the package-default bounded worker pool (runtime.NumCPU() * 4). This was previously unbounded — every Go call spawned a goroutine that ran immediately — which is a foot-gun for embedders that fan out thousands of submits and rely on the pool to throttle. Callers that genuinely need unbounded concurrency must use NewUnbounded explicitly.
func NewBounded ¶
NewBounded constructs a Service that caps the number of concurrently executing active-task bodies at workers. Submitting more does not block — the surplus goroutines exist but wait on an internal semaphore until a slot opens. workers <= 0 disables bounding (equivalent to NewUnbounded).
Sized for I/O-bound work: helm template / oras pull / git clone all release the worker briefly while blocked on the network. A sensible default is runtime.NumCPU() * 4, but callers know their workload better than the package does.
func NewUnbounded ¶
func NewUnbounded() *Service
NewUnbounded constructs a Service with no concurrency cap — every Go submission runs immediately on a fresh goroutine. Use only when the caller's workload is naturally bounded (single-digit submits, fixed-fanout testing harness) or when the caller manages its own throttling on top of the Service. Most production paths should prefer NewBounded with a workload-sized cap, or New for the package default.
func (*Service) BlockTillDone ¶
func (s *Service) BlockTillDone()
BlockTillDone waits until every active task has finished. Safe to call concurrently with Go.
func (*Service) Go ¶
Go launches an active task. ctx is propagated to fn. Completion is reported via BlockTillDone. When the Service is bounded (NewBounded), fn waits on the worker semaphore before it executes — but Go itself never blocks.
func (*Service) YieldSlot ¶
func (s *Service) YieldSlot(fn func())
YieldSlot releases the worker-pool slot held by the current goroutine, runs fn, then re-acquires a slot before returning. Use this around blocking waits where fn is still doing productive work (helm template running, network fetch in flight) so queued tasks can make progress while the holder is I/O-bound. Without this, N tasks waiting on each other for slot-gated work deadlock under NewBounded(N).
MUST be called only from inside a body launched by Service.Go — calling from outside corrupts the semaphore accounting.
The re-acquire is deferred so a panic inside fn still restores the slot count; otherwise Service.Go's outer `defer <-s.sem` would drain a phantom slot on unwind, eventually hanging another goroutine that did own a slot legitimately.
On an unbounded Service (New or NewBounded(<=0)), fn runs unchanged.