stream

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// No failures (successful processing)
	FailureNone = core.FailureNone

	// User handler returned an error
	FailureHandlerError = core.FailureHandlerError

	// Context cancelled (client cancel, deadline)
	FailureContextCancelled = core.FailureContextCancelled

	// Backpressure rejected the message
	FailureBackpressure = core.FailureBackpressure

	// Ordering system closed or violated
	FailureOrdering = core.FailureOrdering

	// Stream is shutting down
	FailureShutdown = core.FailureShutdown

	// Internal invariant violated (BUG)
	// This is NEVER retryable and ALWAYS terminal
	FailureInternal = core.FailureInternal
)

Backward compatibility constants - mapped to core.FailureKind

Variables

View Source
var (
	// Returned when stream is no longer accepting message
	ErrStreamClosed = core.ErrStreamClosed

	// Returned when ordering is closed or violated
	ErrBackpressure = core.ErrBackpressure

	// Panic recovered
	ErrPanicRecovered = core.ErrPanicRecovered
)

Optional error helpers - kept for backward compatibility

View Source
var (
	// Returned when ordering is closed(shutdown initiated).
	ErrOrderingClosed = errors.New("stream: ordering closed")

	// Returned when queue capacity is exceeded.
	ErrQueueFull = errors.New("stream: ordering queue full")

	// Invalid key error
	ErrInvalidKey = errors.New("stream: the ordering key is invalid")
)

Ordering controls admission into execution.

Functions

func BreakOrdering

func BreakOrdering(kind FailureKind) bool

BreakOrdering reports whether this failure invalidates ordering guarantees. Delegates to core.BreakOrdering.

func ErrorToGRPCCode

func ErrorToGRPCCode(ctx context.Context, err error) codes.Code

ErrorToGRPCCode converts an execution error to a gRPC status code.

This is the authoritative mapping for all streams. No stream should implement its own error-to-code logic.

Args:

ctx  - execution context (to check cancellation source)
err  - execution error (may be nil)

Returns: gRPC status code (never panics)

func ErrorToGRPCStatus

func ErrorToGRPCStatus(ctx context.Context, err error) *status.Status

ErrorToGRPCStatus converts an execution error to a complete gRPC status.

Convenience wrapper that includes the error message.

func IsAdmissionRejection

func IsAdmissionRejection(err error) bool

IsAdmissionRejection returns true if error is admission rejection.

func IsContextCanceled

func IsContextCanceled(err error) bool

IsContextCanceled returns true if error is context cancellation.

func IsFailure

func IsFailure(kind FailureKind) bool

IsFailure reports whether kind represents a failure. Delegates to core.IsFailure.

func IsRetryable

func IsRetryable(kind FailureKind) bool

IsRetryable reports whether the message MAY be retried without violating correctness. Delegates to core.IsRetryable.

func IsServerShutdown

func IsServerShutdown(ctx context.Context) bool

IsServerShutdown returns true if cancellation source is server shutdown.

func IsTerminal

func IsTerminal(kind FailureKind) bool

IsTermainl reportes whether the FAILURE must terminate the stream. Delegates to core.IsTerminal.

func ValidateStreamConfig

func ValidateStreamConfig(config StreamConfig) error

ValidateStreamConfig is a helper that validates configuration using the StreamConfig Validate() method.

This can be called by stream implementations that use StreamConfig.

Types

type Drainer

type Drainer interface {
	Drain(ctx context.Context) error
}

Drainer allows graceful shutdown waiting.

type Engine

type Engine[T any] struct {
	// contains filtered or unexported fields
}

Engine coordinates ordering admission and supervised execution.

func NewEngine

func NewEngine[T any](
	ordering OrderingPolicy,
	handler Handler[T],
) *Engine[T]

func (*Engine[T]) Close

func (e *Engine[T]) Close()

Close prevents further admissions.

func (*Engine[T]) Process

func (e *Engine[T]) Process(
	ctx context.Context,
	message Message[T],
) error

type FIFOOrdering

type FIFOOrdering struct {
	// contains filtered or unexported fields
}

func NewFIFOOrdering

func NewFIFOOrdering(capacity int) *FIFOOrdering

func (*FIFOOrdering) Close

func (o *FIFOOrdering) Close()

func (*FIFOOrdering) Wait

func (o *FIFOOrdering) Wait(ctx context.Context) (ReleaseFunc, error)

type FailureKind

type FailureKind = core.FailureKind

FailureKind is an alias to core.FailureKind. See core/failure.go for the authoritative definition.

func FailurFromError

func FailurFromError(err error) FailureKind

FailureFromError maps known erros to FailureKind.

type Handler

type Handler[T any] func(ctx context.Context, message Message[T]) error

Handler processes a single message. It MUST: - respect ctx.Done() - be idempotent per attempt if possible - return nil on success, error on failure

type KeyedFIFOOrdering

type KeyedFIFOOrdering struct {
	// contains filtered or unexported fields
}

func NewKeyedFIFOOrdering

func NewKeyedFIFOOrdering(capacity int) *KeyedFIFOOrdering

func (*KeyedFIFOOrdering) Close

func (k *KeyedFIFOOrdering) Close()

func (*KeyedFIFOOrdering) Wait

func (k *KeyedFIFOOrdering) Wait(
	ctx context.Context,
	key string,
) (ReleaseFunc, error)

type KeyedOrderingPolicy

type KeyedOrderingPolicy interface {
	Wait(ctx context.Context, key string) (ReleaseFunc, error)
	Close()
}

KeyedOrderingPolicy provides key-scoped ordering.

type Message

type Message[T any] struct {
	// Payload is the user provided value
	Payload T

	// Ctx is the request-scoped context
	// Used for cancellation, deadlines, tracing
	Ctx context.Context

	// Meta contains engine-controlled metadata
	Meta MetaData
}

Message represents a single unit of work flowing through the stream engine.

func NewMessage

func NewMessage[T any](
	ctx context.Context,
	payload T,
	seq uint64,
) Message[T]

func (Message[T]) CanRetry

func (m Message[T]) CanRetry() bool

CanRetry reports whether the message is allowed another attempt.

func (Message[T]) NextAttempt

func (m Message[T]) NextAttempt() Message[T]

NextAttempt returns a copy of message incremented to the next retry attempt.

func (Message[T]) WithMaxAttempts

func (m Message[T]) WithMaxAttempts(n int) Message[T]

WithMaxAttempts returns a copy of message with retry budget attached.

func (Message[T]) WithOrderingKey

func (m Message[T]) WithOrderingKey(key string) Message[T]

WithOrderingKey returns a copy of message with ordering key attached.

type MetaData

type MetaData struct {
	// Attempt is the current processing attempt.
	// Starts at 0 for the first delivery.
	Attempt int

	// MaxAttempts defines retry budget.
	// 0 means retries disabled.
	MaxAttempts int

	// OrderingKey is used for keyed ordering.
	// Empty string means unkeyed (global ordering).
	OrderingKey string

	// Sequence is a monotonically increasing
	// stream-wide identifier.
	// Used for debugging, tracing and ordering audits.
	Sequence uint64
}

MetaData contains all non-payload information required for correct stream behavior.

type OrderingPolicy

type OrderingPolicy interface {
	// Wait blocks until execution is allowed or rejected.
	Wait(ctx context.Context) (ReleaseFunc, error)

	// Close prevents new admission and unblocks waiters.
	Close()
}

OrderingPolicy controls admission ordering.

type Pipeline

type Pipeline struct {
	Admission  *admission.Controller
	Executor   *execution.Executor
	Supervisor *supervisor.Supervisor
}

Pipeline represents the complete admission → execution → supervision pipeline.

func NewPipeline

func NewPipeline(cfg *PipelineConfig) *Pipeline

NewPipeline creates a complete pipeline from configuration.

The pipeline follows this initialization order:

  1. Admission Controller (controls capacity)
  2. Executor (wraps admission + shutdown)
  3. Supervisor (wraps executor + hooks)

This ensures proper layering and single responsibility.

func (*Pipeline) Validate

func (p *Pipeline) Validate() error

Validate ensures pipeline configuration is sound.

Validates: - All components initialized successfully - No nil references - Configuration consistency

type PipelineConfig

type PipelineConfig struct {
	AdmissionLimiter *admission.Limiter
	SupervisorPolicy supervisor.SupervisorPolicy
	Shutdown         core.ShutdownCoordinator
	SupervisorHooks  supervisor.Hooks
	MetricsSink      metrics.Sink
}

PipelineConfig holds configuration for pipeline creation.

type ReleaseFunc

type ReleaseFunc func()

ReleaseFunc MUST be called explicitly once to signal that execution has completed.

type StreamConfig

type StreamConfig struct {
	// Pipeline configuration
	AdmissionLimiter *admission.Limiter
	SupervisorPolicy supervisor.SupervisorPolicy
	Shutdown         core.ShutdownCoordinator
	MetricsSink      metrics.Sink

	// Stream-specific parameters
	AdmissionMode   core.AdmissionMode // For ServerStream
	DefaultDeadline int64              // For Unary (nanoseconds)
	Name            string             // For identification
}

StreamConfig holds unified configuration for all stream types.

This struct consolidates the configuration patterns from: - Unary (Options struct with builder pattern) - ServerStream (Config struct with builder pattern)

It provides a single standard that all streams can use.

func ApplyStreamConfigOptions

func ApplyStreamConfigOptions(opts ...func(*StreamConfig)) StreamConfig

ApplyStreamConfigOptions applies a slice of builder functions to construct a StreamConfig with defaults.

This is a generic helper for streams that use option patterns. Usage:

config := ApplyStreamConfigOptions(
	func(sc *StreamConfig) { sc.Name = "my-stream" },
	func(sc *StreamConfig) { sc.AdmissionMode = core.AdmissionHardReject },
)

func BuildStreamConfigWithValidation

func BuildStreamConfigWithValidation(opts ...func(*StreamConfig)) (StreamConfig, error)

BuildStreamConfigWithValidation applies options and validates.

This is the typical pattern: apply options, then validate. Returns error if validation fails.

func DefaultStreamConfig

func DefaultStreamConfig() StreamConfig

DefaultStreamConfig creates a StreamConfig with sensible defaults.

func MustBuildStreamConfig

func MustBuildStreamConfig(opts ...func(*StreamConfig)) StreamConfig

MustBuildStreamConfig applies options and validates.

Panics if validation fails. Use this when config errors should be caught during initialization (typical for gRPC servers).

func (*StreamConfig) ToPipelineConfig

func (sc *StreamConfig) ToPipelineConfig(hooks supervisor.Hooks) *PipelineConfig

ToPipelineConfig converts StreamConfig to PipelineConfig.

This is useful for stream implementations that need to create a pipeline using the factory.

func (*StreamConfig) Validate

func (sc *StreamConfig) Validate() error

Validate ensures the configuration is sound.

Validates: - Shutdown coordinator is not nil - AdmissionLimiter is not nil - MetricsSink is not nil

type StreamConfigBuilder

type StreamConfigBuilder struct {
	// contains filtered or unexported fields
}

StreamConfigBuilder provides a fluent API for building StreamConfig.

func NewStreamConfigBuilder

func NewStreamConfigBuilder() *StreamConfigBuilder

NewStreamConfigBuilder creates a new builder with defaults.

func (*StreamConfigBuilder) Build

func (b *StreamConfigBuilder) Build() StreamConfig

Build returns the constructed StreamConfig.

func (*StreamConfigBuilder) WithAdmissionLimiter

func (b *StreamConfigBuilder) WithAdmissionLimiter(limiter *admission.Limiter) *StreamConfigBuilder

WithAdmissionLimiter sets the admission limiter.

func (*StreamConfigBuilder) WithAdmissionMode

func (b *StreamConfigBuilder) WithAdmissionMode(mode core.AdmissionMode) *StreamConfigBuilder

WithAdmissionMode sets the admission mode.

func (*StreamConfigBuilder) WithDefaultDeadline

func (b *StreamConfigBuilder) WithDefaultDeadline(deadline int64) *StreamConfigBuilder

WithDefaultDeadline sets the default deadline (in nanoseconds).

func (*StreamConfigBuilder) WithMetricsSink

func (b *StreamConfigBuilder) WithMetricsSink(sink metrics.Sink) *StreamConfigBuilder

WithMetricsSink sets the metrics sink.

func (*StreamConfigBuilder) WithName

func (b *StreamConfigBuilder) WithName(name string) *StreamConfigBuilder

WithName sets the stream name for identification.

func (*StreamConfigBuilder) WithShutdown

WithShutdown sets the shutdown coordinator.

func (*StreamConfigBuilder) WithSupervisorPolicy

func (b *StreamConfigBuilder) WithSupervisorPolicy(policy supervisor.SupervisorPolicy) *StreamConfigBuilder

WithSupervisorPolicy sets the supervisor policy.

type Supervisor

type Supervisor[T any] struct {
	// contains filtered or unexported fields
}

func NewSupervisor

func NewSupervisor[T any](handler Handler[T]) *Supervisor[T]

NewSupervisor constructs a stream supervisor.

func (*Supervisor[T]) Execute

func (s *Supervisor[T]) Execute(
	ctx context.Context,
	message Message[T],
) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL