jack

package module
v0.0.0-...-05185f6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 22 Imported by: 5

README

Jack

Production-grade concurrency toolkit for Go

Jack provides the missing pieces for building robust, observable concurrent systems in Go. No magic, no reflection hacks — just solid patterns you'd otherwise write yourself, with full metrics on every component.

Why This Exists

Go's concurrency primitives are excellent, but production systems need more:

  • Panic recovery that doesn't crash your entire process
  • Backpressure when queues fill up, with priority ordering
  • Circuit breaking so a slow upstream doesn't take down your service
  • Goroutine lifecycle tracking so nothing leaks silently
  • Graceful shutdown that finishes in-flight work
  • Health checks that degrade and accelerate automatically
  • Observability into what every component is actually doing

Jack fills these gaps without getting in your way. Every component exposes a Metrics() method with atomic counters safe for concurrent reads.


What's Inside

Pool

Fixed-size worker pool with backpressure. Tasks queue when workers are busy. Submissions fail fast when the queue is full. Full metrics: submitted, completed, failed, rejected, panics recovered, active workers, queue depth.

pool := jack.NewPool(5, jack.PoolingWithQueueSize(100))
defer pool.Shutdown(30 * time.Second)

pool.Submit(jack.Func(func() error {
    return nil
}))

m := pool.Metrics()
fmt.Println(m.TasksCompleted.Load(), m.ActiveWorkers.Load())
Semaphore

Priority-aware semaphore with CoDel queue management. The fast path (TryAcquire, TryAcquireN) is lock-free. Blocking acquire uses per-priority queues that switch from FIFO to LIFO under sustained overload. Exposes Available(), Capacity(), and accurate live QueueDepth.

sem := jack.NewSemaphore(10,
    jack.SemaphoreWithTargetSojourn(5*time.Millisecond),
    jack.SemaphoreWithMaxSojourn(500*time.Millisecond),
)
defer sem.Close()

// Atomic bulk acquire — either all n slots or none.
if sem.TryAcquireN(jack.PriorityHigh, 3) {
    defer func() {
        sem.Release()
        sem.Release()
        sem.Release()
    }()
}

// Blocking acquire respects priority: Critical served before High, etc.
if err := sem.Acquire(ctx, jack.PriorityCritical); err != nil {
    return err
}
defer sem.Release()
RateLimiter

Token bucket with priority queueing. Allow/AllowN are lock-free. Acquire blocks with context. Reserve returns a non-blocking Reservation the caller can inspect and cancel without blocking any goroutine — ideal for admission control.

rl := jack.NewRateLimiter(1000, 100) // 1000 req/s, burst 100
defer rl.Close()

// Non-blocking fast path.
if rl.Allow(jack.PriorityHigh) {
    // proceed
}

// Non-blocking reservation — inspect delay, then decide.
res := rl.Reserve(1, jack.ReserveWithMaxDelay(50*time.Millisecond))
if !res.OK() {
    return ErrTooManyRequests
}
if err := res.Wait(ctx); err != nil {
    res.Cancel()
    return err
}
// tokens consumed — proceed

// Blocking acquire with priority.
if err := rl.Acquire(ctx, jack.PriorityHigh); err != nil {
    return err
}
Throttle

Client-side self-tuning throttle. Observes upstream acceptance and rejection rates, probabilistically dropping local requests before sending when the upstream is overloaded. Each priority tier has an independent probability — Critical is shed last, Low is shed first. Per-tier live probability via Probability(p) and Metrics().ThrottleProbs.

throttle := jack.NewThrottle(jack.priorityCount,
    jack.ThrottleWithRatio(2.0),
    jack.ThrottleWithWindowResetSamples(500),
)
defer throttle.Close()

// Record upstream outcomes.
if upstreamErr != nil {
    throttle.Rejected(jack.PriorityHigh)
} else {
    throttle.Accepted(jack.PriorityHigh)
}

// Check before sending.
if !throttle.Allow(jack.PriorityHigh) {
    return ErrThrottled
}

fmt.Printf("critical rejection prob: %.2f%%\n",
    throttle.Probability(jack.PriorityCritical)*100)
Circuit Breaker

Three-state machine (Closed → Open → HalfOpen). All state transitions are lock-free atomic CAS. Call is the only entry point — it wraps your function and manages the state machine around it. onStateChange callback for alerting and metrics.

breaker := jack.NewBreaker("payments-api",
    jack.BreakerWithThreshold(5),
    jack.BreakerWithOpenTimeout(10*time.Second),
    jack.BreakerWithSuccessThreshold(2),
    jack.BreakerWithOnStateChange(func(name string, from, to jack.BreakerState) {
        metrics.RecordStateChange(name, from.String(), to.String())
    }),
)

err := breaker.Call(ctx, func(ctx context.Context) error {
    return paymentsClient.Charge(ctx, req)
})
if errors.Is(err, jack.ErrBreakerOpen) {
    return ErrServiceUnavailable
}
Bulkhead

Isolates failure domains by giving each named partition its own bounded concurrency budget backed by an independent Semaphore. When one partition saturates, others are completely unaffected. Partitions auto-create on first use.

bh := jack.NewBulkhead(
    jack.BulkheadWithPartition("payments", 20),
    jack.BulkheadWithPartition("reports", 5),
    jack.BulkheadWithDefaultCapacity(10), // auto-creates unknown partitions
)
defer bh.Close()

err := bh.Call(ctx, "payments", jack.PriorityHigh, func(ctx context.Context) error {
    return db.Query(ctx, ...)
})
if errors.Is(err, jack.ErrBulkheadFull) {
    return ErrServiceBusy
}

fmt.Println(bh.Available("payments"), bh.Metrics("payments").AcquiredFast.Load())
Adaptive Concurrency Limiter

AIMD gradient controller that adjusts its concurrency limit dynamically based on observed RTT. Below target RTT → additive increase. Above target → multiplicative decrease. Bounds enforced at minLimit/maxLimit. Mirrors Netflix/concurrency-limits and Envoy's adaptive concurrency filter.

limiter := jack.NewAdaptiveLimiter(
    jack.AdaptiveWithInitialLimit(20),
    jack.AdaptiveWithTargetP50(50*time.Millisecond),
    jack.AdaptiveWithMinLimit(5),
    jack.AdaptiveWithMaxLimit(100),
)
defer limiter.Close()

err := limiter.Call(ctx, jack.PriorityHigh, func(ctx context.Context) error {
    return upstream.Call(ctx, req)
})

m := limiter.Metrics()
fmt.Println(m.CurrentLimit.Load(), m.AvgRTTNs.Load())
Retry

Exponential backoff with full jitter, configurable predicate, and per-call metrics. Retry is reusable and safe for concurrent use. Supports permanent-error detection to skip retry on non-retryable failures.

policy := jack.NewRetry(
    jack.RetryWithMaxAttempts(5),
    jack.RetryWithBaseDelay(100*time.Millisecond),
    jack.RetryWithMaxDelay(30*time.Second),
    jack.RetryWithJitter(true),
    jack.RetryWithRetryIf(func(err error) bool {
        return !errors.Is(err, ErrPermanent)
    }),
    jack.RetryWithOnRetry(func(attempt int, err error) {
        log.Printf("retry %d: %v", attempt, err)
    }),
)

err := policy.Do(ctx, func(ctx context.Context) error {
    return upstream.Call(ctx, req)
})
if errors.Is(err, jack.ErrRetryExhausted) {
    return ErrMaxRetriesExceeded
}
Hedged Requests

Fire a duplicate request after a configurable delay. Whichever responds first is returned; the other is cancelled. The hedge delay adapts automatically from a lock-free circular RTT sample buffer — no external dependencies. Safe only for idempotent operations. Typed HedgerOf[T] wrapper avoids casts.

// Typed — no casts at the call site.
hedger := jack.NewHedgerOf[*UserResponse](
    jack.HedgeWithPercentile(95),   // fire hedge at p95 latency
    jack.HedgeWithMinSamples(20),   // warm-up period before adaptive delay
    jack.HedgeWithMaxConcurrent(50),
)

user, err := hedger.Do(ctx, func(ctx context.Context) (*UserResponse, error) {
    return userClient.Get(ctx, id)
})

m := hedger.Metrics()
fmt.Println(m.Hedged.Load(), m.HedgeWon.Load(), m.PrimaryWon.Load())
Lease

A time-bounded semaphore slot with automatic reclamation. If the holder crashes or forgets to call Release, the internal Reaper reclaims the slot automatically after TTL — slots are never permanently lost.

sem := jack.NewSemaphore(20)
lm := jack.NewLeaser(sem, jack.LeaserWithTTL(30*time.Second))
defer lm.Close()

lease, err := lm.Acquire(ctx, requestID, jack.PriorityHigh, 0)
if err != nil {
    return err
}
defer lease.Release()

// If this process crashes, the slot is returned after 30s automatically.
Queue

Bounded, multi-priority, multi-consumer work queue. Per-priority bins with tail-drop under saturation. Item timeout eviction. EnqueueCtx blocks until space is available. The natural entry point for a load-balancer pipeline before applying Semaphore or RateLimiter downstream.

q := jack.NewQueue(func(ctx context.Context, item any) error {
    return process(ctx, item.(*Request))
},
    jack.QueueWithCapacity(1000),
    jack.QueueWithWorkers(8),
    jack.QueueWithTimeout(5*time.Second),
)
defer q.Close()

if err := q.Enqueue(jack.PriorityCritical, req); err == jack.ErrQueueFull {
    return ErrBackpressure
}

depths := q.DepthByPriority()
fmt.Printf("critical=%d high=%d medium=%d low=%d\n",
    depths[0], depths[1], depths[2], depths[3])
Routines

Goroutine tracker and lifecycle manager. Every Go call registers the goroutine with an ID, tracks its state (Running/Done/Panicked/Cancelled), captures panic stacks, and guarantees it is joined by Stop or Wait. The DefaultRoutines singleton lets you use it without passing the tracker around.

rt := jack.NewRoutines(
    jack.RoutineWithOnPanic(func(info jack.RoutineInfo) {
        log.Printf("panic in %s: %v\n%s", info.ID, info.Err, info.Stack)
    }),
)
defer rt.Stop() // cancels all goroutines and waits

rt.Spawn("fetch-users", func(ctx context.Context) error {
    return fetchUsers(ctx)
})

rt.Background("heartbeat", 0, func(ctx context.Context) error {
    return sendHeartbeat(ctx) // restarts on error, unlimited times
})

// Inspect state of any goroutine.
info, ok := rt.Info("fetch-users#1")
fmt.Println(info.State, info.StartedAt)

// Or use the package-level singleton.
jack.Spawn("background-job", func(ctx context.Context) error {
    return runJob(ctx)
})
Future/Promise

Type-safe async computation with composition. Wait for results, chain transformations, recover from errors.

f := jack.Async(func() (string, error) {
    return fetchUser()
})

f.Then(ctx, func(user string) (any, error) {
    return fetchProfile(user)
}).Await()
Doctor

Health check scheduler that degrades and accelerates. Tracks consecutive failures, applies jitter, notifies observers.

doctor := jack.NewDoctor(jack.DoctorWithMaxConcurrent(10))
doctor.Add(jack.NewPatient(jack.PatientConfig{
    ID:          "database",
    Interval:    10 * time.Second,
    MaxFailures: 3,
    Check:       checkDB,
    OnStateChange: func(e jack.PatientEvent) {
        if e.State == jack.PatientFailed {
            triggerAlert(e.ID)
        }
    },
}))
Debouncer

Rate-limit rapid calls. Execute only after a quiet period or when thresholds are hit.

db := jack.NewDebouncer(
    jack.WithDebounceDelay(500*time.Millisecond),
    jack.WithDebounceMaxCalls(10),
)
db.Do(expensiveOperation)
Looper

Background task with exponential backoff and jitter. Perfect for reconciliation loops.

looper := jack.NewLooper(reconcile,
    jack.WithLooperInterval(5*time.Second),
    jack.WithLooperBackoff(true),
    jack.WithLooperMaxInterval(time.Minute),
)
looper.Start()
Shutdown

Graceful termination with signal handling. Register cleanup in LIFO order. Named tasks appear correctly in stats and logs. Supports concurrent execution of cleanup handlers.

sd := jack.NewShutdown(
    jack.ShutdownWithTimeout(30*time.Second),
    jack.ShutdownConcurrent(),
)
sd.RegisterCloser("db", db)
sd.RegisterFunc("cache", cache.Flush)
sd.RegisterWithContext("grpc", grpcServer.GracefulStop)
sd.Wait() // blocks until SIGTERM/SIGINT
Reaper

TTL expiration with min-heap and sharding.

reaper := jack.NewReaper(5*time.Minute,
    jack.ReaperWithHandler(func(ctx context.Context, id string) {
        cleanup(id)
    }),
)
reaper.Touch("session-123")
Lifetime

Scheduled callbacks with keep-alive resets.

lm := jack.NewLifetime()
lm.ScheduleTimed(ctx, "heartbeat", func(ctx context.Context, id string) {
    markDead(id)
}, 30*time.Second)
lm.ResetTimed("heartbeat")
Runner, Scheduler, Group

Single-worker queue, cron-style scheduling, and coordinated goroutine groups with error collection.

Safely

Context-aware mutex with panic recovery.

var mu jack.Safely
err := mu.SafeCtx(ctx, func() error {
    return nil
})

Priority System

All backpressure components share a four-level priority system. Lower numeric value = higher priority = served first.

Constant Value Intended use
PriorityCritical 0 Admin ops, user-facing critical paths
PriorityHigh 1 Standard user requests
PriorityMedium 2 Async work, type-ahead
PriorityLow 3 Backfill, batch, probes

Observability

Every component exposes a Metrics() method with atomic counters safe for concurrent reads without locks.

obs := jack.NewObservable[jack.Event](10)
obs.Add(myObserver)
pool := jack.NewPool(5, jack.PoolingWithObservable(obs))

// Scrape metrics from any component:
sem.Metrics().QueueDepth.Load()
rl.Metrics().TokensConsumed.Load()
breaker.Metrics().StateChanges.Load()
rt.Metrics().Active.Load()
limiter.Metrics().CurrentLimit.Load()

Error Handling

Panics become *jack.CaughtPanic with stack traces. No silent failures.

err := jack.Safe(func() error {
    panic("boom")
})
if cp, ok := err.(*jack.CaughtPanic); ok {
    log.Printf("panic: %v\n%s", cp.Value, cp.Stack)
}

When To Use What

Problem Use
Process many independent tasks concurrently Pool
Need result from async operation Future
Run periodic health checks with degradation Doctor
Rate-limit bursty calls (blocking) RateLimiter.Acquire
Rate-limit bursty calls (non-blocking inspect) RateLimiter.Reserve
Client-side adaptive load shedding Throttle
Stop calling a failing upstream Breaker
Isolate concurrency budgets per upstream Bulkhead
Auto-tune concurrency limit to RTT AdaptiveLimiter
Retry transient errors with backoff Retry
Reduce tail latency with duplicate requests Hedger / HedgerOf[T]
Bound concurrent slots with auto-reclaim Leaser
Prioritised async work queue Queue
Track and terminate all goroutines Routines
Bound concurrent access with priorities Semaphore
Rate-limit bursty calls Debouncer
Background loop with backoff Looper
Graceful shutdown with cleanup ordering Shutdown
Expire items after TTL Reaper
Schedule callbacks with keep-alive Lifetime
Coordinate multiple goroutines, collect errors Group
Sequential async processing Runner
Cron-style recurring tasks Scheduler
Safe locking with timeouts Safely

Composing Components

The components are designed to stack. A typical high-volume service endpoint:

Request
  → Queue (absorb burst, priority ordering)
  → Breaker (stop calling dead upstream)
  → Bulkhead (cap per-upstream concurrency)
  → AdaptiveLimiter (auto-tune to RTT)
  → Throttle (client-side load shedding)
  → RateLimiter (token bucket)
  → Hedger (cut tail latency)
  → Retry (handle transient errors)
  → Upstream

Each layer is independent. Use only what your service needs.


Testing

go test -v -race ./...
go test -bench=. -benchmem -cpu=8 -run='^$' ./...

Race detector is your friend. Jack is race-free by design. Every public API has benchmarks with ReportAllocs() — zero allocations on all fast paths.


License

MIT

Documentation

Index

Constants

View Source
const (
	// NoLimitCalls is the default for maxCalls, meaning no limit on the number of calls before execution.
	// Set to math.MaxInt to indicate unbounded call accumulation.
	NoLimitCalls = math.MaxInt
	// NoLimitWait is the default for maxWait, meaning no time limit before forced execution.
	// Set to time.Duration(math.MaxInt64) to disable the maximum wait enforcement.
	NoLimitWait = time.Duration(math.MaxInt64)
)

Variables

View Source
var (
	ErrFutureCanceled = fmt.Errorf("future was canceled")
	ErrFutureTimeout  = fmt.Errorf("future timed out")
)
View Source
var (
	ErrSemaphoreClosed   = errors.New("semaphore closed")
	ErrRateLimiterClosed = errors.New("rate limiter closed")
	ErrThrottleClosed    = errors.New("adaptive throttle closed")
	ErrQueueClosed       = errors.New("priority queue closed")
)
View Source
var (
	ErrBreakerOpen   = errors.New("circuit breaker open")
	ErrBreakerClosed = errors.New("circuit breaker instance closed")
)
View Source
var (
	ErrBulkheadFull     = errors.New("bulkhead full")
	ErrBulkheadClosed   = errors.New("bulkhead closed")
	ErrBulkheadNotFound = errors.New("bulkhead partition not found")
)
View Source
var (
	ErrLeaseExpired  = errors.New("lease expired")
	ErrLeaseReleased = errors.New("lease already released")
	ErrLeaseInvalid  = errors.New("lease semaphore closed")
)
View Source
var (
	ErrReservationExpired   = errors.New("reservation expired")
	ErrReservationCancelled = errors.New("reservation cancelled")
)
View Source
var (
	// ErrPoolClosed indicates the worker pool has been closed.
	ErrPoolClosed = errors.New("pool has been closed")
	// ErrRunnerClosed indicates the runner has been closed.
	ErrRunnerClosed = errors.New("runner has been closed")
	// ErrSchedulerClosed indicates the scheduler has been closed.
	ErrSchedulerClosed = errors.New("scheduler has been closed")
	// ErrTaskTimeout indicates a task exceeded its execution timeout.
	ErrTaskTimeout = errors.New("task execution timed out")
	// ErrTaskPanic indicates a task panicked during execution.
	ErrTaskPanic = errors.New("task panicked during execution")
	// ErrQueueFull indicates the task queue is full and cannot accept new tasks.
	ErrQueueFull = errors.New("task queue is full")
	// ErrShutdownTimedOut indicates the pool shutdown exceeded its timeout.
	ErrShutdownTimedOut = errors.New("shutdown timed out")
)

Errors returned by the worker pool and task execution.

View Source
var (
	ErrSchedulerJobAlreadyRunning = errors.New("scheduler: job is already running")
	ErrSchedulerNotRunning        = errors.New("scheduler: job is not running or already stopped")
	ErrSchedulerPoolNil           = errors.New("scheduler: pool cannot be nil")
	ErrSchedulerNameMissing       = errors.New("scheduler: name cannot be empty")
)
View Source
var ErrHedgeAllFailed = errors.New("all hedged requests failed")
View Source
var (
	ErrRetryExhausted = errors.New("retry attempts exhausted")
)

Functions

func Background

func Background(label string, maxRestarts int, fn func(context.Context) error) string

func Go

func Go(f func() error) <-chan error

Go runs a function in a standalone goroutine with error and panic handling. It returns a buffered channel that receives the function’s error or a CaughtPanic. The channel is closed after the error or completion. Thread-safe via goroutine and channel operations. Example:

errCh := Go(func() error { return nil }) // Runs a function and returns error channel

func Logger

func Logger() *ll.Logger

Logger returns the default logger for the jack package. Thread-safe as it returns a pre-initialized logger. Example:

log := jack.Logger() // Retrieves the default logger

func Safe

func Safe(fn func() error) error

Safe executes fn with panic recovery. If fn panics, a *CaughtPanic error is returned. Otherwise, the error returned by fn is returned. If fn is nil, Safe will return a *CaughtPanic. Example:

err := Safe(func() error {
	// May panic or return an error
	return nil
})
if cp, ok := err.(*CaughtPanic); ok {
	fmt.Printf("Caught panic: %v\nStack: %s\n", cp.Value, cp.Stack)
}

func SafeCtx

func SafeCtx(ctx context.Context, fn func() error) error

SafeCtx executes fn with context support and panic recovery. The function fn will be interrupted if the context is canceled (e.g., via context.WithTimeout). If fn is nil, SafeCtx will return a *CaughtPanic. If ctx is nil, SafeCtx will panic when ctx.Err() is called. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := SafeCtx(ctx, func() error {
	// No need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

func SafeCtxNoStack

func SafeCtxNoStack(ctx context.Context, fn func() error) error

SafeCtxNoStack executes fn with context support, panic recovery, but no stack trace. The function fn will be interrupted if the context is canceled (e.g., via context.WithTimeout). If fn is nil, SafeCtxNoStack will return a *CaughtPanic (without stack). If ctx is nil, SafeCtxNoStack will panic when ctx.Err() is called. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := SafeCtxNoStack(ctx, func() error {
	// No need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

func SafeNoStack

func SafeNoStack(fn func() error) error

SafeNoStack executes fn with panic recovery but no stack trace. If fn panics, a *CaughtPanic error (without stack) is returned. Otherwise, fn's error. If fn is nil, SafeNoStack will return a *CaughtPanic. Example:

err := SafeNoStack(func() error {
	panic("error")
	return nil
})
if cp, ok := err.(*CaughtPanic); ok {
	fmt.Printf("Caught panic: %v, Stack: %v\n", cp.Value, cp.Stack == nil)
}

func Select

func Select[T any](ctx context.Context, futures ...*Future[T]) (int, T, error)

Select blocks until ctx is cancelled or the first Future completes. Returns the index and value of the winning Future.

func Spawn

func Spawn(label string, fn func(context.Context) error)

func VitalsWithRun

func VitalsWithRun(ctx context.Context, id string, operation Func, opts ...VitalsOption) error

VitalsWithRun creates a Vitals instance and executes an operation with it in one call. It accepts functional options to configure hooks before immediately running the operation. This convenience function combines NewVitals and Execute for simplified inline usage.

func VitalsWithRunCtx

func VitalsWithRunCtx(ctx context.Context, id string, operation FuncCtx, opts ...VitalsOption) error

VitalsWithRunCtx creates a Vitals instance and executes a context-aware operation in one call. It accepts functional options to configure hooks before immediately running the operation with context. This convenience function combines NewVitals and ExecuteCtx for simplified inline usage.

func WaitAll

func WaitAll[T any](futures ...*Future[T]) ([]T, error)

WaitAll blocks until all futures complete and returns their results in order. Returns an error immediately if any Future fails.

func WaitAny

func WaitAny[T any](futures ...*Future[T]) (int, T, error)

WaitAny blocks until the first Future completes and returns its index and value. Remaining futures continue running; only the first result is returned.

Types

type AdaptiveLimiter

type AdaptiveLimiter struct {
	// contains filtered or unexported fields
}

AdaptiveLimiter adjusts its concurrency limit dynamically based on observed round-trip latency using a gradient-style AIMD controller.

When RTT ≤ target → additive increase (limit++). When RTT > target → multiplicative decrease (limit × ratio).

Composable with the rest of the library: the AdaptiveLimiter owns a Semaphore internally and exposes the same Call / Close interface as Breaker and Bulkhead.

func NewAdaptiveLimiter

func NewAdaptiveLimiter(opts ...AdaptiveOption) *AdaptiveLimiter

NewAdaptiveLimiter creates an AdaptiveLimiter with sensible defaults.

func (*AdaptiveLimiter) Call

func (a *AdaptiveLimiter) Call(ctx context.Context, p Priority, fn func(context.Context) error) error

Call executes fn if a slot is available within the adaptive limit. It measures RTT and adjusts the limit after each call returns.

func (*AdaptiveLimiter) Close

func (a *AdaptiveLimiter) Close()

Close shuts down the underlying semaphore, unblocking all pending callers.

func (*AdaptiveLimiter) InFlight

func (a *AdaptiveLimiter) InFlight() int

InFlight returns the number of concurrently executing calls.

func (*AdaptiveLimiter) Limit

func (a *AdaptiveLimiter) Limit() int

Limit returns the current concurrency limit.

func (*AdaptiveLimiter) Metrics

Metrics returns the limiter's operational metrics.

type AdaptiveLimiterMetrics

type AdaptiveLimiterMetrics struct {
	Acquired        atomic.Uint64 // successful acquisitions
	Rejected        atomic.Uint64 // rejections when at limit
	LimitIncr       atomic.Uint64 // times the limit was increased
	LimitDecr       atomic.Uint64 // times the limit was decreased
	CurrentLimit    atomic.Int64  // live concurrency limit
	CurrentInFlight atomic.Int64  // goroutines currently executing
	AvgRTTNs        atomic.Int64  // EWMA RTT in nanoseconds
}

AdaptiveLimiterMetrics tracks adaptive limiter operational statistics.

type AdaptiveOption

type AdaptiveOption func(*AdaptiveLimiter)

AdaptiveOption configures an AdaptiveLimiter.

func AdaptiveWithInitialLimit

func AdaptiveWithInitialLimit(n int) AdaptiveOption

AdaptiveWithInitialLimit sets the starting concurrency limit (default 10).

func AdaptiveWithMaxLimit

func AdaptiveWithMaxLimit(n int) AdaptiveOption

AdaptiveWithMaxLimit sets the ceiling above which the limit will not grow (default 200).

func AdaptiveWithMinLimit

func AdaptiveWithMinLimit(n int) AdaptiveOption

AdaptiveWithMinLimit sets the floor below which the limit will not drop (default 1).

func AdaptiveWithSmoothingFactor

func AdaptiveWithSmoothingFactor(f float64) AdaptiveOption

AdaptiveWithSmoothingFactor controls EWMA smoothing for RTT samples (default 0.1). Closer to 0 = slower adaptation; closer to 1 = reacts to every sample.

func AdaptiveWithTargetP50

func AdaptiveWithTargetP50(d time.Duration) AdaptiveOption

AdaptiveWithTargetP50 sets the target median RTT the controller aims to maintain. When measured RTT exceeds this, the limit is reduced (default 100ms).

type Breaker

type Breaker struct {
	// contains filtered or unexported fields
}

Breaker implements the circuit-breaker pattern.

States:

Closed     → normal; failures accumulate until threshold is reached.
Open       → tripped; all calls rejected for openTimeout, then → HalfOpen.
HalfOpen   → probe; up to halfOpenLimit calls pass through. Enough successes
             → Closed; any failure → Open.

Call is the only entry point. It wraps the user function and manages all state transitions atomically without holding a mutex during execution.

func NewBreaker

func NewBreaker(name string, opts ...BreakerOption) *Breaker

NewBreaker creates a Breaker with the given name and options.

func (*Breaker) Call

func (b *Breaker) Call(ctx context.Context, fn func(context.Context) error) error

Call executes fn if the circuit allows it, updating state based on the result. Returns ErrBreakerOpen if the circuit is open. The context is passed through to fn; if fn returns context.DeadlineExceeded or context.Canceled the call counts as a timeout (also a failure).

func (*Breaker) Close

func (b *Breaker) Close()

Close permanently disables this Breaker instance.

func (*Breaker) Metrics

func (b *Breaker) Metrics() *BreakerMetrics

Metrics returns the breaker's operational metrics.

func (*Breaker) Name

func (b *Breaker) Name() string

Name returns the breaker's identifier.

func (*Breaker) Reset

func (b *Breaker) Reset()

Reset forces the circuit back to Closed and clears all counters. Use for testing or manual operator intervention.

func (*Breaker) State

func (b *Breaker) State() BreakerState

State returns the current circuit state.

type BreakerMetrics

type BreakerMetrics struct {
	Requests      atomic.Uint64 // total calls attempted
	Successes     atomic.Uint64 // calls that returned nil
	Failures      atomic.Uint64 // calls that returned a non-nil error
	Rejections    atomic.Uint64 // calls rejected while open
	Timeouts      atomic.Uint64 // calls that exceeded their deadline
	StateChanges  atomic.Uint64 // total transitions between states
	ConsecSuccess atomic.Int64  // consecutive successes in half-open
	ConsecFailure atomic.Int64  // consecutive failures in current window
}

BreakerMetrics tracks circuit breaker operational statistics.

type BreakerOption

type BreakerOption func(*Breaker)

BreakerOption configures a Breaker.

func BreakerWithHalfOpenLimit

func BreakerWithHalfOpenLimit(n int64) BreakerOption

BreakerWithHalfOpenLimit sets the maximum number of concurrent probes allowed in half-open state (default 1).

func BreakerWithOnStateChange

func BreakerWithOnStateChange(fn func(name string, from, to BreakerState)) BreakerOption

BreakerWithOnStateChange registers a callback invoked on every state transition.

func BreakerWithOpenTimeout

func BreakerWithOpenTimeout(d time.Duration) BreakerOption

BreakerWithOpenTimeout sets how long the circuit stays open before moving to half-open to probe recovery (default 10s).

func BreakerWithSuccessThreshold

func BreakerWithSuccessThreshold(n uint64) BreakerOption

BreakerWithSuccessThreshold sets how many consecutive successes in half-open state are required before the circuit closes again (default 2).

func BreakerWithThreshold

func BreakerWithThreshold(n uint64) BreakerOption

BreakerWithThreshold sets the number of consecutive failures required to open the circuit (default 5).

type BreakerState

type BreakerState int32

BreakerState represents the state of a circuit breaker.

const (
	// BreakerClosed is the normal operating state — calls pass through.
	BreakerClosed BreakerState = iota
	// BreakerOpen means the circuit is tripped — calls are rejected immediately.
	BreakerOpen
	// BreakerHalfOpen means a limited probe is allowed to test recovery.
	BreakerHalfOpen
)

func (BreakerState) String

func (s BreakerState) String() string

String returns a human-readable label for the circuit state.

type Bulkhead

type Bulkhead struct {
	// contains filtered or unexported fields
}

Bulkhead isolates failure domains by giving each named partition its own bounded concurrency budget backed by an independent Semaphore.

When partition A is saturated, partition B is completely unaffected. This is the standard pattern for protecting a shared resource (e.g. a DB connection pool) from being monopolised by one upstream caller.

Usage:

bh := jack.NewBulkhead(jack.BulkheadWithPartition("payments", 20),
                       jack.BulkheadWithPartition("reports",  5))

err := bh.Call(ctx, "payments", jack.PriorityHigh, func(ctx context.Context) error {
    return db.Query(ctx, ...)
})

func NewBulkhead

func NewBulkhead(opts ...BulkheadOption) *Bulkhead

NewBulkhead creates a Bulkhead with the given options.

func (*Bulkhead) AddPartition

func (b *Bulkhead) AddPartition(name string, capacity int)

AddPartition registers a new named partition at runtime. If a partition with that name already exists it is left unchanged.

func (*Bulkhead) Available

func (b *Bulkhead) Available(partition string) int

Available returns the number of free concurrency slots for the named partition.

func (*Bulkhead) Call

func (b *Bulkhead) Call(ctx context.Context, partition string, p Priority, fn func(context.Context) error) error

Call executes fn within the named partition's concurrency limit. If the partition doesn't exist and defaultCap > 0, it is auto-created. Returns ErrBulkheadFull if all slots are taken and the context expires, ErrBulkheadNotFound if auto-creation is disabled and the partition is unknown.

func (*Bulkhead) Close

func (b *Bulkhead) Close()

Close shuts down all partitions. Subsequent calls return ErrBulkheadClosed.

func (*Bulkhead) Metrics

func (b *Bulkhead) Metrics(partition string) *SemaphoreMetrics

Metrics returns the semaphore metrics for the named partition, or nil if not found.

func (*Bulkhead) Partitions

func (b *Bulkhead) Partitions() []string

Partitions returns a snapshot of all registered partition names.

func (*Bulkhead) TryCall

func (b *Bulkhead) TryCall(ctx context.Context, partition string, p Priority, fn func(context.Context) error) error

TryCall executes fn immediately if a slot is available, returning ErrBulkheadFull otherwise. Never blocks.

type BulkheadMetrics

type BulkheadMetrics struct {
	Acquired SemaphoreMetrics // reuses semaphore metrics per partition
}

BulkheadMetrics tracks statistics for a single named partition.

type BulkheadOption

type BulkheadOption func(*Bulkhead)

BulkheadOption configures a Bulkhead.

func BulkheadWithDefaultCapacity

func BulkheadWithDefaultCapacity(capacity int) BulkheadOption

BulkheadWithDefaultCapacity sets the capacity used when auto-creating partitions on first use (default 10). Set to 0 to disable auto-creation.

func BulkheadWithPartition

func BulkheadWithPartition(name string, capacity int) BulkheadOption

BulkheadWithPartition pre-registers a named partition with the given capacity.

func BulkheadWithSemaphoreOptions

func BulkheadWithSemaphoreOptions(opts ...SemaphoreOption) BulkheadOption

BulkheadWithSemaphoreOptions passes additional SemaphoreOptions to every partition semaphore (e.g. CoDel settings).

type Callback

type Callback func(id string)

Callback defines a callback function without error return for post-execution logic. It accepts a unique identifier string for the operation that completed. Use this type for notifications, cleanup, or metrics that do not affect flow control.

type CallbackCtx

type CallbackCtx func(ctx context.Context, id string)

CallbackCtx defines a context-aware callback function without error return. It accepts a context for cancellation and an identifier for the completed operation. Use this type when callbacks need to coordinate with context deadlines or values.

type Caller

type Caller struct {
	// contains filtered or unexported fields
}

Caller defines a group of calls that execute together

func NewCaller

func NewCaller(calls ...Func) *Caller

NewFuncGroup creates a new Caller

func (*Caller) Append

func (cg *Caller) Append(calls ...Func) *Caller

Append adds more calls to the group

func (*Caller) Execute

func (cg *Caller) Execute() Func

Same as run but iignore errors

func (*Caller) Func

func (cg *Caller) Func() Func

Func converts Caller to a Func

func (*Caller) Run

func (cg *Caller) Run() error

Run runs all calls in the group

type CaughtPanic

type CaughtPanic struct {
	Value interface{} // The value passed to panic()
	Stack []byte      // The stack trace (may be empty if not collected)
}

Package jack provides utilities for safe, context-aware function execution with mutex protection. It includes methods to execute functions with panic recovery, context cancellation support, and mutex locking, eliminating the need for verbose boilerplate when handling timeouts or cancellations. CaughtPanic represents a panic that was caught during execution.

func (*CaughtPanic) Error

func (c *CaughtPanic) Error() string

Error implements the error interface.

func (*CaughtPanic) String

func (c *CaughtPanic) String() string

String provides a formatted string representation of the panic.

func (*CaughtPanic) Unwrap

func (c *CaughtPanic) Unwrap() error

Unwrap provides compatibility with errors.Is/As.

type Cycle

type Cycle func(*Scheduling)

Cycle is a functional option type for configuring a Scheduler. It uses the functional options pattern to allow flexible and extensible configuration of scheduler settings.

func SchedulingWithObservable

func SchedulingWithObservable(obs Observable[Schedule]) Cycle

SchedulingWithObservable returns a Cycle to set an observable for events.

func SchedulingWithRetry

func SchedulingWithRetry(count int, backoff time.Duration) Cycle

SchedulingWithRetry returns a Cycle to configure retry attempts on queue full errors.

type Debouncer

type Debouncer struct {
	// contains filtered or unexported fields
}

Debouncer groups calls to a function, executing only after a period of inactivity or when certain thresholds (max calls or max wait) are met. It uses a mutex for thread-safety and timers for delay and maxWait enforcement.

func NewDebouncer

func NewDebouncer(options ...DebouncerOption) *Debouncer

NewDebouncer creates a new Debouncer instance configured with the given functional options. The WithDebounceDelay option is required; other options are optional with sensible defaults. Timers are initialized but stopped until first use; limits default to no restrictions.

func (*Debouncer) Cancel

func (d *Debouncer) Cancel()

Cancel prevents a pending debounced function from executing by clearing internal state. It stops all active timers and resets call counters without invoking the pending function. Safe to call multiple times or when no function is pending; has no effect if already closed.

func (*Debouncer) Do

func (d *Debouncer) Do(fn func())

Do schedules the given function to execute after the configured delay period of inactivity. Each call resets the delay timer and updates the pending function to the latest provided. If maxCalls or maxWait thresholds are met, execution happens immediately without waiting.

func (*Debouncer) Flush

func (d *Debouncer) Flush()

Flush executes any pending debounced function immediately without waiting for timers. It stops all active timers, executes the function in a new goroutine, and resets state. If no function is currently pending, this method does nothing and returns safely.

func (*Debouncer) IsPending

func (d *Debouncer) IsPending() bool

IsPending returns true if a debounced function is currently waiting to be executed. This method checks the internal call counter while holding the mutex for thread safety. Useful for monitoring debouncer state without triggering any execution or side effects.

func (*Debouncer) Stop

func (d *Debouncer) Stop()

Stop permanently shuts down the debouncer and prevents any future function executions. It sets the closed flag, clears pending functions, and stops all active timers immediately. After calling Stop, any subsequent Do calls will be ignored and no operations will proceed.

type DebouncerOption

type DebouncerOption func(*Debouncer)

DebouncerOption is a functional option for configuring the Debouncer. These options allow customization of delay, maximum calls, and maximum wait time. Use WithDebounceDelay (required), WithDebounceMaxCalls, and WithDebounceMaxWait to set behaviors.

func WithDebounceDelay

func WithDebounceDelay(d time.Duration) DebouncerOption

WithDebounceDelay sets the delay period after the last call before execution. This is a required option; without it, the debouncer will not function correctly. The delay determines the inactivity period needed to trigger the debounced function.

func WithDebounceMaxCalls

func WithDebounceMaxCalls(count int) DebouncerOption

WithDebounceMaxCalls sets the maximum number of calls before immediate execution. By default, there is no limit on call accumulation using NoLimitCalls constant. Once this threshold is reached, the function executes regardless of remaining delay.

func WithDebounceMaxWait

func WithDebounceMaxWait(limit time.Duration) DebouncerOption

WithDebounceMaxWait sets the maximum wait time before forced function execution. This check happens on each Do call, so total wait could be up to maxWait plus delay. A separate timer ensures execution after maxWait even if no new calls occur.

func WithDebouncerPool

func WithDebouncerPool(pool *Pool) DebouncerOption

WithDebouncerPool sets a custom pool for executing debounced functions asynchronously. If not provided via this option, the default package-level pool will be used instead. This allows control over goroutine scheduling and resource management for executions.

type Doctor

type Doctor struct {
	// contains filtered or unexported fields
}

Doctor monitors patients with global and per-patient metrics.

func NewDoctor

func NewDoctor(opts ...DoctorOption) *Doctor

NewDoctor creates and initializes a new Doctor instance with the provided configuration options. It sets up the patient priority queue, starts the scheduler goroutine, and creates a default pool if needed. The returned Doctor is immediately active and ready to accept patient registrations via Add.

func (*Doctor) Add

func (d *Doctor) Add(p *Patient) error

Add registers a new patient with the Doctor, validating its configuration and scheduling its first check. If a patient with the same ID already exists, it is replaced and the old instance is marked removed. Returns an error if the patient lacks a required Check function or if registration fails.

func (*Doctor) GetState

func (d *Doctor) GetState(id string) (PatientState, bool)

GetState retrieves the current health state of a patient by ID along with an existence indicator. If the patient is not registered, it returns PatientUnknown and false for the existence flag. This method provides thread-safe read access to patient state without modifying any internal data.

func (*Doctor) Metrics

func (d *Doctor) Metrics() *DoctorMetrics

Metrics returns a reference to the Doctor's global metrics struct for monitoring operational statistics. All metric fields use atomic operations, allowing safe concurrent reads without additional locking. Use this to expose health check statistics to dashboards, alerts, or external monitoring systems.

func (*Doctor) Remove

func (d *Doctor) Remove(id string) bool

Remove unregisters a patient by ID, marking it as removed and preventing future scheduled checks. The method returns true if the patient was found and successfully removed, false otherwise. Removed patients are cleaned from the scheduler queue on the next scheduling cycle.

func (*Doctor) SetDegraded

func (d *Doctor) SetDegraded(id string, degraded bool)

SetDegraded manually overrides a patient's degradation state, forcing degraded or healthy status. If the patient ID is not found, the method logs a warning and returns without making changes. Use this for operational control, such as forcing maintenance mode or acknowledging known issues.

func (*Doctor) Stop

func (d *Doctor) Stop(id string) bool

Stop halts a specific patient by ID, marking it removed and preventing any further health checks. It returns true if the patient was found and stopped, false if the patient did not exist. This method is safe to call multiple times and ensures clean removal from the scheduler.

func (*Doctor) StopAll

func (d *Doctor) StopAll(timeout time.Duration)

StopAll gracefully shuts down the Doctor, stopping all patients and the scheduler goroutine. It uses an atomic flag to ensure idempotency and waits for in-flight operations with the given timeout. After calling StopAll, the Doctor cannot be restarted and all patient registrations are cleared.

type DoctorMetrics

type DoctorMetrics struct {
	PatientsTotal     atomic.Int64
	ChecksTotal       atomic.Uint64
	ChecksHealthy     atomic.Uint64
	ChecksDegraded    atomic.Uint64
	ChecksFailed      atomic.Uint64
	StateChanges      atomic.Uint64
	AcceleratedChecks atomic.Uint64
	ManualDegraded    atomic.Uint64
	Recoveries        atomic.Uint64
	Timeouts          atomic.Uint64
	PanicsRecovered   atomic.Uint64
	PoolSubmits       atomic.Uint64
	PoolSubmitFails   atomic.Uint64
}

DoctorMetrics tracks global operational metrics across all patients.

type DoctorOption

type DoctorOption func(*Doctor)

func DoctorWithGlobalTimeout

func DoctorWithGlobalTimeout(t time.Duration) DoctorOption

DoctorWithGlobalTimeout configures the default timeout for patient checks lacking individual settings. If the provided duration is zero or negative, the option is ignored and the default applies. This ensures checks do not hang indefinitely and helps maintain responsive health monitoring.

func DoctorWithLogger

func DoctorWithLogger(l *ll.Logger) DoctorOption

DoctorWithLogger assigns a namespaced logger instance for structured Doctor operation logging. If the provided logger is nil, the option has no effect and the default logger remains active. Use this to integrate Doctor logs with your application's logging infrastructure and levels.

func DoctorWithMaxConcurrent

func DoctorWithMaxConcurrent(n int) DoctorOption

DoctorWithMaxConcurrent sets the maximum number of concurrent health checks the Doctor can run. If the provided value is zero or negative, the option is ignored and the default applies. This limit controls resource usage and prevents system overload during intensive monitoring.

func DoctorWithObservable

func DoctorWithObservable(obs Observable[PatientEvent]) DoctorOption

DoctorWithObservable attaches an event observer to receive patient state change notifications. The observable receives PatientEvent structs whenever a patient's health status updates. Use this for external monitoring, logging, or triggering downstream actions based on health.

func DoctorWithPool

func DoctorWithPool(pool *Pool) DoctorOption

DoctorWithPool configures the Doctor to use a custom worker pool for check execution. If the provided pool is nil, the option has no effect and default pool creation applies. Use this to control concurrency limits and resource sharing across multiple Doctors.

func DoctorWithVerbose

func DoctorWithVerbose(v bool) DoctorOption

DoctorWithVerbose enables or disables detailed debug logging for Doctor operations and checks. When enabled, additional context like durations and errors are logged for troubleshooting. Use this during development or debugging, and disable in production for reduced log volume.

type Event

type Event struct {
	Type     string        // Type of event: "queued", "run", "done"
	TaskID   string        // Optional task identifier
	WorkerID string        // Identifier of the worker processing the task
	Time     time.Time     // Time of the event
	Duration time.Duration // Duration of task execution (for "done" events)
	Err      error         // Error, if any (for "done" events)
}

Event captures details of task execution for observability. Thread-safe for use in notification across goroutines.

type Func

type Func func() error

Func converts a function returning an error into a Task. Example:

pool.Submit(jack.Func(func() error { return nil })) // Submits a simple task

func (Func) Do

func (f Func) Do() error

Do executes the function to satisfy the Task interface.

type FuncCtx

type FuncCtx func(ctx context.Context) error

FuncCtx converts a context-aware function into a TaskCtx. Example:

pool.SubmitCtx(ctx, jack.FuncCtx(func(ctx context.Context) error { return nil })) // Submits a context-aware task

func (FuncCtx) Do

func (f FuncCtx) Do(ctx context.Context) error

Do executes the function with the given context to satisfy the TaskCtx interface.

type Future

type Future[T any] struct {
	// contains filtered or unexported fields
}

func Async

func Async[T any](fn func() (T, error)) *Future[T]

Async runs fn in a new goroutine and returns a Future for its result.

func AsyncWithContext

func AsyncWithContext[T any](ctx context.Context, fn func(context.Context) (T, error)) *Future[T]

AsyncWithContext runs fn in a new goroutine under the given context.

func NewFuture

func NewFuture[T any](ctx context.Context, fn func(context.Context) (T, error)) *Future[T]

NewFuture creates a Future that executes fn asynchronously in a new goroutine. The context governs cancellation; fn receives it directly. If fn is nil, the Future is returned in an already-failed state.

func (*Future[T]) Await

func (f *Future[T]) Await() (T, error)

Await blocks until the Future completes and returns its result and any error.

func (*Future[T]) AwaitWithContext

func (f *Future[T]) AwaitWithContext(ctx context.Context) (T, error)

AwaitWithContext blocks until the Future completes or ctx is cancelled.

func (*Future[T]) AwaitWithTimeout

func (f *Future[T]) AwaitWithTimeout(timeout time.Duration) (T, error)

AwaitWithTimeout blocks until the Future completes or timeout elapses. Returns ErrFutureTimeout if the deadline is exceeded.

func (*Future[T]) Cancel

func (f *Future[T]) Cancel() bool

Cancel signals the Future to stop. Returns true if this call caused the cancellation.

func (*Future[T]) IsCanceled

func (f *Future[T]) IsCanceled() bool

IsCanceled reports whether the Future was cancelled before completing.

func (*Future[T]) IsDone

func (f *Future[T]) IsDone() bool

IsDone reports whether the Future has finished, either successfully or with an error.

func (*Future[T]) Recover

func (f *Future[T]) Recover(ctx context.Context, fn func(error) (T, error)) *Future[T]

Recover chains an error handler that can substitute a value when the Future fails. The handler fn is called only if the original Future returns a non-nil error.

func (*Future[T]) Then

func (f *Future[T]) Then(ctx context.Context, fn func(T) (any, error)) *Future[any]

Then chains a transformation on a successful result, returning a new Future. The chained fn is called only if the original Future completes without error.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group coordinates concurrent execution of functions with error and panic handling. It supports context cancellation and worker limits, collecting errors in a channel. Thread-safe via wait group, mutex, and channel operations.

func NewGroup

func NewGroup() *Group

NewGroup creates a new Group for running functions concurrently. It initializes an error channel with a buffer of 1. Thread-safe via initialization. Example:

group := NewGroup() // Creates a new group

func (*Group) Errors

func (g *Group) Errors() <-chan error

Errors returns a channel that receives the first error from Go or GoCtx goroutines. The channel has a buffer of 1 and is closed by Wait. Thread-safe via channel operations. Example:

for err := range group.Errors() { ... } // Reads errors

func (*Group) Go

func (g *Group) Go(f func() error)

Go runs a function in a new goroutine with error and panic handling. The first non-nil error or panic is sent to the Errors channel. If a context is set, it is cancelled on the first error. Thread-safe via wait group and doWork. Example:

group.Go(func() error { return nil }) // Runs a function

func (*Group) GoCtx

func (g *Group) GoCtx(f func(context.Context) error)

GoCtx runs a context-aware function in a new goroutine. It requires a context set via WithContext and passes it to the function. The first error, panic, or context cancellation is sent to the Errors channel, and the context is cancelled. Thread-safe via wait group and doWork. Example:

group.WithContext(ctx).GoCtx(func(ctx context.Context) error { return nil }) // Runs a context-aware function

func (*Group) Wait

func (g *Group) Wait()

Wait blocks until all Go and GoCtx goroutines complete. It closes the Errors channel and cancels the Group’s context, if set. Thread-safe via wait group and once. Example:

group.Wait() // Waits for all goroutines

func (*Group) WithContext

func (g *Group) WithContext(ctx context.Context) *Group

WithContext associates a context with the Group, enabling cancellation. It cancels any previous context and creates a new cancellable context. Thread-safe via context operations. Example:

group.WithContext(ctx) // Sets group context

func (*Group) WithLimit

func (g *Group) WithLimit(n int) *Group

WithLimit sets the maximum number of concurrent workers in the Group. Non-positive values remove the limit. Thread-safe via semaphore initialization. Example:

group.WithLimit(5) // Limits to 5 concurrent workers

type HedgeGroup

type HedgeGroup struct {
	// contains filtered or unexported fields
}

HedgeGroup runs multiple hedge strategies concurrently and returns the first successful result. Useful when hedging across different endpoints or regions.

func NewHedgeGroup

func NewHedgeGroup(hedgers ...*Hedger) *HedgeGroup

NewHedgeGroup creates a group of independent hedgers.

func (*HedgeGroup) Do

func (g *HedgeGroup) Do(ctx context.Context, fn func(context.Context) (any, error)) (any, error)

Do fires each hedger concurrently and returns the first success.

type HedgeMetrics

type HedgeMetrics struct {
	Requests   atomic.Uint64 // total primary calls
	Hedged     atomic.Uint64 // times a second request was fired
	PrimaryWon atomic.Uint64 // primary response arrived first
	HedgeWon   atomic.Uint64 // hedge response arrived first
	Errors     atomic.Uint64 // all attempts failed
	AvgDelayNs atomic.Int64  // EWMA of hedge fire delay in nanoseconds
}

HedgeMetrics tracks hedged request operational statistics.

type HedgeOption

type HedgeOption func(*Hedger)

HedgeOption configures a Hedger.

func HedgeWithDelay

func HedgeWithDelay(d time.Duration) HedgeOption

HedgeWithDelay sets a fixed delay before the hedge fires (default: use latency percentile). When set, the latency window is not consulted.

func HedgeWithMaxConcurrent

func HedgeWithMaxConcurrent(n int) HedgeOption

HedgeWithMaxConcurrent sets the maximum number of in-flight hedged pairs (default 100). When the limit is reached, no hedge is fired and only the primary runs.

func HedgeWithMinSamples

func HedgeWithMinSamples(n int) HedgeOption

HedgeWithMinSamples sets how many RTT samples must be collected before the latency-based hedge delay activates (default 10). Before that, a zero delay (fire immediately) is used.

func HedgeWithPercentile

func HedgeWithPercentile(p int) HedgeOption

HedgeWithPercentile sets which RTT percentile to use as the hedge delay (default 50). Valid range: 1–99.

type Hedger

type Hedger struct {
	// contains filtered or unexported fields
}

Hedger implements the hedged requests pattern.

A primary request is sent immediately. If it has not responded within the hedge delay (derived from recent latency or a fixed duration), an identical second request is fired. Whichever responds first is returned; the other is cancelled via context cancellation.

This reduces tail latency at the cost of occasionally sending duplicate requests. It is safe only for idempotent operations (reads, retried writes with idempotency keys).

The hedge delay adapts automatically: it is the Nth percentile of recent call RTTs measured in a lock-free circular sample buffer. No third-party sketch library is used.

func NewHedger

func NewHedger(opts ...HedgeOption) *Hedger

NewHedger creates a Hedger with sensible defaults.

func (*Hedger) Do

func (h *Hedger) Do(ctx context.Context, fn func(context.Context) (any, error)) (any, error)

Do executes fn with hedging. fn receives a context that is cancelled when the other attempt wins. The caller's ctx governs the outer deadline.

fn must be idempotent — it may be called twice concurrently.

func (*Hedger) Metrics

func (h *Hedger) Metrics() *HedgeMetrics

Metrics returns the hedger's operational metrics.

type HedgerOf

type HedgerOf[T any] struct {
	// contains filtered or unexported fields
}

HedgerOf wraps Hedger for typed results, avoiding any cast at the call site.

func NewHedgerOf

func NewHedgerOf[T any](opts ...HedgeOption) *HedgerOf[T]

NewHedgerOf creates a typed hedger.

func (*HedgerOf[T]) Do

func (h *HedgerOf[T]) Do(ctx context.Context, fn func(context.Context) (T, error)) (T, error)

Do executes fn with hedging and returns a typed result.

func (*HedgerOf[T]) Metrics

func (h *HedgerOf[T]) Metrics() *HedgeMetrics

Metrics returns the underlying hedger's metrics.

type Hook

type Hook func(id string) error

Hook defines a lifecycle hook function that returns an error. It accepts a unique identifier string for the operation being tracked. Use this type for pre-execution validation or setup logic that may fail.

type HookCtx

type HookCtx func(ctx context.Context, id string) error

HookCtx defines a context-aware lifecycle hook function that returns an error. It accepts a context for cancellation and an identifier for the tracked operation. Use this type when hooks need to respect timeouts or external cancellation signals.

type Identifiable

type Identifiable interface {
	// ID returns a unique identifier for the task.
	ID() string
}

Identifiable is an optional interface for tasks to provide a custom ID for logging, metrics, and tracing.

type Lease

type Lease struct {
	// contains filtered or unexported fields
}

Lease represents a time-bounded slot acquired from a Semaphore. If the holder fails to call Release before the deadline, the Reaper automatically reclaims the slot so it is never permanently lost.

Obtain a Lease via Leaser.Acquire, not by constructing directly.

func (*Lease) Deadline

func (l *Lease) Deadline() time.Time

Deadline returns when this lease will be automatically reclaimed.

func (*Lease) ID

func (l *Lease) ID() string

ID returns the lease's unique identifier.

func (*Lease) Release

func (l *Lease) Release() error

Release explicitly returns the semaphore slot and cancels the reaper timer. Calling Release more than once is safe — subsequent calls are no-ops.

type LeaseMetrics

type LeaseMetrics struct {
	Acquired atomic.Uint64 // leases successfully granted
	Released atomic.Uint64 // leases explicitly released
	Expired  atomic.Uint64 // leases reclaimed by the reaper
	Active   atomic.Int64  // leases currently held
}

LeaseMetrics tracks lease operational statistics.

type Leaser

type Leaser struct {
	// contains filtered or unexported fields
}

Leaser wraps a Semaphore and a Reaper to provide leases: acquired semaphore slots with automatic reclamation on expiry.

This solves the problem where a crashed or slow holder never calls Release, permanently consuming a concurrency slot. The Reaper fires after TTL and calls Release on the holder's behalf.

func NewLeaser

func NewLeaser(sem *Semaphore, opts ...LeaserOption) *Leaser

NewLeaser creates a Leaser backed by the given Semaphore. The caller owns the Semaphore and Reaper lifecycles; call Close on both when done.

func (*Leaser) Acquire

func (lm *Leaser) Acquire(ctx context.Context, id string, p Priority, ttl time.Duration) (*Lease, error)

Acquire waits for a semaphore slot and returns a Lease valid for ttl. If ttl is zero the manager's default TTL is used. The Lease must be Released when the work is done; if not, the Reaper reclaims it automatically after ttl.

func (*Leaser) Close

func (lm *Leaser) Close()

Close stops the internal Reaper. The backing Semaphore is not closed.

func (*Leaser) Metrics

func (lm *Leaser) Metrics() *LeaseMetrics

Metrics returns the manager's operational metrics.

type LeaserOption

type LeaserOption func(*Leaser)

LeaserOption configures a Leaser.

func LeaserWithTTL

func LeaserWithTTL(d time.Duration) LeaserOption

LeaserWithTTL sets the default lease lifetime (default 30s).

type Lifetime

type Lifetime struct {
	// contains filtered or unexported fields
}

func NewLifetime

func NewLifetime(opts ...LifetimeOption) *Lifetime

NewLifetime creates and initializes a new Lifetime manager with the provided configuration options. It spawns one pruning goroutine per shard to handle timer expiration with minimal lock contention. The returned Lifetime is immediately active and ready to schedule timed callbacks via ScheduleTimed.

func (*Lifetime) CancelTimed

func (lm *Lifetime) CancelTimed(id string) bool

CancelTimed removes a scheduled timer by ID, preventing its callback from executing. It returns true if the timer was found and successfully cancelled, false if none existed. This method is safe to call multiple times and has no effect if the timer already expired.

func (*Lifetime) ExecuteCtxWithLifetime

func (lm *Lifetime) ExecuteCtxWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation FuncCtx) error

ExecuteCtxWithLifetime runs a context-aware operation with Vitals hooks and schedules timed callback on success. If the operation succeeds and a timed callback is configured, it is scheduled for later execution. This method combines immediate context-aware execution with deferred lifecycle callback management.

func (*Lifetime) ExecuteWithLifetime

func (lm *Lifetime) ExecuteWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation Func) error

ExecuteWithLifetime runs an operation with Vitals hooks and schedules a timed callback on success. If the operation succeeds and a timed callback is configured, it is scheduled for later execution. This method combines immediate operation execution with deferred lifecycle callback management.

func (*Lifetime) GetRemainingDuration

func (lm *Lifetime) GetRemainingDuration(id string) (time.Duration, bool)

GetRemainingDuration returns the time remaining until a scheduled timer expires for the given ID. It returns the duration and true if the timer exists and has not yet expired, otherwise zero and false. This method is useful for displaying countdowns or making scheduling decisions based on timer state.

func (*Lifetime) HasPending

func (lm *Lifetime) HasPending(id string) bool

HasPending checks whether a timer is currently scheduled for the given ID. It returns true if the ID exists in any shard's queue, false otherwise. This method provides thread-safe read access without modifying internal state.

func (*Lifetime) Metrics

func (lm *Lifetime) Metrics() *LifetimeMetrics

Metrics returns a reference to the Lifetime manager's metrics struct for monitoring operational statistics. All metric fields use atomic operations, allowing safe concurrent reads without additional locking. Use this to expose timer scheduling statistics to dashboards, alerts, or external monitoring systems.

func (*Lifetime) PendingCount

func (lm *Lifetime) PendingCount() int

PendingCount returns the total number of timers currently scheduled across all shards. It iterates through each shard while holding locks to ensure an accurate snapshot count. Use this for monitoring queue depth or implementing backpressure in high-load scenarios.

func (*Lifetime) ResetTimed

func (lm *Lifetime) ResetTimed(id string) bool

ResetTimed extends the expiration time of an existing timer by its original duration from now. It returns true if the timer was found and successfully reset, false if no timer existed for the ID. This method is useful for implementing keep-alive or activity-based timeout extension patterns.

func (*Lifetime) ScheduleLifetimeTimed

func (lm *Lifetime) ScheduleLifetimeTimed(ctx context.Context, id string, lifetime *Vitals)

ScheduleLifetimeTimed schedules a timed callback using configuration from a Vitals instance. If the Vitals or its Timed callback is nil, it resets any existing timer for the ID instead. This convenience method integrates Lifetime timer management with Vitals lifecycle configuration.

func (*Lifetime) ScheduleTimed

func (lm *Lifetime) ScheduleTimed(ctx context.Context, id string, callback CallbackCtx, wait time.Duration)

ScheduleTimed registers a callback to execute after the specified wait duration for a given ID. If a timer already exists for the ID, it is replaced with the new callback and expiration time. The method uses sharding for concurrency and signals the prune loop to re-evaluate scheduling.

func (*Lifetime) Stop

func (lm *Lifetime) Stop(ids ...string)

Stop cancels timers for the specified IDs, or all timers if no IDs are provided. It groups IDs by shard for efficient batch removal and signals prune loops to re-evaluate. This method provides a convenient way to clean up multiple timers without individual CancelTimed calls.

func (*Lifetime) StopAll

func (lm *Lifetime) StopAll()

StopAll gracefully shuts down the Lifetime manager, cancelling all pending timers and goroutines. It signals the context to stop prune loops, waits for them to exit, then clears all shard state. After calling StopAll, the Lifetime instance cannot be reused and all scheduled callbacks are discarded.

type LifetimeMetrics

type LifetimeMetrics struct {
	ScheduleCalls   atomic.Uint64
	ResetCalls      atomic.Uint64
	CancelCalls     atomic.Uint64
	ExecuteCalls    atomic.Uint64
	ExpiredTotal    atomic.Uint64
	ExpiredHandled  atomic.Uint64
	ExpiredMissed   atomic.Uint64
	ExpiredErrors   atomic.Uint64
	ActiveTimers    atomic.Int64
	MaxActiveTimers atomic.Int64
	TotalTimersSeen atomic.Uint64
	AvgExpirationMs atomic.Int64
	MinExpirationMs atomic.Int64
	MaxExpirationMs atomic.Int64
	LoopIterations  atomic.Uint64
	SignalsSent     atomic.Uint64
	StopsReceived   atomic.Uint64
}

LifetimeMetrics tracks operational metrics for the lifetime manager.

type LifetimeOption

type LifetimeOption func(*Lifetime)

func LifetimeWithLogger

func LifetimeWithLogger(l *ll.Logger) LifetimeOption

LifetimeWithLogger assigns a namespaced logger instance for structured Lifetime operation logging. If the provided logger is nil, the option has no effect and the default logger remains active. Use this to integrate Lifetime logs with your application's logging infrastructure and levels.

func LifetimeWithMinTick

func LifetimeWithMinTick(tick time.Duration) LifetimeOption

LifetimeWithMinTick sets the minimum polling interval for the expiration check loop. If the provided duration is zero or negative, the option is ignored and default applies. Use this to balance responsiveness against CPU usage when processing expired timers.

func LifetimeWithShards

func LifetimeWithShards(count uint32) LifetimeOption

LifetimeWithShards configures the number of shards for concurrent timer management. The count must be a power of two for efficient bitwise modulo hashing distribution. Use higher shard counts to reduce lock contention under heavy timer scheduling load.

func LifetimeWithTimerLimit

func LifetimeWithTimerLimit(limit time.Duration) LifetimeOption

LifetimeWithTimerLimit configures the maximum execution time allowed for timer callbacks. If the provided duration is zero or negative, the option is ignored and default applies. This prevents long-running callbacks from blocking the expiration processing loop.

type LoopConfig

type LoopConfig struct {
	Name        string
	Interval    time.Duration
	Jitter      float64
	Backoff     bool
	MaxInterval time.Duration
	MinInterval time.Duration
	Immediate   bool
	Context     context.Context
	Logger      *ll.Logger
}

type Looper

type Looper struct {
	// contains filtered or unexported fields
}

func NewLooper

func NewLooper(task Func, opts ...LooperOption) *Looper

NewLooper creates a new Looper with full metrics.

func (*Looper) CurrentInterval

func (l *Looper) CurrentInterval() time.Duration

CurrentInterval returns the interval currently in use, including any backoff applied.

func (*Looper) FailureCount

func (l *Looper) FailureCount() uint64

FailureCount returns the number of consecutive errors since the last success.

func (*Looper) IsRunning

func (l *Looper) IsRunning() bool

IsRunning reports whether the looper goroutine is active.

func (*Looper) Metrics

func (l *Looper) Metrics() *LooperMetrics

Metrics returns the looper's metrics.

func (*Looper) ResetInterval

func (l *Looper) ResetInterval()

ResetInterval restores the interval to the value set at construction time.

func (*Looper) SetInterval

func (l *Looper) SetInterval(d time.Duration)

SetInterval updates the iteration interval dynamically; takes effect on the next tick.

func (*Looper) Start

func (l *Looper) Start()

Start launches the looper goroutine. Safe to call only once.

func (*Looper) Stop

func (l *Looper) Stop()

Stop signals the looper to exit and blocks until it does.

type LooperMetrics

type LooperMetrics struct {
	Executions      atomic.Uint64
	Failures        atomic.Uint64
	Successes       atomic.Uint64
	BackoffEvents   atomic.Uint64
	IntervalChanges atomic.Uint64
	PanicsRecovered atomic.Uint64
	ContextCancels  atomic.Uint64
	Timeouts        atomic.Uint64

	LastRun   atomic.Value
	LastError atomic.Value
	LastStart atomic.Int64
	LastEnd   atomic.Int64

	TotalDurationNs atomic.Int64
	MinDurationNs   atomic.Int64
	MaxDurationNs   atomic.Int64

	CurrentIntervalNs atomic.Int64
	CurrentBackoffNs  atomic.Int64
}

LooperMetrics tracks comprehensive operational metrics.

type LooperOption

type LooperOption func(*LoopConfig)

func WithLooperBackoff

func WithLooperBackoff(enabled bool) LooperOption

WithLooperBackoff enables exponential backoff on consecutive errors.

func WithLooperContext

func WithLooperContext(ctx context.Context) LooperOption

WithLooperContext attaches an external context; cancelling it stops the looper.

func WithLooperImmediate

func WithLooperImmediate(immediate bool) LooperOption

WithLooperImmediate makes the looper execute fn once before the first timer fires.

func WithLooperInterval

func WithLooperInterval(interval time.Duration) LooperOption

WithLooperInterval sets the base interval between loop iterations.

func WithLooperJitter

func WithLooperJitter(jitter float64) LooperOption

WithLooperJitter adds proportional random jitter to each interval to spread load.

func WithLooperLogger

func WithLooperLogger(logger *ll.Logger) LooperOption

WithLooperLogger sets a custom logger for the looper.

func WithLooperMaxInterval

func WithLooperMaxInterval(max time.Duration) LooperOption

WithLooperMaxInterval caps the backoff interval ceiling.

func WithLooperMinInterval

func WithLooperMinInterval(min time.Duration) LooperOption

WithLooperMinInterval sets the floor below which the interval will not drop.

func WithLooperName

func WithLooperName(name string) LooperOption

WithLooperName sets the display name used in logs and metrics.

type Observable

type Observable[T any] interface {
	// Add registers one or more observers to receive notifications.
	Add(observers ...Observer[T])
	// Remove unregisters one or more observers.
	Remove(observers ...Observer[T])
	// Notify sends one or more events to all registered observers.
	Notify(events ...T)
	// Shutdown stops the observable and waits for all notifications to complete.
	Shutdown()
}

Observable defines the interface for objects that can be observed by multiple Observer instances. It supports adding and removing observers, notifying them of one or more events, and shutting down cleanly. The type parameter T matches the event type for observers. Example:

obs := NewObservable[string]()
logObserver := &LogObserver{}
obs.Add(logObserver)
obs.Notify("Task completed")
obs.Shutdown()

func NewObservable

func NewObservable[T any](numNotifyWorkers ...int) Observable[T]

NewObservable creates and returns a new Observable instance. It optionally accepts the number of worker goroutines for asynchronous notifications (defaults to 5 if not provided or invalid). Workers process notifications in parallel to avoid blocking the notifier, with panic recovery for observer errors. Parameters: numNotifyWorkers: Optional number of worker goroutines (positive integer). If not provided or invalid, defaults to 5.

Returns: Observable[T]: A new Observable instance ready to accept observers and events.

Example:

// Create an observable with 2 workers
obs := NewObservable[string](2)
logObserver := &LogObserver{}
obs.Add(logObserver)
obs.Notify("Event 1") // Sends "Event 1" to logObserver asynchronously

type Observer

type Observer[T any] interface {
	OnNotify(value T) // Called when an event of type T is received
}

Observer defines the interface for objects that wish to receive notifications from an Observable. The type parameter T specifies the type of events or values that will be notified. Implementors must provide an OnNotify method to handle incoming events. Example:

type LogObserver struct {}
func (l *LogObserver) OnNotify(event string) {
	fmt.Printf("Received event: %s\n", event)
}

type Patient

type Patient struct {
	// contains filtered or unexported fields
}

Patient holds runtime state for one monitored entity with its own metrics.

func NewPatient

func NewPatient(cfg PatientConfig) *Patient

NewPatient creates a Patient with its own metrics instance.

func (*Patient) Metrics

func (p *Patient) Metrics() *PatientMetrics

Metrics returns the patient's individual metrics.

func (*Patient) Remove

func (p *Patient) Remove()

Remove signals that this patient has been removed from monitoring. It invokes the OnRemove callback if configured.

type PatientConfig

type PatientConfig struct {
	ID            string
	Interval      time.Duration
	Jitter        float64
	Timeout       time.Duration
	Accelerated   time.Duration
	MaxFailures   uint64
	Check         FuncCtx
	OnStart       FuncCtx
	OnComplete    FuncCtx
	OnError       FuncCtx
	OnTimeout     FuncCtx
	OnRecover     FuncCtx
	OnRemove      func()
	OnStateChange func(PatientEvent)
}

PatientConfig defines monitoring parameters for a single patient.

type PatientEvent

type PatientEvent struct {
	ID        string
	State     PatientState
	LastCheck time.Time
	Duration  time.Duration
	Error     error
	Meta      map[string]any
}

PatientEvent is emitted for observability when a patient's state changes.

type PatientMetrics

type PatientMetrics struct {
	ChecksTotal     atomic.Uint64
	ChecksHealthy   atomic.Uint64
	ChecksDegraded  atomic.Uint64
	ChecksFailed    atomic.Uint64
	StateChanges    atomic.Uint64
	Recoveries      atomic.Uint64
	Timeouts        atomic.Uint64
	PanicsRecovered atomic.Uint64
	ConsecFailures  atomic.Int64
	ConsecSuccesses atomic.Int64
	LastCheckMs     atomic.Int64
	AvgCheckMs      atomic.Int64
	LastFailureMs   atomic.Int64
	LastRecoveryMs  atomic.Int64
}

PatientMetrics tracks per-patient operational metrics.

type PatientState

type PatientState int

PatientState represents the health state of a monitored patient.

const (
	PatientUnknown PatientState = iota
	PatientHealthy
	PatientDegraded
	PatientFailed
)

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a fixed number of worker goroutines to execute tasks concurrently. It supports task submission with or without context, shutdown with timeout, and observability.

func NewPool

func NewPool(numWorkers int, opts ...Pooling) *Pool

NewPool creates a new pool with the specified number of workers and optional configurations. Workers start immediately. At least one worker is always created.

func (*Pool) Do

func (p *Pool) Do(fn func())

Do submits a void function as a task, discarding any submission error.

func (*Pool) DoCtx

func (p *Pool) DoCtx(ctx context.Context, fn func(ctx context.Context))

DoCtx submits a context-aware void function as a task, discarding any submission error.

func (*Pool) IsClosed

func (p *Pool) IsClosed() bool

IsClosed returns true if the pool has been shut down.

func (*Pool) Logger

func (p *Pool) Logger(extLogger *ll.Logger) *Pool

Logger sets a custom logger for the pool, namespacing it as "pool".

func (*Pool) Metrics

func (p *Pool) Metrics() *PoolMetrics

Metrics returns the pool's operational metrics.

func (*Pool) QueueSize

func (p *Pool) QueueSize() int

QueueSize returns the current number of pending tasks in the buffer.

func (*Pool) Shutdown

func (p *Pool) Shutdown(timeout time.Duration) error

Shutdown gracefully stops the pool and waits for all workers to finish. Returns ErrShutdownTimedOut if workers do not exit within the timeout.

func (*Pool) Submit

func (p *Pool) Submit(ts ...Task) error

Submit enqueues one or more tasks for execution without context. Returns ErrPoolClosed if the pool is shut down, ErrQueueFull if the queue is at capacity.

func (*Pool) SubmitCtx

func (p *Pool) SubmitCtx(ctx context.Context, ts ...TaskCtx) error

SubmitCtx enqueues one or more context-aware tasks for execution. Blocks until the task is queued, the context is cancelled, or the pool is closed.

func (*Pool) Workers

func (p *Pool) Workers() int

Workers returns the number of worker goroutines in the pool.

type PoolMetrics

type PoolMetrics struct {
	TasksSubmitted  atomic.Uint64 // total tasks accepted into the queue
	TasksCompleted  atomic.Uint64 // tasks that finished without error
	TasksFailed     atomic.Uint64 // tasks that returned an error or panicked
	TasksRejected   atomic.Uint64 // tasks dropped due to full queue or closed pool
	PanicsRecovered atomic.Uint64 // panics caught inside task execution
	QueueDepth      atomic.Int64  // current number of tasks waiting in the channel
	MaxQueueDepth   atomic.Int64  // high-water mark of QueueDepth
	TotalDurationNs atomic.Int64  // cumulative nanoseconds spent executing tasks
	ActiveWorkers   atomic.Int64  // workers currently executing a task
}

PoolMetrics tracks operational statistics for a Pool. All fields use atomic operations and are safe for concurrent reads without locking.

type Pooling

type Pooling func(*poolingOpt)

Pooling is a functional option type for configuring the pool during creation.

func PoolingWithIDGenerator

func PoolingWithIDGenerator(fn func(interface{}) string) Pooling

PoolingWithIDGenerator sets a custom task ID generator function.

func PoolingWithNoID

func PoolingWithNoID() Pooling

PoolingWithNoID disables task ID generation entirely. This eliminates all allocations in the Submit hot path when observability (logging, event emission) is not needed. The worker will log an empty task ID. Use when pool submission is on a latency-critical path and task tracing is not required.

func PoolingWithObservable

func PoolingWithObservable(obs Observable[Event]) Pooling

PoolingWithObservable sets an observable for event notifications in the pool. The observable receives "queued", "run", and "done" events for every task.

func PoolingWithQueueSize

func PoolingWithQueueSize(size int) Pooling

PoolingWithQueueSize sets the task queue buffer size. Defaults to 2× the worker count when not provided or negative.

type Priority

type Priority int

Priority defines request importance for backpressure decisions. Lower numeric values indicate higher priority (Critical=0 is served first).

const (
	PriorityCritical Priority = iota // Admin, user-facing critical paths
	PriorityHigh                     // Standard user requests
	PriorityMedium                   // Async work, type-ahead
	PriorityLow                      // Backfill, batch, probes
)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a bounded, multi-priority, multi-consumer work queue. Higher-priority items are always dequeued before lower-priority ones. Under saturation it drops the lowest-priority items first (tail-drop per tier). This makes it suitable as the entry point of a load-balancer or transaction processor pipeline: pair it with a Semaphore or RateLimiter downstream.

func NewQueue

func NewQueue(handler func(context.Context, any) error, opts ...QueueOption) *Queue

NewQueue creates a priority queue that dispatches items to handler concurrently. handler is called once per item with the queue's context; if it returns a non-nil error the item is counted as failed but the queue keeps running.

func (*Queue) Close

func (q *Queue) Close()

Close stops accepting new items and waits for all workers to drain and exit.

func (*Queue) Depth

func (q *Queue) Depth() int

Depth returns the total number of items currently in the queue across all priorities.

func (*Queue) DepthByPriority

func (q *Queue) DepthByPriority() [priorityCount]int

DepthByPriority returns the number of items waiting at each priority level.

func (*Queue) Enqueue

func (q *Queue) Enqueue(p Priority, item any) error

Enqueue adds an item at the given priority. It returns ErrQueueFull if the per-priority bin is at capacity, or ErrQueueClosed if the queue is shut down.

func (*Queue) EnqueueCtx

func (q *Queue) EnqueueCtx(ctx context.Context, p Priority, item any) error

EnqueueCtx adds an item at the given priority, blocking until space is available, the context is cancelled, or the queue is closed.

func (*Queue) Metrics

func (q *Queue) Metrics() *QueueMetrics

Metrics returns the queue's operational metrics.

type QueueMetrics

type QueueMetrics struct {
	Enqueued  atomic.Uint64
	Dequeued  atomic.Uint64
	Dropped   atomic.Uint64
	Timeouts  atomic.Uint64
	Depth     atomic.Int64
	MaxDepth  atomic.Int64
	Saturated atomic.Uint64 // times the queue was full at enqueue time
}

QueueMetrics tracks operational statistics for the priority queue.

type QueueOption

type QueueOption func(*Queue)

QueueOption configures a Queue.

func QueueWithCapacity

func QueueWithCapacity(capacity int) QueueOption

QueueWithCapacity sets the maximum number of items the queue can hold per priority level. Total capacity is capacity × priorityCount. Default is 256 per level.

func QueueWithTimeout

func QueueWithTimeout(d time.Duration) QueueOption

QueueWithTimeout sets the maximum time an item may wait in the queue before being dropped. Zero means no timeout (items wait until a worker is free).

func QueueWithWorkers

func QueueWithWorkers(n int) QueueOption

QueueWithWorkers sets the number of concurrent consumer goroutines (default 1).

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter bounds throughput with a prioritized token bucket. Allow / AllowN are lock-free. Blocking Acquire uses CoDel-style priority queues. Replenish manually returns tokens; it is distinct from time-based refill and is intended for reservation patterns where the caller holds a token between operations.

func NewRateLimiter

func NewRateLimiter(ratePerSec float64, burst int, opts ...RateLimiterOption) *RateLimiter

NewRateLimiter creates a token bucket rate limiter. ratePerSec is the sustained fill rate; burst is the maximum token capacity. Default maxWait (CoDel dropping threshold) is 500ms.

func (*RateLimiter) Acquire

func (r *RateLimiter) Acquire(ctx context.Context, p Priority) error

Acquire waits for a single token, respecting priority and context cancellation. Higher-priority callers are dequeued before lower-priority ones already waiting.

func (*RateLimiter) Allow

func (r *RateLimiter) Allow(p Priority) bool

Allow consumes one token if available. Non-blocking, lock-free.

func (*RateLimiter) AllowN

func (r *RateLimiter) AllowN(p Priority, n int64) bool

AllowN attempts to consume n tokens atomically. Non-blocking, lock-free.

func (*RateLimiter) Burst

func (r *RateLimiter) Burst() int64

Burst returns the maximum token capacity the rate limiter was created with.

func (*RateLimiter) Close

func (r *RateLimiter) Close()

Close shuts down the rate limiter, unblocking all pending waiters with ErrRateLimiterClosed.

func (*RateLimiter) Metrics

func (r *RateLimiter) Metrics() *RateLimiterMetrics

Metrics returns the rate limiter's operational metrics.

func (*RateLimiter) Release

func (r *RateLimiter) Release()

Release replenishes one token and wakes the highest-priority waiter if any are queued. Retained for compatibility with patterns that pair Acquire with Release. Prefer Replenish(1) for new code where the intent is explicit.

func (*RateLimiter) Replenish

func (r *RateLimiter) Replenish(n int64)

Replenish adds n tokens back to the bucket, up to the burst ceiling. Use this when a caller held a reservation and is returning it — for example, when a request is retried locally and the upstream slot does not need to be consumed. This is NOT the same as time-based refill.

func (*RateLimiter) Reserve

func (r *RateLimiter) Reserve(n int64, opts ...ReserveOption) *Reservation

Reserve attempts to reserve n tokens from the rate limiter without blocking. It returns a Reservation whose Delay() tells the caller how long to wait. The caller can Cancel() the reservation if the delay is unacceptable.

Unlike Acquire, Reserve never blocks. It is safe for concurrent use.

func (*RateLimiter) ReserveMetrics

func (r *RateLimiter) ReserveMetrics() *ReservationMetrics

ReserveMetrics returns the metrics for the reservation subsystem. Returns nil if Reserve has never been called.

func (*RateLimiter) Tokens

func (r *RateLimiter) Tokens() int64

Tokens returns the number of tokens currently available.

type RateLimiterMetrics

type RateLimiterMetrics struct {
	AllowedFast    atomic.Uint64
	AllowedSlow    atomic.Uint64
	Rejected       atomic.Uint64
	TokensConsumed atomic.Uint64
	TokensRefilled atomic.Uint64
	QueueDepth     atomic.Int64
	MaxQueueDepth  atomic.Int64
}

RateLimiterMetrics tracks operational statistics for the rate limiter.

type RateLimiterOption

type RateLimiterOption func(*RateLimiter)

RateLimiterOption configures a RateLimiter.

func RateLimiterWithMaxWait

func RateLimiterWithMaxWait(d time.Duration) RateLimiterOption

RateLimiterWithMaxWait sets the hard ceiling on waiter age during CoDel dropping mode. In normal mode waiters are not evicted by age; their own context handles timeout.

type Reaper

type Reaper struct {
	// contains filtered or unexported fields
}

func NewReaper

func NewReaper(ttl time.Duration, opts ...ReaperOption) *Reaper

NewReaper creates a TTL-based expiry manager with min-heap scheduling. The handler is called once per expired entry in a background goroutine.

func (*Reaper) Clear

func (r *Reaper) Clear() int

Clear removes all tracked entries and returns how many were removed.

func (*Reaper) Count

func (r *Reaper) Count() int

Count returns the number of entries currently tracked across all shards.

func (*Reaper) Deadline

func (r *Reaper) Deadline() (time.Time, bool)

Deadline returns the earliest expiry time across all shards, or false if empty.

func (*Reaper) Metrics

func (r *Reaper) Metrics() *ReaperMetrics

Metrics returns the reaper's operational statistics.

func (*Reaper) Register

func (r *Reaper) Register(h ReaperHandler)

Register sets the handler for expired tasks (backward compatible).

func (*Reaper) Remove

func (r *Reaper) Remove(id string) bool

Remove cancels the TTL for the given ID. Returns true if the entry existed.

func (*Reaper) Start

func (r *Reaper) Start()

Start is a no-op for backward compatibility. Reaper starts automatically now.

func (*Reaper) Stop

func (r *Reaper) Stop()

Stop shuts down all reaper goroutines and releases internal resources.

func (*Reaper) Touch

func (r *Reaper) Touch(id string)

Touch resets the TTL for the given ID, creating the entry if it does not exist.

func (*Reaper) TouchAt

func (r *Reaper) TouchAt(id string, deadline time.Time)

TouchAt registers or updates an entry with an explicit expiry deadline.

type ReaperHandler

type ReaperHandler func(context.Context, string)

type ReaperMetrics

type ReaperMetrics struct {
	TouchCalls      atomic.Uint64
	TouchAtCalls    atomic.Uint64
	RemoveCalls     atomic.Uint64
	ClearCalls      atomic.Uint64
	ExpiredTotal    atomic.Uint64
	ExpiredHandled  atomic.Uint64
	ExpiredMissed   atomic.Uint64
	ExpiredErrors   atomic.Uint64
	ActiveTasks     atomic.Int64
	MaxActiveTasks  atomic.Int64
	TotalTasksSeen  atomic.Uint64
	AvgExpirationMs atomic.Int64
	MinExpirationMs atomic.Int64
	MaxExpirationMs atomic.Int64
	LoopIterations  atomic.Uint64
	SignalsSent     atomic.Uint64
	StopsReceived   atomic.Uint64
}

type ReaperOption

type ReaperOption func(*Reaper)

func ReaperWithHandler

func ReaperWithHandler(h ReaperHandler) ReaperOption

ReaperWithHandler sets the callback invoked when an entry's TTL expires.

func ReaperWithLogger

func ReaperWithLogger(l *ll.Logger) ReaperOption

ReaperWithLogger sets a custom logger for the reaper.

func ReaperWithMinTick

func ReaperWithMinTick(tick time.Duration) ReaperOption

ReaperWithMinTick sets the minimum poll interval for the expiry loop.

func ReaperWithShards

func ReaperWithShards(count uint32) ReaperOption

ReaperWithShards controls the number of internal heap shards for reduced contention.

func ReaperWithTimerLimit

func ReaperWithTimerLimit(limit time.Duration) ReaperOption

ReaperWithTimerLimit caps the duration the reaper will sleep before checking again.

type ReaperTask

type ReaperTask struct {
	ID       string
	Deadline time.Time
	// contains filtered or unexported fields
}

type Reservation

type Reservation struct {
	// contains filtered or unexported fields
}

Reservation is a future token grant from a RateLimiter. The caller inspects Delay() and decides whether to wait or drop. Once committed, call Wait(ctx) to block for exactly the reservation's delay. A Reservation is single-use and not safe for concurrent access.

func (*Reservation) Cancel

func (r *Reservation) Cancel()

Cancel releases the reservation without consuming tokens. Safe to call multiple times.

func (*Reservation) Delay

func (r *Reservation) Delay() time.Duration

Delay returns how long the caller must wait before the reserved tokens are available. Zero means tokens are available immediately. A negative value means the reservation was cancelled.

func (*Reservation) OK

func (r *Reservation) OK() bool

OK reports whether the reservation is valid and not yet used or cancelled.

func (*Reservation) Wait

func (r *Reservation) Wait(ctx context.Context) error

Wait blocks until the reservation's delay elapses or the context is cancelled. Returns nil when the tokens are ready to use.

type ReservationMetrics

type ReservationMetrics struct {
	Reserved atomic.Uint64 // successful Reserve calls
	Used     atomic.Uint64 // reservations that called Wait successfully
	Dropped  atomic.Uint64 // reservations cancelled before use
	Expired  atomic.Uint64 // reservations that timed out during Wait
}

ReservationMetrics tracks reservation operational statistics.

type ReserveOption

type ReserveOption func(*reserveConfig)

ReserveOption configures reservation behaviour.

func ReserveWithMaxDelay

func ReserveWithMaxDelay(d time.Duration) ReserveOption

ReserveWithMaxDelay causes Reserve to return a cancelled Reservation if the computed delay exceeds d. Use to implement admission control at the call site without blocking any goroutine.

type Retry

type Retry struct {
	// contains filtered or unexported fields
}

Retry defines how retries are performed. Create one once and reuse it across many Do / DoCtx calls — it is safe for concurrent use.

func NewRetry

func NewRetry(opts ...RetryOption) *Retry

NewRetry constructs a Retry with sensible defaults.

func (*Retry) Do

func (p *Retry) Do(ctx context.Context, fn func(context.Context) error) error

Do executes fn according to the policy, retrying on retryable errors. The provided context governs the total deadline; each individual attempt also respects context cancellation. Returns the last error wrapped in ErrRetryExhausted if all attempts fail.

func (*Retry) Metrics

func (p *Retry) Metrics() *RetryMetrics

Metrics returns the policy's operational metrics.

type RetryMetrics

type RetryMetrics struct {
	Attempts  atomic.Uint64 // total individual calls (including first)
	Successes atomic.Uint64 // calls that ultimately succeeded
	Failures  atomic.Uint64 // calls that exhausted all retries
	Timeouts  atomic.Uint64 // attempts abandoned due to context deadline
}

RetryMetrics tracks retry operational statistics.

type RetryOption

type RetryOption func(*Retry)

RetryOption configures a Retry.

func RetryWithBaseDelay

func RetryWithBaseDelay(d time.Duration) RetryOption

RetryWithBaseDelay sets the initial delay before the first retry (default 100ms).

func RetryWithJitter

func RetryWithJitter(enabled bool) RetryOption

RetryWithJitter enables full jitter on the backoff delay (default true). Full jitter randomises each delay in [0, computed_delay] to spread load.

func RetryWithMaxAttempts

func RetryWithMaxAttempts(n int) RetryOption

RetryWithMaxAttempts sets the total number of attempts (initial + retries). Default is 3.

func RetryWithMaxDelay

func RetryWithMaxDelay(d time.Duration) RetryOption

RetryWithMaxDelay caps the backoff delay (default 30s).

func RetryWithMultiplier

func RetryWithMultiplier(m float64) RetryOption

RetryWithMultiplier sets the exponential backoff multiplier (default 2.0).

func RetryWithOnRetry

func RetryWithOnRetry(fn func(attempt int, err error)) RetryOption

RetryWithOnRetry registers a callback invoked before each retry with the attempt number (1-based) and the error that triggered the retry.

func RetryWithRetryIf

func RetryWithRetryIf(fn func(error) bool) RetryOption

RetryWithRetryIf replaces the default retry predicate. By default all non-nil errors are retried. Supply a custom function to skip retrying on permanent errors (e.g. 404, validation failures).

type Routine

type Routine struct {
	Interval time.Duration // Interval between task executions
	MaxRuns  int           // Maximum number of runs (0 for unlimited)
	Cron     string        // Cron expression (e.g., "0 0 * * *" for daily at midnight, "@every 1m")
}

Routine configures recurring task execution with an interval and maximum runs. Thread-safe for use in scheduling configurations.

type RoutineInfo

type RoutineInfo struct {
	ID        string
	Label     string
	State     RoutineState
	StartedAt time.Time
	EndedAt   time.Time
	Err       error
	Stack     []byte // non-nil only when State == RoutinePanicked
}

RoutineInfo holds the observable state of a single tracked goroutine.

type RoutineOption

type RoutineOption func(*Routines)

RoutineOption configures a Routines tracker.

func RoutineWithIDGenerator

func RoutineWithIDGenerator(fn func(label string) string) RoutineOption

RoutineWithIDGenerator replaces the default sequential ID generator.

func RoutineWithMaxEntries

func RoutineWithMaxEntries(n int) RoutineOption

RoutineWithMaxEntries caps the number of retained RoutineInfo entries. When exceeded, completed entries are evicted to bound memory growth.

func RoutineWithOnDone

func RoutineWithOnDone(fn func(info RoutineInfo)) RoutineOption

RoutineWithOnDone sets a callback invoked when any goroutine finishes (regardless of success, error, or panic).

func RoutineWithOnPanic

func RoutineWithOnPanic(fn func(info RoutineInfo)) RoutineOption

RoutineWithOnPanic sets a callback invoked whenever a goroutine panics. The callback runs synchronously before the goroutine exits.

type RoutineState

type RoutineState int32

RoutineState describes the lifecycle state of a tracked goroutine.

const (
	RoutineRunning RoutineState = iota
	RoutineDone
	RoutinePanicked
	RoutineCancelled
)

func (RoutineState) String

func (s RoutineState) String() string

String returns a human-readable label for the goroutine state.

type Routines

type Routines struct {
	// contains filtered or unexported fields
}

Routines is a goroutine tracker and lifecycle manager. It is the answer to the "rogue goroutine" problem: every goroutine spawned through Routines is registered, tracked by state, and guaranteed to be joined by Stop or Wait.

Singleton usage:

var rt = jack.NewRoutines()

rt.Spawn("fetch", func(ctx context.Context) error { ... })
rt.SpawnCtx("heartbeat", func(ctx context.Context) error { ... })

rt.Stop()   // cancels context, waits for all goroutines

All methods are safe for concurrent use.

func NewRoutines

func NewRoutines(opts ...RoutineOption) *Routines

NewRoutines creates a Routines tracker. The returned tracker has its own cancellable context; call Stop() to terminate all goroutines it owns.

func (*Routines) Active

func (r *Routines) Active() int

Active returns how many goroutines are currently in RoutineRunning state.

func (*Routines) Background

func (r *Routines) Background(label string, maxRestarts int, fn func(context.Context) error) string

Background spawns a long-running goroutine that is restarted automatically if it returns a non-nil error, up to maxRestarts times (0 = unlimited). Each restart is counted in Metrics.Spawned.

func (*Routines) Cancel

func (r *Routines) Cancel(id string) bool

Cancel cancels a single goroutine by ID. Returns false if the ID is unknown or the goroutine has already exited.

func (*Routines) Forget

func (r *Routines) Forget(id string) bool

Forget removes a completed goroutine from tracking, reclaiming the memory held by its RoutineInfo. Returns false if the ID is unknown or the goroutine is still running.

func (*Routines) Info

func (r *Routines) Info(id string) (RoutineInfo, bool)

Info returns a snapshot of the named goroutine's current state. Returns false if no goroutine with that ID exists.

func (*Routines) List

func (r *Routines) List() []RoutineInfo

List returns a snapshot of all tracked goroutines, including completed ones.

func (*Routines) Metrics

func (r *Routines) Metrics() *RoutinesMetrics

Metrics returns aggregate goroutine statistics.

func (*Routines) Spawn

func (r *Routines) Spawn(label string, fn func(context.Context) error) string

Spawn spawns a goroutine under the tracker's context. The goroutine is cancelled when Stop() is called or the tracker's context is cancelled. label is used for identification; it need not be unique — a unique ID is derived automatically. Returns the assigned ID so the caller can query status via Info(id).

func (*Routines) SpawnCtx

func (r *Routines) SpawnCtx(ctx context.Context, label string, fn func(context.Context) error) string

SpawnCtx spawns a goroutine under a caller-supplied context. The goroutine is cancelled when either the supplied ctx OR the tracker's context is done — whichever fires first.

func (*Routines) Stop

func (r *Routines) Stop()

Stop cancels the tracker's context and waits for all goroutines to exit. After Stop returns, no goroutines tracked by this Routines are running.

func (*Routines) StopTimeout

func (r *Routines) StopTimeout(timeout time.Duration) error

StopTimeout cancels the tracker's context and waits up to the given duration for all goroutines to exit. Returns an error if the timeout is exceeded.

func (*Routines) Wait

func (r *Routines) Wait()

Wait blocks until all currently tracked goroutines have exited. Unlike Stop, it does not cancel the context.

type RoutinesMetrics

type RoutinesMetrics struct {
	Spawned   atomic.Uint64 // total goroutines ever started
	Active    atomic.Int64  // currently running
	Completed atomic.Uint64 // finished without error or panic
	Failed    atomic.Uint64 // returned a non-nil error
	Panicked  atomic.Uint64 // recovered from a panic
	Cancelled atomic.Uint64 // stopped via context cancellation
}

RoutinesMetrics tracks aggregate goroutine statistics.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes tasks asynchronously using a buffered task queue. It supports observability and logging, processing tasks in a single goroutine. Thread-safe via mutex, wait group, and channel operations.

func NewRunner

func NewRunner(opts ...RunnerOption) *Runner

NewRunner creates a new Runner with the specified options. It initializes a task queue and starts a processing goroutine. Thread-safe via initialization and logger namespace. Example:

runner := NewRunner(WithRunnerQueueSize(10), WithRunnerObservable(obs)) // Creates runner with queue size 10

func (*Runner) Do

func (r *Runner) Do(t Task) error

Do submits a Task for execution in the runner’s queue. It returns an error if the task is nil or the runner is closed. Thread-safe via submit and channel operations. Example:

runner.Do(myTask) // Submits a task

func (*Runner) DoCtx

func (r *Runner) DoCtx(ctx context.Context, t TaskCtx) error

DoCtx submits a TaskCtx for execution with the given context. It returns an error if the context or task is nil, the context is done, or the runner is closed. Thread-safe via submit and channel operations. Example:

runner.DoCtx(ctx, myTaskCtx) // Submits a context-aware task

func (*Runner) QueueSize

func (r *Runner) QueueSize() int

QueueSize returns the current number of tasks in the queue. Thread-safe via channel length access. Example:

size := runner.QueueSize() // Gets current queue size

func (*Runner) Shutdown

func (r *Runner) Shutdown(timeout time.Duration) error

Shutdown closes the task queue and waits for the processing goroutine to finish. It returns an error if the shutdown times out. Thread-safe via mutex, once, and wait group. Example:

runner.Shutdown(5 * time.Second) // Shuts down runner with 5-second timeout

type RunnerOption

type RunnerOption func(*runnerOptions)

RunnerOption configures a Runner during creation.

func WithRunnerIDGenerator

func WithRunnerIDGenerator(fn func(interface{}) string) RunnerOption

WithRunnerIDGenerator sets the task ID generator function. Example:

runner := NewRunner(WithRunnerIDGenerator(customIDFunc)) // Sets custom ID generator

func WithRunnerObservable

func WithRunnerObservable(obs Observable[Event]) RunnerOption

WithRunnerObservable sets the observable for task events. Example:

runner := NewRunner(WithRunnerObservable(obs)) // Configures observable

func WithRunnerQueueSize

func WithRunnerQueueSize(size int) RunnerOption

WithRunnerQueueSize sets the task queue size. Non-positive values are ignored. Example:

runner := NewRunner(WithRunnerQueueSize(20)) // Sets queue size to 20

type Safely

type Safely struct {
	sync.Mutex
}

Safely wraps sync.Mutex with safe execution methods.

func (*Safely) Do

func (m *Safely) Do(fn func())

Do executes fn while holding the mutex lock. The function is guaranteed to run with exclusive access, ensuring thread safety. If fn is nil, Do will panic. Example:

var mu Safely
mu.Do(func() {
	// Critical section
	fmt.Println("Safe execution")
})

func (*Safely) DoCtx

func (m *Safely) DoCtx(ctx context.Context, fn func() error) error

DoCtx executes fn while holding the mutex lock, with context support. The lock is acquired, then the context is checked. If the context is done, its error is returned and fn is not executed. Otherwise, fn is executed in a goroutine, allowing it to be interrupted if the context is canceled or times out (e.g., via context.WithTimeout). If fn is nil, DoCtx will return a *CaughtPanic. If ctx is nil, DoCtx will panic when ctx.Err() is called. This method does not recover from panics in fn; use SafeCtx for panic recovery. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := mu.DoCtx(ctx, func() error {
	// Critical section, no need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

func (*Safely) Safe

func (m *Safely) Safe(fn func() error) error

Safe executes fn while holding the mutex lock, with panic recovery. If fn panics, a *CaughtPanic error is returned. Otherwise, the error returned by fn is returned. If fn is nil, Safe will return a *CaughtPanic. Example:

var mu Safely
err := mu.Safe(func() error {
	// Critical section
	return nil
})
if err != nil {
	fmt.Println("Error:", err)
}

func (*Safely) SafeCtx

func (m *Safely) SafeCtx(ctx context.Context, fn func() error) error

SafeCtx executes fn with panic recovery and context awareness while holding the mutex lock. The lock is acquired, then fn is executed via the standalone SafeCtx function. This ensures fn can be canceled by the context (e.g., via context.WithTimeout), and any panics are recovered. The lock is held until fn completes or the context is canceled. If fn is nil, SafeCtx will return a *CaughtPanic. If ctx is nil, SafeCtx will panic when ctx.Err() is called. Example:

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := mu.SafeCtx(ctx, func() error {
	// Critical section, no need to check ctx
	time.Sleep(100 * time.Millisecond)
	return nil
})
// err will likely be context.DeadlineExceeded due to timeout

type Schedule

type Schedule struct {
	Type     string    // Type of event (e.g., "task_submitted", "task_submission_failed", "stopped")
	Name     string    // Name of the scheduler emitting the event
	TaskID   string    // Unique identifier for the task
	TaskType string    // Type of the task (e.g., struct name or type description)
	Message  string    // Descriptive message providing context for the event
	Error    error     // Any error associated with the event, if applicable
	Routine  Routine   // Configuration of the scheduling routine (e.g., interval, max runs)
	Time     time.Time // Timestamp when the event occurred
	NextRun  time.Time // Scheduled time for the next task execution, if applicable
}

Schedule represents an event emitted by the scheduler for observability.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages periodic or limited task submissions to a Pool. It supports both interval-based and cron expression-based scheduling.

func NewScheduler

func NewScheduler(name string, pool *Pool, schedule Routine, opts ...Cycle) (*Scheduler, error)

NewScheduler creates a new Scheduler instance.

func (*Scheduler) Do

func (s *Scheduler) Do(ts ...Task) error

Do starts scheduling for the provided non-context-aware tasks. It initializes the scheduler to execute the given tasks according to the configured routine (e.g., interval or max runs). The method ensures thread-safety by locking the scheduler state and checks if the scheduler is already running to prevent duplicate executions. Each task is executed in its own goroutine, and the method emits a "started" event for observability.

func (*Scheduler) DoCtx

func (s *Scheduler) DoCtx(taskExecCtx context.Context, ts ...TaskCtx) error

DoCtx starts scheduling for context-aware tasks. Similar to Do, but designed for tasks that accept a context for cancellation or deadlines. It ensures thread-safety, checks for existing runs, and associates a task execution context with the tasks. Each task runs in its own goroutine, and a "started" event is emitted for observability.

func (*Scheduler) Entries

func (s *Scheduler) Entries() []cron.Entry

Entries returns the current cron entries if using cron-based scheduling.

func (*Scheduler) Name

func (s *Scheduler) Name() string

Name returns the scheduler's identifying name.

func (*Scheduler) NextRun

func (s *Scheduler) NextRun() (time.Time, bool)

NextRun returns the next scheduled run time for the first task.

func (*Scheduler) Running

func (s *Scheduler) Running() bool

Running checks if the scheduler is currently active.

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop terminates the scheduler without shutting down the pool.

func (*Scheduler) Terminate

func (s *Scheduler) Terminate(cancelPool bool) error

Terminate gracefully stops all running scheduler loops.

type Scheduling

type Scheduling struct {
	RetryCount   int           // Number of retry attempts for task submission on failure
	RetryBackoff time.Duration // Duration to wait between retry attempts for failed submissions
	// contains filtered or unexported fields
}

Scheduling holds configuration for retry behavior and observability.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

Semaphore bounds concurrent access with priority and CoDel queueing. The fast path (TryAcquire) is lock-free. Blocking acquire uses per-priority queues that switch from FIFO to LIFO under sustained overload.

func NewSemaphore

func NewSemaphore(capacity int, opts ...SemaphoreOption) *Semaphore

NewSemaphore creates a prioritized semaphore with the given capacity. Defaults: target sojourn 5ms, max sojourn 500ms, interval 100ms.

func (*Semaphore) Acquire

func (s *Semaphore) Acquire(ctx context.Context, p Priority) error

Acquire waits for a slot, respecting priority and context cancellation. Higher-priority callers are served before lower-priority ones already queued.

func (*Semaphore) Available

func (s *Semaphore) Available() int

Available returns the number of slots currently available for acquisition.

func (*Semaphore) Capacity

func (s *Semaphore) Capacity() int

Capacity returns the total slot capacity the semaphore was created with.

func (*Semaphore) Close

func (s *Semaphore) Close()

Close permanently shuts down the semaphore, unblocking all pending waiters with ErrSemaphoreClosed.

func (*Semaphore) Metrics

func (s *Semaphore) Metrics() *SemaphoreMetrics

Metrics returns the semaphore's operational metrics.

func (*Semaphore) Release

func (s *Semaphore) Release()

Release returns a slot and attempts to hand it to the highest-priority waiter. If the dequeued waiter was context-cancelled, Release retries until it finds a live one.

func (*Semaphore) TryAcquire

func (s *Semaphore) TryAcquire(p Priority) bool

TryAcquire attempts to take a slot without blocking. Returns false immediately if no slot is available or the semaphore is closed.

func (*Semaphore) TryAcquireN

func (s *Semaphore) TryAcquireN(p Priority, n int) bool

TryAcquireN attempts to take n slots atomically without blocking. Returns false immediately if fewer than n slots are available or the semaphore is closed.

type SemaphoreMetrics

type SemaphoreMetrics struct {
	AcquiredFast  atomic.Uint64
	AcquiredSlow  atomic.Uint64
	Released      atomic.Uint64
	Rejected      atomic.Uint64
	Timeouts      atomic.Uint64
	QueueDepth    atomic.Int64
	MaxQueueDepth atomic.Int64
}

SemaphoreMetrics tracks operational statistics for the semaphore. All fields are safe for concurrent reads without additional locking.

type SemaphoreOption

type SemaphoreOption func(*Semaphore)

SemaphoreOption configures a Semaphore.

func SemaphoreWithInterval

func SemaphoreWithInterval(d time.Duration) SemaphoreOption

SemaphoreWithInterval sets the CoDel interval for sustained overload detection. Dropping mode engages when sojourn stays above target for this long.

func SemaphoreWithMaxSojourn

func SemaphoreWithMaxSojourn(d time.Duration) SemaphoreOption

SemaphoreWithMaxSojourn sets the hard ceiling on waiter age. Exceeding this immediately engages dropping mode regardless of interval.

func SemaphoreWithTargetSojourn

func SemaphoreWithTargetSojourn(d time.Duration) SemaphoreOption

SemaphoreWithTargetSojourn sets the CoDel target sojourn time. When the oldest waiter exceeds this threshold, the queue enters dropping mode.

type Shutdown

type Shutdown struct {
	// contains filtered or unexported fields
}

Shutdown manages the graceful shutdown process.

func NewShutdown

func NewShutdown(opts ...ShutdownOption) *Shutdown

NewShutdown creates a configured Shutdown manager. Defaults: 30s timeout, sequential execution, SIGINT/SIGTERM/SIGQUIT.

func (*Shutdown) Done

func (sm *Shutdown) Done() <-chan struct{}

Done returns a channel closed when shutdown completes.

func (*Shutdown) GetStats

func (sm *Shutdown) GetStats() *ShutdownStats

GetStats returns a deep copy of current shutdown statistics. Safe for concurrent reads; protects against mutation during access. Used by public APIs to return final results. Used by public APIs to return final results.

func (*Shutdown) IsShuttingDown

func (sm *Shutdown) IsShuttingDown() bool

IsShuttingDown reports whether shutdown has been initiated. Thread-safe via atomic boolean; useful for guarding late registrations. Returns true once the first shutdown trigger occurs.

func (*Shutdown) Register

func (sm *Shutdown) Register(fn any) error

Register adds a cleanup task. Supported types: func(), func() error, func(context.Context) error, io.Closer.

func (*Shutdown) RegisterCall

func (sm *Shutdown) RegisterCall(name string, fn Func) error

RegisterCall registers a simple function returning an error. Convenience wrapper around Register; supports jack.Func signature. Name is auto-generated when empty string is provided.

func (*Shutdown) RegisterCloser

func (sm *Shutdown) RegisterCloser(name string, closer io.Closer) error

RegisterCloser registers an io.Closer. Convenience wrapper that converts Close() error into shutdown error. Name reflects the concrete type when left empty.

func (*Shutdown) RegisterFunc

func (sm *Shutdown) RegisterFunc(name string, fn func()) error

RegisterFunc registers a simple void function. Convenience wrapper around Register; name is auto-generated if empty. Useful for quick registration of fire-and-forget cleanup.

func (*Shutdown) RegisterWithContext

func (sm *Shutdown) RegisterWithContext(name string, fn FuncCtx) error

RegisterWithContext registers a fully context-aware callback. Allows explicit naming and direct use of jack.FuncCtx functions. Preferred for advanced cleanup needing cancellation/timeout awareness.

func (*Shutdown) TriggerShutdown

func (sm *Shutdown) TriggerShutdown() *ShutdownStats

TriggerShutdown manually initiates the shutdown process and returns final statistics.

func (*Shutdown) Wait

func (sm *Shutdown) Wait() *ShutdownStats

Wait blocks until a signal is received or TriggerShutdown is called, then runs all registered cleanup tasks and returns statistics.

func (*Shutdown) WaitChan

func (sm *Shutdown) WaitChan() <-chan *ShutdownStats

WaitChan returns a channel that receives stats once shutdown is complete. Non-blocking alternative to Wait(), ideal for async integration. Channel is closed after sending the single stats value.

type ShutdownError

type ShutdownError struct {
	Name      string
	Err       error
	Timestamp time.Time
}

ShutdownError provides structured error details for a failed cleanup task.

func (*ShutdownError) Error

func (e *ShutdownError) Error() string

Error implements the error interface, combining the task name and underlying cause.

func (*ShutdownError) Unwrap

func (e *ShutdownError) Unwrap() error

Unwrap returns the underlying error for errors.Is/As chain traversal.

type ShutdownOption

type ShutdownOption func(*Shutdown)

ShutdownOption configures the Shutdown.

func ShutdownConcurrent

func ShutdownConcurrent() ShutdownOption

ShutdownConcurrent enables concurrent execution of cleanup functions. By default, execution is sequential (LIFO).

func ShutdownWithForceQuit

func ShutdownWithForceQuit(d time.Duration) ShutdownOption

ShutdownWithForceQuit enables a force-quit trigger after a specific timeout, cancelling the cleanup context if shutdown takes too long.

func ShutdownWithLogger

func ShutdownWithLogger(l *ll.Logger) ShutdownOption

ShutdownWithLogger sets a custom logger for the manager.

func ShutdownWithSignals

func ShutdownWithSignals(signals ...os.Signal) ShutdownOption

ShutdownWithSignals specifies which OS signals to capture. If not set, defaults to SIGINT, SIGTERM, and SIGQUIT.

func ShutdownWithTimeout

func ShutdownWithTimeout(d time.Duration) ShutdownOption

ShutdownWithTimeout sets the maximum time to wait for shutdown completion.

type ShutdownStats

type ShutdownStats struct {
	TotalEvents     int
	CompletedEvents int
	FailedEvents    int
	StartTime       time.Time
	EndTime         time.Time
	Errors          []error
}

ShutdownStats contains metrics about the shutdown execution.

type Task

type Task interface {
	// Do executes the task and returns any error.
	Do() error
}

Task represents a simple task that can be executed without a context.

func Do

func Do(fn func()) Task

Do wraps a function with no return value into a Task. Useful for fire-and-forget or simple operations that don’t produce errors.

Example:

pool.Submit(jack.Do(func() {
	fmt.Println("Hello from task")
}))

type TaskCtx

type TaskCtx interface {
	// Do executes the task with the given context and returns any error.
	Do(ctx context.Context) error
}

TaskCtx represents a context-aware task that can be executed with a context.

func DoCtx

func DoCtx(fn func(ctx context.Context)) TaskCtx

DoCtx wraps a context-aware function with no return value into a TaskCtx. Useful when you only need context propagation without returning an error.

Example:

pool.SubmitCtx(ctx, jack.DoCtx(func(ctx context.Context) {
	fmt.Println("Running with context:", ctx)
}))

type Throttle

type Throttle struct {
	// contains filtered or unexported fields
}

Throttle is a client-side self-tuning throttle. It observes upstream acceptance and rejection rates and probabilistically rejects local requests before sending when the upstream is overloaded. Lower-priority traffic is throttled more aggressively than critical traffic.

func NewThrottle

func NewThrottle(priorities int, opts ...ThrottleOption) *Throttle

NewThrottle creates a throttle with the given number of priorities. The standard jack priority count (4) is recommended.

func (*Throttle) Accepted

func (a *Throttle) Accepted(p Priority)

Accepted records a successful upstream response for the given priority tier, decreasing throttle pressure for that tier.

func (*Throttle) Allow

func (a *Throttle) Allow(p Priority) bool

Allow returns true if the request should proceed. Uses a lock-free LCG random check against the per-priority rejection probability.

func (*Throttle) Close

func (a *Throttle) Close()

Close shuts down the throttle; all subsequent Allow calls return false.

func (*Throttle) Metrics

func (a *Throttle) Metrics() *ThrottleMetrics

Metrics returns the throttle's operational metrics.

func (*Throttle) Probability

func (a *Throttle) Probability(p Priority) float64

Probability returns the current rejection probability for the given priority as a value in [0.0, 1.0]. This is the live per-priority value, not a single aggregate.

func (*Throttle) Rejected

func (a *Throttle) Rejected(p Priority)

Rejected records an upstream rejection or timeout for the given priority tier, increasing throttle pressure for that tier.

type ThrottleMetrics

type ThrottleMetrics struct {
	RequestsTotal  atomic.Uint64
	Accepted       atomic.Uint64
	RejectedLocal  atomic.Uint64
	RejectedRemote atomic.Uint64
	// ThrottleProbs holds the current rejection probability for each priority tier,
	// scaled by throttleProbScale (0 = no throttling, 10 000 = always throttled).
	// Index matches Priority: 0=Critical, 1=High, 2=Medium, 3=Low.
	ThrottleProbs [priorityCount]atomic.Int64
}

ThrottleMetrics tracks throttle behaviour across all priority tiers.

type ThrottleOption

type ThrottleOption func(*Throttle)

ThrottleOption configures a Throttle.

func ThrottleWithRatio

func ThrottleWithRatio(ratio float64) ThrottleOption

ThrottleWithRatio sets the overcommit ratio (default 2.0). A ratio of 2.0 targets a 50% upstream acceptance rate before local shedding begins.

func ThrottleWithWindow

func ThrottleWithWindow(d time.Duration) ThrottleOption

ThrottleWithWindow sets the observation window for accept/reject counters (default 1 minute). Counters reset after windowResetSamples samples to adapt to changing upstream load.

func ThrottleWithWindowResetSamples

func ThrottleWithWindowResetSamples(n uint64) ThrottleOption

ThrottleWithWindowResetSamples sets how many samples to collect before resetting the observation window (default 1000). Smaller values adapt faster; larger values are smoother.

type Vitals

type Vitals struct {
	Start     HookCtx
	End       CallbackCtx
	Timed     CallbackCtx
	TimedWait time.Duration
}

func NewVitals

func NewVitals(opts ...VitalsOption) *Vitals

NewVitals creates a new Vitals instance configured with the provided functional options. Each option modifies the Vitals configuration before returning the initialized instance. If no options are provided, returns a Vitals with all fields set to their zero values.

func (*Vitals) Execute

func (l *Vitals) Execute(ctx context.Context, id string, operation Func) error

Execute runs an operation with optional start and end hooks while propagating errors. It invokes the Start hook first, then the operation, and finally the End callback on success. If Start or the operation returns an error, execution stops and the error is returned immediately.

func (*Vitals) ExecuteCtx

func (l *Vitals) ExecuteCtx(ctx context.Context, id string, operation FuncCtx) error

ExecuteCtx runs a context-aware operation with optional start and end hooks. It invokes the Start hook first, then the operation with context, and finally the End callback. Errors from Start or the operation are propagated immediately, skipping subsequent steps.

type VitalsOption

type VitalsOption func(*Vitals)

func VitalsWithEnd

func VitalsWithEnd(callback CallbackCtx) VitalsOption

VitalsWithEnd configures a callback function to run after successful operation completion. The callback receives the context and operation ID but does not return an error. Use this option for post-operation cleanup, notifications, or result processing.

func VitalsWithStart

func VitalsWithStart(hook HookCtx) VitalsOption

VitalsWithStart configures a hook function to run at the beginning of a vital operation. The hook receives the context and operation ID, and can return an error to abort execution. Use this option to inject pre-operation logic like logging, metrics, or validation.

func VitalsWithTimed

func VitalsWithTimed(callback CallbackCtx, wait time.Duration) VitalsOption

VitalsWithTimed configures a timed callback with a wait duration for delayed execution. The callback receives context and ID after the specified wait period elapses. Use this option for deferred operations like timeout handling or scheduled cleanup.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL