Documentation
¶
Overview ¶
Package task provides a comprehensive concurrent task execution system with progress tracking, status management, and visual rendering capabilities.
Quick Start ¶
The task system provides a simple interface for running concurrent operations with visual feedback:
import "github.com/flanksource/clicky/task"
// Start a simple task
task := task.StartTask("Download File", func(ctx flanksourceContext.Context, t *task.Task) error {
t.Infof("Starting download...")
// Perform work
time.Sleep(2 * time.Second)
t.Infof("Download complete")
return nil
})
// Wait for completion
result := task.WaitFor()
if result.Error != nil {
fmt.Printf("Task failed: %v\n", result.Error)
}
Core Concepts ¶
Task Lifecycle ¶
Tasks progress through well-defined states with visual indicators:
- StatusPending (⏳): Task is queued but not yet started - StatusRunning (⟳): Task is currently executing - StatusSuccess (✓): Task completed successfully - StatusFailed (✗): Task failed with an error - StatusWarning (⚠): Task completed with warnings - StatusCancelled (⊘): Task was canceled
Manager ¶
The Manager coordinates task execution and provides visual rendering:
manager := task.NewManager( task.WithMaxConcurrency(5), // Limit concurrent tasks task.WithVerbose(true), // Enable verbose logging task.WithNoProgress(false), // Show progress bars ) // Tasks automatically use the global manager if none specified
Task Groups ¶
Groups organize related tasks and provide concurrency control:
group := task.NewGroup("Database Migration",
task.WithConcurrency(2), // Max 2 concurrent tasks in group
)
// Add tasks to the group
task1 := group.Add("Migrate Users", func(ctx, t) error { ... })
task2 := group.Add("Migrate Orders", func(ctx, t) error { ... })
// Wait for all tasks in the group
result := group.WaitFor()
TypedTask for Type Safety ¶
TypedTask provides compile-time type safety for task results:
// Define a typed task that returns a string
task := task.StartTask("Fetch Data", func(ctx flanksourceContext.Context, t *task.Task) (string, error) {
// Perform work and return typed result
return "Hello, World!", nil
})
// Get typed result
result, err := task.GetResult()
// result is of type string
// Or wait and get result in one call
wait := task.WaitFor()
if wait.Error != nil {
// Handle error
}
TypedGroup ¶
Groups can also be typed for consistent result handling:
group := task.NewTypedGroup[UserData]("Load Users")
user1 := group.Add("Load User 1", func(ctx, t) (UserData, error) {
return loadUser(1)
})
user2 := group.Add("Load User 2", func(ctx, t) (UserData, error) {
return loadUser(2)
})
// Get all results as map
results, err := group.GetResults()
if err != nil {
return err
}
for task, userData := range results {
fmt.Printf("Task %s loaded: %+v\n", task.Name(), userData)
}
Status and Health System ¶
The task system includes a rich status and health reporting system:
Status Types ¶
Tasks support multiple status paradigms:
// Standard task statuses task.SetStatus(task.StatusSuccess) task.SetStatus(task.StatusFailed) task.SetStatus(task.StatusWarning) // Test-style statuses task.SetStatus(task.StatusPASS) task.SetStatus(task.StatusFAIL) task.SetStatus(task.StatusSKIP)
Health Mixin ¶
Results can implement HealthMixin for automatic status determination:
type DatabaseResult struct {
Connected bool
Error error
}
func (r DatabaseResult) Health() task.Health {
if r.Error != nil {
return task.HealthError
}
if !r.Connected {
return task.HealthWarning
}
return task.HealthOK
}
// Task status will automatically reflect the health
task.SetResult(DatabaseResult{Connected: true})
// Task status becomes StatusSuccess automatically
Visual Styling ¶
Status information includes visual styling with Tailwind CSS classes:
status := task.StatusSuccess
fmt.Println(status.Icon()) // ✓
fmt.Println(status.Style()) // "text-green-600"
// Apply status styling to text
text := api.Text{Content: "Operation Complete"}
styledText := status.Apply(text)
Concurrency Control ¶
Manager-Level Concurrency ¶
Control global task concurrency:
manager := task.NewManager(task.WithMaxConcurrency(10)) // Maximum 10 tasks running simultaneously
Group-Level Concurrency ¶
Fine-grained control within groups:
group := task.NewGroup("API Calls", task.WithConcurrency(3))
// Maximum 3 concurrent tasks within this group
Semaphore-Based Control ¶
Groups use semaphores for precise concurrency management:
// Group automatically handles semaphore acquisition/release
group.Add("Task 1", taskFunc1) // Acquires semaphore
group.Add("Task 2", taskFunc2) // Waits if limit reached
// Semaphore released when task completes
Error Handling and Retry ¶
Basic Error Handling ¶
Tasks provide multiple ways to handle errors:
task := task.StartTask("Risky Operation", func(ctx, t) error {
if someCondition {
return errors.New("operation failed")
}
return nil
})
result := task.WaitFor()
if result.Error != nil {
fmt.Printf("Task failed: %v\n", result.Error)
fmt.Printf("Status: %s\n", result.Status)
}
Retry Configuration ¶
Tasks support automatic retry with exponential backoff:
retryConfig := task.RetryConfig{
RetryableErrors: []string{"timeout", "connection", "rate limit"},
BaseDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
BackoffFactor: 2.0,
JitterFactor: 0.1,
MaxRetries: 3,
}
task := task.StartTaskWithOptions("Flaky Operation", taskFunc,
task.WithRetry(retryConfig),
)
Fatal Errors ¶
For unrecoverable errors that should stop execution:
task.Fatal(errors.New("critical system failure"))
// Immediately stops execution and exits program
Logging and Progress ¶
Built-in Logging ¶
Tasks include comprehensive logging capabilities:
task := task.StartTask("Process Data", func(ctx, t) error {
t.Infof("Starting data processing...")
t.Debugf("Processing batch %d", batchNum)
if err := processData(); err != nil {
t.Errorf("Failed to process batch: %v", err)
return err
}
t.Infof("Processing complete")
return nil
})
Log levels: - t.Tracef(): Detailed tracing (lowest level) - t.Debugf(): Debug information - t.Infof(): General information - t.Warnf(): Warnings - t.Errorf(): Errors - t.Fatalf(): Fatal errors
Progress Tracking ¶
Tasks support progress indication:
task := task.StartTask("Upload Files", func(ctx, t) error {
files := getFilesToUpload()
total := len(files)
for i, file := range files {
t.SetProgress(i, total)
err := uploadFile(file)
if err != nil {
return err
}
}
t.SetProgress(total, total) // 100% complete
return nil
})
Integration Examples ¶
CLI Tool Integration ¶
func runCommand(args []string) error {
manager := task.NewManager(
task.WithVerbose(verbose),
task.WithNoProgress(noProgress),
)
// Start background tasks
task1 := task.StartTask("Validate Input", validateInput)
task2 := task.StartTask("Load Configuration", loadConfig)
// Wait for prerequisites
task1.WaitFor()
task2.WaitFor()
// Main processing
mainTask := task.StartTask("Process Data", processData)
return mainTask.WaitFor().Error
}
HTTP Server Integration ¶
func handleRequest(w http.ResponseWriter, r *http.Request) {
requestID := generateRequestID()
task := task.StartTask(fmt.Sprintf("Handle Request %s", requestID),
func(ctx, t) error {
t.Infof("Processing request from %s", r.RemoteAddr)
// Process request
result, err := processRequest(r)
if err != nil {
t.Errorf("Request failed: %v", err)
return err
}
t.Infof("Request completed successfully")
return writeResponse(w, result)
})
// Wait for completion
if result := task.WaitFor(); result.Error != nil {
http.Error(w, result.Error.Error(), 500)
}
}
Batch Processing ¶
func processBatch(items []Item) error {
group := task.NewTypedGroup[ProcessedItem]("Batch Processing",
task.WithConcurrency(5),
)
// Process items concurrently
for i, item := range items {
group.Add(fmt.Sprintf("Process Item %d", i+1),
func(ctx, t) (ProcessedItem, error) {
return processItem(item)
})
}
// Wait for all processing to complete
results, err := group.GetResults()
if err != nil {
return fmt.Errorf("batch processing failed: %w", err)
}
// Handle results
for task, result := range results {
fmt.Printf("Task %s: %+v\n", task.Name(), result)
}
return nil
}
Testing Integration ¶
The task system automatically detects test environments and adjusts behavior:
func TestDataProcessing(t *testing.T) {
// Progress bars and colors automatically disabled in tests
task := task.StartTask("Test Operation", func(ctx, task) error {
// Task logging available in tests
task.Infof("Running test operation")
return performTestOperation()
})
result := task.WaitFor()
assert.NoError(t, result.Error)
assert.Equal(t, task.StatusSuccess, result.Status)
}
Advanced Features ¶
Task Dependencies ¶
Tasks can depend on other tasks:
prerequisite := task.StartTask("Load Configuration", loadConfig)
mainTask := task.StartTaskWithOptions("Main Process", mainProcess,
task.WithDependencies(prerequisite),
)
// mainTask won't start until prerequisite completes successfully
Task Identity and Deduplication ¶
Prevent duplicate tasks with identity tracking:
task1 := task.StartTaskWithOptions("Download File", downloadFunc,
task.WithIdentity("download-file-xyz"),
)
// This will return the existing task instead of creating a new one
task2 := task.StartTaskWithOptions("Download File", downloadFunc,
task.WithIdentity("download-file-xyz"),
)
// task1 and task2 reference the same underlying task
Custom Styling and Themes ¶
Tasks support custom visual themes:
customTheme := api.Theme{
Success: "text-blue-600",
Error: "text-purple-600",
Warning: "text-orange-500",
}
manager := task.NewManager(task.WithTheme(customTheme))
Graceful Shutdown ¶
Handle interrupts gracefully:
manager := task.NewManager(
task.WithGracefulTimeout(30 * time.Second),
task.WithInterruptHandler(func() {
fmt.Println("Shutting down gracefully...")
}),
)
// Manager will handle SIGINT/SIGTERM and allow running tasks to complete
// Terminal Safety // // The task system saves and restores terminal state automatically: // // - Normal exit: restored via shutdown hooks // - SIGINT: restored during graceful shutdown // - Double SIGINT: restored before forced os.Exit(1) // - Panic in main: restored if main defers shutdown.RecoverAndShutdown() // - Panic in task func: caught by worker, task marked as failed, process continues // // For panic protection in main, use: // // func main() { // defer shutdown.RecoverAndShutdown() // // ... start tasks, wait, etc. // } // // RecoverAndShutdown runs all shutdown hooks, restores the terminal, then // re-panics so the stack trace is preserved.
The task package provides a complete solution for concurrent task execution with rich visual feedback, comprehensive error handling, and flexible configuration options suitable for CLI tools, servers, and batch processing applications.
Index ¶
- Variables
- func AcquirePromptTerminal() (func(), bool)
- func BindManagerFlags(flags *flag.FlagSet, options *ManagerOptions)
- func BindManagerPFlags(flags *pflag.FlagSet, options *ManagerOptions)
- func CancelAll()
- func ClearTasks()
- func Debug() string
- func GCRuns()
- func GatedStderr() io.Writer
- func IsForceInteractive() bool
- func IsInteractiveRenderActive() bool
- func IsNoRender() bool
- func JSONHandler(taskIDs ...string) http.Handler
- func RegisterHandlers(mux *http.ServeMux, prefix string)
- func RunHandler() http.Handler
- func RunsHandler() http.Handler
- func RunsSSEHandler(supplement func(RunFilter) []RunMeta) http.Handler
- func SSEHandler(taskIDs ...string) http.Handler
- func SetForceInteractive(force bool)
- func SetGracefulTimeout(timeout time.Duration)
- func SetInterruptHandler(fn func())
- func SetLiveRenderer(r LiveRenderer)
- func SetMaxConcurrent(max int)
- func SetNoColor(noColor bool)
- func SetNoProgress(noProgress bool)
- func SetNoRender(noRender bool)
- func SetRetryConfig(config RetryConfig)
- func SetVerbose(verbose bool)
- func StartCapturingOutput()
- func StopCapturingOutput()
- func StopTask(id string) bool
- func Wait() int
- func WaitForAllTasks()
- func WaitSilent() int
- type Batch
- type BatchResult
- type Group
- func (g *Group) Cancel()
- func (g *Group) FinishedAt() time.Time
- func (g *Group) GetTasks() []Taskable
- func (g *Group) ID() string
- func (g *Group) IsGroup() bool
- func (g *Group) Metadata() GroupMetadata
- func (g *Group) Name() string
- func (g *Group) StartedAt() time.Time
- func (g *Group) Status() Status
- type GroupMetadata
- type Health
- type HealthMixin
- type LiveRenderer
- type LogEntry
- type Manager
- func (tm *Manager) PlainRender()
- func (tm *Manager) Pretty() api.Text
- func (tm *Manager) Run() error
- func (tm *Manager) Start(name string, opts ...Option) *Task
- func (tm *Manager) StartCapturingOutput()
- func (tm *Manager) StartWithResult(name string, ...) *Task
- func (tm *Manager) StopCapturingOutput()
- type ManagerOptions
- type Option
- func WithDependencies(deps ...*Task) Option
- func WithFunc(fn func(flanksourceContext.Context, *Task) error) Option
- func WithIdentity(identity string) Option
- func WithModel(modelName string) Option
- func WithPriority(priority int) Option
- func WithPrompt(prompt string) Option
- func WithRetryConfig(config RetryConfig) Option
- func WithTaskTimeout(d time.Duration) Option
- func WithTimeout(d time.Duration) Option
- type OutputEntry
- type RetryConfig
- type RunFilter
- type RunMeta
- type Status
- type Task
- func (t *Task) Cancel()
- func (t *Task) ClearLogs()
- func (t *Task) Context() context.Context
- func (t *Task) Debugf(format string, args ...interface{})
- func (t *Task) Description() string
- func (t *Task) Duration() time.Duration
- func (t *Task) Error() error
- func (t *Task) Errorf(format string, args ...interface{})
- func (t *Task) Failed() *Task
- func (t *Task) FailedWithError(err error) (*Task, error)
- func (t *Task) Fatal(err error)
- func (t *Task) Fatalf(format string, args ...interface{})
- func (t *Task) FlanksourceContext() flanksourceContext.Context
- func (t *Task) GetLevel() logger.LogLevel
- func (t *Task) GetResult() (interface{}, error)
- func (t *Task) GetSlogLogger() *slog.Logger
- func (t *Task) GetTask() *Task
- func (t *Task) GetTypedResult(target interface{}) error
- func (t *Task) ID() string
- func (t *Task) Identity() string
- func (t *Task) Infof(format string, args ...interface{})
- func (t *Task) IsDebugEnabled() bool
- func (t *Task) IsGroup() bool
- func (t *Task) IsLevelEnabled(level logger.LogLevel) bool
- func (t *Task) IsOk() bool
- func (t *Task) IsTraceEnabled() bool
- func (t *Task) Name() string
- func (t *Task) Named(name string) logger.Logger
- func (t *Task) PopDirty() bool
- func (t *Task) Pretty() api.Text
- func (t *Task) Progress() (value, maximum int)
- func (t *Task) SetDescription(description string)
- func (t *Task) SetLogLevel(level any)
- func (t *Task) SetMinLogLevel(level any)
- func (t *Task) SetName(name string)
- func (t *Task) SetProgress(value, maximum int)
- func (t *Task) SetResult(result interface{})
- func (t *Task) SetStatus(status Status)
- func (t *Task) StartTime() time.Time
- func (t *Task) Status() Status
- func (t *Task) Success() *Task
- func (t *Task) Tracef(format string, args ...interface{})
- func (t *Task) V(level any) logger.Verbose
- func (t *Task) WaitFor() *WaitResult
- func (t *Task) WaitTime() time.Duration
- func (t *Task) Warnf(format string, args ...interface{})
- func (t *Task) Warning() *Task
- func (t *Task) WithSkipReportLevel(i int) logger.Logger
- func (t *Task) WithV(level any) logger.Logger
- func (t *Task) WithValues(keysAndValues ...interface{}) logger.Logger
- func (t *Task) WithoutName() logger.Logger
- type TaskFunc
- type TaskGroupOption
- type TaskResult
- type TaskSnapshot
- type Taskable
- type TypedGroup
- func (g TypedGroup[T]) Add(name string, taskFunc func(flanksourceContext.Context, *Task) (T, error), ...) TypedTask[T]
- func (g *TypedGroup[T]) Duration() time.Duration
- func (g TypedGroup[T]) GetResults() (map[TypedTask[T]]T, error)
- func (g TypedGroup[T]) IsGroup() bool
- func (g *TypedGroup[T]) WaitFor() *WaitResult
- type TypedTask
- type WaitResult
- type Waitable
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrBatchTimeout is returned when the entire batch exceeds its timeout ErrBatchTimeout = errors.New("batch execution exceeded timeout") // ErrItemTimeout is returned when an individual item exceeds its timeout ErrItemTimeout = errors.New("item execution exceeded timeout") )
var OnBeforeGC func(groupID string, snapshots []TaskSnapshot)
OnBeforeGC, if non-nil, is called with each group's full snapshot just before GCRuns removes it from the in-memory manager. The callback receives the group's stable ID and the full snapshot slice (group + child tasks). It is called while global.mu is held, so it must not call back into the task package.
Functions ¶
func AcquirePromptTerminal ¶
func AcquirePromptTerminal() (func(), bool)
AcquirePromptTerminal waits for exclusive ANSI terminal ownership for prompt rendering, stopping the active task renderer first when necessary. The second return value reports whether the prompt displaced an active task renderer.
func BindManagerFlags ¶
func BindManagerFlags(flags *flag.FlagSet, options *ManagerOptions)
BindManagerFlags adds TaskManager flags to standard flag set
func BindManagerPFlags ¶
func BindManagerPFlags(flags *pflag.FlagSet, options *ManagerOptions)
BindManagerPFlags adds TaskManager flags to pflag set (for Cobra)
func ClearTasks ¶
func ClearTasks()
ClearTasks removes all completed tasks (and groups) from the registry, keeping only those still pending or running. Groups must be pruned too: SnapshotAll iterates global.groups, so a leftover completed group would keep surfacing its (now-removed) tasks — e.g. a stale "Running tests" group whose task IDs no longer resolve, causing StopTask lookups to fail.
func GCRuns ¶
func GCRuns()
GCRuns removes finished runs older than runRetention from the global manager. A run is "finished" once it has a non-zero FinishedAt (recorded the first time it is observed terminal). Live runs are retained regardless of age. If OnBeforeGC is set, each evicted group's full snapshot is passed to it before removal.
func GatedStderr ¶
GatedStderr returns a writer that wraps os.Stderr but drops writes while the interactive task renderer owns the TTY. The writer is stateless; each Write rechecks ownership, so a writer captured before the renderer started still gates correctly.
func IsForceInteractive ¶
func IsForceInteractive() bool
IsForceInteractive reports whether the interactive renderer has been forced on via SetForceInteractive or the CLICKY_FORCE_INTERACTIVE env var.
func IsInteractiveRenderActive ¶
func IsInteractiveRenderActive() bool
IsInteractiveRenderActive reports whether the global task manager's interactive render loop currently owns the terminal. Callers that write to os.Stderr can consult this to drop writes that would corrupt the live frame. The check is cheap (atomic load + RLock).
Returns false when the manager is non-interactive (PlainRender mode), before the renderer has acquired the TTY, or after stopRender has released it.
func IsNoRender ¶
func IsNoRender() bool
IsNoRender reports whether task rendering is currently disabled.
func JSONHandler ¶
JSONHandler returns an http.Handler that serves the full task state as JSON. If taskIDs are provided, only groups matching those IDs are included. This is used for initial page load, reconnection after SSE drop, or polling fallback.
func RegisterHandlers ¶
RegisterHandlers wires the generic task-manager API under prefix:
GET {prefix}/tasks run listing (RunMeta[], ?kind=&status=&label=k=v)
GET {prefix}/tasks/stream SSE stream of TaskSnapshots (?tasks=<id>&kind=)
GET {prefix}/tasks/{id} id-scoped snapshot (group + tasks)
The {id} route reuses Go 1.22 net/http path-value routing; the stream route is registered before the {id} route so "stream" is not treated as an id.
func RunHandler ¶
RunHandler serves the id-scoped snapshot (group + its tasks) for a single run. The id is read from the {id} path value of the registered route.
func RunsHandler ¶
RunsHandler serves the run listing (RunMeta per group) for the generic task-manager view, filtered by the ?kind=, ?status=, and repeated ?label=k=v query params.
func RunsSSEHandler ¶
RunsSSEHandler streams the run listing (RunMeta) as SSE. Unlike SSEHandler it never sends a terminal event: a manager view stays subscribed to observe new and changing runs. supplement (may be nil) merges extra runs — e.g. archived or persisted runs the in-memory registry no longer holds; live runs win on id. It emits a single "event: runs" frame carrying the full listing, and only re-emits when the listing changes.
func SSEHandler ¶
SSEHandler returns an http.Handler that streams task events via Server-Sent Events. If taskIDs are provided, only groups matching those IDs are streamed. The handler polls for dirty tasks every 200ms and emits JSON snapshots. It sends an "event: done" when all tracked groups have completed.
func SetForceInteractive ¶
func SetForceInteractive(force bool)
SetForceInteractive forces the interactive renderer on even when the process is running under `go test` or CI (GO_TEST, CI, GITHUB_ACTIONS, …). It also marks the manager as interactive when the underlying FD is not a TTY, so that a PTY-wrapping caller sees the full cursor/clear/redraw ANSI stream. Intended for harnesses that need to audit interactive output — normal applications should rely on the automatic TTY detection.
func SetGracefulTimeout ¶
SetGracefulTimeout sets the timeout for graceful shutdown
func SetInterruptHandler ¶
func SetInterruptHandler(fn func())
SetInterruptHandler sets a custom callback to be called on interrupt
func SetLiveRenderer ¶
func SetLiveRenderer(r LiveRenderer)
SetLiveRenderer installs a custom renderer on the global manager, or removes it when r is nil. It is process-global like SetNoRender; install it for the duration of one command and restore (SetLiveRenderer(nil)) afterwards.
func SetMaxConcurrent ¶
func SetMaxConcurrent(max int)
SetMaxConcurrent sets the maximum number of concurrent tasks
func SetNoProgress ¶
func SetNoProgress(noProgress bool)
SetNoProgress enables or disables progress display
func SetNoRender ¶
func SetNoRender(noRender bool)
SetNoRender enables or disables all task rendering, including final summaries.
func SetRetryConfig ¶
func SetRetryConfig(config RetryConfig)
SetRetryConfig sets the default retry configuration for new tasks
func StartCapturingOutput ¶
func StartCapturingOutput()
StartCapturingOutput replaces os.Stdout and os.Stderr on the global task manager with pipes that buffer everything written until StopCapturingOutput is called. The live task renderer is unaffected because it captured the original file descriptors at manager init time (see Manager.renderer in manager.go). Loggers that captured os.Stderr before this call will also keep writing to the real terminal — only bare fmt.Print / os.Stderr writes after this call get buffered.
func StopCapturingOutput ¶
func StopCapturingOutput()
StopCapturingOutput restores the real os.Stdout and os.Stderr on the global task manager and flushes every buffered line to the restored streams in the order it was written, tagged by stream of origin. Safe to call when capture wasn't started (no-op).
func Wait ¶
func Wait() int
Wait waits for all tasks to complete and returns the appropriate exit code
func WaitForAllTasks ¶
func WaitForAllTasks()
WaitForAllTasks waits for all global tasks to complete and forces a final render
func WaitSilent ¶
func WaitSilent() int
WaitSilent waits for all tasks to complete without displaying results
Types ¶
type Batch ¶
type Batch[T any] struct { Name string Items []func(logger logger.Logger) (T, error) MaxWorkers int Results []T // Timeout is the maximum duration for the entire batch to complete. // Zero value means no timeout (infinite wait until completion or context cancellation). Timeout time.Duration // ItemTimeout is the maximum duration for each individual item to complete. // Zero value means no per-item timeout. ItemTimeout time.Duration }
Example (ItemTimeout) ¶
ExampleBatch_itemTimeout demonstrates using per-item timeouts
package main
import (
"errors"
"fmt"
"time"
"github.com/flanksource/commons/logger"
"github.com/flanksource/clicky/task"
)
func main() {
batch := &task.Batch[int]{
Name: "example-item-timeout",
ItemTimeout: 100 * time.Millisecond, // Each item must complete within 100ms
MaxWorkers: 3,
}
// Add 6 items: 3 fast (50ms), 3 slow (150ms)
for i := 0; i < 6; i++ {
i := i
batch.Items = append(batch.Items, func(log logger.Logger) (int, error) {
if i%2 == 0 {
time.Sleep(50 * time.Millisecond)
} else {
time.Sleep(150 * time.Millisecond)
}
return i, nil
})
}
completed := []int{}
itemTimeouts := 0
for result := range batch.Run() {
if result.Error != nil {
if errors.Is(result.Error, task.ErrItemTimeout) {
itemTimeouts++
}
} else {
completed = append(completed, result.Value)
}
}
fmt.Printf("Completed: %d, Timed out: %d\n", len(completed), itemTimeouts)
}
Output: Completed: 3, Timed out: 3
Example (Timeout) ¶
ExampleBatch_timeout demonstrates using a batch timeout to limit total execution time
package main
import (
"errors"
"fmt"
"time"
"github.com/flanksource/commons/logger"
"github.com/flanksource/clicky/task"
)
func main() {
batch := &task.Batch[string]{
Name: "example-batch-timeout",
Timeout: 500 * time.Millisecond, // Batch must complete within 500ms
MaxWorkers: 2,
}
// Add 10 items that each take 200ms - only ~5 will complete before timeout
for i := 0; i < 10; i++ {
i := i
batch.Items = append(batch.Items, func(log logger.Logger) (string, error) {
time.Sleep(200 * time.Millisecond)
return fmt.Sprintf("item-%d", i), nil
})
}
completed := []string{}
timedOut := false
for result := range batch.Run() {
if result.Error != nil {
if errors.Is(result.Error, task.ErrBatchTimeout) {
timedOut = true
fmt.Println("Batch timeout occurred")
}
} else {
completed = append(completed, result.Value)
}
}
fmt.Printf("Completed %d items before timeout: %v\n", len(completed), timedOut)
}
Output: Batch timeout occurred Completed 6 items before timeout: true
Example (ZeroTimeout) ¶
ExampleBatch_zeroTimeout demonstrates zero-value backward compatibility
package main
import (
"fmt"
"time"
"github.com/flanksource/commons/logger"
"github.com/flanksource/clicky/task"
)
func main() {
batch := &task.Batch[int]{
Name: "example-zero-timeout",
Timeout: 0, // Zero means no timeout (backward compatible)
MaxWorkers: 2,
}
// Add items that would take a while
for i := 0; i < 5; i++ {
i := i
batch.Items = append(batch.Items, func(log logger.Logger) (int, error) {
time.Sleep(50 * time.Millisecond)
return i, nil
})
}
count := 0
for result := range batch.Run() {
if result.Error == nil {
count++
}
}
fmt.Printf("All %d items completed (no timeout)\n", count)
}
Output: All 5 items completed (no timeout)
func (*Batch[T]) Run ¶
func (b *Batch[T]) Run() chan BatchResult[T]
func (*Batch[T]) WithItemTimeout ¶
WithItemTimeout sets the maximum duration for each individual item to complete. Returns the batch for method chaining.
func (*Batch[T]) WithTimeout ¶
WithTimeout sets the maximum duration for the entire batch to complete. Returns the batch for method chaining.
Example ¶
ExampleBatch_WithTimeout demonstrates method chaining
package main
import (
"fmt"
"time"
"github.com/flanksource/clicky/task"
)
func main() {
batch := (&task.Batch[string]{
Name: "example-chaining",
MaxWorkers: 3,
}).WithTimeout(1 * time.Second).WithItemTimeout(200 * time.Millisecond)
fmt.Printf("Batch timeout: %v, Item timeout: %v\n", batch.Timeout, batch.ItemTimeout)
}
Output: Batch timeout: 1s, Item timeout: 200ms
type Group ¶
type Group struct {
Items []Taskable // Can contain Tasks or nested Groups
// contains filtered or unexported fields
}
Group represents a group of tasks that can be managed collectively
func (*Group) FinishedAt ¶
FinishedAt returns the time the group first became terminal, or the zero time if it is still running/pending. It is recorded lazily by observeTerminal (called from snapshotting) so it does not depend on a WaitFor caller.
func (*Group) Metadata ¶
func (g *Group) Metadata() GroupMetadata
Metadata returns a copy of the group's metadata.
type GroupMetadata ¶
type GroupMetadata struct {
Kind string `json:"kind,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Owner string `json:"owner,omitempty"`
}
GroupMetadata is the queryable metadata attached to a run (task group). Kind classifies the run (e.g. "sql-fix", "test-run"), Labels carry arbitrary key/value facets for filtering, and Owner identifies who started it. All fields are optional.
type HealthMixin ¶
type HealthMixin interface {
Health() Health
}
type LiveRenderer ¶
type LiveRenderer interface {
// RenderLive returns the content for one live frame — a 250ms interactive
// tick or a single non-interactive (PlainRender) flush.
RenderLive(tasks []*Task) api.Text
// RenderFinal returns the content drawn once after all tasks complete.
RenderFinal(tasks []*Task) api.Text
}
LiveRenderer replaces the default task-tree rendering with caller-supplied content while keeping clicky's terminal ownership: the render loop still owns the TTY, ClearLines line accounting, and the logger serializer that prevents concurrent log lines from corrupting the in-place frame. Only the rendered content (an api.Text) is swapped — a caller that wants its own block (a status table, a custom dashboard) renders it through clicky instead of hand-rolling ANSI redraws that collide with logger output.
Install with SetLiveRenderer; remove by passing nil. Both methods receive a snapshot of the current tasks and must not mutate them.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages and displays multiple tasks with progress bars
func (*Manager) PlainRender ¶
func (tm *Manager) PlainRender()
PlainRender outputs the current task statuses in plain text without any interactive / ANSI / console features
func (*Manager) StartCapturingOutput ¶
func (tm *Manager) StartCapturingOutput()
StartCapturingOutput redirects stdout/stderr to internal buffer
func (*Manager) StartWithResult ¶
func (tm *Manager) StartWithResult(name string, taskFunc func(flanksourceContext.Context, *Task) (interface{}, error), opts ...Option) *Task
StartWithResult creates and starts tracking a new task with typed result handling
func (*Manager) StopCapturingOutput ¶
func (tm *Manager) StopCapturingOutput()
StopCapturingOutput restores stdout/stderr and prints buffered output
type ManagerOptions ¶
type ManagerOptions struct {
NoProgress bool // Disable progress display
NoRender bool // Disable all task rendering
MaxConcurrent int // Maximum concurrent tasks (0 = unlimited)
GracefulTimeout time.Duration // Timeout for graceful shutdown
// Retry configuration
MaxRetries int // Maximum retry attempts
RetryDelay time.Duration // Base delay between retries
}
ManagerOptions contains configuration options for TaskManager
func DefaultManagerOptions ¶
func DefaultManagerOptions() *ManagerOptions
DefaultManagerOptions returns sensible defaults
func (*ManagerOptions) Apply ¶
func (opts *ManagerOptions) Apply()
Apply configures a TaskManager with these options
type Option ¶
type Option func(*Task)
Option configures task creation
func WithDependencies ¶
WithDependencies sets tasks that must complete before this task can start
func WithFunc ¶
func WithFunc(fn func(flanksourceContext.Context, *Task) error) Option
WithFunc sets the function to run for the task
func WithIdentity ¶
WithIdentity sets a unique identifier for task deduplication
func WithPriority ¶
WithPriority sets the priority for task scheduling (lower = higher priority)
func WithRetryConfig ¶
func WithRetryConfig(config RetryConfig) Option
WithRetryConfig sets custom retry configuration for the task
func WithTaskTimeout ¶
WithTaskTimeout sets an individual task timeout applied at execution time
type OutputEntry ¶
OutputEntry represents a captured stdout/stderr line with metadata
type RetryConfig ¶
type RetryConfig struct {
RetryableErrors []string // Error message patterns that should trigger retries
BaseDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
JitterFactor float64
MaxRetries int
}
RetryConfig holds configuration for task retry behavior
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns sensible default retry configuration
type RunFilter ¶
type RunFilter struct {
Kind string
Status string
Labels map[string]string // every entry must match
}
RunFilter narrows the runs returned by Runs. Empty fields match everything.
type RunMeta ¶
type RunMeta struct {
ID string `json:"id"`
Name string `json:"name"`
Kind string `json:"kind,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Owner string `json:"owner,omitempty"`
Status string `json:"status"`
StartedAt string `json:"startedAt,omitempty"` // RFC3339
FinishedAt string `json:"finishedAt,omitempty"` // RFC3339
Total int `json:"total"`
Completed int `json:"completed"`
Failed int `json:"failed"`
Running int `json:"running"`
}
RunMeta is the listing summary for one run (task group): identity, metadata, status, timing, and child-task counts. It is what the generic task-manager list view renders; drill-down uses SnapshotByID.
func RunMetaFromSnapshot ¶
func RunMetaFromSnapshot(snap TaskSnapshot) RunMeta
RunMetaFromSnapshot lifts the group-level fields of a group snapshot into a RunMeta. The snapshot's ID is the group name; GroupID carries the stable id.
type Status ¶
type Status string
Status represents the status of a task
const ( // StatusPending indicates the task is waiting to start StatusPending Status = "pending" // StatusRunning indicates the task is currently running StatusRunning Status = "running" // StatusSuccess indicates the task completed successfully StatusSuccess Status = "success" // StatusFailed indicates the task failed StatusFailed Status = "failed" // StatusWarning indicates the task completed with warnings StatusWarning Status = "warning" // StatusCancelled indicates the task was canceled StatusCancelled Status = "canceled" // StatusPASS indicates a test passed StatusPASS Status = "PASS" // StatusFAIL indicates a test failed StatusFAIL Status = "FAIL" // StatusERR indicates a test had an error StatusERR Status = "ERR" // StatusSKIP indicates a test was skipped StatusSKIP Status = "SKIP" )
func (Status) Apply ¶
Apply applies the status icon and style to the given text, preserving any style classes (such as width/truncation directives) the caller has already set.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task represents a single task being tracked by the TaskManager
func (*Task) ClearLogs ¶
func (t *Task) ClearLogs()
ClearLogs clears all buffered logs for this task
func (*Task) Description ¶
Description returns the task description
func (*Task) FailedWithError ¶
FailedWithError marks the task as failed with an error
func (*Task) FlanksourceContext ¶
func (t *Task) FlanksourceContext() flanksourceContext.Context
FlanksourceContext returns the task's flanksource context for logging
func (*Task) GetSlogLogger ¶
GetSlogLogger returns the slog logger (implements Logger interface - unsupported)
func (*Task) GetTypedResult ¶
GetTypedResult retrieves the result with type assertion
func (*Task) IsDebugEnabled ¶
IsDebugEnabled checks if debug level is enabled (implements Logger interface)
func (*Task) IsLevelEnabled ¶
IsLevelEnabled checks if a specific level is enabled (implements Logger interface)
func (*Task) IsTraceEnabled ¶
IsTraceEnabled checks if trace level is enabled (implements Logger interface)
func (*Task) Progress ¶
Progress returns the task's current progress value and maximum. A maximum of 0 means the task has no bounded progress.
func (*Task) SetDescription ¶
SetDescription sets the task description
func (*Task) SetLogLevel ¶
SetLogLevel sets the log level (implements Logger interface)
func (*Task) SetMinLogLevel ¶
SetMinLogLevel sets the minimum log level (implements Logger interface)
func (*Task) SetProgress ¶
SetProgress updates the task's progress
func (*Task) SetResult ¶
func (t *Task) SetResult(result interface{})
SetResult stores a result in the task
func (*Task) WaitFor ¶
func (t *Task) WaitFor() *WaitResult
WaitFor waits for this specific task to complete and returns the result
func (*Task) WithSkipReportLevel ¶
WithSkipReportLevel returns a logger with skip report level (implements Logger interface - noop)
func (*Task) WithValues ¶
WithValues returns a logger with additional key-value pairs (implements Logger interface)
func (*Task) WithoutName ¶
WithoutName returns a logger without name (implements Logger interface - noop)
type TaskFunc ¶
type TaskFunc[T any] func(flanksourceContext.Context, *Task) (T, error)
TaskFunc is a generic task function that returns a typed result
type TaskGroupOption ¶
type TaskGroupOption func(group *Group)
func WithConcurrency ¶
func WithConcurrency(concurrency int) TaskGroupOption
func WithGroupID ¶
func WithGroupID(id string) TaskGroupOption
WithGroupID sets a caller-supplied stable id for the group. When unset, StartGroup assigns a uuid.
func WithKind ¶
func WithKind(kind string) TaskGroupOption
WithKind classifies the run for the task-manager listing/filtering.
func WithLabels ¶
func WithLabels(labels map[string]string) TaskGroupOption
WithLabels attaches filterable key/value facets to the run.
func WithOwner ¶
func WithOwner(owner string) TaskGroupOption
WithOwner records who started the run.
type TaskResult ¶
TaskResult holds a typed result and error
type TaskSnapshot ¶
type TaskSnapshot struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"` // "task" or "group"
Group string `json:"group,omitempty"` // parent group name
Status string `json:"status"`
Duration string `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"` // latest log line
Logs []LogEntry `json:"logs,omitempty"` // all log entries
Total int `json:"total,omitempty"` // group: total child tasks
Completed int `json:"completed,omitempty"` // group: completed tasks
Failed int `json:"failed,omitempty"` // group: failed tasks
Running int `json:"running,omitempty"` // group: running tasks
// Per-task fields (type == "task"). Description is the live stage label set
// via Task.SetDescription; Progress/MaxValue mirror Task.SetProgress so the UI
// can render an x/y count and percent for a running task. MaxValue 0 means the
// task has no bounded progress.
Description string `json:"description,omitempty"`
Progress int `json:"progress,omitempty"`
MaxValue int `json:"maxValue,omitempty"`
// Registry metadata (additive). For a group these describe the run itself;
// for a task GroupID links it to its parent run so the SSE/JSON clients can
// key on a stable id rather than the human-facing name.
GroupID string `json:"groupId,omitempty"`
Kind string `json:"kind,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Owner string `json:"owner,omitempty"`
StartedAt string `json:"startedAt,omitempty"` // RFC3339
FinishedAt string `json:"finishedAt,omitempty"` // RFC3339
}
TaskSnapshot is a JSON-serializable snapshot of a task or group's current state.
func SnapshotAll ¶
func SnapshotAll(taskIDs ...string) []TaskSnapshot
SnapshotAll returns snapshots for all groups and their tasks. If taskIDs is non-empty, only groups whose name OR stable id matches are included (matching by id lets the registry/SSE drill into one run; matching by name preserves the legacy name-keyed callers).
func SnapshotByID ¶
func SnapshotByID(id string) []TaskSnapshot
SnapshotByID returns the group + task snapshots for the run with the given stable id (not name). Returns nil when no such run exists.
func SnapshotGroup ¶
func SnapshotGroup(g *Group) TaskSnapshot
SnapshotGroup creates a TaskSnapshot from a Group with aggregate child stats. The snapshot ID stays the group NAME for backward compatibility with the name-keyed Preact UI and JSONHandler; the stable id is carried separately in GroupID. Observing a terminal status here records finishedAt lazily.
func SnapshotTask ¶
func SnapshotTask(t *Task, group *Group) TaskSnapshot
SnapshotTask creates a TaskSnapshot from a Task. group is the parent group, or nil for an ungrouped task; its name and id are recorded on the snapshot.
type Taskable ¶
type Taskable interface {
GetTask() *Task
}
Taskable represents objects that can return a Task
type TypedGroup ¶
func StartGroup ¶
func StartGroup[T any](name string, opts ...TaskGroupOption) TypedGroup[T]
StartGroup creates and starts tracking a new task group
func (TypedGroup[T]) Add ¶
func (g TypedGroup[T]) Add(name string, taskFunc func(flanksourceContext.Context, *Task) (T, error), opts ...Option) TypedTask[T]
Add adds a Waitable item (Task or Group) to this group
func (*TypedGroup[T]) Duration ¶
func (g *TypedGroup[T]) Duration() time.Duration
Duration returns the total duration from first start to last completion
func (TypedGroup[T]) GetResults ¶
func (g TypedGroup[T]) GetResults() (map[TypedTask[T]]T, error)
GetResults waits for all tasks in the group and returns typed results
func (*TypedGroup[T]) WaitFor ¶
func (g *TypedGroup[T]) WaitFor() *WaitResult
WaitFor waits for all child items to complete and returns aggregate results This version handles dynamically added tasks by continuously checking for new tasks
type WaitResult ¶
type WaitResult struct {
Error error
Status Status
Duration time.Duration
TaskCount int // Number of individual tasks (1 for Task, N for TaskGroup)
SuccessCount int // Number of successful tasks
FailureCount int // Number of failed tasks
WarningCount int // Number of tasks with warnings
}
WaitResult contains unified result information