progress

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2025 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Overview

Package progress defines primitives for reporting and aggregating the progress of long-running tasks executed by the Fluxor runtime. It abstracts away the underlying communication mechanism so that callers can consume progress updates in a uniform way regardless of whether they are delivered via in-memory channels, message queues or external observers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func UpdateCtx

func UpdateCtx(ctx context.Context, d Delta)

UpdateCtx is a helper that looks up the tracker in ctx (if any) and applies the supplied delta.

Types

type Delta

type Delta struct {
	Total     int
	Completed int
	Skipped   int
	Failed    int
	Running   int
	Pending   int
}

Delta represents an incremental counter change emitted by the allocator, executor or processor. The fields are signed and therefore can be either positive (increment) or negative (decrement).

type Progress

type Progress struct {
	// Identification – informative only, filled when the root workflow starts.
	RootProcessID string
	Workflow      string
	StartedAt     time.Time

	// Counters – modified via Update().
	TotalTasks     int
	CompletedTasks int
	SkippedTasks   int
	FailedTasks    int
	RunningTasks   int
	PendingTasks   int

	sync.Mutex
	// contains filtered or unexported fields
}

Progress keeps aggregated task counters for the root workflow and all its sub-workflows. It is safe for concurrent use.

func FromContext

func FromContext(ctx context.Context) (*Progress, bool)

FromContext extracts the Progress tracker from ctx. It returns (tracker, ok). The second return value is false when the context carries no tracker.

func GetSnapshot

func GetSnapshot(ctx context.Context) (Progress, bool)

GetSnapshot is a convenience wrapper that combines FromContext and Snapshot. The boolean return value is false when the context does not carry a tracker.

func WithNewTracker

func WithNewTracker(ctx context.Context, rootProcessID, workflow string, onChange func(Progress)) (context.Context, *Progress)

WithNewTracker creates a new Progress tracker, embeds it in a derived context and returns both. The caller may optionally pass an onChange callback that will be invoked after every counter update.

func (*Progress) OnChange

func (p *Progress) OnChange(cb func(Progress))

OnChange registers a callback that is invoked after every successful Update. Passing nil disables the callback. Only one callback can be active; subsequent calls overwrite the previous value.

func (*Progress) Snapshot

func (p *Progress) Snapshot() Progress

Snapshot returns a copy of the tracker suitable for read-only inspection.

func (*Progress) Update

func (p *Progress) Update(d Delta)

Update applies the supplied delta to the tracker. It is safe to call from multiple goroutines. If an onChange callback has been registered it will be invoked with a copy of the updated tracker outside the critical section so that the callback can perform slow operations (e.g. JSON encoding, I/O) without blocking engine internals.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL