clientstream

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package clientstream provides an execution pipeline for gRPC client-streaming RPCs.

It manages the lifecycle of client-initiated streams where the client sends multiple messages and the server responds once. The pipeline includes gate-based admission, per-message execution, ordering guarantees, and failure classification.

Architecture

The client stream pipeline consists of three components:

  • Receiver: accepts messages from the gRPC transport and assigns sequence numbers
  • Gate: performs admission control (admit/reject decisions per message)
  • Engine: executes admitted messages with retry supervision

Features

  • Per-message admission control via Gate
  • FIFO and keyed-FIFO ordering guarantees
  • Automatic panic recovery per message
  • Retry supervision with configurable policies
  • Failure classification with gRPC status code mapping

Index

Constants

View Source
const (
	FailureNone = core.FailureNone

	FailureHandlerError     = core.FailureHandlerError
	FailureContextCancelled = core.FailureContextCancelled
	FailureBackpressure     = core.FailureBackpressure
	FailureOrdering         = core.FailureOrdering
	FailureShutdown         = core.FailureShutdown
	FailureInternal         = core.FailureInternal
)

Backward compatibility constants - mapped to core.FailureKind

Variables

This section is empty.

Functions

func BreakOrdering

func BreakOrdering(kind FailureKind) bool

BreakOrdering reports whether ordering guarantees are invalidated Delegates to core.BreakOrdering.

func IsRetryable

func IsRetryable(kind FailureKind) bool

IsRetryable reports whether a failure MAY be retried Delegates to core.IsRetryable.

func IsTerminal

func IsTerminal(kind FailureKind) bool

IsTerminal reports whether failure must terminate stream Delegates to core.IsTerminal.

Types

type BackpressureOptions

type BackpressureOptions struct {
	// Policy defines how pressure is handled
	// Default: BackpressureQueue
	Policy BackpressurePolicy

	// Capacity is the max concurrent in-flight executions.
	// 0 means unlimited.
	Capacity int

	// QueueCapacity limits number of waiting message.
	// Only applies when policy is BackpressureQueue.
	// 0 means unbounded.
	QueueCapacity int
}

BackpressureOptions defines behavior under load

type BackpressurePolicy

type BackpressurePolicy int
const (
	// Reject immediately when capacity is exhausted.
	BackpressureHardReject BackpressurePolicy = iota

	// Queue until capacity available.
	BackpressureQueue

	// Allow execution even when capacity exhausted.
	BackpressureSoftAllow

	// Allow execution but signal pressure.
	BackpressureSoftAllowWithSignal
)

type Engine

type Engine[T any] struct {
	// contains filtered or unexported fields
}

func NewEngine

func NewEngine[T any](handler streaming.Handler[T]) *Engine[T]

NewEngine creates a client-stream execution engine.

handler processes individual stream messages. Use streaming.Handler to define one.

func (*Engine[T]) Process

func (e *Engine[T]) Process(
	ctx context.Context,
	msg streaming.Message[T],
) error

Process executes a single admitted message.

Contract: - ctx is already admission-approved - message.Sequence already assigned - message.Meta.Attempt starts at 0

type FailureKind

type FailureKind = core.FailureKind

FailureKind is an alias to core.FailureKind. See core/failure.go for the authoritative definition.

func ClassifyFailure

func ClassifyFailure(err error) FailureKind

ClassifyFailure maps execution error → FailureKind Handles context errors and delegates to core.Classify for others.

type Gate

type Gate struct {
	// contains filtered or unexported fields
}

func NewGate

func NewGate(
	ordering OrderingResolver,
	backpressure *backpressure.Controller,
	shutdown core.ShutdownCoordinator,
) *Gate

func (*Gate) Admit

func (g *Gate) Admit(
	ctx context.Context,
	orderingKey string,
) (stream.ReleaseFunc, error)

Admit decides whether a message may enter execution.

Contract: - If error != nil → message MUST NOT execute - If error == nil → caller MUST call returned ReleaseFunc exactly once

type Interceptor

type Interceptor[T any] struct {
	// contains filtered or unexported fields
}

func NewInterceptor

func NewInterceptor[T any](
	receiver *Receiver[T],
	orderingKeyFn OrderingKeyFunc[T],
) *Interceptor[T]

NewInterceptor constructs a client-stream interceptor.

orderingKeyFn extracts ordering key from incoming message. Use a constant key ("") for global FIFO ordering.

func (*Interceptor[T]) StreamServerInterceptor

func (i *Interceptor[T]) StreamServerInterceptor() grpc.StreamServerInterceptor

StreamServerInterceptor returns a gRPC StreamServerInterceptor for client-side streaming RPCs.

type Options

type Options struct {
	// Ordering controls message processing order.
	Ordering OrderingOptions

	// Backpressure controls admission under load.
	Backpressure BackpressureOptions

	// Retry controls per-message retry behavior.
	Retry RetryOptions

	// Shutdown controls graceful termination.
	Shutdown ShutdownOptions
}

Options configure client-side stream behavior. It is immutable once the stream starts.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions return safe defaults.

type OrderingKeyFunc

type OrderingKeyFunc[T any] func(T) string

type OrderingMode

type OrderingMode int
const (
	// Global FIFO ordering across entire stream.
	OrderingFIFO OrderingMode = iota

	// FIFO ordering per key
	OrderingKeyedFIFO
)

type OrderingOptions

type OrderingOptions struct {
	// Mode determines ordering strategy.
	// Default: OrderingFIFO
	Mode OrderingMode

	// capacity limits number of waiting messages.
	// 0 means unbounded.
	Capacity int
}

OrderingOptions defines how messages are ordered.

type OrderingResolver

type OrderingResolver interface {
	Wait(ctx context.Context, key string) (streaming.ReleaseFunc, error)
	Close()
}

Gate is the single admission point for client-stream messages.

Responsibilities: - Enforce ordering guarantees - Enforce backpressure - Respect shutdown - Respect context cancellation

Gate does NOT: - Execute handlers - Retry messages - Interpret failures

OrderingResolver abstracts ordering strategy. It supports both global FIFO and keyed FIFO ordering.

Implementations must return a streaming.ReleaseFunc that the caller MUST invoke exactly once after processing completes.

type Receiver

type Receiver[T any] struct {
	// contains filtered or unexported fields
}

func NewReceiver

func NewReceiver[T any](
	engine *Engine[T],
	gate *Gate,
) *Receiver[T]

func (*Receiver[T]) Close

func (r *Receiver[T]) Close()

Close signals that no further messages will be accepted. Safe to call multiple times.

func (*Receiver[T]) Receive

func (r *Receiver[T]) Receive(
	ctx context.Context,
	payload T,
	orderingKey string,
) error

Receive accepts a single message from the client stream.

type RetryOptions

type RetryOptions struct {
	// MaxAttempts defines retry budget per message.
	// 0 disables retries.
	MaxAttempts int

	// Backoff defines delay between retries.
	// Zero means no delay.
	Backoff time.Duration
}

RetryOptions controls per-message retries.

type ShutdownOptions

type ShutdownOptions struct {
	// GracefulPeriod defines how long to wait for in-flight
	// messages to complete before force shutdown.
	GracePeriod time.Duration

	// DrainOnClose waits for ordering queues to drain
	// before rejecting new messages.
	DrainOnClose bool
}

ShutdownOptions controls graceful termination.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL