Documentation
¶
Overview ¶
Package clientstream provides an execution pipeline for gRPC client-streaming RPCs.
It manages the lifecycle of client-initiated streams where the client sends multiple messages and the server responds once. The pipeline includes gate-based admission, per-message execution, ordering guarantees, and failure classification.
Architecture ¶
The client stream pipeline consists of three components:
- Receiver: accepts messages from the gRPC transport and assigns sequence numbers
- Gate: performs admission control (admit/reject decisions per message)
- Engine: executes admitted messages with retry supervision
Features ¶
- Per-message admission control via Gate
- FIFO and keyed-FIFO ordering guarantees
- Automatic panic recovery per message
- Retry supervision with configurable policies
- Failure classification with gRPC status code mapping
Index ¶
- Constants
- func BreakOrdering(kind FailureKind) bool
- func IsRetryable(kind FailureKind) bool
- func IsTerminal(kind FailureKind) bool
- type BackpressureOptions
- type BackpressurePolicy
- type Engine
- type FailureKind
- type Gate
- type Interceptor
- type Options
- type OrderingKeyFunc
- type OrderingMode
- type OrderingOptions
- type OrderingResolver
- type Receiver
- type RetryOptions
- type ShutdownOptions
Constants ¶
const ( FailureNone = core.FailureNone FailureHandlerError = core.FailureHandlerError FailureContextCancelled = core.FailureContextCancelled FailureBackpressure = core.FailureBackpressure FailureOrdering = core.FailureOrdering FailureShutdown = core.FailureShutdown FailureInternal = core.FailureInternal )
Backward compatibility constants - mapped to core.FailureKind
Variables ¶
This section is empty.
Functions ¶
func BreakOrdering ¶
func BreakOrdering(kind FailureKind) bool
BreakOrdering reports whether ordering guarantees are invalidated Delegates to core.BreakOrdering.
func IsRetryable ¶
func IsRetryable(kind FailureKind) bool
IsRetryable reports whether a failure MAY be retried Delegates to core.IsRetryable.
func IsTerminal ¶
func IsTerminal(kind FailureKind) bool
IsTerminal reports whether failure must terminate stream Delegates to core.IsTerminal.
Types ¶
type BackpressureOptions ¶
type BackpressureOptions struct {
// Policy defines how pressure is handled
// Default: BackpressureQueue
Policy BackpressurePolicy
// Capacity is the max concurrent in-flight executions.
// 0 means unlimited.
Capacity int
// QueueCapacity limits number of waiting message.
// Only applies when policy is BackpressureQueue.
// 0 means unbounded.
QueueCapacity int
}
BackpressureOptions defines behavior under load
type BackpressurePolicy ¶
type BackpressurePolicy int
const ( // Reject immediately when capacity is exhausted. BackpressureHardReject BackpressurePolicy = iota // Queue until capacity available. BackpressureQueue // Allow execution even when capacity exhausted. BackpressureSoftAllow // Allow execution but signal pressure. BackpressureSoftAllowWithSignal )
type Engine ¶
type Engine[T any] struct { // contains filtered or unexported fields }
func NewEngine ¶
NewEngine creates a client-stream execution engine.
handler processes individual stream messages. Use streaming.Handler to define one.
type FailureKind ¶
type FailureKind = core.FailureKind
FailureKind is an alias to core.FailureKind. See core/failure.go for the authoritative definition.
func ClassifyFailure ¶
func ClassifyFailure(err error) FailureKind
ClassifyFailure maps execution error → FailureKind Handles context errors and delegates to core.Classify for others.
type Gate ¶
type Gate struct {
// contains filtered or unexported fields
}
func NewGate ¶
func NewGate( ordering OrderingResolver, backpressure *backpressure.Controller, shutdown core.ShutdownCoordinator, ) *Gate
type Interceptor ¶
type Interceptor[T any] struct { // contains filtered or unexported fields }
func NewInterceptor ¶
func NewInterceptor[T any]( receiver *Receiver[T], orderingKeyFn OrderingKeyFunc[T], ) *Interceptor[T]
NewInterceptor constructs a client-stream interceptor.
orderingKeyFn extracts ordering key from incoming message. Use a constant key ("") for global FIFO ordering.
func (*Interceptor[T]) StreamServerInterceptor ¶
func (i *Interceptor[T]) StreamServerInterceptor() grpc.StreamServerInterceptor
StreamServerInterceptor returns a gRPC StreamServerInterceptor for client-side streaming RPCs.
type Options ¶
type Options struct {
// Ordering controls message processing order.
Ordering OrderingOptions
// Backpressure controls admission under load.
Backpressure BackpressureOptions
// Retry controls per-message retry behavior.
Retry RetryOptions
// Shutdown controls graceful termination.
Shutdown ShutdownOptions
}
Options configure client-side stream behavior. It is immutable once the stream starts.
type OrderingKeyFunc ¶
type OrderingMode ¶
type OrderingMode int
const ( // Global FIFO ordering across entire stream. OrderingFIFO OrderingMode = iota // FIFO ordering per key OrderingKeyedFIFO )
type OrderingOptions ¶
type OrderingOptions struct {
// Mode determines ordering strategy.
// Default: OrderingFIFO
Mode OrderingMode
// capacity limits number of waiting messages.
// 0 means unbounded.
Capacity int
}
OrderingOptions defines how messages are ordered.
type OrderingResolver ¶
type OrderingResolver interface {
Wait(ctx context.Context, key string) (streaming.ReleaseFunc, error)
Close()
}
Gate is the single admission point for client-stream messages.
Responsibilities: - Enforce ordering guarantees - Enforce backpressure - Respect shutdown - Respect context cancellation
Gate does NOT: - Execute handlers - Retry messages - Interpret failures
OrderingResolver abstracts ordering strategy. It supports both global FIFO and keyed FIFO ordering.
Implementations must return a streaming.ReleaseFunc that the caller MUST invoke exactly once after processing completes.
type Receiver ¶
type Receiver[T any] struct { // contains filtered or unexported fields }
type RetryOptions ¶
type RetryOptions struct {
// MaxAttempts defines retry budget per message.
// 0 disables retries.
MaxAttempts int
// Backoff defines delay between retries.
// Zero means no delay.
Backoff time.Duration
}
RetryOptions controls per-message retries.
type ShutdownOptions ¶
type ShutdownOptions struct {
// GracefulPeriod defines how long to wait for in-flight
// messages to complete before force shutdown.
GracePeriod time.Duration
// DrainOnClose waits for ordering queues to drain
// before rejecting new messages.
DrainOnClose bool
}
ShutdownOptions controls graceful termination.