streaming

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: MIT Imports: 2 Imported by: 0

Documentation

Overview

Package streaming provides public types for stream processing.

This package re-exports streaming types (Handler, Message, OrderingPolicy) from internal packages, making them available for users who need to customize stream behavior.

OrderingPolicy options:

  • OrderingNone: Maximum throughput, no ordering
  • OrderingFIFO: Global sequential ordering
  • OrderingKeyedFIFO: Per-key sequential, cross-key parallel (recommended)

Package streaming exposes the public types for GRIP's streaming RPC engine.

These types are used by [clientstream] and [bidistream] packages. Import this package if you need to provide a custom Handler, OrderingPolicy, or work with Message directly.

Quick start:

handler := func(ctx context.Context, msg streaming.Message[*pb.Request]) error {
    // process msg.Payload
    return nil
}

See also: [clientstream], [bidistream].

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrOrderingClosed is returned when ordering is closed (shutdown initiated).
	ErrOrderingClosed = stream.ErrOrderingClosed

	// ErrQueueFull is returned when the ordering queue capacity is exceeded.
	ErrQueueFull = stream.ErrQueueFull

	// ErrInvalidKey is returned when the ordering key is invalid.
	ErrInvalidKey = stream.ErrInvalidKey
)

Errors re-exported for external consumers.

Functions

This section is empty.

Types

type Handler

type Handler[T any] = stream.Handler[T]

Handler processes a single stream message.

Implementations MUST:

  • Respect ctx.Done() for cancellation
  • Be idempotent per attempt when possible
  • Return nil on success, error on failure

type KeyedOrderingPolicy

type KeyedOrderingPolicy = stream.KeyedOrderingPolicy

KeyedOrderingPolicy provides per-key ordering for stream messages.

func NewKeyedFIFOOrdering

func NewKeyedFIFOOrdering(capacity int) KeyedOrderingPolicy

NewKeyedFIFOOrdering creates a per-key FIFO admission ordering policy.

Capacity limits per-key queue depth. 0 means unbounded.

type Message

type Message[T any] = stream.Message[T]

Message represents a single unit of work flowing through the stream engine.

func NewMessage

func NewMessage[T any](ctx context.Context, payload T, seq uint64) Message[T]

NewMessage creates a new stream Message with the given payload and sequence number.

type MetaData

type MetaData = stream.MetaData

MetaData contains engine-controlled metadata for a stream message.

type OrderingPolicy

type OrderingPolicy = stream.OrderingPolicy

OrderingPolicy controls admission ordering for stream messages.

func NewFIFOOrdering

func NewFIFOOrdering(capacity int) OrderingPolicy

NewFIFOOrdering creates a FIFO admission ordering policy.

Capacity limits the number of messages waiting for admission. 0 means unbounded.

type ReleaseFunc

type ReleaseFunc = stream.ReleaseFunc

ReleaseFunc MUST be called after an ordering slot has been consumed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL