task

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Overview

Package task provides a lightweight goroutine lifecycle manager modeled on flux-local's TaskService. Active tasks are bounded units of work (a single reconciliation, a dependency wait) whose completion is tracked via BlockTillDone. A single Service should be associated with one orchestrator run.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Coalescer

type Coalescer[K comparable] struct {
	// contains filtered or unexported fields
}

Coalescer keeps at most one task per key in flight; submits that arrive while a key is running collapse into a single re-run after it returns. fn must re-read its inputs each call — a coalesced re-run exists precisely because the previous submit's inputs went stale.

func NewCoalescer

func NewCoalescer[K comparable](svc *Service) *Coalescer[K]

NewCoalescer constructs a Coalescer that schedules work onto svc.

func (*Coalescer[K]) Submit

func (c *Coalescer[K]) Submit(ctx context.Context, name string, key K, fn func(context.Context))

Submit schedules fn for key, starting a new active task if key is idle or marking the running slot pending otherwise.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service tracks active goroutines.

func New

func New() *Service

New constructs a Service with the package-default bounded worker pool (runtime.NumCPU() * 4). This was previously unbounded — every Go call spawned a goroutine that ran immediately — which is a foot-gun for embedders that fan out thousands of submits and rely on the pool to throttle. Callers that genuinely need unbounded concurrency must use NewUnbounded explicitly.

func NewBounded

func NewBounded(workers int) *Service

NewBounded constructs a Service that caps the number of concurrently executing active-task bodies at workers. Submitting more does not block — the surplus goroutines exist but wait on an internal semaphore until a slot opens. workers <= 0 disables bounding (equivalent to NewUnbounded).

Sized for I/O-bound work: helm template / oras pull / git clone all release the worker briefly while blocked on the network. A sensible default is runtime.NumCPU() * 4, but callers know their workload better than the package does.

func NewUnbounded

func NewUnbounded() *Service

NewUnbounded constructs a Service with no concurrency cap — every Go submission runs immediately on a fresh goroutine. Use only when the caller's workload is naturally bounded (single-digit submits, fixed-fanout testing harness) or when the caller manages its own throttling on top of the Service. Most production paths should prefer NewBounded with a workload-sized cap, or New for the package default.

func (*Service) BlockTillDone

func (s *Service) BlockTillDone()

BlockTillDone waits until every active task has finished. Safe to call concurrently with Go.

func (*Service) Failures

func (s *Service) Failures() int64

Failures returns the number of panicked tasks observed.

func (*Service) Go

func (s *Service) Go(ctx context.Context, name string, fn func(context.Context))

Go launches an active task. ctx is propagated to fn. Completion is reported via BlockTillDone. When the Service is bounded (NewBounded), fn waits on the worker semaphore before it executes — but Go itself never blocks.

func (*Service) YieldSlot

func (s *Service) YieldSlot(fn func())

YieldSlot releases the worker-pool slot held by the current goroutine, runs fn, then re-acquires a slot before returning. Use this around blocking waits where fn is still doing productive work (helm template running, network fetch in flight) so queued tasks can make progress while the holder is I/O-bound. Without this, N tasks waiting on each other for slot-gated work deadlock under NewBounded(N).

MUST be called only from inside a body launched by Service.Go — calling from outside corrupts the semaphore accounting.

The re-acquire is deferred so a panic inside fn still restores the slot count; otherwise Service.Go's outer `defer <-s.sem` would drain a phantom slot on unwind, eventually hanging another goroutine that did own a slot legitimately.

On an unbounded Service (New or NewBounded(<=0)), fn runs unchanged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL