Documentation
¶
Index ¶
- Constants
- Variables
- func Background(label string, maxRestarts int, fn func(context.Context) error) string
- func Go(f func() error) <-chan error
- func Logger() *ll.Logger
- func Safe(fn func() error) error
- func SafeCtx(ctx context.Context, fn func() error) error
- func SafeCtxNoStack(ctx context.Context, fn func() error) error
- func SafeNoStack(fn func() error) error
- func Select[T any](ctx context.Context, futures ...*Future[T]) (int, T, error)
- func Spawn(label string, fn func(context.Context) error)
- func VitalsWithRun(ctx context.Context, id string, operation Func, opts ...VitalsOption) error
- func VitalsWithRunCtx(ctx context.Context, id string, operation FuncCtx, opts ...VitalsOption) error
- func WaitAll[T any](futures ...*Future[T]) ([]T, error)
- func WaitAny[T any](futures ...*Future[T]) (int, T, error)
- type AdaptiveLimiter
- type AdaptiveLimiterMetrics
- type AdaptiveOption
- type Breaker
- type BreakerMetrics
- type BreakerOption
- func BreakerWithHalfOpenLimit(n int64) BreakerOption
- func BreakerWithOnStateChange(fn func(name string, from, to BreakerState)) BreakerOption
- func BreakerWithOpenTimeout(d time.Duration) BreakerOption
- func BreakerWithSuccessThreshold(n uint64) BreakerOption
- func BreakerWithThreshold(n uint64) BreakerOption
- type BreakerState
- type Bulkhead
- func (b *Bulkhead) AddPartition(name string, capacity int)
- func (b *Bulkhead) Available(partition string) int
- func (b *Bulkhead) Call(ctx context.Context, partition string, p Priority, ...) error
- func (b *Bulkhead) Close()
- func (b *Bulkhead) Metrics(partition string) *SemaphoreMetrics
- func (b *Bulkhead) Partitions() []string
- func (b *Bulkhead) TryCall(ctx context.Context, partition string, p Priority, ...) error
- type BulkheadMetrics
- type BulkheadOption
- type Callback
- type CallbackCtx
- type Caller
- type CaughtPanic
- type Cycle
- type Debouncer
- type DebouncerOption
- type Doctor
- func (d *Doctor) Add(p *Patient) error
- func (d *Doctor) GetState(id string) (PatientState, bool)
- func (d *Doctor) Metrics() *DoctorMetrics
- func (d *Doctor) Remove(id string) bool
- func (d *Doctor) SetDegraded(id string, degraded bool)
- func (d *Doctor) Stop(id string) bool
- func (d *Doctor) StopAll(timeout time.Duration)
- type DoctorMetrics
- type DoctorOption
- func DoctorWithGlobalTimeout(t time.Duration) DoctorOption
- func DoctorWithLogger(l *ll.Logger) DoctorOption
- func DoctorWithMaxConcurrent(n int) DoctorOption
- func DoctorWithObservable(obs Observable[PatientEvent]) DoctorOption
- func DoctorWithPool(pool *Pool) DoctorOption
- func DoctorWithVerbose(v bool) DoctorOption
- type Event
- type Func
- type FuncCtx
- type Future
- func (f *Future[T]) Await() (T, error)
- func (f *Future[T]) AwaitWithContext(ctx context.Context) (T, error)
- func (f *Future[T]) AwaitWithTimeout(timeout time.Duration) (T, error)
- func (f *Future[T]) Cancel() bool
- func (f *Future[T]) IsCanceled() bool
- func (f *Future[T]) IsDone() bool
- func (f *Future[T]) Recover(ctx context.Context, fn func(error) (T, error)) *Future[T]
- func (f *Future[T]) Then(ctx context.Context, fn func(T) (any, error)) *Future[any]
- type Group
- type HedgeGroup
- type HedgeMetrics
- type HedgeOption
- type Hedger
- type HedgerOf
- type Hook
- type HookCtx
- type Identifiable
- type Lease
- type LeaseMetrics
- type Leaser
- type LeaserOption
- type Lifetime
- func (lm *Lifetime) CancelTimed(id string) bool
- func (lm *Lifetime) ExecuteCtxWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation FuncCtx) error
- func (lm *Lifetime) ExecuteWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation Func) error
- func (lm *Lifetime) GetRemainingDuration(id string) (time.Duration, bool)
- func (lm *Lifetime) HasPending(id string) bool
- func (lm *Lifetime) Metrics() *LifetimeMetrics
- func (lm *Lifetime) PendingCount() int
- func (lm *Lifetime) ResetTimed(id string) bool
- func (lm *Lifetime) ScheduleLifetimeTimed(ctx context.Context, id string, lifetime *Vitals)
- func (lm *Lifetime) ScheduleTimed(ctx context.Context, id string, callback CallbackCtx, wait time.Duration)
- func (lm *Lifetime) Stop(ids ...string)
- func (lm *Lifetime) StopAll()
- type LifetimeMetrics
- type LifetimeOption
- type LoopConfig
- type Looper
- type LooperMetrics
- type LooperOption
- func WithLooperBackoff(enabled bool) LooperOption
- func WithLooperContext(ctx context.Context) LooperOption
- func WithLooperImmediate(immediate bool) LooperOption
- func WithLooperInterval(interval time.Duration) LooperOption
- func WithLooperJitter(jitter float64) LooperOption
- func WithLooperLogger(logger *ll.Logger) LooperOption
- func WithLooperMaxInterval(max time.Duration) LooperOption
- func WithLooperMinInterval(min time.Duration) LooperOption
- func WithLooperName(name string) LooperOption
- type Observable
- type Observer
- type Patient
- type PatientConfig
- type PatientEvent
- type PatientMetrics
- type PatientState
- type Pool
- func (p *Pool) Do(fn func())
- func (p *Pool) DoCtx(ctx context.Context, fn func(ctx context.Context))
- func (p *Pool) IsClosed() bool
- func (p *Pool) Logger(extLogger *ll.Logger) *Pool
- func (p *Pool) Metrics() *PoolMetrics
- func (p *Pool) QueueSize() int
- func (p *Pool) Shutdown(timeout time.Duration) error
- func (p *Pool) Submit(ts ...Task) error
- func (p *Pool) SubmitCtx(ctx context.Context, ts ...TaskCtx) error
- func (p *Pool) Workers() int
- type PoolMetrics
- type Pooling
- type Priority
- type Queue
- type QueueMetrics
- type QueueOption
- type RateLimiter
- func (r *RateLimiter) Acquire(ctx context.Context, p Priority) error
- func (r *RateLimiter) Allow(p Priority) bool
- func (r *RateLimiter) AllowN(p Priority, n int64) bool
- func (r *RateLimiter) Burst() int64
- func (r *RateLimiter) Close()
- func (r *RateLimiter) Metrics() *RateLimiterMetrics
- func (r *RateLimiter) Release()
- func (r *RateLimiter) Replenish(n int64)
- func (r *RateLimiter) Reserve(n int64, opts ...ReserveOption) *Reservation
- func (r *RateLimiter) ReserveMetrics() *ReservationMetrics
- func (r *RateLimiter) Tokens() int64
- type RateLimiterMetrics
- type RateLimiterOption
- type Reaper
- func (r *Reaper) Clear() int
- func (r *Reaper) Count() int
- func (r *Reaper) Deadline() (time.Time, bool)
- func (r *Reaper) Metrics() *ReaperMetrics
- func (r *Reaper) Register(h ReaperHandler)
- func (r *Reaper) Remove(id string) bool
- func (r *Reaper) Start()
- func (r *Reaper) Stop()
- func (r *Reaper) Touch(id string)
- func (r *Reaper) TouchAt(id string, deadline time.Time)
- type ReaperHandler
- type ReaperMetrics
- type ReaperOption
- type ReaperTask
- type Reservation
- type ReservationMetrics
- type ReserveOption
- type Retry
- type RetryMetrics
- type RetryOption
- func RetryWithBaseDelay(d time.Duration) RetryOption
- func RetryWithJitter(enabled bool) RetryOption
- func RetryWithMaxAttempts(n int) RetryOption
- func RetryWithMaxDelay(d time.Duration) RetryOption
- func RetryWithMultiplier(m float64) RetryOption
- func RetryWithOnRetry(fn func(attempt int, err error)) RetryOption
- func RetryWithRetryIf(fn func(error) bool) RetryOption
- type Routine
- type RoutineInfo
- type RoutineOption
- type RoutineState
- type Routines
- func (r *Routines) Active() int
- func (r *Routines) Background(label string, maxRestarts int, fn func(context.Context) error) string
- func (r *Routines) Cancel(id string) bool
- func (r *Routines) Forget(id string) bool
- func (r *Routines) Info(id string) (RoutineInfo, bool)
- func (r *Routines) List() []RoutineInfo
- func (r *Routines) Metrics() *RoutinesMetrics
- func (r *Routines) Spawn(label string, fn func(context.Context) error) string
- func (r *Routines) SpawnCtx(ctx context.Context, label string, fn func(context.Context) error) string
- func (r *Routines) Stop()
- func (r *Routines) StopTimeout(timeout time.Duration) error
- func (r *Routines) Wait()
- type RoutinesMetrics
- type Runner
- type RunnerOption
- type Safely
- type Schedule
- type Scheduler
- func (s *Scheduler) Do(ts ...Task) error
- func (s *Scheduler) DoCtx(taskExecCtx context.Context, ts ...TaskCtx) error
- func (s *Scheduler) Entries() []cron.Entry
- func (s *Scheduler) Name() string
- func (s *Scheduler) NextRun() (time.Time, bool)
- func (s *Scheduler) Running() bool
- func (s *Scheduler) Stop() error
- func (s *Scheduler) Terminate(cancelPool bool) error
- type Scheduling
- type Semaphore
- func (s *Semaphore) Acquire(ctx context.Context, p Priority) error
- func (s *Semaphore) Available() int
- func (s *Semaphore) Capacity() int
- func (s *Semaphore) Close()
- func (s *Semaphore) Metrics() *SemaphoreMetrics
- func (s *Semaphore) Release()
- func (s *Semaphore) TryAcquire(p Priority) bool
- func (s *Semaphore) TryAcquireN(p Priority, n int) bool
- type SemaphoreMetrics
- type SemaphoreOption
- type Shutdown
- func (sm *Shutdown) Done() <-chan struct{}
- func (sm *Shutdown) GetStats() *ShutdownStats
- func (sm *Shutdown) IsShuttingDown() bool
- func (sm *Shutdown) Register(fn any) error
- func (sm *Shutdown) RegisterCall(name string, fn Func) error
- func (sm *Shutdown) RegisterCloser(name string, closer io.Closer) error
- func (sm *Shutdown) RegisterFunc(name string, fn func()) error
- func (sm *Shutdown) RegisterWithContext(name string, fn FuncCtx) error
- func (sm *Shutdown) TriggerShutdown() *ShutdownStats
- func (sm *Shutdown) Wait() *ShutdownStats
- func (sm *Shutdown) WaitChan() <-chan *ShutdownStats
- type ShutdownError
- type ShutdownOption
- type ShutdownStats
- type Task
- type TaskCtx
- type Throttle
- type ThrottleMetrics
- type ThrottleOption
- type Vitals
- type VitalsOption
Constants ¶
const ( // NoLimitCalls is the default for maxCalls, meaning no limit on the number of calls before execution. // Set to math.MaxInt to indicate unbounded call accumulation. NoLimitCalls = math.MaxInt // NoLimitWait is the default for maxWait, meaning no time limit before forced execution. // Set to time.Duration(math.MaxInt64) to disable the maximum wait enforcement. NoLimitWait = time.Duration(math.MaxInt64) )
Variables ¶
var ( ErrFutureCanceled = fmt.Errorf("future was canceled") ErrFutureTimeout = fmt.Errorf("future timed out") )
var ( ErrSemaphoreClosed = errors.New("semaphore closed") ErrRateLimiterClosed = errors.New("rate limiter closed") ErrThrottleClosed = errors.New("adaptive throttle closed") ErrQueueClosed = errors.New("priority queue closed") )
var ( ErrBreakerOpen = errors.New("circuit breaker open") ErrBreakerClosed = errors.New("circuit breaker instance closed") )
var ( ErrBulkheadFull = errors.New("bulkhead full") ErrBulkheadClosed = errors.New("bulkhead closed") ErrBulkheadNotFound = errors.New("bulkhead partition not found") )
var ( ErrLeaseExpired = errors.New("lease expired") ErrLeaseReleased = errors.New("lease already released") ErrLeaseInvalid = errors.New("lease semaphore closed") )
var ( ErrReservationExpired = errors.New("reservation expired") ErrReservationCancelled = errors.New("reservation cancelled") )
var ( // ErrPoolClosed indicates the worker pool has been closed. ErrPoolClosed = errors.New("pool has been closed") // ErrRunnerClosed indicates the runner has been closed. ErrRunnerClosed = errors.New("runner has been closed") // ErrSchedulerClosed indicates the scheduler has been closed. ErrSchedulerClosed = errors.New("scheduler has been closed") // ErrTaskTimeout indicates a task exceeded its execution timeout. ErrTaskTimeout = errors.New("task execution timed out") // ErrTaskPanic indicates a task panicked during execution. ErrTaskPanic = errors.New("task panicked during execution") // ErrQueueFull indicates the task queue is full and cannot accept new tasks. ErrQueueFull = errors.New("task queue is full") // ErrShutdownTimedOut indicates the pool shutdown exceeded its timeout. ErrShutdownTimedOut = errors.New("shutdown timed out") )
Errors returned by the worker pool and task execution.
var ( ErrSchedulerJobAlreadyRunning = errors.New("scheduler: job is already running") ErrSchedulerNotRunning = errors.New("scheduler: job is not running or already stopped") ErrSchedulerPoolNil = errors.New("scheduler: pool cannot be nil") ErrSchedulerNameMissing = errors.New("scheduler: name cannot be empty") )
var ErrHedgeAllFailed = errors.New("all hedged requests failed")
var (
ErrRetryExhausted = errors.New("retry attempts exhausted")
)
Functions ¶
func Background ¶
func Go ¶
Go runs a function in a standalone goroutine with error and panic handling. It returns a buffered channel that receives the function’s error or a CaughtPanic. The channel is closed after the error or completion. Thread-safe via goroutine and channel operations. Example:
errCh := Go(func() error { return nil }) // Runs a function and returns error channel
func Logger ¶
Logger returns the default logger for the jack package. Thread-safe as it returns a pre-initialized logger. Example:
log := jack.Logger() // Retrieves the default logger
func Safe ¶
Safe executes fn with panic recovery. If fn panics, a *CaughtPanic error is returned. Otherwise, the error returned by fn is returned. If fn is nil, Safe will return a *CaughtPanic. Example:
err := Safe(func() error {
// May panic or return an error
return nil
})
if cp, ok := err.(*CaughtPanic); ok {
fmt.Printf("Caught panic: %v\nStack: %s\n", cp.Value, cp.Stack)
}
func SafeCtx ¶
SafeCtx executes fn with context support and panic recovery. The function fn will be interrupted if the context is canceled (e.g., via context.WithTimeout). If fn is nil, SafeCtx will return a *CaughtPanic. If ctx is nil, SafeCtx will panic when ctx.Err() is called. Example:
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := SafeCtx(ctx, func() error {
// No need to check ctx
time.Sleep(100 * time.Millisecond)
return nil
})
// err will likely be context.DeadlineExceeded due to timeout
func SafeCtxNoStack ¶
SafeCtxNoStack executes fn with context support, panic recovery, but no stack trace. The function fn will be interrupted if the context is canceled (e.g., via context.WithTimeout). If fn is nil, SafeCtxNoStack will return a *CaughtPanic (without stack). If ctx is nil, SafeCtxNoStack will panic when ctx.Err() is called. Example:
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := SafeCtxNoStack(ctx, func() error {
// No need to check ctx
time.Sleep(100 * time.Millisecond)
return nil
})
// err will likely be context.DeadlineExceeded due to timeout
func SafeNoStack ¶
SafeNoStack executes fn with panic recovery but no stack trace. If fn panics, a *CaughtPanic error (without stack) is returned. Otherwise, fn's error. If fn is nil, SafeNoStack will return a *CaughtPanic. Example:
err := SafeNoStack(func() error {
panic("error")
return nil
})
if cp, ok := err.(*CaughtPanic); ok {
fmt.Printf("Caught panic: %v, Stack: %v\n", cp.Value, cp.Stack == nil)
}
func Select ¶
Select blocks until ctx is cancelled or the first Future completes. Returns the index and value of the winning Future.
func VitalsWithRun ¶
VitalsWithRun creates a Vitals instance and executes an operation with it in one call. It accepts functional options to configure hooks before immediately running the operation. This convenience function combines NewVitals and Execute for simplified inline usage.
func VitalsWithRunCtx ¶
func VitalsWithRunCtx(ctx context.Context, id string, operation FuncCtx, opts ...VitalsOption) error
VitalsWithRunCtx creates a Vitals instance and executes a context-aware operation in one call. It accepts functional options to configure hooks before immediately running the operation with context. This convenience function combines NewVitals and ExecuteCtx for simplified inline usage.
Types ¶
type AdaptiveLimiter ¶
type AdaptiveLimiter struct {
// contains filtered or unexported fields
}
AdaptiveLimiter adjusts its concurrency limit dynamically based on observed round-trip latency using a gradient-style AIMD controller.
When RTT ≤ target → additive increase (limit++). When RTT > target → multiplicative decrease (limit × ratio).
Composable with the rest of the library: the AdaptiveLimiter owns a Semaphore internally and exposes the same Call / Close interface as Breaker and Bulkhead.
func NewAdaptiveLimiter ¶
func NewAdaptiveLimiter(opts ...AdaptiveOption) *AdaptiveLimiter
NewAdaptiveLimiter creates an AdaptiveLimiter with sensible defaults.
func (*AdaptiveLimiter) Call ¶
func (a *AdaptiveLimiter) Call(ctx context.Context, p Priority, fn func(context.Context) error) error
Call executes fn if a slot is available within the adaptive limit. It measures RTT and adjusts the limit after each call returns.
func (*AdaptiveLimiter) Close ¶
func (a *AdaptiveLimiter) Close()
Close shuts down the underlying semaphore, unblocking all pending callers.
func (*AdaptiveLimiter) InFlight ¶
func (a *AdaptiveLimiter) InFlight() int
InFlight returns the number of concurrently executing calls.
func (*AdaptiveLimiter) Limit ¶
func (a *AdaptiveLimiter) Limit() int
Limit returns the current concurrency limit.
func (*AdaptiveLimiter) Metrics ¶
func (a *AdaptiveLimiter) Metrics() *AdaptiveLimiterMetrics
Metrics returns the limiter's operational metrics.
type AdaptiveLimiterMetrics ¶
type AdaptiveLimiterMetrics struct {
Acquired atomic.Uint64 // successful acquisitions
Rejected atomic.Uint64 // rejections when at limit
LimitIncr atomic.Uint64 // times the limit was increased
LimitDecr atomic.Uint64 // times the limit was decreased
CurrentLimit atomic.Int64 // live concurrency limit
CurrentInFlight atomic.Int64 // goroutines currently executing
AvgRTTNs atomic.Int64 // EWMA RTT in nanoseconds
}
AdaptiveLimiterMetrics tracks adaptive limiter operational statistics.
type AdaptiveOption ¶
type AdaptiveOption func(*AdaptiveLimiter)
AdaptiveOption configures an AdaptiveLimiter.
func AdaptiveWithInitialLimit ¶
func AdaptiveWithInitialLimit(n int) AdaptiveOption
AdaptiveWithInitialLimit sets the starting concurrency limit (default 10).
func AdaptiveWithMaxLimit ¶
func AdaptiveWithMaxLimit(n int) AdaptiveOption
AdaptiveWithMaxLimit sets the ceiling above which the limit will not grow (default 200).
func AdaptiveWithMinLimit ¶
func AdaptiveWithMinLimit(n int) AdaptiveOption
AdaptiveWithMinLimit sets the floor below which the limit will not drop (default 1).
func AdaptiveWithSmoothingFactor ¶
func AdaptiveWithSmoothingFactor(f float64) AdaptiveOption
AdaptiveWithSmoothingFactor controls EWMA smoothing for RTT samples (default 0.1). Closer to 0 = slower adaptation; closer to 1 = reacts to every sample.
func AdaptiveWithTargetP50 ¶
func AdaptiveWithTargetP50(d time.Duration) AdaptiveOption
AdaptiveWithTargetP50 sets the target median RTT the controller aims to maintain. When measured RTT exceeds this, the limit is reduced (default 100ms).
type Breaker ¶
type Breaker struct {
// contains filtered or unexported fields
}
Breaker implements the circuit-breaker pattern.
States:
Closed → normal; failures accumulate until threshold is reached.
Open → tripped; all calls rejected for openTimeout, then → HalfOpen.
HalfOpen → probe; up to halfOpenLimit calls pass through. Enough successes
→ Closed; any failure → Open.
Call is the only entry point. It wraps the user function and manages all state transitions atomically without holding a mutex during execution.
func NewBreaker ¶
func NewBreaker(name string, opts ...BreakerOption) *Breaker
NewBreaker creates a Breaker with the given name and options.
func (*Breaker) Call ¶
Call executes fn if the circuit allows it, updating state based on the result. Returns ErrBreakerOpen if the circuit is open. The context is passed through to fn; if fn returns context.DeadlineExceeded or context.Canceled the call counts as a timeout (also a failure).
func (*Breaker) Metrics ¶
func (b *Breaker) Metrics() *BreakerMetrics
Metrics returns the breaker's operational metrics.
func (*Breaker) Reset ¶
func (b *Breaker) Reset()
Reset forces the circuit back to Closed and clears all counters. Use for testing or manual operator intervention.
func (*Breaker) State ¶
func (b *Breaker) State() BreakerState
State returns the current circuit state.
type BreakerMetrics ¶
type BreakerMetrics struct {
Requests atomic.Uint64 // total calls attempted
Successes atomic.Uint64 // calls that returned nil
Failures atomic.Uint64 // calls that returned a non-nil error
Rejections atomic.Uint64 // calls rejected while open
Timeouts atomic.Uint64 // calls that exceeded their deadline
StateChanges atomic.Uint64 // total transitions between states
ConsecSuccess atomic.Int64 // consecutive successes in half-open
ConsecFailure atomic.Int64 // consecutive failures in current window
}
BreakerMetrics tracks circuit breaker operational statistics.
type BreakerOption ¶
type BreakerOption func(*Breaker)
BreakerOption configures a Breaker.
func BreakerWithHalfOpenLimit ¶
func BreakerWithHalfOpenLimit(n int64) BreakerOption
BreakerWithHalfOpenLimit sets the maximum number of concurrent probes allowed in half-open state (default 1).
func BreakerWithOnStateChange ¶
func BreakerWithOnStateChange(fn func(name string, from, to BreakerState)) BreakerOption
BreakerWithOnStateChange registers a callback invoked on every state transition.
func BreakerWithOpenTimeout ¶
func BreakerWithOpenTimeout(d time.Duration) BreakerOption
BreakerWithOpenTimeout sets how long the circuit stays open before moving to half-open to probe recovery (default 10s).
func BreakerWithSuccessThreshold ¶
func BreakerWithSuccessThreshold(n uint64) BreakerOption
BreakerWithSuccessThreshold sets how many consecutive successes in half-open state are required before the circuit closes again (default 2).
func BreakerWithThreshold ¶
func BreakerWithThreshold(n uint64) BreakerOption
BreakerWithThreshold sets the number of consecutive failures required to open the circuit (default 5).
type BreakerState ¶
type BreakerState int32
BreakerState represents the state of a circuit breaker.
const ( // BreakerClosed is the normal operating state — calls pass through. BreakerClosed BreakerState = iota // BreakerOpen means the circuit is tripped — calls are rejected immediately. BreakerOpen // BreakerHalfOpen means a limited probe is allowed to test recovery. BreakerHalfOpen )
func (BreakerState) String ¶
func (s BreakerState) String() string
String returns a human-readable label for the circuit state.
type Bulkhead ¶
type Bulkhead struct {
// contains filtered or unexported fields
}
Bulkhead isolates failure domains by giving each named partition its own bounded concurrency budget backed by an independent Semaphore.
When partition A is saturated, partition B is completely unaffected. This is the standard pattern for protecting a shared resource (e.g. a DB connection pool) from being monopolised by one upstream caller.
Usage:
bh := jack.NewBulkhead(jack.BulkheadWithPartition("payments", 20),
jack.BulkheadWithPartition("reports", 5))
err := bh.Call(ctx, "payments", jack.PriorityHigh, func(ctx context.Context) error {
return db.Query(ctx, ...)
})
func NewBulkhead ¶
func NewBulkhead(opts ...BulkheadOption) *Bulkhead
NewBulkhead creates a Bulkhead with the given options.
func (*Bulkhead) AddPartition ¶
AddPartition registers a new named partition at runtime. If a partition with that name already exists it is left unchanged.
func (*Bulkhead) Available ¶
Available returns the number of free concurrency slots for the named partition.
func (*Bulkhead) Call ¶
func (b *Bulkhead) Call(ctx context.Context, partition string, p Priority, fn func(context.Context) error) error
Call executes fn within the named partition's concurrency limit. If the partition doesn't exist and defaultCap > 0, it is auto-created. Returns ErrBulkheadFull if all slots are taken and the context expires, ErrBulkheadNotFound if auto-creation is disabled and the partition is unknown.
func (*Bulkhead) Close ¶
func (b *Bulkhead) Close()
Close shuts down all partitions. Subsequent calls return ErrBulkheadClosed.
func (*Bulkhead) Metrics ¶
func (b *Bulkhead) Metrics(partition string) *SemaphoreMetrics
Metrics returns the semaphore metrics for the named partition, or nil if not found.
func (*Bulkhead) Partitions ¶
Partitions returns a snapshot of all registered partition names.
type BulkheadMetrics ¶
type BulkheadMetrics struct {
Acquired SemaphoreMetrics // reuses semaphore metrics per partition
}
BulkheadMetrics tracks statistics for a single named partition.
type BulkheadOption ¶
type BulkheadOption func(*Bulkhead)
BulkheadOption configures a Bulkhead.
func BulkheadWithDefaultCapacity ¶
func BulkheadWithDefaultCapacity(capacity int) BulkheadOption
BulkheadWithDefaultCapacity sets the capacity used when auto-creating partitions on first use (default 10). Set to 0 to disable auto-creation.
func BulkheadWithPartition ¶
func BulkheadWithPartition(name string, capacity int) BulkheadOption
BulkheadWithPartition pre-registers a named partition with the given capacity.
func BulkheadWithSemaphoreOptions ¶
func BulkheadWithSemaphoreOptions(opts ...SemaphoreOption) BulkheadOption
BulkheadWithSemaphoreOptions passes additional SemaphoreOptions to every partition semaphore (e.g. CoDel settings).
type Callback ¶
type Callback func(id string)
Callback defines a callback function without error return for post-execution logic. It accepts a unique identifier string for the operation that completed. Use this type for notifications, cleanup, or metrics that do not affect flow control.
type CallbackCtx ¶
CallbackCtx defines a context-aware callback function without error return. It accepts a context for cancellation and an identifier for the completed operation. Use this type when callbacks need to coordinate with context deadlines or values.
type Caller ¶
type Caller struct {
// contains filtered or unexported fields
}
Caller defines a group of calls that execute together
type CaughtPanic ¶
type CaughtPanic struct {
Value interface{} // The value passed to panic()
Stack []byte // The stack trace (may be empty if not collected)
}
Package jack provides utilities for safe, context-aware function execution with mutex protection. It includes methods to execute functions with panic recovery, context cancellation support, and mutex locking, eliminating the need for verbose boilerplate when handling timeouts or cancellations. CaughtPanic represents a panic that was caught during execution.
func (*CaughtPanic) Error ¶
func (c *CaughtPanic) Error() string
Error implements the error interface.
func (*CaughtPanic) String ¶
func (c *CaughtPanic) String() string
String provides a formatted string representation of the panic.
func (*CaughtPanic) Unwrap ¶
func (c *CaughtPanic) Unwrap() error
Unwrap provides compatibility with errors.Is/As.
type Cycle ¶
type Cycle func(*Scheduling)
Cycle is a functional option type for configuring a Scheduler. It uses the functional options pattern to allow flexible and extensible configuration of scheduler settings.
func SchedulingWithObservable ¶
func SchedulingWithObservable(obs Observable[Schedule]) Cycle
SchedulingWithObservable returns a Cycle to set an observable for events.
type Debouncer ¶
type Debouncer struct {
// contains filtered or unexported fields
}
Debouncer groups calls to a function, executing only after a period of inactivity or when certain thresholds (max calls or max wait) are met. It uses a mutex for thread-safety and timers for delay and maxWait enforcement.
func NewDebouncer ¶
func NewDebouncer(options ...DebouncerOption) *Debouncer
NewDebouncer creates a new Debouncer instance configured with the given functional options. The WithDebounceDelay option is required; other options are optional with sensible defaults. Timers are initialized but stopped until first use; limits default to no restrictions.
func (*Debouncer) Cancel ¶
func (d *Debouncer) Cancel()
Cancel prevents a pending debounced function from executing by clearing internal state. It stops all active timers and resets call counters without invoking the pending function. Safe to call multiple times or when no function is pending; has no effect if already closed.
func (*Debouncer) Do ¶
func (d *Debouncer) Do(fn func())
Do schedules the given function to execute after the configured delay period of inactivity. Each call resets the delay timer and updates the pending function to the latest provided. If maxCalls or maxWait thresholds are met, execution happens immediately without waiting.
func (*Debouncer) Flush ¶
func (d *Debouncer) Flush()
Flush executes any pending debounced function immediately without waiting for timers. It stops all active timers, executes the function in a new goroutine, and resets state. If no function is currently pending, this method does nothing and returns safely.
func (*Debouncer) IsPending ¶
IsPending returns true if a debounced function is currently waiting to be executed. This method checks the internal call counter while holding the mutex for thread safety. Useful for monitoring debouncer state without triggering any execution or side effects.
func (*Debouncer) Stop ¶
func (d *Debouncer) Stop()
Stop permanently shuts down the debouncer and prevents any future function executions. It sets the closed flag, clears pending functions, and stops all active timers immediately. After calling Stop, any subsequent Do calls will be ignored and no operations will proceed.
type DebouncerOption ¶
type DebouncerOption func(*Debouncer)
DebouncerOption is a functional option for configuring the Debouncer. These options allow customization of delay, maximum calls, and maximum wait time. Use WithDebounceDelay (required), WithDebounceMaxCalls, and WithDebounceMaxWait to set behaviors.
func WithDebounceDelay ¶
func WithDebounceDelay(d time.Duration) DebouncerOption
WithDebounceDelay sets the delay period after the last call before execution. This is a required option; without it, the debouncer will not function correctly. The delay determines the inactivity period needed to trigger the debounced function.
func WithDebounceMaxCalls ¶
func WithDebounceMaxCalls(count int) DebouncerOption
WithDebounceMaxCalls sets the maximum number of calls before immediate execution. By default, there is no limit on call accumulation using NoLimitCalls constant. Once this threshold is reached, the function executes regardless of remaining delay.
func WithDebounceMaxWait ¶
func WithDebounceMaxWait(limit time.Duration) DebouncerOption
WithDebounceMaxWait sets the maximum wait time before forced function execution. This check happens on each Do call, so total wait could be up to maxWait plus delay. A separate timer ensures execution after maxWait even if no new calls occur.
func WithDebouncerPool ¶
func WithDebouncerPool(pool *Pool) DebouncerOption
WithDebouncerPool sets a custom pool for executing debounced functions asynchronously. If not provided via this option, the default package-level pool will be used instead. This allows control over goroutine scheduling and resource management for executions.
type Doctor ¶
type Doctor struct {
// contains filtered or unexported fields
}
Doctor monitors patients with global and per-patient metrics.
func NewDoctor ¶
func NewDoctor(opts ...DoctorOption) *Doctor
NewDoctor creates and initializes a new Doctor instance with the provided configuration options. It sets up the patient priority queue, starts the scheduler goroutine, and creates a default pool if needed. The returned Doctor is immediately active and ready to accept patient registrations via Add.
func (*Doctor) Add ¶
Add registers a new patient with the Doctor, validating its configuration and scheduling its first check. If a patient with the same ID already exists, it is replaced and the old instance is marked removed. Returns an error if the patient lacks a required Check function or if registration fails.
func (*Doctor) GetState ¶
func (d *Doctor) GetState(id string) (PatientState, bool)
GetState retrieves the current health state of a patient by ID along with an existence indicator. If the patient is not registered, it returns PatientUnknown and false for the existence flag. This method provides thread-safe read access to patient state without modifying any internal data.
func (*Doctor) Metrics ¶
func (d *Doctor) Metrics() *DoctorMetrics
Metrics returns a reference to the Doctor's global metrics struct for monitoring operational statistics. All metric fields use atomic operations, allowing safe concurrent reads without additional locking. Use this to expose health check statistics to dashboards, alerts, or external monitoring systems.
func (*Doctor) Remove ¶
Remove unregisters a patient by ID, marking it as removed and preventing future scheduled checks. The method returns true if the patient was found and successfully removed, false otherwise. Removed patients are cleaned from the scheduler queue on the next scheduling cycle.
func (*Doctor) SetDegraded ¶
SetDegraded manually overrides a patient's degradation state, forcing degraded or healthy status. If the patient ID is not found, the method logs a warning and returns without making changes. Use this for operational control, such as forcing maintenance mode or acknowledging known issues.
func (*Doctor) Stop ¶
Stop halts a specific patient by ID, marking it removed and preventing any further health checks. It returns true if the patient was found and stopped, false if the patient did not exist. This method is safe to call multiple times and ensures clean removal from the scheduler.
func (*Doctor) StopAll ¶
StopAll gracefully shuts down the Doctor, stopping all patients and the scheduler goroutine. It uses an atomic flag to ensure idempotency and waits for in-flight operations with the given timeout. After calling StopAll, the Doctor cannot be restarted and all patient registrations are cleared.
type DoctorMetrics ¶
type DoctorMetrics struct {
PatientsTotal atomic.Int64
ChecksTotal atomic.Uint64
ChecksHealthy atomic.Uint64
ChecksDegraded atomic.Uint64
ChecksFailed atomic.Uint64
StateChanges atomic.Uint64
AcceleratedChecks atomic.Uint64
ManualDegraded atomic.Uint64
Recoveries atomic.Uint64
Timeouts atomic.Uint64
PanicsRecovered atomic.Uint64
PoolSubmits atomic.Uint64
PoolSubmitFails atomic.Uint64
}
DoctorMetrics tracks global operational metrics across all patients.
type DoctorOption ¶
type DoctorOption func(*Doctor)
func DoctorWithGlobalTimeout ¶
func DoctorWithGlobalTimeout(t time.Duration) DoctorOption
DoctorWithGlobalTimeout configures the default timeout for patient checks lacking individual settings. If the provided duration is zero or negative, the option is ignored and the default applies. This ensures checks do not hang indefinitely and helps maintain responsive health monitoring.
func DoctorWithLogger ¶
func DoctorWithLogger(l *ll.Logger) DoctorOption
DoctorWithLogger assigns a namespaced logger instance for structured Doctor operation logging. If the provided logger is nil, the option has no effect and the default logger remains active. Use this to integrate Doctor logs with your application's logging infrastructure and levels.
func DoctorWithMaxConcurrent ¶
func DoctorWithMaxConcurrent(n int) DoctorOption
DoctorWithMaxConcurrent sets the maximum number of concurrent health checks the Doctor can run. If the provided value is zero or negative, the option is ignored and the default applies. This limit controls resource usage and prevents system overload during intensive monitoring.
func DoctorWithObservable ¶
func DoctorWithObservable(obs Observable[PatientEvent]) DoctorOption
DoctorWithObservable attaches an event observer to receive patient state change notifications. The observable receives PatientEvent structs whenever a patient's health status updates. Use this for external monitoring, logging, or triggering downstream actions based on health.
func DoctorWithPool ¶
func DoctorWithPool(pool *Pool) DoctorOption
DoctorWithPool configures the Doctor to use a custom worker pool for check execution. If the provided pool is nil, the option has no effect and default pool creation applies. Use this to control concurrency limits and resource sharing across multiple Doctors.
func DoctorWithVerbose ¶
func DoctorWithVerbose(v bool) DoctorOption
DoctorWithVerbose enables or disables detailed debug logging for Doctor operations and checks. When enabled, additional context like durations and errors are logged for troubleshooting. Use this during development or debugging, and disable in production for reduced log volume.
type Event ¶
type Event struct {
Type string // Type of event: "queued", "run", "done"
TaskID string // Optional task identifier
WorkerID string // Identifier of the worker processing the task
Time time.Time // Time of the event
Duration time.Duration // Duration of task execution (for "done" events)
Err error // Error, if any (for "done" events)
}
Event captures details of task execution for observability. Thread-safe for use in notification across goroutines.
type Func ¶
type Func func() error
Func converts a function returning an error into a Task. Example:
pool.Submit(jack.Func(func() error { return nil })) // Submits a simple task
type FuncCtx ¶
FuncCtx converts a context-aware function into a TaskCtx. Example:
pool.SubmitCtx(ctx, jack.FuncCtx(func(ctx context.Context) error { return nil })) // Submits a context-aware task
type Future ¶
type Future[T any] struct { // contains filtered or unexported fields }
func AsyncWithContext ¶
AsyncWithContext runs fn in a new goroutine under the given context.
func NewFuture ¶
NewFuture creates a Future that executes fn asynchronously in a new goroutine. The context governs cancellation; fn receives it directly. If fn is nil, the Future is returned in an already-failed state.
func (*Future[T]) Await ¶
Await blocks until the Future completes and returns its result and any error.
func (*Future[T]) AwaitWithContext ¶
AwaitWithContext blocks until the Future completes or ctx is cancelled.
func (*Future[T]) AwaitWithTimeout ¶
AwaitWithTimeout blocks until the Future completes or timeout elapses. Returns ErrFutureTimeout if the deadline is exceeded.
func (*Future[T]) Cancel ¶
Cancel signals the Future to stop. Returns true if this call caused the cancellation.
func (*Future[T]) IsCanceled ¶
IsCanceled reports whether the Future was cancelled before completing.
func (*Future[T]) IsDone ¶
IsDone reports whether the Future has finished, either successfully or with an error.
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group coordinates concurrent execution of functions with error and panic handling. It supports context cancellation and worker limits, collecting errors in a channel. Thread-safe via wait group, mutex, and channel operations.
func NewGroup ¶
func NewGroup() *Group
NewGroup creates a new Group for running functions concurrently. It initializes an error channel with a buffer of 1. Thread-safe via initialization. Example:
group := NewGroup() // Creates a new group
func (*Group) Errors ¶
Errors returns a channel that receives the first error from Go or GoCtx goroutines. The channel has a buffer of 1 and is closed by Wait. Thread-safe via channel operations. Example:
for err := range group.Errors() { ... } // Reads errors
func (*Group) Go ¶
Go runs a function in a new goroutine with error and panic handling. The first non-nil error or panic is sent to the Errors channel. If a context is set, it is cancelled on the first error. Thread-safe via wait group and doWork. Example:
group.Go(func() error { return nil }) // Runs a function
func (*Group) GoCtx ¶
GoCtx runs a context-aware function in a new goroutine. It requires a context set via WithContext and passes it to the function. The first error, panic, or context cancellation is sent to the Errors channel, and the context is cancelled. Thread-safe via wait group and doWork. Example:
group.WithContext(ctx).GoCtx(func(ctx context.Context) error { return nil }) // Runs a context-aware function
func (*Group) Wait ¶
func (g *Group) Wait()
Wait blocks until all Go and GoCtx goroutines complete. It closes the Errors channel and cancels the Group’s context, if set. Thread-safe via wait group and once. Example:
group.Wait() // Waits for all goroutines
func (*Group) WithContext ¶
WithContext associates a context with the Group, enabling cancellation. It cancels any previous context and creates a new cancellable context. Thread-safe via context operations. Example:
group.WithContext(ctx) // Sets group context
type HedgeGroup ¶
type HedgeGroup struct {
// contains filtered or unexported fields
}
HedgeGroup runs multiple hedge strategies concurrently and returns the first successful result. Useful when hedging across different endpoints or regions.
func NewHedgeGroup ¶
func NewHedgeGroup(hedgers ...*Hedger) *HedgeGroup
NewHedgeGroup creates a group of independent hedgers.
type HedgeMetrics ¶
type HedgeMetrics struct {
Requests atomic.Uint64 // total primary calls
Hedged atomic.Uint64 // times a second request was fired
PrimaryWon atomic.Uint64 // primary response arrived first
HedgeWon atomic.Uint64 // hedge response arrived first
Errors atomic.Uint64 // all attempts failed
AvgDelayNs atomic.Int64 // EWMA of hedge fire delay in nanoseconds
}
HedgeMetrics tracks hedged request operational statistics.
type HedgeOption ¶
type HedgeOption func(*Hedger)
HedgeOption configures a Hedger.
func HedgeWithDelay ¶
func HedgeWithDelay(d time.Duration) HedgeOption
HedgeWithDelay sets a fixed delay before the hedge fires (default: use latency percentile). When set, the latency window is not consulted.
func HedgeWithMaxConcurrent ¶
func HedgeWithMaxConcurrent(n int) HedgeOption
HedgeWithMaxConcurrent sets the maximum number of in-flight hedged pairs (default 100). When the limit is reached, no hedge is fired and only the primary runs.
func HedgeWithMinSamples ¶
func HedgeWithMinSamples(n int) HedgeOption
HedgeWithMinSamples sets how many RTT samples must be collected before the latency-based hedge delay activates (default 10). Before that, a zero delay (fire immediately) is used.
func HedgeWithPercentile ¶
func HedgeWithPercentile(p int) HedgeOption
HedgeWithPercentile sets which RTT percentile to use as the hedge delay (default 50). Valid range: 1–99.
type Hedger ¶
type Hedger struct {
// contains filtered or unexported fields
}
Hedger implements the hedged requests pattern.
A primary request is sent immediately. If it has not responded within the hedge delay (derived from recent latency or a fixed duration), an identical second request is fired. Whichever responds first is returned; the other is cancelled via context cancellation.
This reduces tail latency at the cost of occasionally sending duplicate requests. It is safe only for idempotent operations (reads, retried writes with idempotency keys).
The hedge delay adapts automatically: it is the Nth percentile of recent call RTTs measured in a lock-free circular sample buffer. No third-party sketch library is used.
func NewHedger ¶
func NewHedger(opts ...HedgeOption) *Hedger
NewHedger creates a Hedger with sensible defaults.
func (*Hedger) Do ¶
Do executes fn with hedging. fn receives a context that is cancelled when the other attempt wins. The caller's ctx governs the outer deadline.
fn must be idempotent — it may be called twice concurrently.
func (*Hedger) Metrics ¶
func (h *Hedger) Metrics() *HedgeMetrics
Metrics returns the hedger's operational metrics.
type HedgerOf ¶
type HedgerOf[T any] struct { // contains filtered or unexported fields }
HedgerOf wraps Hedger for typed results, avoiding any cast at the call site.
func NewHedgerOf ¶
func NewHedgerOf[T any](opts ...HedgeOption) *HedgerOf[T]
NewHedgerOf creates a typed hedger.
func (*HedgerOf[T]) Metrics ¶
func (h *HedgerOf[T]) Metrics() *HedgeMetrics
Metrics returns the underlying hedger's metrics.
type Hook ¶
Hook defines a lifecycle hook function that returns an error. It accepts a unique identifier string for the operation being tracked. Use this type for pre-execution validation or setup logic that may fail.
type HookCtx ¶
HookCtx defines a context-aware lifecycle hook function that returns an error. It accepts a context for cancellation and an identifier for the tracked operation. Use this type when hooks need to respect timeouts or external cancellation signals.
type Identifiable ¶
type Identifiable interface {
// ID returns a unique identifier for the task.
ID() string
}
Identifiable is an optional interface for tasks to provide a custom ID for logging, metrics, and tracing.
type Lease ¶
type Lease struct {
// contains filtered or unexported fields
}
Lease represents a time-bounded slot acquired from a Semaphore. If the holder fails to call Release before the deadline, the Reaper automatically reclaims the slot so it is never permanently lost.
Obtain a Lease via Leaser.Acquire, not by constructing directly.
type LeaseMetrics ¶
type LeaseMetrics struct {
Acquired atomic.Uint64 // leases successfully granted
Released atomic.Uint64 // leases explicitly released
Expired atomic.Uint64 // leases reclaimed by the reaper
Active atomic.Int64 // leases currently held
}
LeaseMetrics tracks lease operational statistics.
type Leaser ¶
type Leaser struct {
// contains filtered or unexported fields
}
Leaser wraps a Semaphore and a Reaper to provide leases: acquired semaphore slots with automatic reclamation on expiry.
This solves the problem where a crashed or slow holder never calls Release, permanently consuming a concurrency slot. The Reaper fires after TTL and calls Release on the holder's behalf.
func NewLeaser ¶
func NewLeaser(sem *Semaphore, opts ...LeaserOption) *Leaser
NewLeaser creates a Leaser backed by the given Semaphore. The caller owns the Semaphore and Reaper lifecycles; call Close on both when done.
func (*Leaser) Acquire ¶
func (lm *Leaser) Acquire(ctx context.Context, id string, p Priority, ttl time.Duration) (*Lease, error)
Acquire waits for a semaphore slot and returns a Lease valid for ttl. If ttl is zero the manager's default TTL is used. The Lease must be Released when the work is done; if not, the Reaper reclaims it automatically after ttl.
func (*Leaser) Close ¶
func (lm *Leaser) Close()
Close stops the internal Reaper. The backing Semaphore is not closed.
func (*Leaser) Metrics ¶
func (lm *Leaser) Metrics() *LeaseMetrics
Metrics returns the manager's operational metrics.
type LeaserOption ¶
type LeaserOption func(*Leaser)
LeaserOption configures a Leaser.
func LeaserWithTTL ¶
func LeaserWithTTL(d time.Duration) LeaserOption
LeaserWithTTL sets the default lease lifetime (default 30s).
type Lifetime ¶
type Lifetime struct {
// contains filtered or unexported fields
}
func NewLifetime ¶
func NewLifetime(opts ...LifetimeOption) *Lifetime
NewLifetime creates and initializes a new Lifetime manager with the provided configuration options. It spawns one pruning goroutine per shard to handle timer expiration with minimal lock contention. The returned Lifetime is immediately active and ready to schedule timed callbacks via ScheduleTimed.
func (*Lifetime) CancelTimed ¶
CancelTimed removes a scheduled timer by ID, preventing its callback from executing. It returns true if the timer was found and successfully cancelled, false if none existed. This method is safe to call multiple times and has no effect if the timer already expired.
func (*Lifetime) ExecuteCtxWithLifetime ¶
func (lm *Lifetime) ExecuteCtxWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation FuncCtx) error
ExecuteCtxWithLifetime runs a context-aware operation with Vitals hooks and schedules timed callback on success. If the operation succeeds and a timed callback is configured, it is scheduled for later execution. This method combines immediate context-aware execution with deferred lifecycle callback management.
func (*Lifetime) ExecuteWithLifetime ¶
func (lm *Lifetime) ExecuteWithLifetime(ctx context.Context, id string, lifetime *Vitals, operation Func) error
ExecuteWithLifetime runs an operation with Vitals hooks and schedules a timed callback on success. If the operation succeeds and a timed callback is configured, it is scheduled for later execution. This method combines immediate operation execution with deferred lifecycle callback management.
func (*Lifetime) GetRemainingDuration ¶
GetRemainingDuration returns the time remaining until a scheduled timer expires for the given ID. It returns the duration and true if the timer exists and has not yet expired, otherwise zero and false. This method is useful for displaying countdowns or making scheduling decisions based on timer state.
func (*Lifetime) HasPending ¶
HasPending checks whether a timer is currently scheduled for the given ID. It returns true if the ID exists in any shard's queue, false otherwise. This method provides thread-safe read access without modifying internal state.
func (*Lifetime) Metrics ¶
func (lm *Lifetime) Metrics() *LifetimeMetrics
Metrics returns a reference to the Lifetime manager's metrics struct for monitoring operational statistics. All metric fields use atomic operations, allowing safe concurrent reads without additional locking. Use this to expose timer scheduling statistics to dashboards, alerts, or external monitoring systems.
func (*Lifetime) PendingCount ¶
PendingCount returns the total number of timers currently scheduled across all shards. It iterates through each shard while holding locks to ensure an accurate snapshot count. Use this for monitoring queue depth or implementing backpressure in high-load scenarios.
func (*Lifetime) ResetTimed ¶
ResetTimed extends the expiration time of an existing timer by its original duration from now. It returns true if the timer was found and successfully reset, false if no timer existed for the ID. This method is useful for implementing keep-alive or activity-based timeout extension patterns.
func (*Lifetime) ScheduleLifetimeTimed ¶
ScheduleLifetimeTimed schedules a timed callback using configuration from a Vitals instance. If the Vitals or its Timed callback is nil, it resets any existing timer for the ID instead. This convenience method integrates Lifetime timer management with Vitals lifecycle configuration.
func (*Lifetime) ScheduleTimed ¶
func (lm *Lifetime) ScheduleTimed(ctx context.Context, id string, callback CallbackCtx, wait time.Duration)
ScheduleTimed registers a callback to execute after the specified wait duration for a given ID. If a timer already exists for the ID, it is replaced with the new callback and expiration time. The method uses sharding for concurrency and signals the prune loop to re-evaluate scheduling.
func (*Lifetime) Stop ¶
Stop cancels timers for the specified IDs, or all timers if no IDs are provided. It groups IDs by shard for efficient batch removal and signals prune loops to re-evaluate. This method provides a convenient way to clean up multiple timers without individual CancelTimed calls.
func (*Lifetime) StopAll ¶
func (lm *Lifetime) StopAll()
StopAll gracefully shuts down the Lifetime manager, cancelling all pending timers and goroutines. It signals the context to stop prune loops, waits for them to exit, then clears all shard state. After calling StopAll, the Lifetime instance cannot be reused and all scheduled callbacks are discarded.
type LifetimeMetrics ¶
type LifetimeMetrics struct {
ScheduleCalls atomic.Uint64
ResetCalls atomic.Uint64
CancelCalls atomic.Uint64
ExecuteCalls atomic.Uint64
ExpiredTotal atomic.Uint64
ExpiredHandled atomic.Uint64
ExpiredMissed atomic.Uint64
ExpiredErrors atomic.Uint64
ActiveTimers atomic.Int64
MaxActiveTimers atomic.Int64
TotalTimersSeen atomic.Uint64
AvgExpirationMs atomic.Int64
MinExpirationMs atomic.Int64
MaxExpirationMs atomic.Int64
LoopIterations atomic.Uint64
SignalsSent atomic.Uint64
StopsReceived atomic.Uint64
}
LifetimeMetrics tracks operational metrics for the lifetime manager.
type LifetimeOption ¶
type LifetimeOption func(*Lifetime)
func LifetimeWithLogger ¶
func LifetimeWithLogger(l *ll.Logger) LifetimeOption
LifetimeWithLogger assigns a namespaced logger instance for structured Lifetime operation logging. If the provided logger is nil, the option has no effect and the default logger remains active. Use this to integrate Lifetime logs with your application's logging infrastructure and levels.
func LifetimeWithMinTick ¶
func LifetimeWithMinTick(tick time.Duration) LifetimeOption
LifetimeWithMinTick sets the minimum polling interval for the expiration check loop. If the provided duration is zero or negative, the option is ignored and default applies. Use this to balance responsiveness against CPU usage when processing expired timers.
func LifetimeWithShards ¶
func LifetimeWithShards(count uint32) LifetimeOption
LifetimeWithShards configures the number of shards for concurrent timer management. The count must be a power of two for efficient bitwise modulo hashing distribution. Use higher shard counts to reduce lock contention under heavy timer scheduling load.
func LifetimeWithTimerLimit ¶
func LifetimeWithTimerLimit(limit time.Duration) LifetimeOption
LifetimeWithTimerLimit configures the maximum execution time allowed for timer callbacks. If the provided duration is zero or negative, the option is ignored and default applies. This prevents long-running callbacks from blocking the expiration processing loop.
type LoopConfig ¶
type Looper ¶
type Looper struct {
// contains filtered or unexported fields
}
func NewLooper ¶
func NewLooper(task Func, opts ...LooperOption) *Looper
NewLooper creates a new Looper with full metrics.
func (*Looper) CurrentInterval ¶
CurrentInterval returns the interval currently in use, including any backoff applied.
func (*Looper) FailureCount ¶
FailureCount returns the number of consecutive errors since the last success.
func (*Looper) Metrics ¶
func (l *Looper) Metrics() *LooperMetrics
Metrics returns the looper's metrics.
func (*Looper) ResetInterval ¶
func (l *Looper) ResetInterval()
ResetInterval restores the interval to the value set at construction time.
func (*Looper) SetInterval ¶
SetInterval updates the iteration interval dynamically; takes effect on the next tick.
type LooperMetrics ¶
type LooperMetrics struct {
Executions atomic.Uint64
Failures atomic.Uint64
Successes atomic.Uint64
BackoffEvents atomic.Uint64
IntervalChanges atomic.Uint64
PanicsRecovered atomic.Uint64
ContextCancels atomic.Uint64
Timeouts atomic.Uint64
LastRun atomic.Value
LastError atomic.Value
LastStart atomic.Int64
LastEnd atomic.Int64
TotalDurationNs atomic.Int64
MinDurationNs atomic.Int64
MaxDurationNs atomic.Int64
CurrentIntervalNs atomic.Int64
CurrentBackoffNs atomic.Int64
}
LooperMetrics tracks comprehensive operational metrics.
type LooperOption ¶
type LooperOption func(*LoopConfig)
func WithLooperBackoff ¶
func WithLooperBackoff(enabled bool) LooperOption
WithLooperBackoff enables exponential backoff on consecutive errors.
func WithLooperContext ¶
func WithLooperContext(ctx context.Context) LooperOption
WithLooperContext attaches an external context; cancelling it stops the looper.
func WithLooperImmediate ¶
func WithLooperImmediate(immediate bool) LooperOption
WithLooperImmediate makes the looper execute fn once before the first timer fires.
func WithLooperInterval ¶
func WithLooperInterval(interval time.Duration) LooperOption
WithLooperInterval sets the base interval between loop iterations.
func WithLooperJitter ¶
func WithLooperJitter(jitter float64) LooperOption
WithLooperJitter adds proportional random jitter to each interval to spread load.
func WithLooperLogger ¶
func WithLooperLogger(logger *ll.Logger) LooperOption
WithLooperLogger sets a custom logger for the looper.
func WithLooperMaxInterval ¶
func WithLooperMaxInterval(max time.Duration) LooperOption
WithLooperMaxInterval caps the backoff interval ceiling.
func WithLooperMinInterval ¶
func WithLooperMinInterval(min time.Duration) LooperOption
WithLooperMinInterval sets the floor below which the interval will not drop.
func WithLooperName ¶
func WithLooperName(name string) LooperOption
WithLooperName sets the display name used in logs and metrics.
type Observable ¶
type Observable[T any] interface { // Add registers one or more observers to receive notifications. Add(observers ...Observer[T]) // Remove unregisters one or more observers. Remove(observers ...Observer[T]) // Notify sends one or more events to all registered observers. Notify(events ...T) // Shutdown stops the observable and waits for all notifications to complete. Shutdown() }
Observable defines the interface for objects that can be observed by multiple Observer instances. It supports adding and removing observers, notifying them of one or more events, and shutting down cleanly. The type parameter T matches the event type for observers. Example:
obs := NewObservable[string]()
logObserver := &LogObserver{}
obs.Add(logObserver)
obs.Notify("Task completed")
obs.Shutdown()
func NewObservable ¶
func NewObservable[T any](numNotifyWorkers ...int) Observable[T]
NewObservable creates and returns a new Observable instance. It optionally accepts the number of worker goroutines for asynchronous notifications (defaults to 5 if not provided or invalid). Workers process notifications in parallel to avoid blocking the notifier, with panic recovery for observer errors. Parameters: numNotifyWorkers: Optional number of worker goroutines (positive integer). If not provided or invalid, defaults to 5.
Returns: Observable[T]: A new Observable instance ready to accept observers and events.
Example:
// Create an observable with 2 workers
obs := NewObservable[string](2)
logObserver := &LogObserver{}
obs.Add(logObserver)
obs.Notify("Event 1") // Sends "Event 1" to logObserver asynchronously
type Observer ¶
type Observer[T any] interface { OnNotify(value T) // Called when an event of type T is received }
Observer defines the interface for objects that wish to receive notifications from an Observable. The type parameter T specifies the type of events or values that will be notified. Implementors must provide an OnNotify method to handle incoming events. Example:
type LogObserver struct {}
func (l *LogObserver) OnNotify(event string) {
fmt.Printf("Received event: %s\n", event)
}
type Patient ¶
type Patient struct {
// contains filtered or unexported fields
}
Patient holds runtime state for one monitored entity with its own metrics.
func NewPatient ¶
func NewPatient(cfg PatientConfig) *Patient
NewPatient creates a Patient with its own metrics instance.
func (*Patient) Metrics ¶
func (p *Patient) Metrics() *PatientMetrics
Metrics returns the patient's individual metrics.
type PatientConfig ¶
type PatientConfig struct {
ID string
Interval time.Duration
Jitter float64
Timeout time.Duration
Accelerated time.Duration
MaxFailures uint64
Check FuncCtx
OnStart FuncCtx
OnComplete FuncCtx
OnError FuncCtx
OnTimeout FuncCtx
OnRecover FuncCtx
OnRemove func()
OnStateChange func(PatientEvent)
}
PatientConfig defines monitoring parameters for a single patient.
type PatientEvent ¶
type PatientEvent struct {
ID string
State PatientState
LastCheck time.Time
Duration time.Duration
Error error
Meta map[string]any
}
PatientEvent is emitted for observability when a patient's state changes.
type PatientMetrics ¶
type PatientMetrics struct {
ChecksTotal atomic.Uint64
ChecksHealthy atomic.Uint64
ChecksDegraded atomic.Uint64
ChecksFailed atomic.Uint64
StateChanges atomic.Uint64
Recoveries atomic.Uint64
Timeouts atomic.Uint64
PanicsRecovered atomic.Uint64
ConsecFailures atomic.Int64
ConsecSuccesses atomic.Int64
LastCheckMs atomic.Int64
AvgCheckMs atomic.Int64
LastFailureMs atomic.Int64
LastRecoveryMs atomic.Int64
}
PatientMetrics tracks per-patient operational metrics.
type PatientState ¶
type PatientState int
PatientState represents the health state of a monitored patient.
const ( PatientUnknown PatientState = iota PatientHealthy PatientDegraded PatientFailed )
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages a fixed number of worker goroutines to execute tasks concurrently. It supports task submission with or without context, shutdown with timeout, and observability.
func NewPool ¶
NewPool creates a new pool with the specified number of workers and optional configurations. Workers start immediately. At least one worker is always created.
func (*Pool) Do ¶
func (p *Pool) Do(fn func())
Do submits a void function as a task, discarding any submission error.
func (*Pool) DoCtx ¶
DoCtx submits a context-aware void function as a task, discarding any submission error.
func (*Pool) Metrics ¶
func (p *Pool) Metrics() *PoolMetrics
Metrics returns the pool's operational metrics.
func (*Pool) Shutdown ¶
Shutdown gracefully stops the pool and waits for all workers to finish. Returns ErrShutdownTimedOut if workers do not exit within the timeout.
func (*Pool) Submit ¶
Submit enqueues one or more tasks for execution without context. Returns ErrPoolClosed if the pool is shut down, ErrQueueFull if the queue is at capacity.
type PoolMetrics ¶
type PoolMetrics struct {
TasksSubmitted atomic.Uint64 // total tasks accepted into the queue
TasksCompleted atomic.Uint64 // tasks that finished without error
TasksFailed atomic.Uint64 // tasks that returned an error or panicked
TasksRejected atomic.Uint64 // tasks dropped due to full queue or closed pool
PanicsRecovered atomic.Uint64 // panics caught inside task execution
QueueDepth atomic.Int64 // current number of tasks waiting in the channel
MaxQueueDepth atomic.Int64 // high-water mark of QueueDepth
TotalDurationNs atomic.Int64 // cumulative nanoseconds spent executing tasks
ActiveWorkers atomic.Int64 // workers currently executing a task
}
PoolMetrics tracks operational statistics for a Pool. All fields use atomic operations and are safe for concurrent reads without locking.
type Pooling ¶
type Pooling func(*poolingOpt)
Pooling is a functional option type for configuring the pool during creation.
func PoolingWithIDGenerator ¶
PoolingWithIDGenerator sets a custom task ID generator function.
func PoolingWithNoID ¶
func PoolingWithNoID() Pooling
PoolingWithNoID disables task ID generation entirely. This eliminates all allocations in the Submit hot path when observability (logging, event emission) is not needed. The worker will log an empty task ID. Use when pool submission is on a latency-critical path and task tracing is not required.
func PoolingWithObservable ¶
func PoolingWithObservable(obs Observable[Event]) Pooling
PoolingWithObservable sets an observable for event notifications in the pool. The observable receives "queued", "run", and "done" events for every task.
func PoolingWithQueueSize ¶
PoolingWithQueueSize sets the task queue buffer size. Defaults to 2× the worker count when not provided or negative.
type Priority ¶
type Priority int
Priority defines request importance for backpressure decisions. Lower numeric values indicate higher priority (Critical=0 is served first).
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a bounded, multi-priority, multi-consumer work queue. Higher-priority items are always dequeued before lower-priority ones. Under saturation it drops the lowest-priority items first (tail-drop per tier). This makes it suitable as the entry point of a load-balancer or transaction processor pipeline: pair it with a Semaphore or RateLimiter downstream.
func NewQueue ¶
NewQueue creates a priority queue that dispatches items to handler concurrently. handler is called once per item with the queue's context; if it returns a non-nil error the item is counted as failed but the queue keeps running.
func (*Queue) Close ¶
func (q *Queue) Close()
Close stops accepting new items and waits for all workers to drain and exit.
func (*Queue) Depth ¶
Depth returns the total number of items currently in the queue across all priorities.
func (*Queue) DepthByPriority ¶
DepthByPriority returns the number of items waiting at each priority level.
func (*Queue) Enqueue ¶
Enqueue adds an item at the given priority. It returns ErrQueueFull if the per-priority bin is at capacity, or ErrQueueClosed if the queue is shut down.
func (*Queue) EnqueueCtx ¶
EnqueueCtx adds an item at the given priority, blocking until space is available, the context is cancelled, or the queue is closed.
func (*Queue) Metrics ¶
func (q *Queue) Metrics() *QueueMetrics
Metrics returns the queue's operational metrics.
type QueueMetrics ¶
type QueueMetrics struct {
Enqueued atomic.Uint64
Dequeued atomic.Uint64
Dropped atomic.Uint64
Timeouts atomic.Uint64
Depth atomic.Int64
MaxDepth atomic.Int64
Saturated atomic.Uint64 // times the queue was full at enqueue time
}
QueueMetrics tracks operational statistics for the priority queue.
type QueueOption ¶
type QueueOption func(*Queue)
QueueOption configures a Queue.
func QueueWithCapacity ¶
func QueueWithCapacity(capacity int) QueueOption
QueueWithCapacity sets the maximum number of items the queue can hold per priority level. Total capacity is capacity × priorityCount. Default is 256 per level.
func QueueWithTimeout ¶
func QueueWithTimeout(d time.Duration) QueueOption
QueueWithTimeout sets the maximum time an item may wait in the queue before being dropped. Zero means no timeout (items wait until a worker is free).
func QueueWithWorkers ¶
func QueueWithWorkers(n int) QueueOption
QueueWithWorkers sets the number of concurrent consumer goroutines (default 1).
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter bounds throughput with a prioritized token bucket. Allow / AllowN are lock-free. Blocking Acquire uses CoDel-style priority queues. Replenish manually returns tokens; it is distinct from time-based refill and is intended for reservation patterns where the caller holds a token between operations.
func NewRateLimiter ¶
func NewRateLimiter(ratePerSec float64, burst int, opts ...RateLimiterOption) *RateLimiter
NewRateLimiter creates a token bucket rate limiter. ratePerSec is the sustained fill rate; burst is the maximum token capacity. Default maxWait (CoDel dropping threshold) is 500ms.
func (*RateLimiter) Acquire ¶
func (r *RateLimiter) Acquire(ctx context.Context, p Priority) error
Acquire waits for a single token, respecting priority and context cancellation. Higher-priority callers are dequeued before lower-priority ones already waiting.
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow(p Priority) bool
Allow consumes one token if available. Non-blocking, lock-free.
func (*RateLimiter) AllowN ¶
func (r *RateLimiter) AllowN(p Priority, n int64) bool
AllowN attempts to consume n tokens atomically. Non-blocking, lock-free.
func (*RateLimiter) Burst ¶
func (r *RateLimiter) Burst() int64
Burst returns the maximum token capacity the rate limiter was created with.
func (*RateLimiter) Close ¶
func (r *RateLimiter) Close()
Close shuts down the rate limiter, unblocking all pending waiters with ErrRateLimiterClosed.
func (*RateLimiter) Metrics ¶
func (r *RateLimiter) Metrics() *RateLimiterMetrics
Metrics returns the rate limiter's operational metrics.
func (*RateLimiter) Release ¶
func (r *RateLimiter) Release()
Release replenishes one token and wakes the highest-priority waiter if any are queued. Retained for compatibility with patterns that pair Acquire with Release. Prefer Replenish(1) for new code where the intent is explicit.
func (*RateLimiter) Replenish ¶
func (r *RateLimiter) Replenish(n int64)
Replenish adds n tokens back to the bucket, up to the burst ceiling. Use this when a caller held a reservation and is returning it — for example, when a request is retried locally and the upstream slot does not need to be consumed. This is NOT the same as time-based refill.
func (*RateLimiter) Reserve ¶
func (r *RateLimiter) Reserve(n int64, opts ...ReserveOption) *Reservation
Reserve attempts to reserve n tokens from the rate limiter without blocking. It returns a Reservation whose Delay() tells the caller how long to wait. The caller can Cancel() the reservation if the delay is unacceptable.
Unlike Acquire, Reserve never blocks. It is safe for concurrent use.
func (*RateLimiter) ReserveMetrics ¶
func (r *RateLimiter) ReserveMetrics() *ReservationMetrics
ReserveMetrics returns the metrics for the reservation subsystem. Returns nil if Reserve has never been called.
func (*RateLimiter) Tokens ¶
func (r *RateLimiter) Tokens() int64
Tokens returns the number of tokens currently available.
type RateLimiterMetrics ¶
type RateLimiterMetrics struct {
AllowedFast atomic.Uint64
AllowedSlow atomic.Uint64
Rejected atomic.Uint64
TokensConsumed atomic.Uint64
TokensRefilled atomic.Uint64
QueueDepth atomic.Int64
MaxQueueDepth atomic.Int64
}
RateLimiterMetrics tracks operational statistics for the rate limiter.
type RateLimiterOption ¶
type RateLimiterOption func(*RateLimiter)
RateLimiterOption configures a RateLimiter.
func RateLimiterWithMaxWait ¶
func RateLimiterWithMaxWait(d time.Duration) RateLimiterOption
RateLimiterWithMaxWait sets the hard ceiling on waiter age during CoDel dropping mode. In normal mode waiters are not evicted by age; their own context handles timeout.
type Reaper ¶
type Reaper struct {
// contains filtered or unexported fields
}
func NewReaper ¶
func NewReaper(ttl time.Duration, opts ...ReaperOption) *Reaper
NewReaper creates a TTL-based expiry manager with min-heap scheduling. The handler is called once per expired entry in a background goroutine.
func (*Reaper) Deadline ¶
Deadline returns the earliest expiry time across all shards, or false if empty.
func (*Reaper) Metrics ¶
func (r *Reaper) Metrics() *ReaperMetrics
Metrics returns the reaper's operational statistics.
func (*Reaper) Register ¶
func (r *Reaper) Register(h ReaperHandler)
Register sets the handler for expired tasks (backward compatible).
func (*Reaper) Start ¶
func (r *Reaper) Start()
Start is a no-op for backward compatibility. Reaper starts automatically now.
func (*Reaper) Stop ¶
func (r *Reaper) Stop()
Stop shuts down all reaper goroutines and releases internal resources.
type ReaperHandler ¶
type ReaperMetrics ¶
type ReaperMetrics struct {
TouchCalls atomic.Uint64
TouchAtCalls atomic.Uint64
RemoveCalls atomic.Uint64
ClearCalls atomic.Uint64
ExpiredTotal atomic.Uint64
ExpiredHandled atomic.Uint64
ExpiredMissed atomic.Uint64
ExpiredErrors atomic.Uint64
ActiveTasks atomic.Int64
MaxActiveTasks atomic.Int64
TotalTasksSeen atomic.Uint64
AvgExpirationMs atomic.Int64
MinExpirationMs atomic.Int64
MaxExpirationMs atomic.Int64
LoopIterations atomic.Uint64
SignalsSent atomic.Uint64
StopsReceived atomic.Uint64
}
type ReaperOption ¶
type ReaperOption func(*Reaper)
func ReaperWithHandler ¶
func ReaperWithHandler(h ReaperHandler) ReaperOption
ReaperWithHandler sets the callback invoked when an entry's TTL expires.
func ReaperWithLogger ¶
func ReaperWithLogger(l *ll.Logger) ReaperOption
ReaperWithLogger sets a custom logger for the reaper.
func ReaperWithMinTick ¶
func ReaperWithMinTick(tick time.Duration) ReaperOption
ReaperWithMinTick sets the minimum poll interval for the expiry loop.
func ReaperWithShards ¶
func ReaperWithShards(count uint32) ReaperOption
ReaperWithShards controls the number of internal heap shards for reduced contention.
func ReaperWithTimerLimit ¶
func ReaperWithTimerLimit(limit time.Duration) ReaperOption
ReaperWithTimerLimit caps the duration the reaper will sleep before checking again.
type ReaperTask ¶
type Reservation ¶
type Reservation struct {
// contains filtered or unexported fields
}
Reservation is a future token grant from a RateLimiter. The caller inspects Delay() and decides whether to wait or drop. Once committed, call Wait(ctx) to block for exactly the reservation's delay. A Reservation is single-use and not safe for concurrent access.
func (*Reservation) Cancel ¶
func (r *Reservation) Cancel()
Cancel releases the reservation without consuming tokens. Safe to call multiple times.
func (*Reservation) Delay ¶
func (r *Reservation) Delay() time.Duration
Delay returns how long the caller must wait before the reserved tokens are available. Zero means tokens are available immediately. A negative value means the reservation was cancelled.
func (*Reservation) OK ¶
func (r *Reservation) OK() bool
OK reports whether the reservation is valid and not yet used or cancelled.
type ReservationMetrics ¶
type ReservationMetrics struct {
Reserved atomic.Uint64 // successful Reserve calls
Used atomic.Uint64 // reservations that called Wait successfully
Dropped atomic.Uint64 // reservations cancelled before use
Expired atomic.Uint64 // reservations that timed out during Wait
}
ReservationMetrics tracks reservation operational statistics.
type ReserveOption ¶
type ReserveOption func(*reserveConfig)
ReserveOption configures reservation behaviour.
func ReserveWithMaxDelay ¶
func ReserveWithMaxDelay(d time.Duration) ReserveOption
ReserveWithMaxDelay causes Reserve to return a cancelled Reservation if the computed delay exceeds d. Use to implement admission control at the call site without blocking any goroutine.
type Retry ¶
type Retry struct {
// contains filtered or unexported fields
}
Retry defines how retries are performed. Create one once and reuse it across many Do / DoCtx calls — it is safe for concurrent use.
func NewRetry ¶
func NewRetry(opts ...RetryOption) *Retry
NewRetry constructs a Retry with sensible defaults.
func (*Retry) Do ¶
Do executes fn according to the policy, retrying on retryable errors. The provided context governs the total deadline; each individual attempt also respects context cancellation. Returns the last error wrapped in ErrRetryExhausted if all attempts fail.
func (*Retry) Metrics ¶
func (p *Retry) Metrics() *RetryMetrics
Metrics returns the policy's operational metrics.
type RetryMetrics ¶
type RetryMetrics struct {
Attempts atomic.Uint64 // total individual calls (including first)
Successes atomic.Uint64 // calls that ultimately succeeded
Failures atomic.Uint64 // calls that exhausted all retries
Timeouts atomic.Uint64 // attempts abandoned due to context deadline
}
RetryMetrics tracks retry operational statistics.
type RetryOption ¶
type RetryOption func(*Retry)
RetryOption configures a Retry.
func RetryWithBaseDelay ¶
func RetryWithBaseDelay(d time.Duration) RetryOption
RetryWithBaseDelay sets the initial delay before the first retry (default 100ms).
func RetryWithJitter ¶
func RetryWithJitter(enabled bool) RetryOption
RetryWithJitter enables full jitter on the backoff delay (default true). Full jitter randomises each delay in [0, computed_delay] to spread load.
func RetryWithMaxAttempts ¶
func RetryWithMaxAttempts(n int) RetryOption
RetryWithMaxAttempts sets the total number of attempts (initial + retries). Default is 3.
func RetryWithMaxDelay ¶
func RetryWithMaxDelay(d time.Duration) RetryOption
RetryWithMaxDelay caps the backoff delay (default 30s).
func RetryWithMultiplier ¶
func RetryWithMultiplier(m float64) RetryOption
RetryWithMultiplier sets the exponential backoff multiplier (default 2.0).
func RetryWithOnRetry ¶
func RetryWithOnRetry(fn func(attempt int, err error)) RetryOption
RetryWithOnRetry registers a callback invoked before each retry with the attempt number (1-based) and the error that triggered the retry.
func RetryWithRetryIf ¶
func RetryWithRetryIf(fn func(error) bool) RetryOption
RetryWithRetryIf replaces the default retry predicate. By default all non-nil errors are retried. Supply a custom function to skip retrying on permanent errors (e.g. 404, validation failures).
type Routine ¶
type Routine struct {
Interval time.Duration // Interval between task executions
MaxRuns int // Maximum number of runs (0 for unlimited)
Cron string // Cron expression (e.g., "0 0 * * *" for daily at midnight, "@every 1m")
}
Routine configures recurring task execution with an interval and maximum runs. Thread-safe for use in scheduling configurations.
type RoutineInfo ¶
type RoutineInfo struct {
ID string
Label string
State RoutineState
StartedAt time.Time
EndedAt time.Time
Err error
Stack []byte // non-nil only when State == RoutinePanicked
}
RoutineInfo holds the observable state of a single tracked goroutine.
type RoutineOption ¶
type RoutineOption func(*Routines)
RoutineOption configures a Routines tracker.
func RoutineWithIDGenerator ¶
func RoutineWithIDGenerator(fn func(label string) string) RoutineOption
RoutineWithIDGenerator replaces the default sequential ID generator.
func RoutineWithMaxEntries ¶
func RoutineWithMaxEntries(n int) RoutineOption
RoutineWithMaxEntries caps the number of retained RoutineInfo entries. When exceeded, completed entries are evicted to bound memory growth.
func RoutineWithOnDone ¶
func RoutineWithOnDone(fn func(info RoutineInfo)) RoutineOption
RoutineWithOnDone sets a callback invoked when any goroutine finishes (regardless of success, error, or panic).
func RoutineWithOnPanic ¶
func RoutineWithOnPanic(fn func(info RoutineInfo)) RoutineOption
RoutineWithOnPanic sets a callback invoked whenever a goroutine panics. The callback runs synchronously before the goroutine exits.
type RoutineState ¶
type RoutineState int32
RoutineState describes the lifecycle state of a tracked goroutine.
const ( RoutineRunning RoutineState = iota RoutineDone RoutinePanicked RoutineCancelled )
func (RoutineState) String ¶
func (s RoutineState) String() string
String returns a human-readable label for the goroutine state.
type Routines ¶
type Routines struct {
// contains filtered or unexported fields
}
Routines is a goroutine tracker and lifecycle manager. It is the answer to the "rogue goroutine" problem: every goroutine spawned through Routines is registered, tracked by state, and guaranteed to be joined by Stop or Wait.
Singleton usage:
var rt = jack.NewRoutines()
rt.Spawn("fetch", func(ctx context.Context) error { ... })
rt.SpawnCtx("heartbeat", func(ctx context.Context) error { ... })
rt.Stop() // cancels context, waits for all goroutines
All methods are safe for concurrent use.
func NewRoutines ¶
func NewRoutines(opts ...RoutineOption) *Routines
NewRoutines creates a Routines tracker. The returned tracker has its own cancellable context; call Stop() to terminate all goroutines it owns.
func (*Routines) Background ¶
Background spawns a long-running goroutine that is restarted automatically if it returns a non-nil error, up to maxRestarts times (0 = unlimited). Each restart is counted in Metrics.Spawned.
func (*Routines) Cancel ¶
Cancel cancels a single goroutine by ID. Returns false if the ID is unknown or the goroutine has already exited.
func (*Routines) Forget ¶
Forget removes a completed goroutine from tracking, reclaiming the memory held by its RoutineInfo. Returns false if the ID is unknown or the goroutine is still running.
func (*Routines) Info ¶
func (r *Routines) Info(id string) (RoutineInfo, bool)
Info returns a snapshot of the named goroutine's current state. Returns false if no goroutine with that ID exists.
func (*Routines) List ¶
func (r *Routines) List() []RoutineInfo
List returns a snapshot of all tracked goroutines, including completed ones.
func (*Routines) Metrics ¶
func (r *Routines) Metrics() *RoutinesMetrics
Metrics returns aggregate goroutine statistics.
func (*Routines) Spawn ¶
Spawn spawns a goroutine under the tracker's context. The goroutine is cancelled when Stop() is called or the tracker's context is cancelled. label is used for identification; it need not be unique — a unique ID is derived automatically. Returns the assigned ID so the caller can query status via Info(id).
func (*Routines) SpawnCtx ¶
func (r *Routines) SpawnCtx(ctx context.Context, label string, fn func(context.Context) error) string
SpawnCtx spawns a goroutine under a caller-supplied context. The goroutine is cancelled when either the supplied ctx OR the tracker's context is done — whichever fires first.
func (*Routines) Stop ¶
func (r *Routines) Stop()
Stop cancels the tracker's context and waits for all goroutines to exit. After Stop returns, no goroutines tracked by this Routines are running.
func (*Routines) StopTimeout ¶
StopTimeout cancels the tracker's context and waits up to the given duration for all goroutines to exit. Returns an error if the timeout is exceeded.
type RoutinesMetrics ¶
type RoutinesMetrics struct {
Spawned atomic.Uint64 // total goroutines ever started
Active atomic.Int64 // currently running
Completed atomic.Uint64 // finished without error or panic
Failed atomic.Uint64 // returned a non-nil error
Panicked atomic.Uint64 // recovered from a panic
Cancelled atomic.Uint64 // stopped via context cancellation
}
RoutinesMetrics tracks aggregate goroutine statistics.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner executes tasks asynchronously using a buffered task queue. It supports observability and logging, processing tasks in a single goroutine. Thread-safe via mutex, wait group, and channel operations.
func NewRunner ¶
func NewRunner(opts ...RunnerOption) *Runner
NewRunner creates a new Runner with the specified options. It initializes a task queue and starts a processing goroutine. Thread-safe via initialization and logger namespace. Example:
runner := NewRunner(WithRunnerQueueSize(10), WithRunnerObservable(obs)) // Creates runner with queue size 10
func (*Runner) Do ¶
Do submits a Task for execution in the runner’s queue. It returns an error if the task is nil or the runner is closed. Thread-safe via submit and channel operations. Example:
runner.Do(myTask) // Submits a task
func (*Runner) DoCtx ¶
DoCtx submits a TaskCtx for execution with the given context. It returns an error if the context or task is nil, the context is done, or the runner is closed. Thread-safe via submit and channel operations. Example:
runner.DoCtx(ctx, myTaskCtx) // Submits a context-aware task
func (*Runner) QueueSize ¶
QueueSize returns the current number of tasks in the queue. Thread-safe via channel length access. Example:
size := runner.QueueSize() // Gets current queue size
type RunnerOption ¶
type RunnerOption func(*runnerOptions)
RunnerOption configures a Runner during creation.
func WithRunnerIDGenerator ¶
func WithRunnerIDGenerator(fn func(interface{}) string) RunnerOption
WithRunnerIDGenerator sets the task ID generator function. Example:
runner := NewRunner(WithRunnerIDGenerator(customIDFunc)) // Sets custom ID generator
func WithRunnerObservable ¶
func WithRunnerObservable(obs Observable[Event]) RunnerOption
WithRunnerObservable sets the observable for task events. Example:
runner := NewRunner(WithRunnerObservable(obs)) // Configures observable
func WithRunnerQueueSize ¶
func WithRunnerQueueSize(size int) RunnerOption
WithRunnerQueueSize sets the task queue size. Non-positive values are ignored. Example:
runner := NewRunner(WithRunnerQueueSize(20)) // Sets queue size to 20
type Safely ¶
Safely wraps sync.Mutex with safe execution methods.
func (*Safely) Do ¶
func (m *Safely) Do(fn func())
Do executes fn while holding the mutex lock. The function is guaranteed to run with exclusive access, ensuring thread safety. If fn is nil, Do will panic. Example:
var mu Safely
mu.Do(func() {
// Critical section
fmt.Println("Safe execution")
})
func (*Safely) DoCtx ¶
DoCtx executes fn while holding the mutex lock, with context support. The lock is acquired, then the context is checked. If the context is done, its error is returned and fn is not executed. Otherwise, fn is executed in a goroutine, allowing it to be interrupted if the context is canceled or times out (e.g., via context.WithTimeout). If fn is nil, DoCtx will return a *CaughtPanic. If ctx is nil, DoCtx will panic when ctx.Err() is called. This method does not recover from panics in fn; use SafeCtx for panic recovery. Example:
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := mu.DoCtx(ctx, func() error {
// Critical section, no need to check ctx
time.Sleep(100 * time.Millisecond)
return nil
})
// err will likely be context.DeadlineExceeded due to timeout
func (*Safely) Safe ¶
Safe executes fn while holding the mutex lock, with panic recovery. If fn panics, a *CaughtPanic error is returned. Otherwise, the error returned by fn is returned. If fn is nil, Safe will return a *CaughtPanic. Example:
var mu Safely
err := mu.Safe(func() error {
// Critical section
return nil
})
if err != nil {
fmt.Println("Error:", err)
}
func (*Safely) SafeCtx ¶
SafeCtx executes fn with panic recovery and context awareness while holding the mutex lock. The lock is acquired, then fn is executed via the standalone SafeCtx function. This ensures fn can be canceled by the context (e.g., via context.WithTimeout), and any panics are recovered. The lock is held until fn completes or the context is canceled. If fn is nil, SafeCtx will return a *CaughtPanic. If ctx is nil, SafeCtx will panic when ctx.Err() is called. Example:
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
err := mu.SafeCtx(ctx, func() error {
// Critical section, no need to check ctx
time.Sleep(100 * time.Millisecond)
return nil
})
// err will likely be context.DeadlineExceeded due to timeout
type Schedule ¶
type Schedule struct {
Type string // Type of event (e.g., "task_submitted", "task_submission_failed", "stopped")
Name string // Name of the scheduler emitting the event
TaskID string // Unique identifier for the task
TaskType string // Type of the task (e.g., struct name or type description)
Message string // Descriptive message providing context for the event
Error error // Any error associated with the event, if applicable
Routine Routine // Configuration of the scheduling routine (e.g., interval, max runs)
Time time.Time // Timestamp when the event occurred
NextRun time.Time // Scheduled time for the next task execution, if applicable
}
Schedule represents an event emitted by the scheduler for observability.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages periodic or limited task submissions to a Pool. It supports both interval-based and cron expression-based scheduling.
func NewScheduler ¶
NewScheduler creates a new Scheduler instance.
func (*Scheduler) Do ¶
Do starts scheduling for the provided non-context-aware tasks. It initializes the scheduler to execute the given tasks according to the configured routine (e.g., interval or max runs). The method ensures thread-safety by locking the scheduler state and checks if the scheduler is already running to prevent duplicate executions. Each task is executed in its own goroutine, and the method emits a "started" event for observability.
func (*Scheduler) DoCtx ¶
DoCtx starts scheduling for context-aware tasks. Similar to Do, but designed for tasks that accept a context for cancellation or deadlines. It ensures thread-safety, checks for existing runs, and associates a task execution context with the tasks. Each task runs in its own goroutine, and a "started" event is emitted for observability.
func (*Scheduler) Entries ¶
Entries returns the current cron entries if using cron-based scheduling.
type Scheduling ¶
type Scheduling struct {
RetryCount int // Number of retry attempts for task submission on failure
RetryBackoff time.Duration // Duration to wait between retry attempts for failed submissions
// contains filtered or unexported fields
}
Scheduling holds configuration for retry behavior and observability.
type Semaphore ¶
type Semaphore struct {
// contains filtered or unexported fields
}
Semaphore bounds concurrent access with priority and CoDel queueing. The fast path (TryAcquire) is lock-free. Blocking acquire uses per-priority queues that switch from FIFO to LIFO under sustained overload.
func NewSemaphore ¶
func NewSemaphore(capacity int, opts ...SemaphoreOption) *Semaphore
NewSemaphore creates a prioritized semaphore with the given capacity. Defaults: target sojourn 5ms, max sojourn 500ms, interval 100ms.
func (*Semaphore) Acquire ¶
Acquire waits for a slot, respecting priority and context cancellation. Higher-priority callers are served before lower-priority ones already queued.
func (*Semaphore) Available ¶
Available returns the number of slots currently available for acquisition.
func (*Semaphore) Capacity ¶
Capacity returns the total slot capacity the semaphore was created with.
func (*Semaphore) Close ¶
func (s *Semaphore) Close()
Close permanently shuts down the semaphore, unblocking all pending waiters with ErrSemaphoreClosed.
func (*Semaphore) Metrics ¶
func (s *Semaphore) Metrics() *SemaphoreMetrics
Metrics returns the semaphore's operational metrics.
func (*Semaphore) Release ¶
func (s *Semaphore) Release()
Release returns a slot and attempts to hand it to the highest-priority waiter. If the dequeued waiter was context-cancelled, Release retries until it finds a live one.
func (*Semaphore) TryAcquire ¶
TryAcquire attempts to take a slot without blocking. Returns false immediately if no slot is available or the semaphore is closed.
type SemaphoreMetrics ¶
type SemaphoreMetrics struct {
AcquiredFast atomic.Uint64
AcquiredSlow atomic.Uint64
Released atomic.Uint64
Rejected atomic.Uint64
Timeouts atomic.Uint64
QueueDepth atomic.Int64
MaxQueueDepth atomic.Int64
}
SemaphoreMetrics tracks operational statistics for the semaphore. All fields are safe for concurrent reads without additional locking.
type SemaphoreOption ¶
type SemaphoreOption func(*Semaphore)
SemaphoreOption configures a Semaphore.
func SemaphoreWithInterval ¶
func SemaphoreWithInterval(d time.Duration) SemaphoreOption
SemaphoreWithInterval sets the CoDel interval for sustained overload detection. Dropping mode engages when sojourn stays above target for this long.
func SemaphoreWithMaxSojourn ¶
func SemaphoreWithMaxSojourn(d time.Duration) SemaphoreOption
SemaphoreWithMaxSojourn sets the hard ceiling on waiter age. Exceeding this immediately engages dropping mode regardless of interval.
func SemaphoreWithTargetSojourn ¶
func SemaphoreWithTargetSojourn(d time.Duration) SemaphoreOption
SemaphoreWithTargetSojourn sets the CoDel target sojourn time. When the oldest waiter exceeds this threshold, the queue enters dropping mode.
type Shutdown ¶
type Shutdown struct {
// contains filtered or unexported fields
}
Shutdown manages the graceful shutdown process.
func NewShutdown ¶
func NewShutdown(opts ...ShutdownOption) *Shutdown
NewShutdown creates a configured Shutdown manager. Defaults: 30s timeout, sequential execution, SIGINT/SIGTERM/SIGQUIT.
func (*Shutdown) Done ¶
func (sm *Shutdown) Done() <-chan struct{}
Done returns a channel closed when shutdown completes.
func (*Shutdown) GetStats ¶
func (sm *Shutdown) GetStats() *ShutdownStats
GetStats returns a deep copy of current shutdown statistics. Safe for concurrent reads; protects against mutation during access. Used by public APIs to return final results. Used by public APIs to return final results.
func (*Shutdown) IsShuttingDown ¶
IsShuttingDown reports whether shutdown has been initiated. Thread-safe via atomic boolean; useful for guarding late registrations. Returns true once the first shutdown trigger occurs.
func (*Shutdown) Register ¶
Register adds a cleanup task. Supported types: func(), func() error, func(context.Context) error, io.Closer.
func (*Shutdown) RegisterCall ¶
RegisterCall registers a simple function returning an error. Convenience wrapper around Register; supports jack.Func signature. Name is auto-generated when empty string is provided.
func (*Shutdown) RegisterCloser ¶
RegisterCloser registers an io.Closer. Convenience wrapper that converts Close() error into shutdown error. Name reflects the concrete type when left empty.
func (*Shutdown) RegisterFunc ¶
RegisterFunc registers a simple void function. Convenience wrapper around Register; name is auto-generated if empty. Useful for quick registration of fire-and-forget cleanup.
func (*Shutdown) RegisterWithContext ¶
RegisterWithContext registers a fully context-aware callback. Allows explicit naming and direct use of jack.FuncCtx functions. Preferred for advanced cleanup needing cancellation/timeout awareness.
func (*Shutdown) TriggerShutdown ¶
func (sm *Shutdown) TriggerShutdown() *ShutdownStats
TriggerShutdown manually initiates the shutdown process and returns final statistics.
func (*Shutdown) Wait ¶
func (sm *Shutdown) Wait() *ShutdownStats
Wait blocks until a signal is received or TriggerShutdown is called, then runs all registered cleanup tasks and returns statistics.
func (*Shutdown) WaitChan ¶
func (sm *Shutdown) WaitChan() <-chan *ShutdownStats
WaitChan returns a channel that receives stats once shutdown is complete. Non-blocking alternative to Wait(), ideal for async integration. Channel is closed after sending the single stats value.
type ShutdownError ¶
ShutdownError provides structured error details for a failed cleanup task.
func (*ShutdownError) Error ¶
func (e *ShutdownError) Error() string
Error implements the error interface, combining the task name and underlying cause.
func (*ShutdownError) Unwrap ¶
func (e *ShutdownError) Unwrap() error
Unwrap returns the underlying error for errors.Is/As chain traversal.
type ShutdownOption ¶
type ShutdownOption func(*Shutdown)
ShutdownOption configures the Shutdown.
func ShutdownConcurrent ¶
func ShutdownConcurrent() ShutdownOption
ShutdownConcurrent enables concurrent execution of cleanup functions. By default, execution is sequential (LIFO).
func ShutdownWithForceQuit ¶
func ShutdownWithForceQuit(d time.Duration) ShutdownOption
ShutdownWithForceQuit enables a force-quit trigger after a specific timeout, cancelling the cleanup context if shutdown takes too long.
func ShutdownWithLogger ¶
func ShutdownWithLogger(l *ll.Logger) ShutdownOption
ShutdownWithLogger sets a custom logger for the manager.
func ShutdownWithSignals ¶
func ShutdownWithSignals(signals ...os.Signal) ShutdownOption
ShutdownWithSignals specifies which OS signals to capture. If not set, defaults to SIGINT, SIGTERM, and SIGQUIT.
func ShutdownWithTimeout ¶
func ShutdownWithTimeout(d time.Duration) ShutdownOption
ShutdownWithTimeout sets the maximum time to wait for shutdown completion.
type ShutdownStats ¶
type ShutdownStats struct {
TotalEvents int
CompletedEvents int
FailedEvents int
StartTime time.Time
EndTime time.Time
Errors []error
}
ShutdownStats contains metrics about the shutdown execution.
type Task ¶
type Task interface {
// Do executes the task and returns any error.
Do() error
}
Task represents a simple task that can be executed without a context.
type TaskCtx ¶
type TaskCtx interface {
// Do executes the task with the given context and returns any error.
Do(ctx context.Context) error
}
TaskCtx represents a context-aware task that can be executed with a context.
type Throttle ¶
type Throttle struct {
// contains filtered or unexported fields
}
Throttle is a client-side self-tuning throttle. It observes upstream acceptance and rejection rates and probabilistically rejects local requests before sending when the upstream is overloaded. Lower-priority traffic is throttled more aggressively than critical traffic.
func NewThrottle ¶
func NewThrottle(priorities int, opts ...ThrottleOption) *Throttle
NewThrottle creates a throttle with the given number of priorities. The standard jack priority count (4) is recommended.
func (*Throttle) Accepted ¶
Accepted records a successful upstream response for the given priority tier, decreasing throttle pressure for that tier.
func (*Throttle) Allow ¶
Allow returns true if the request should proceed. Uses a lock-free LCG random check against the per-priority rejection probability.
func (*Throttle) Close ¶
func (a *Throttle) Close()
Close shuts down the throttle; all subsequent Allow calls return false.
func (*Throttle) Metrics ¶
func (a *Throttle) Metrics() *ThrottleMetrics
Metrics returns the throttle's operational metrics.
func (*Throttle) Probability ¶
Probability returns the current rejection probability for the given priority as a value in [0.0, 1.0]. This is the live per-priority value, not a single aggregate.
type ThrottleMetrics ¶
type ThrottleMetrics struct {
RequestsTotal atomic.Uint64
Accepted atomic.Uint64
RejectedLocal atomic.Uint64
RejectedRemote atomic.Uint64
// ThrottleProbs holds the current rejection probability for each priority tier,
// scaled by throttleProbScale (0 = no throttling, 10 000 = always throttled).
// Index matches Priority: 0=Critical, 1=High, 2=Medium, 3=Low.
ThrottleProbs [priorityCount]atomic.Int64
}
ThrottleMetrics tracks throttle behaviour across all priority tiers.
type ThrottleOption ¶
type ThrottleOption func(*Throttle)
ThrottleOption configures a Throttle.
func ThrottleWithRatio ¶
func ThrottleWithRatio(ratio float64) ThrottleOption
ThrottleWithRatio sets the overcommit ratio (default 2.0). A ratio of 2.0 targets a 50% upstream acceptance rate before local shedding begins.
func ThrottleWithWindow ¶
func ThrottleWithWindow(d time.Duration) ThrottleOption
ThrottleWithWindow sets the observation window for accept/reject counters (default 1 minute). Counters reset after windowResetSamples samples to adapt to changing upstream load.
func ThrottleWithWindowResetSamples ¶
func ThrottleWithWindowResetSamples(n uint64) ThrottleOption
ThrottleWithWindowResetSamples sets how many samples to collect before resetting the observation window (default 1000). Smaller values adapt faster; larger values are smoother.
type Vitals ¶
type Vitals struct {
Start HookCtx
End CallbackCtx
Timed CallbackCtx
TimedWait time.Duration
}
func NewVitals ¶
func NewVitals(opts ...VitalsOption) *Vitals
NewVitals creates a new Vitals instance configured with the provided functional options. Each option modifies the Vitals configuration before returning the initialized instance. If no options are provided, returns a Vitals with all fields set to their zero values.
func (*Vitals) Execute ¶
Execute runs an operation with optional start and end hooks while propagating errors. It invokes the Start hook first, then the operation, and finally the End callback on success. If Start or the operation returns an error, execution stops and the error is returned immediately.
func (*Vitals) ExecuteCtx ¶
ExecuteCtx runs a context-aware operation with optional start and end hooks. It invokes the Start hook first, then the operation with context, and finally the End callback. Errors from Start or the operation are propagated immediately, skipping subsequent steps.
type VitalsOption ¶
type VitalsOption func(*Vitals)
func VitalsWithEnd ¶
func VitalsWithEnd(callback CallbackCtx) VitalsOption
VitalsWithEnd configures a callback function to run after successful operation completion. The callback receives the context and operation ID but does not return an error. Use this option for post-operation cleanup, notifications, or result processing.
func VitalsWithStart ¶
func VitalsWithStart(hook HookCtx) VitalsOption
VitalsWithStart configures a hook function to run at the beginning of a vital operation. The hook receives the context and operation ID, and can return an error to abort execution. Use this option to inject pre-operation logic like logging, metrics, or validation.
func VitalsWithTimed ¶
func VitalsWithTimed(callback CallbackCtx, wait time.Duration) VitalsOption
VitalsWithTimed configures a timed callback with a wait duration for delayed execution. The callback receives context and ID after the specified wait period elapses. Use this option for deferred operations like timeout handling or scheduled cleanup.
Source Files
¶
- adaptive.go
- async.go
- backpressure.go
- breaker.go
- bulkhead.go
- debounce.go
- doctor.go
- fn.go
- func.go
- hedge.go
- lease.go
- lifetime.go
- looper.go
- observer.go
- patient.go
- pool.go
- queue.go
- ratelimiter.go
- reaper.go
- reservation.go
- retry.go
- routines.go
- run.go
- runner.go
- safely.go
- scheduler.go
- semaphore.go
- shutdown.go
- task.go
- throttle.go
- types.go
- vitals.go
- worker.go