bidistream

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package bidistream provides an execution pipeline for gRPC bidirectional-streaming RPCs.

It implements a full session lifecycle with a state machine (Init → Open → ClientClosed → Draining → Closed) and supports half-close semantics, per-message ordering, backpressure, and concurrent send/receive.

Architecture

The bidirectional stream pipeline consists of:

  • Session: manages the overall stream lifecycle and state machine
  • Receiver: handles inbound messages with ordering and admission
  • Sender: handles outbound messages with flow control
  • Engine: executes admitted messages with retry supervision

Features

  • Full state machine with half-close semantics
  • Per-key ordering (sequential within key, parallel across keys)
  • Backpressure with configurable policies
  • Automatic panic recovery per message
  • Graceful shutdown with in-flight message draining
  • Interceptor adapter for gRPC StreamServerInterceptor

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrSessionClosed is returned when any operation is attempted
	// after the session has fully terminated.
	ErrSessionClosed = errors.New("bidistream: session closed")

	// ErrShutdownInProgress indicates shutdown has begun but
	// draining may still be happening.
	ErrShutdownInProgress = errors.New("bidistream: shutdown in progress")
)
View Source
var (
	// ErrIboundCloses means client has half-closed the stream
	// and no further Recv is allowed
	ErrInboundClosed = errors.New("bidistream: inbound flow is closed")

	// ErrIboundAdmissionRejected means ordering or backpressure
	// rejected the inbound message.
	ErrInboundAdmissionRejected = errors.New("bidistream: inbound admission rejected")
)
View Source
var (
	// ErrOutboundClosed means server has closed its send side.
	ErrOutboundClosed = errors.New("bidistream: outbound flow closed")

	// ErrOutboundBackpressure means send-side backpressure
	// rejected the message
	ErrOutboundBackPressure = errors.New("bidistream: outbound backpressure")
)
View Source
var (
	// ErrTerminalFailure indiciates a failure that MUST
	// terminate the entire session
	ErrTerminalFailure = errors.New("bidistream: terminal failure")

	// ErrNonTerminalFailure indicates a failure confined
	// to one direction.
	ErrNonTerminalFailure = errors.New("bidistream: non-terminal failure")
)
View Source
var (
	// ErrOrderingBroken inidicates ordering guarantees
	// can no longer be maintained.
	//
	// This is always terminal
	ErrOrderingBroken = errors.New("bidistream: ordering broken")
)
View Source
var (
	// ErrPanicRecovered indicates a panic was caught inside
	// the execution path.
	//
	// Panics are ALWAYS terminal.
	ErrPanicRecovered = errors.New("bidistream: panic recovered")
)

Functions

func NoopOrdering

func NoopOrdering() streaming.OrderingPolicy

NoopOrdering returns an streaming.OrderingPolicy that always admits without blocking. Use this when constructing a bidistream Engine — ordering is already enforced by Receiver / Sender.

Types

type BackpressureController

type BackpressureController interface {
	// Acquire attempts to admit one unit of work.
	// On success, returns an opaque token that MUST be released.
	Acquire(ctx context.Context) (any, error)

	// Release releases a previously acquired token.
	Release(token any)
}

type BackpressureOptions

type BackpressureOptions struct {
	// Policy defines pressure handling behavior
	Policy BackpressurePolicy

	// Capacity limits concurrent executions.
	// 0 means unlimited.
	Capacity int

	// QueueCapacity limits waiting messages.
	// Only applies when policy = BackpressureQueue
	// 0 means unbounded
	QueueCapacity int
}

type BackpressurePolicy

type BackpressurePolicy int
const (
	// Reject immediately when capacity is exhausted
	BackpressureHardReject BackpressurePolicy = iota

	// Queue until capacity becomes available
	BackpressureQueue

	// Allow execution even when capacity is exhausted
	BackpressureSoftAllow

	// Allow execution and emit pressure signal
	BackpressureSoftAllowWithSignal
)

type Direction

type Direction int
const (
	Inbound Direction = iota
	Outbound
)

type Engine

type Engine[T any] struct {
	// contains filtered or unexported fields
}

Engine executes messages for a bidirectional stream.

func NewEngine

func NewEngine[T any](
	ordering streaming.OrderingPolicy,
	handler streaming.Handler[T],
) *Engine[T]

NewEngine constructs a bidistream engine.

ordering controls message admission. In bidistream this is typically a no-op ordering because admission is already handled by Receiver / Sender. Use NoopOrdering for this.

handler processes individual stream messages. Use streaming.Handler to define one.

func (*Engine[T]) Close

func (e *Engine[T]) Close()

Close prevents further execution.

func (*Engine[T]) Process

func (e *Engine[T]) Process(
	ctx context.Context,
	msg streaming.Message[T],
) error

Process executes a single admitted message.

Contract: - Ordering and backpressure already enforced upstream - Retry semantics handled internally by stream.Engine

type FailureInvariant

type FailureInvariant struct{}

type FlowRole

type FlowRole string
const (
	RoleInterceptor FlowRole = "interceptor"
	RoleReceiver    FlowRole = "receiver"
	RoleSender      FlowRole = "sender"
	RoleEngine      FlowRole = "engine"
	RoleSupervisor  FlowRole = "supervisor"
)

type FlowState

type FlowState int
const (
	FlowOpen FlowState = iota
	FlowClosing
	FlowClosed
)

type InboundOptions

type InboundOptions struct {
	// Ordering controls inbound message ordering.
	Ordering OrderingOptions

	// Backpressure controls inbound admission under Load
	Backpressure BackpressureOptions

	// DrainOnClientClose controls whether in-flight inbound
	// messages are allowed to finish after client half-close
	DrainOnClientClose bool
}

type InboundVariant

type InboundVariant struct{}

type Interceptor

type Interceptor[In any, Out any] struct {
	// contains filtered or unexported fields
}

func NewInterceptor

func NewInterceptor[In any, Out any](
	session *Session,
	receiver *Receiver[In],
	sender *Sender[Out],
	inKey OrderingKeyResolver[In],
	outKey OrderingKeyResolver[Out],
) *Interceptor[In, Out]

func (*Interceptor[In, Out]) Handle

func (i *Interceptor[In, Out]) Handle(
	ctx context.Context,
	stream grpc.ServerStream,
	sendQueue <-chan Out,
) error

Handle attaches bidistream logic to a gRPC server stream.

This method blocks until the stream terminates.

type MetricsOptions

type MetricsOptions struct {
	// Enabled toggles metrics emission.
	Enabled bool

	// IncludePayloadSize determines whether payload sizes
	// are recorded (may be expensive).
	IncludePayloadSize bool
}

type Options

type Options struct {
	Inbound  InboundOptions
	Outbound OutboundOptions
	Retry    RetryOptions
	Shutdown ShutdownOptions
	Metrics  MetricsOptions
}

Options configure the behavior of a bidirectional stream.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions return safe defaults

type OrderingKeyResolver

type OrderingKeyResolver[T any] func(ctx context.Context, msg T) string

type OrderingMode

type OrderingMode int
const (
	// Global FIFO ordering across the entire system.
	OrderingFIFO OrderingMode = iota

	// FIFO ordering per key
	OrderingKeyedFIFO
)

type OrderingOptions

type OrderingOptions struct {
	// Mode defines the ordering strategy
	Mode OrderingMode

	// Capacity limits waiting messages.
	// 0 means unbounded
	Capacity int
}

type OrderingResolver

type OrderingResolver interface {
	// Wait blocks (or gates) until ordering allows execution.
	// The returned [streaming.ReleaseFunc] MUST be called exactly once.
	Wait(ctx context.Context, key string) (streaming.ReleaseFunc, error)

	// Close permanently shuts down the ordering domain.
	// After Close, all Wait calls MUST fail.
	Close()
}

type OutboundInvariant

type OutboundInvariant struct{}

type OutboundOptions

type OutboundOptions struct {
	// Ordering controls outbound response ordering.
	Ordering OrderingOptions

	// Backpressure controls send-side pressure.
	Backpressure BackpressureOptions

	// FailFast determines whether outbound errors
	// immediately terminates the session
	FailFast bool
}

type Receiver

type Receiver[T any] struct {
	// contains filtered or unexported fields
}

Receiver handles inbound messages for a bidirectional stream

func NewReceiver

func NewReceiver[T any](
	session *Session,
	engine *Engine[T],
	ordering OrderingResolver,
	backpressure BackpressureController,
) *Receiver[T]

NewReceiver creates a an inbound receiver

All dependencies MUST be non-nil

func (*Receiver[T]) Close

func (r *Receiver[T]) Close() error

Close signals that the client has closed its send side.

func (*Receiver[T]) Receive

func (r *Receiver[T]) Receive(
	ctx context.Context,
	payload T,
	orderingKey string,
	seq uint64,
) error

Receive admits and processes a single inbound message

Contract: - MUST NOT be called after inbound close - Admission happens before execution - ReleaseFunc MUST be called exactly once

type RetryOptions

type RetryOptions struct {
	// MaxAttempts defines retry budget per message.
	// 0 disables retries.
	MaxAttempts int

	// Backoff defines delay between retries.
	// Zero means no delay.
	Backoff time.Duration
}

type Sender

type Sender[T any] struct {
	// contains filtered or unexported fields
}

Sender handles outbound messages for a bidirectional stream.

func NewSender

func NewSender[T any](
	session *Session,
	engine *Engine[T],
	ordering OrderingResolver,
	backpressure BackpressureController,
) *Sender[T]

NewSender creates an outbound sender.

All dependencies MUST be non-nil.

func (*Sender[T]) Close

func (s *Sender[T]) Close() error

Close signals that the server has closed its send side.

func (*Sender[T]) Send

func (s *Sender[T]) Send(
	ctx context.Context,
	payload T,
	orderingKey string,
	seq uint64,
) error

-----------------------------------------------------------

Outbound Send Path

-----------------------------------------------------------

Send admits and processes a single outbound message // // Contract: // - MUST NOT be called after outbound close // - Admission happens before execution // - ReleaseFunc MUST be called exactly once

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session represents one bidirectional stream instance.

func NewSession

func NewSession(
	parent context.Context,
	opts Options,
) *Session

NewSession creates a new bidirectional stream session.

/ The returned session is NOT yet open Interceptor is responsible for moving it to StateOpen

func (*Session) AllowInbound

func (s *Session) AllowInbound() error

AllowInbound reports whether inbound messages may be accepted

func (*Session) AllowOutbound

func (s *Session) AllowOutbound() error

AllowOutbound reports whether outbound may be sent

func (*Session) ClientClose

func (s *Session) ClientClose() error

ClientClose signals that the client has half-closed the stream

func (*Session) Context

func (s *Session) Context() context.Context

Context returns the session-scoped context This context is cancelled on terminal shutdown

func (*Session) MarkDrained

func (s *Session) MarkDrained()

MarkDrained transitions draining -> closed Safe to call mutiple times

func (*Session) Open

func (s *Session) Open() bool

Open transitions the session into active state. This must be called exactly once

func (*Session) ServerClose

func (s *Session) ServerClose() error

ServerClose signals that the server has decided to stop sending

func (*Session) Shutdown

func (s *Session) Shutdown(reason error, terminal bool)

Shutdown inidicates a session-wide shutdown Rules: - May be called multiple times - First caller wins the shutdown reason - Terminal failures force immediate closure

func (*Session) ShutdownReason

func (s *Session) ShutdownReason() error

ShutdownReason returns the reason for shutdown if any

func (*Session) State

func (s *Session) State() State

State returns the current stream lifecycle state.

type ShutdownInvariant

type ShutdownInvariant struct{}

type ShutdownOptions

type ShutdownOptions struct {
	// GracePeriod defines how long the session waits
	// for in-flight work to complete before force close.
	GracePeriod time.Duration

	// DrainInboundOnShutdown controls whether inbound
	// messages are drained on shutdown.
	DrainInboundOnShutdown bool

	// DrainOutboundOnShutdown controls whether outbound
	// sends are drained on shutdown.
	DrainOutboundOnShutdown bool
}

type State

type State int32

State represents the lifecycle phase of a bidirectional stream.

const (
	// StateInit indicates the stream object has been constructed
	// but has not yet started processing messages.
	StateInit State = iota

	// StateOpen indicates both client and server may send messages.
	StateOpen

	// StateClientClosed indicates the client has sent EOF.
	// The server may still send responses.
	StateClientClosed

	// StateServerClosed indicates the server has decided to stop sending.
	// The client may still be sending messages.
	StateServerClosed

	// StateDraining indicates no new messages are accepted.
	// In-flight work is allowed to complete.
	StateDraining

	// StateClosed indicates the stream is fully terminated.
	// No further operations are permitted.
	StateClosed
)

type StreamState

type StreamState struct {
	// contains filtered or unexported fields
}

StreamState wraps an atomic lifecycle state. It MUST be used by all bidirectional stream components.

func NewStreamState

func NewStreamState() *StreamState

NewStreamState initializes the stream in StateInit.

func (*StreamState) IsClientClosed

func (s *StreamState) IsClientClosed() bool

IsClientClosed reports whether the client has closed its side.

func (*StreamState) IsDraining

func (s *StreamState) IsDraining() bool

IsDraining reports whether the stream is draining in-flight work.

func (*StreamState) IsOpen

func (s *StreamState) IsOpen() bool

IsOpen reports whether the stream is fully open for traffic.

func (*StreamState) IsServerClosed

func (s *StreamState) IsServerClosed() bool

IsServerClosed reports whether the server has closed its side.

func (*StreamState) IsTerminal

func (s *StreamState) IsTerminal() bool

IsTerminal reports whether the stream is fully terminated.

func (*StreamState) Load

func (s *StreamState) Load() State

Load returns the current stream state.

func (*StreamState) Transition

func (s *StreamState) Transition(from, to State) bool

Transition performs an atomic state transition.

Rules: - Current state MUST match `from` - Transition MUST be explicitly allowed - Terminal states are immutable

Returns true if the transition succeeded.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL