Documentation
¶
Overview ¶
Package bidistream provides an execution pipeline for gRPC bidirectional-streaming RPCs.
It implements a full session lifecycle with a state machine (Init → Open → ClientClosed → Draining → Closed) and supports half-close semantics, per-message ordering, backpressure, and concurrent send/receive.
Architecture ¶
The bidirectional stream pipeline consists of:
- Session: manages the overall stream lifecycle and state machine
- Receiver: handles inbound messages with ordering and admission
- Sender: handles outbound messages with flow control
- Engine: executes admitted messages with retry supervision
Features ¶
- Full state machine with half-close semantics
- Per-key ordering (sequential within key, parallel across keys)
- Backpressure with configurable policies
- Automatic panic recovery per message
- Graceful shutdown with in-flight message draining
- Interceptor adapter for gRPC StreamServerInterceptor
Index ¶
- Variables
- func NoopOrdering() streaming.OrderingPolicy
- type BackpressureController
- type BackpressureOptions
- type BackpressurePolicy
- type Direction
- type Engine
- type FailureInvariant
- type FlowRole
- type FlowState
- type InboundOptions
- type InboundVariant
- type Interceptor
- type MetricsOptions
- type Options
- type OrderingKeyResolver
- type OrderingMode
- type OrderingOptions
- type OrderingResolver
- type OutboundInvariant
- type OutboundOptions
- type Receiver
- type RetryOptions
- type Sender
- type Session
- func (s *Session) AllowInbound() error
- func (s *Session) AllowOutbound() error
- func (s *Session) ClientClose() error
- func (s *Session) Context() context.Context
- func (s *Session) MarkDrained()
- func (s *Session) Open() bool
- func (s *Session) ServerClose() error
- func (s *Session) Shutdown(reason error, terminal bool)
- func (s *Session) ShutdownReason() error
- func (s *Session) State() State
- type ShutdownInvariant
- type ShutdownOptions
- type State
- type StreamState
Constants ¶
This section is empty.
Variables ¶
var ( // ErrSessionClosed is returned when any operation is attempted // after the session has fully terminated. ErrSessionClosed = errors.New("bidistream: session closed") // ErrShutdownInProgress indicates shutdown has begun but // draining may still be happening. ErrShutdownInProgress = errors.New("bidistream: shutdown in progress") )
var ( // ErrIboundCloses means client has half-closed the stream // and no further Recv is allowed ErrInboundClosed = errors.New("bidistream: inbound flow is closed") // ErrIboundAdmissionRejected means ordering or backpressure // rejected the inbound message. ErrInboundAdmissionRejected = errors.New("bidistream: inbound admission rejected") )
var ( // ErrOutboundClosed means server has closed its send side. ErrOutboundClosed = errors.New("bidistream: outbound flow closed") // ErrOutboundBackpressure means send-side backpressure // rejected the message ErrOutboundBackPressure = errors.New("bidistream: outbound backpressure") )
var ( // ErrTerminalFailure indiciates a failure that MUST // terminate the entire session ErrTerminalFailure = errors.New("bidistream: terminal failure") // ErrNonTerminalFailure indicates a failure confined // to one direction. ErrNonTerminalFailure = errors.New("bidistream: non-terminal failure") )
var ( // ErrOrderingBroken inidicates ordering guarantees // can no longer be maintained. // // This is always terminal ErrOrderingBroken = errors.New("bidistream: ordering broken") )
var ( // ErrPanicRecovered indicates a panic was caught inside // the execution path. // // Panics are ALWAYS terminal. ErrPanicRecovered = errors.New("bidistream: panic recovered") )
Functions ¶
func NoopOrdering ¶
func NoopOrdering() streaming.OrderingPolicy
NoopOrdering returns an streaming.OrderingPolicy that always admits without blocking. Use this when constructing a bidistream Engine — ordering is already enforced by Receiver / Sender.
Types ¶
type BackpressureController ¶
type BackpressureOptions ¶
type BackpressureOptions struct {
// Policy defines pressure handling behavior
Policy BackpressurePolicy
// Capacity limits concurrent executions.
// 0 means unlimited.
Capacity int
// QueueCapacity limits waiting messages.
// Only applies when policy = BackpressureQueue
// 0 means unbounded
QueueCapacity int
}
type BackpressurePolicy ¶
type BackpressurePolicy int
const ( // Reject immediately when capacity is exhausted BackpressureHardReject BackpressurePolicy = iota // Queue until capacity becomes available BackpressureQueue // Allow execution even when capacity is exhausted BackpressureSoftAllow // Allow execution and emit pressure signal BackpressureSoftAllowWithSignal )
type Engine ¶
type Engine[T any] struct { // contains filtered or unexported fields }
Engine executes messages for a bidirectional stream.
func NewEngine ¶
func NewEngine[T any]( ordering streaming.OrderingPolicy, handler streaming.Handler[T], ) *Engine[T]
NewEngine constructs a bidistream engine.
ordering controls message admission. In bidistream this is typically a no-op ordering because admission is already handled by Receiver / Sender. Use NoopOrdering for this.
handler processes individual stream messages. Use streaming.Handler to define one.
type FailureInvariant ¶
type FailureInvariant struct{}
type InboundOptions ¶
type InboundOptions struct {
// Ordering controls inbound message ordering.
Ordering OrderingOptions
// Backpressure controls inbound admission under Load
Backpressure BackpressureOptions
// DrainOnClientClose controls whether in-flight inbound
// messages are allowed to finish after client half-close
DrainOnClientClose bool
}
type InboundVariant ¶
type InboundVariant struct{}
type Interceptor ¶
func NewInterceptor ¶
func NewInterceptor[In any, Out any]( session *Session, receiver *Receiver[In], sender *Sender[Out], inKey OrderingKeyResolver[In], outKey OrderingKeyResolver[Out], ) *Interceptor[In, Out]
func (*Interceptor[In, Out]) Handle ¶
func (i *Interceptor[In, Out]) Handle( ctx context.Context, stream grpc.ServerStream, sendQueue <-chan Out, ) error
Handle attaches bidistream logic to a gRPC server stream.
This method blocks until the stream terminates.
type MetricsOptions ¶
type Options ¶
type Options struct {
Inbound InboundOptions
Outbound OutboundOptions
Retry RetryOptions
Shutdown ShutdownOptions
Metrics MetricsOptions
}
Options configure the behavior of a bidirectional stream.
type OrderingMode ¶
type OrderingMode int
const ( // Global FIFO ordering across the entire system. OrderingFIFO OrderingMode = iota // FIFO ordering per key OrderingKeyedFIFO )
type OrderingOptions ¶
type OrderingOptions struct {
// Mode defines the ordering strategy
Mode OrderingMode
// Capacity limits waiting messages.
// 0 means unbounded
Capacity int
}
type OrderingResolver ¶
type OrderingResolver interface {
// Wait blocks (or gates) until ordering allows execution.
// The returned [streaming.ReleaseFunc] MUST be called exactly once.
Wait(ctx context.Context, key string) (streaming.ReleaseFunc, error)
// Close permanently shuts down the ordering domain.
// After Close, all Wait calls MUST fail.
Close()
}
type OutboundInvariant ¶
type OutboundInvariant struct{}
type OutboundOptions ¶
type OutboundOptions struct {
// Ordering controls outbound response ordering.
Ordering OrderingOptions
// Backpressure controls send-side pressure.
Backpressure BackpressureOptions
// FailFast determines whether outbound errors
// immediately terminates the session
FailFast bool
}
type Receiver ¶
type Receiver[T any] struct { // contains filtered or unexported fields }
Receiver handles inbound messages for a bidirectional stream
func NewReceiver ¶
func NewReceiver[T any]( session *Session, engine *Engine[T], ordering OrderingResolver, backpressure BackpressureController, ) *Receiver[T]
NewReceiver creates a an inbound receiver
All dependencies MUST be non-nil
func (*Receiver[T]) Receive ¶
func (r *Receiver[T]) Receive( ctx context.Context, payload T, orderingKey string, seq uint64, ) error
Receive admits and processes a single inbound message
Contract: - MUST NOT be called after inbound close - Admission happens before execution - ReleaseFunc MUST be called exactly once
type RetryOptions ¶
type Sender ¶
type Sender[T any] struct { // contains filtered or unexported fields }
Sender handles outbound messages for a bidirectional stream.
func NewSender ¶
func NewSender[T any]( session *Session, engine *Engine[T], ordering OrderingResolver, backpressure BackpressureController, ) *Sender[T]
NewSender creates an outbound sender.
All dependencies MUST be non-nil.
func (*Sender[T]) Send ¶
-----------------------------------------------------------
Outbound Send Path
-----------------------------------------------------------
Send admits and processes a single outbound message // // Contract: // - MUST NOT be called after outbound close // - Admission happens before execution // - ReleaseFunc MUST be called exactly once
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session represents one bidirectional stream instance.
func NewSession ¶
NewSession creates a new bidirectional stream session.
/ The returned session is NOT yet open Interceptor is responsible for moving it to StateOpen
func (*Session) AllowInbound ¶
AllowInbound reports whether inbound messages may be accepted
func (*Session) AllowOutbound ¶
AllowOutbound reports whether outbound may be sent
func (*Session) ClientClose ¶
ClientClose signals that the client has half-closed the stream
func (*Session) Context ¶
Context returns the session-scoped context This context is cancelled on terminal shutdown
func (*Session) MarkDrained ¶
func (s *Session) MarkDrained()
MarkDrained transitions draining -> closed Safe to call mutiple times
func (*Session) Open ¶
Open transitions the session into active state. This must be called exactly once
func (*Session) ServerClose ¶
ServerClose signals that the server has decided to stop sending
func (*Session) Shutdown ¶
Shutdown inidicates a session-wide shutdown Rules: - May be called multiple times - First caller wins the shutdown reason - Terminal failures force immediate closure
func (*Session) ShutdownReason ¶
ShutdownReason returns the reason for shutdown if any
type ShutdownInvariant ¶
type ShutdownInvariant struct{}
type ShutdownOptions ¶
type ShutdownOptions struct {
// GracePeriod defines how long the session waits
// for in-flight work to complete before force close.
GracePeriod time.Duration
// DrainInboundOnShutdown controls whether inbound
// messages are drained on shutdown.
DrainInboundOnShutdown bool
// DrainOutboundOnShutdown controls whether outbound
// sends are drained on shutdown.
DrainOutboundOnShutdown bool
}
type State ¶
type State int32
State represents the lifecycle phase of a bidirectional stream.
const ( // StateInit indicates the stream object has been constructed // but has not yet started processing messages. StateInit State = iota // StateOpen indicates both client and server may send messages. StateOpen // StateClientClosed indicates the client has sent EOF. // The server may still send responses. StateClientClosed // StateServerClosed indicates the server has decided to stop sending. // The client may still be sending messages. StateServerClosed // StateDraining indicates no new messages are accepted. // In-flight work is allowed to complete. StateDraining // StateClosed indicates the stream is fully terminated. // No further operations are permitted. StateClosed )
type StreamState ¶
type StreamState struct {
// contains filtered or unexported fields
}
StreamState wraps an atomic lifecycle state. It MUST be used by all bidirectional stream components.
func NewStreamState ¶
func NewStreamState() *StreamState
NewStreamState initializes the stream in StateInit.
func (*StreamState) IsClientClosed ¶
func (s *StreamState) IsClientClosed() bool
IsClientClosed reports whether the client has closed its side.
func (*StreamState) IsDraining ¶
func (s *StreamState) IsDraining() bool
IsDraining reports whether the stream is draining in-flight work.
func (*StreamState) IsOpen ¶
func (s *StreamState) IsOpen() bool
IsOpen reports whether the stream is fully open for traffic.
func (*StreamState) IsServerClosed ¶
func (s *StreamState) IsServerClosed() bool
IsServerClosed reports whether the server has closed its side.
func (*StreamState) IsTerminal ¶
func (s *StreamState) IsTerminal() bool
IsTerminal reports whether the stream is fully terminated.
func (*StreamState) Load ¶
func (s *StreamState) Load() State
Load returns the current stream state.
func (*StreamState) Transition ¶
func (s *StreamState) Transition(from, to State) bool
Transition performs an atomic state transition.
Rules: - Current state MUST match `from` - Transition MUST be explicitly allowed - Terminal states are immutable
Returns true if the transition succeeded.