task

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Overview

Package task provides a lightweight goroutine lifecycle manager modeled on flux-local's TaskService. Active tasks are bounded units of work (a single reconciliation, a dependency wait) whose completion is tracked via BlockTillDone. A single Service should be associated with one orchestrator run.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Coalescer

type Coalescer[K comparable] struct {
	// contains filtered or unexported fields
}

Coalescer keeps at most one task per key in flight; submits that arrive while a key is running collapse into a single re-run after it returns. fn must re-read its inputs each call — a coalesced re-run exists precisely because the previous submit's inputs went stale.

func NewCoalescer

func NewCoalescer[K comparable](svc *Service) *Coalescer[K]

NewCoalescer constructs a Coalescer that schedules work onto svc.

func (*Coalescer[K]) Submit

func (c *Coalescer[K]) Submit(ctx context.Context, name string, key K, fn func(context.Context))

Submit schedules fn for key, starting a new active task if key is idle or marking the running slot pending otherwise.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service tracks active goroutines.

func New

func New() *Service

New constructs a Service with the package-default bounded worker pool (runtime.NumCPU() * 4). This was previously unbounded — every Go call spawned a goroutine that ran immediately — which is a foot-gun for embedders that fan out thousands of submits and rely on the pool to throttle. Callers that genuinely need unbounded concurrency must use NewUnbounded explicitly.

func NewBounded

func NewBounded(workers int) *Service

NewBounded constructs a Service that caps the number of concurrently executing active-task bodies at workers. Submitting more does not block — the surplus goroutines exist but wait on an internal semaphore until a slot opens. workers <= 0 disables bounding (equivalent to NewUnbounded).

Sized for I/O-bound work: helm template / oras pull / git clone all release the worker briefly while blocked on the network. A sensible default is runtime.NumCPU() * 4, but callers know their workload better than the package does.

func NewUnbounded

func NewUnbounded() *Service

NewUnbounded constructs a Service with no concurrency cap — every Go submission runs immediately on a fresh goroutine. Use only when the caller's workload is naturally bounded (single-digit submits, fixed-fanout testing harness) or when the caller manages its own throttling on top of the Service. Most production paths should prefer NewBounded with a workload-sized cap, or New for the package default.

func (*Service) ActiveCount

func (s *Service) ActiveCount() int64

ActiveCount returns the number of in-flight tasks. Includes tasks that have been Go'd but are still parked on the worker semaphore (NewBounded). Useful as a quiescence signal — when every reconcile has finished, ActiveCount returns 0.

Callers asking from inside a Go'd body should remember their own goroutine is counted: a "no other work" check needs ActiveCount() > 1, not > 0.

func (*Service) BlockTillDone

func (s *Service) BlockTillDone()

BlockTillDone waits until every active task has finished. Safe to call concurrently with Go.

func (*Service) Failures

func (s *Service) Failures() int64

Failures returns the number of panicked tasks observed.

func (*Service) Go

func (s *Service) Go(ctx context.Context, name string, fn func(context.Context))

Go launches an active task. ctx is propagated to fn. Completion is reported via BlockTillDone. When the Service is bounded (NewBounded), fn waits on the worker semaphore before it executes — but Go itself never blocks.

func (*Service) QuiescenceCh

func (s *Service) QuiescenceCh(threshold int64) <-chan struct{}

QuiescenceCh returns a channel closed when ActiveCount drops to <= threshold. The channel is fresh per call; callers waiting on distinct thresholds work independently. When ActiveCount is already <= threshold at call time, the channel is returned closed.

Used by depwait's render-emission wait to fire the moment no other reconcile is in flight, instead of polling every 100ms. The orchestrator drains exactly once per run, so a successful quiescence signal is a one-shot event.

func (*Service) YieldQuiescent

func (s *Service) YieldQuiescent(fn func())

YieldQuiescent releases the worker slot AND decrements the active count for the duration of fn. Use when fn is a wait on external state that cannot itself produce store mutations — typically depwait blocking on a dep that other tasks must produce. The decrement lets QuiescenceCh fire on the caller's behalf: a depwait-blocked task shouldn't keep the orchestrator from declaring quiescence on its own absence of productive work.

Without this hop, two reconciles both blocked in depwait (e.g. a parent KS waiting on a typo'd dependsOn and its child HR waiting on the parent's status) hold ActiveCount at 2 indefinitely — QuiescenceCh(1) never fires and both waiters ride the full RenderProducingTimeout cap.

MUST be called only from inside a body launched by Service.Go — calling from outside corrupts the active counter.

Both the active increment and the slot re-acquire are deferred so a panic inside fn still restores both counters; without that, Service.Go's outer `defer s.taskDone()` would over-decrement on unwind.

func (*Service) YieldSlot

func (s *Service) YieldSlot(fn func())

YieldSlot releases the worker-pool slot held by the current goroutine, runs fn, then re-acquires a slot before returning. Use this around blocking waits where fn is still doing productive work (helm template running, network fetch in flight) so queued tasks can make progress while the holder is I/O-bound. Without this, N tasks waiting on each other for slot-gated work deadlock under NewBounded(N).

Compare YieldQuiescent: that variant additionally decrements the active count for callers waiting on OTHER tasks' work (depwait). YieldSlot keeps the active count incremented because the caller IS producing — quiescence-aware consumers must NOT see the caller as idle.

MUST be called only from inside a body launched by Service.Go — calling from outside corrupts the semaphore accounting.

The re-acquire is deferred so a panic inside fn still restores the slot count; otherwise Service.Go's outer `defer <-s.sem` would drain a phantom slot on unwind, eventually hanging another goroutine that did own a slot legitimately.

On an unbounded Service (New or NewBounded(<=0)), fn runs unchanged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL