Versions in this module Expand all Collapse all v0 v0.1.0 Mar 25, 2026 Changes in this version + var ErrClosed = errors.New("toc: stage closed") + var ErrStopping = errors.New("toc: stage is stopping") + var ErrWeightExceedsLimit = errors.New("toc: item weight exceeds MaxWIPWeight") + func Adapt(name string, prev, curr Stats, elapsed time.Duration) core.StageObservation + func BufferCapacity(throughput float64, protectionTime time.Duration) int + func FromChan[T any](ch <-chan T) <-chan rslt.Result[T] + func PublishOnSnapshot(ctx context.Context, pub ObservationPublisher, onError PublishErrorHandler) func(PipelineSnapshot) + type Batcher struct + func NewBatcher[T any](ctx context.Context, src <-chan rslt.Result[T], n int) *Batcher[T] + func (b *Batcher[T]) Out() <-chan rslt.Result[[]T] + func (b *Batcher[T]) Stats() BatcherStats + func (b *Batcher[T]) Wait() error + type BatcherStats struct + BatchCount int64 + BufferedDepth int64 + Dropped int64 + Emitted int64 + Forwarded int64 + OutputBlockedTime time.Duration + Received int64 + func (s BatcherStats) ToStats() Stats + type BudgetAllocatorHandle struct + func BudgetAllocator(stages []StageAllocation, drumName string, budgetFraction float64, ...) *BudgetAllocatorHandle + func (h *BudgetAllocatorHandle) Callback() func(context.Context, memctl.MemInfo) + type BufferZone int + const BufferGreen + const BufferRed + const BufferYellow + func (z BufferZone) String() string + type DiagnosisHandler func(ctx context.Context, msg DiagnosisMessage) error + type DiagnosisMessage struct + Diagnosis core.Diagnosis + PipelineID string + TimestampUnixNano int64 + type DiagnosisPublisher interface + PublishDiagnosis func(ctx context.Context, msg DiagnosisMessage) error + type DiagnosisSubscriber interface + SubscribeAllDiagnosis func(ctx context.Context, fn DiagnosisHandler) (Subscription, error) + SubscribeDiagnosis func(ctx context.Context, pipelineID string, fn DiagnosisHandler) (Subscription, error) + type FocusingStep int + const StepElevate + const StepExploit + const StepIdentify + const StepPreventInertia + const StepSubordinate + func ClassifyStep(prevConstraint string, currConstraint string, ropeActive bool, ...) FocusingStep + func (s FocusingStep) String() string + type IncomingEdge struct + From string + Ratio int + type IntervalStats struct + ApproxUtilization float64 + ArrivalRate float64 + CurrActiveWorkers int + CurrBufferPenetration float64 + CurrBufferedDepth int64 + CurrQueueCapacity int + CurrTargetWorkers int + Duration time.Duration + ErrorRate float64 + Goodput float64 + IdleTimeDelta time.Duration + ItemsCanceled int64 + ItemsCompleted int64 + ItemsFailed int64 + ItemsSubmitted int64 + MeanServiceTime time.Duration + OutputBlockedDelta time.Duration + QueueGrowthRate float64 + ResetDetected bool + ServiceTimeDelta time.Duration + Throughput float64 + func Delta(prev, curr Stats, elapsed time.Duration) IntervalStats + func (s IntervalStats) BufferZone(yellowAt, redAt float64) BufferZone + type Join struct + func NewJoin[A, B, R any](ctx context.Context, srcA <-chan rslt.Result[A], srcB <-chan rslt.Result[B], ...) *Join[R] + func (j *Join[R]) Out() <-chan rslt.Result[R] + func (j *Join[R]) Stats() JoinStats + func (j *Join[R]) Wait() error + type JoinStats struct + Combined int64 + DiscardedA int64 + DiscardedB int64 + Errors int64 + ExtraA int64 + ExtraB int64 + OutputBlockedTime time.Duration + ReceivedA int64 + ReceivedB int64 + type LimitHandle struct + func (h *LimitHandle) Close() + func (h *LimitHandle) ProposeCount(limit int) + func (h *LimitHandle) ProposeWeight(limit int64) + type LimitManager struct + func NewLimitManager(setCount func(int) int, setWeight func(int64) int64, defaultCount int, ...) *LimitManager + func (m *LimitManager) Effective() LimitSnapshot + func (m *LimitManager) ProposeCount(source string, limit int) + func (m *LimitManager) ProposeWeight(source string, limit int64) + func (m *LimitManager) Register(source LimitSource) *LimitHandle + func (m *LimitManager) WithdrawCount(source string) + func (m *LimitManager) WithdrawWeight(source string) + type LimitSnapshot struct + AppliedCount int + AppliedWeight int64 + CountProposals map[string]int + CountSource string + CountSources int + EffectiveCount int + EffectiveWeight int64 + WeightProposals map[string]int64 + WeightSource string + WeightSources int + type LimitSource = string + const LimitSourceBudgetAllocator + const LimitSourceMemoryRope + const LimitSourceProcessingRope + const LimitSourceWeightRope + type MemoryFeverZone int + const MemoryGreen + const MemoryRed + const MemoryUnknown + const MemoryYellow + func MemoryFever(headroom, limit uint64, yellowAt, redAt float64) MemoryFeverZone + func (z MemoryFeverZone) String() string + type MemoryRopeHandle struct + func MemoryRope(pipeline *Pipeline, drum string, limits *LimitManager, budgetFraction float64, ...) *MemoryRopeHandle + func (h *MemoryRopeHandle) Callback() func(context.Context, memctl.MemInfo) + func (h *MemoryRopeHandle) Stats() MemoryRopeStats + type MemoryRopeStats struct + Adjustments int64 + Applied int64 + Budget int64 + Headroom int64 + Weight int64 + type Merge struct + func NewMerge[T any](ctx context.Context, sources ...<-chan rslt.Result[T]) *Merge[T] + func (m *Merge[T]) Out() <-chan rslt.Result[T] + func (m *Merge[T]) Stats() MergeStats + func (m *Merge[T]) Wait() error + type MergeStats struct + Dropped int64 + Forwarded int64 + Received int64 + SourceDropped []int64 + SourceForwarded []int64 + SourceReceived []int64 + type MissingResultError struct + Source string + func (e *MissingResultError) Error() string + type ObservationBatch struct + Observations []core.StageObservation + PipelineID string + TimestampUnixNano int64 + WindowDurationNano int64 + type ObservationHandler func(ctx context.Context, batch ObservationBatch) error + type ObservationPublisher interface + PublishObservations func(ctx context.Context, batch ObservationBatch) error + type ObservationSubscriber interface + SubscribeAllObservations func(ctx context.Context, fn ObservationHandler) (Subscription, error) + SubscribeObservations func(ctx context.Context, pipelineID string, fn ObservationHandler) (Subscription, error) + type Observer struct + func NewObserver(pipelineID string) *Observer + func (o *Observer) AddStage(s ObserverStage) + func (o *Observer) Run(ctx context.Context, interval time.Duration, fn func(PipelineSnapshot)) + func (o *Observer) RunWithTicker(ctx context.Context, ticks <-chan time.Time, fn func(PipelineSnapshot)) + func (o *Observer) SetDiagnosis(fn func() *core.Diagnosis) + func (o *Observer) Snapshot() PipelineSnapshot + type ObserverStage struct + Name string + Stats func() Stats + UnitLabel string + type Options struct + Capacity int + ContinueOnError bool + MaxWIP int + MaxWIPWeight int64 + OversizePolicy OversizePolicy + TrackAllocations bool + TrackServiceTimeDist bool + Weight func(T) int64 + Workers int + type OversizePolicy int + const OversizeReject + const OversizeWait + type Pipeline struct + func NewPipeline() *Pipeline + func (p *Pipeline) AddEdge(from, to string) + func (p *Pipeline) AddEdgeWithRatio(from, to string, ratio int) + func (p *Pipeline) AddStage(name string, stats func() Stats) + func (p *Pipeline) AncestorsOf(target string) []string + func (p *Pipeline) DirectPredecessors(name string) []string + func (p *Pipeline) EdgeRatio(from, to string) int + func (p *Pipeline) Freeze() + func (p *Pipeline) HasPath(from, to string) bool + func (p *Pipeline) Heads() []string + func (p *Pipeline) HeadsTo(target string) []string + func (p *Pipeline) Incoming(name string) []IncomingEdge + func (p *Pipeline) StageStats(name string) func() Stats + func (p *Pipeline) Stages() []string + type PipelineSnapshot struct + At time.Time + Diagnosis *core.Diagnosis + GoHeap uint64 + PipelineID string + RSS uint64 + RSSOK bool + Stages []StageSnapshotEntry + type PublishErrorHandler func(err error, batch ObservationBatch) + type Publisher interface + type Reporter struct + func NewReporter(interval time.Duration, opts ...ReporterOption) *Reporter + func (r *Reporter) AddStage(name string, fn func() Stats) + func (r *Reporter) Run(ctx context.Context) + type ReporterOption func(*Reporter) + func WithLogger(l *log.Logger) ReporterOption + type RopeController struct + func NewRopeController(pipeline *Pipeline, drum string, limits *LimitManager, ...) *RopeController + func NewWeightRopeController(pipeline *Pipeline, drum string, limits *LimitManager, ...) *RopeController + func (rc *RopeController) Run(ctx context.Context) + func (rc *RopeController) RunWithTicker(ctx context.Context, ticks <-chan time.Time) + func (rc *RopeController) Stats() RopeStats + type RopeOption func(*RopeController) + func WithInitialRopeLength(n int) RopeOption + func WithRopeLogger(l *log.Logger) RopeOption + func WithRopeSafetyFactor(factor float64) RopeOption + type RopeStats struct + AdjustmentCount int64 + DrumErrorRate float64 + DrumGoodput float64 + HeadAppliedWIP int + RopeLength int + RopeUtilization float64 + RopeWIP int + type ScalingHistory struct + func NewScalingHistory() *ScalingHistory + func (h *ScalingHistory) DiminishingReturns(workers int, threshold float64) bool + func (h *ScalingHistory) Len() int + func (h *ScalingHistory) Record(workers int, throughput float64) + func (h *ScalingHistory) Reset() + func (h *ScalingHistory) ScalingGain(workers int) (float64, bool) + type ServiceTimeSummary struct + Count int64 + Max time.Duration + Mean time.Duration + Min time.Duration + Overflow int64 + P50 time.Duration + P95 time.Duration + P99 time.Duration + StdDev time.Duration + Underflow int64 + type Stage struct + func Pipe[T, R any](ctx context.Context, src <-chan rslt.Result[T], ...) *Stage[T, R] + func Start[T, R any](ctx context.Context, fn func(context.Context, T) (R, error), opts Options[T]) *Stage[T, R] + func (s *Stage[T, R]) ActiveWorkers() int + func (s *Stage[T, R]) Cause() error + func (s *Stage[T, R]) CloseInput() + func (s *Stage[T, R]) DisableMaxWIPWeight() + func (s *Stage[T, R]) DiscardAndCause() error + func (s *Stage[T, R]) DiscardAndWait() error + func (s *Stage[T, R]) Limits() *LimitManager + func (s *Stage[T, R]) MaxWIP() int + func (s *Stage[T, R]) MaxWIPWeight() int64 + func (s *Stage[T, R]) Out() <-chan rslt.Result[R] + func (s *Stage[T, R]) PauseAdmission() + func (s *Stage[T, R]) Paused() bool + func (s *Stage[T, R]) ResumeAdmission() + func (s *Stage[T, R]) SetMaxWIP(n int) int + func (s *Stage[T, R]) SetMaxWIPWeight(n int64) int64 + func (s *Stage[T, R]) SetWorkers(n int) (int, error) + func (s *Stage[T, R]) Stats() Stats + func (s *Stage[T, R]) Submit(ctx context.Context, item T) error + func (s *Stage[T, R]) TargetWorkers() int + func (s *Stage[T, R]) Wait() error + type StageAllocation struct + Limits *LimitManager + Name string + Share float64 + type StageSnapshotEntry struct + Name string + Order int + QueueCapacity int + QueueDepth int64 + State core.StageState + Stats Stats + UnitLabel string + Workers int + type Stats struct + ActiveWorkers int + Admitted int64 + AdmittedWeight int64 + AllocTrackingActive bool + BufferedDepth int64 + Canceled int64 + Completed int64 + Dropped int64 + Failed int64 + Forwarded int64 + IdleTime time.Duration + InFlightWeight int64 + MaxWIP int + MaxWIPWeight int64 + MaxWIPWeightEnabled bool + MaxWaiterCount int + ObservedAllocBytes uint64 + ObservedAllocObjects uint64 + OutputBlockedTime time.Duration + Panicked int64 + Paused bool + QueueCapacity int + Received int64 + ServiceTime time.Duration + ServiceTimeDist ServiceTimeSummary + Submitted int64 + TargetWorkers int + WIPWaitCount int64 + WIPWaitNs int64 + WaiterCount int + type Subscriber interface + type Subscription interface + Close func() error + type Tee struct + func NewTee[T any](ctx context.Context, src <-chan rslt.Result[T], n int) *Tee[T] + func (t *Tee[T]) Branch(i int) <-chan rslt.Result[T] + func (t *Tee[T]) Stats() TeeStats + func (t *Tee[T]) Wait() error + type TeeStats struct + BranchBlockedTime []time.Duration + BranchDelivered []int64 + FullyDelivered int64 + PartiallyDelivered int64 + Received int64 + Undelivered int64 + type WeightedBatcher struct + func NewWeightedBatcher[T any](ctx context.Context, src <-chan rslt.Result[T], threshold int, ...) *WeightedBatcher[T] + func (b *WeightedBatcher[T]) Out() <-chan rslt.Result[[]T] + func (b *WeightedBatcher[T]) Stats() WeightedBatcherStats + func (b *WeightedBatcher[T]) Wait() error + type WeightedBatcherStats struct + BatchCount int64 + BufferedDepth int64 + BufferedWeight int64 + Dropped int64 + Emitted int64 + Forwarded int64 + OutputBlockedTime time.Duration + Received int64 + func (s WeightedBatcherStats) ToStats() Stats