backpressure

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package backpressure provides queue-based backpressure control for GRIP streaming pipelines.

It coordinates three components to manage load:

  • Queue: bounded message buffer with configurable capacity
  • Limiter: concurrency limiter for in-flight processing
  • Policy: strategy for handling capacity exhaustion (hard reject, soft allow, drop oldest)

The Controller ties these together and exposes Acquire/Release semantics for stream engines to throttle message processing.

Usage

q := backpressure.NewQueue(1000)
l := backpressure.NewLimiter(100)
ctrl := backpressure.NewController(backpressure.PolicyHardReject, q, l)

if err := ctrl.Acquire(ctx); err != nil {
    // backpressure triggered
}
defer ctrl.Release()

Index

Constants

This section is empty.

Variables

View Source
var ErrQueueClosed = capacity.ErrQueueClosed

ErrQueueClosed is exported for backward compatibility.

Functions

This section is empty.

Types

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

Controller coordinates backpressure decisions using policy, limiter, and queue.

func NewController

func NewController(
	policy Policy,
	queue *Queue,
	limiter *Limiter,
) *Controller

func (*Controller) Acquire

func (c *Controller) Acquire(ctx context.Context) (bool, error)

Acquire decides whether execution may proceed.

Contract: - acquired == true → caller MUST call Release(true) - acquired == false → caller MUST NOT call Release

func (*Controller) Close

func (c *Controller) Close()

Close shuts down the queue and unblocks all waiters.

func (*Controller) Release

func (c *Controller) Release(acquired bool)

Release releases capacity and wakes one queued waiter.

type Decision

type Decision int

Decision represents what to do with an incoming request.

const (
	// Reject immediately
	DecisionReject Decision = iota

	// Allow immediately (consume capacity)
	DecisionAllow

	// Queue for later execution
	DecisionQueue

	// Allow but emit a signal(degraded mode)
	DecisionAllowWithSignal
)

func ResolveDecision

func ResolveDecision(policy Policy, state State) Decision

ResolveDecision decides what to do given policy and state.

Invariants: - DecisionReject MUST NOT execute or enqueue - DecisionAllow MUST consume capacity - DecisionQueue MUST NOT consume capacity - DecisionAllowWithSignal MUST consume capacity - Unknown policy MUST fail closed

type Limiter

type Limiter = capacity.Limiter

Limiter is a type alias for capacity.Limiter. Kept for backward compatibility within backpressure package.

func NewLimiter

func NewLimiter(max int) *Limiter

NewLimiter creates a new backpressure limiter.

type Policy

type Policy int

Policy defines how backpressure behaves under pressure.

const (
	// Reject immediately when capacity is exhausted
	PolicyHardReject Policy = iota

	// Queue when capacity is exhausted.
	PolicyQueue

	// Allow execution even when capacity is exhausted.
	PolicySoftAllow

	// Allow execution but signal pressure to caller.
	PolicySoftAllowWithSignal
)

type Queue

type Queue = capacity.Queue

Queue is a type alias for capacity.Queue. Kept for backward compatibility within backpressure package.

func NewQueue

func NewQueue() *Queue

NewQueue creates a new FIFO backpressure queue.

type State

type State struct {
	// Whether capacity is currently available
	HasCapacity bool

	// Whether queue has space
	QueueAvailable bool
}

State represents the current pressure state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL