Documentation
¶
Index ¶
- Constants
- Variables
- func BreakOrdering(kind FailureKind) bool
- func ErrorToGRPCCode(ctx context.Context, err error) codes.Code
- func ErrorToGRPCStatus(ctx context.Context, err error) *status.Status
- func IsAdmissionRejection(err error) bool
- func IsContextCanceled(err error) bool
- func IsFailure(kind FailureKind) bool
- func IsRetryable(kind FailureKind) bool
- func IsServerShutdown(ctx context.Context) bool
- func IsTerminal(kind FailureKind) bool
- func ValidateStreamConfig(config StreamConfig) error
- type Drainer
- type Engine
- type FIFOOrdering
- type FailureKind
- type Handler
- type KeyedFIFOOrdering
- type KeyedOrderingPolicy
- type Message
- type MetaData
- type OrderingPolicy
- type Pipeline
- type PipelineConfig
- type ReleaseFunc
- type StreamConfig
- type StreamConfigBuilder
- func (b *StreamConfigBuilder) Build() StreamConfig
- func (b *StreamConfigBuilder) WithAdmissionLimiter(limiter *admission.Limiter) *StreamConfigBuilder
- func (b *StreamConfigBuilder) WithAdmissionMode(mode core.AdmissionMode) *StreamConfigBuilder
- func (b *StreamConfigBuilder) WithDefaultDeadline(deadline int64) *StreamConfigBuilder
- func (b *StreamConfigBuilder) WithMetricsSink(sink metrics.Sink) *StreamConfigBuilder
- func (b *StreamConfigBuilder) WithName(name string) *StreamConfigBuilder
- func (b *StreamConfigBuilder) WithShutdown(shutdown core.ShutdownCoordinator) *StreamConfigBuilder
- func (b *StreamConfigBuilder) WithSupervisorPolicy(policy supervisor.SupervisorPolicy) *StreamConfigBuilder
- type Supervisor
Constants ¶
const ( // No failures (successful processing) FailureNone = core.FailureNone // User handler returned an error FailureHandlerError = core.FailureHandlerError // Context cancelled (client cancel, deadline) FailureContextCancelled = core.FailureContextCancelled // Backpressure rejected the message FailureBackpressure = core.FailureBackpressure // Ordering system closed or violated FailureOrdering = core.FailureOrdering // Stream is shutting down FailureShutdown = core.FailureShutdown // Internal invariant violated (BUG) // This is NEVER retryable and ALWAYS terminal FailureInternal = core.FailureInternal )
Backward compatibility constants - mapped to core.FailureKind
Variables ¶
var ( // Returned when stream is no longer accepting message ErrStreamClosed = core.ErrStreamClosed // Returned when ordering is closed or violated ErrBackpressure = core.ErrBackpressure // Panic recovered ErrPanicRecovered = core.ErrPanicRecovered )
Optional error helpers - kept for backward compatibility
var ( // Returned when ordering is closed(shutdown initiated). ErrOrderingClosed = errors.New("stream: ordering closed") // Returned when queue capacity is exceeded. ErrQueueFull = errors.New("stream: ordering queue full") // Invalid key error ErrInvalidKey = errors.New("stream: the ordering key is invalid") )
Ordering controls admission into execution.
Functions ¶
func BreakOrdering ¶
func BreakOrdering(kind FailureKind) bool
BreakOrdering reports whether this failure invalidates ordering guarantees. Delegates to core.BreakOrdering.
func ErrorToGRPCCode ¶
ErrorToGRPCCode converts an execution error to a gRPC status code.
This is the authoritative mapping for all streams. No stream should implement its own error-to-code logic.
Args:
ctx - execution context (to check cancellation source) err - execution error (may be nil)
Returns: gRPC status code (never panics)
func ErrorToGRPCStatus ¶
ErrorToGRPCStatus converts an execution error to a complete gRPC status.
Convenience wrapper that includes the error message.
func IsAdmissionRejection ¶
IsAdmissionRejection returns true if error is admission rejection.
func IsContextCanceled ¶
IsContextCanceled returns true if error is context cancellation.
func IsFailure ¶
func IsFailure(kind FailureKind) bool
IsFailure reports whether kind represents a failure. Delegates to core.IsFailure.
func IsRetryable ¶
func IsRetryable(kind FailureKind) bool
IsRetryable reports whether the message MAY be retried without violating correctness. Delegates to core.IsRetryable.
func IsServerShutdown ¶
IsServerShutdown returns true if cancellation source is server shutdown.
func IsTerminal ¶
func IsTerminal(kind FailureKind) bool
IsTermainl reportes whether the FAILURE must terminate the stream. Delegates to core.IsTerminal.
func ValidateStreamConfig ¶
func ValidateStreamConfig(config StreamConfig) error
ValidateStreamConfig is a helper that validates configuration using the StreamConfig Validate() method.
This can be called by stream implementations that use StreamConfig.
Types ¶
type Engine ¶
type Engine[T any] struct { // contains filtered or unexported fields }
Engine coordinates ordering admission and supervised execution.
type FIFOOrdering ¶
type FIFOOrdering struct {
// contains filtered or unexported fields
}
func NewFIFOOrdering ¶
func NewFIFOOrdering(capacity int) *FIFOOrdering
func (*FIFOOrdering) Close ¶
func (o *FIFOOrdering) Close()
func (*FIFOOrdering) Wait ¶
func (o *FIFOOrdering) Wait(ctx context.Context) (ReleaseFunc, error)
type FailureKind ¶
type FailureKind = core.FailureKind
FailureKind is an alias to core.FailureKind. See core/failure.go for the authoritative definition.
func FailurFromError ¶
func FailurFromError(err error) FailureKind
FailureFromError maps known erros to FailureKind.
type Handler ¶
Handler processes a single message. It MUST: - respect ctx.Done() - be idempotent per attempt if possible - return nil on success, error on failure
type KeyedFIFOOrdering ¶
type KeyedFIFOOrdering struct {
// contains filtered or unexported fields
}
func NewKeyedFIFOOrdering ¶
func NewKeyedFIFOOrdering(capacity int) *KeyedFIFOOrdering
func (*KeyedFIFOOrdering) Close ¶
func (k *KeyedFIFOOrdering) Close()
func (*KeyedFIFOOrdering) Wait ¶
func (k *KeyedFIFOOrdering) Wait( ctx context.Context, key string, ) (ReleaseFunc, error)
type KeyedOrderingPolicy ¶
type KeyedOrderingPolicy interface {
Wait(ctx context.Context, key string) (ReleaseFunc, error)
Close()
}
KeyedOrderingPolicy provides key-scoped ordering.
type Message ¶
type Message[T any] struct { // Payload is the user provided value Payload T // Ctx is the request-scoped context // Used for cancellation, deadlines, tracing Ctx context.Context // Meta contains engine-controlled metadata Meta MetaData }
Message represents a single unit of work flowing through the stream engine.
func (Message[T]) NextAttempt ¶
NextAttempt returns a copy of message incremented to the next retry attempt.
func (Message[T]) WithMaxAttempts ¶
WithMaxAttempts returns a copy of message with retry budget attached.
func (Message[T]) WithOrderingKey ¶
WithOrderingKey returns a copy of message with ordering key attached.
type MetaData ¶
type MetaData struct {
// Attempt is the current processing attempt.
// Starts at 0 for the first delivery.
Attempt int
// MaxAttempts defines retry budget.
// 0 means retries disabled.
MaxAttempts int
// OrderingKey is used for keyed ordering.
// Empty string means unkeyed (global ordering).
OrderingKey string
// Sequence is a monotonically increasing
// stream-wide identifier.
// Used for debugging, tracing and ordering audits.
Sequence uint64
}
MetaData contains all non-payload information required for correct stream behavior.
type OrderingPolicy ¶
type OrderingPolicy interface {
// Wait blocks until execution is allowed or rejected.
Wait(ctx context.Context) (ReleaseFunc, error)
// Close prevents new admission and unblocks waiters.
Close()
}
OrderingPolicy controls admission ordering.
type Pipeline ¶
type Pipeline struct {
Admission *admission.Controller
Executor *execution.Executor
Supervisor *supervisor.Supervisor
}
Pipeline represents the complete admission → execution → supervision pipeline.
func NewPipeline ¶
func NewPipeline(cfg *PipelineConfig) *Pipeline
NewPipeline creates a complete pipeline from configuration.
The pipeline follows this initialization order:
- Admission Controller (controls capacity)
- Executor (wraps admission + shutdown)
- Supervisor (wraps executor + hooks)
This ensures proper layering and single responsibility.
type PipelineConfig ¶
type PipelineConfig struct {
AdmissionLimiter *admission.Limiter
SupervisorPolicy supervisor.SupervisorPolicy
Shutdown core.ShutdownCoordinator
SupervisorHooks supervisor.Hooks
MetricsSink metrics.Sink
}
PipelineConfig holds configuration for pipeline creation.
type ReleaseFunc ¶
type ReleaseFunc func()
ReleaseFunc MUST be called explicitly once to signal that execution has completed.
type StreamConfig ¶
type StreamConfig struct {
// Pipeline configuration
AdmissionLimiter *admission.Limiter
SupervisorPolicy supervisor.SupervisorPolicy
Shutdown core.ShutdownCoordinator
MetricsSink metrics.Sink
// Stream-specific parameters
AdmissionMode core.AdmissionMode // For ServerStream
DefaultDeadline int64 // For Unary (nanoseconds)
Name string // For identification
}
StreamConfig holds unified configuration for all stream types.
This struct consolidates the configuration patterns from: - Unary (Options struct with builder pattern) - ServerStream (Config struct with builder pattern)
It provides a single standard that all streams can use.
func ApplyStreamConfigOptions ¶
func ApplyStreamConfigOptions(opts ...func(*StreamConfig)) StreamConfig
ApplyStreamConfigOptions applies a slice of builder functions to construct a StreamConfig with defaults.
This is a generic helper for streams that use option patterns. Usage:
config := ApplyStreamConfigOptions(
func(sc *StreamConfig) { sc.Name = "my-stream" },
func(sc *StreamConfig) { sc.AdmissionMode = core.AdmissionHardReject },
)
func BuildStreamConfigWithValidation ¶
func BuildStreamConfigWithValidation(opts ...func(*StreamConfig)) (StreamConfig, error)
BuildStreamConfigWithValidation applies options and validates.
This is the typical pattern: apply options, then validate. Returns error if validation fails.
func DefaultStreamConfig ¶
func DefaultStreamConfig() StreamConfig
DefaultStreamConfig creates a StreamConfig with sensible defaults.
func MustBuildStreamConfig ¶
func MustBuildStreamConfig(opts ...func(*StreamConfig)) StreamConfig
MustBuildStreamConfig applies options and validates.
Panics if validation fails. Use this when config errors should be caught during initialization (typical for gRPC servers).
func (*StreamConfig) ToPipelineConfig ¶
func (sc *StreamConfig) ToPipelineConfig(hooks supervisor.Hooks) *PipelineConfig
ToPipelineConfig converts StreamConfig to PipelineConfig.
This is useful for stream implementations that need to create a pipeline using the factory.
func (*StreamConfig) Validate ¶
func (sc *StreamConfig) Validate() error
Validate ensures the configuration is sound.
Validates: - Shutdown coordinator is not nil - AdmissionLimiter is not nil - MetricsSink is not nil
type StreamConfigBuilder ¶
type StreamConfigBuilder struct {
// contains filtered or unexported fields
}
StreamConfigBuilder provides a fluent API for building StreamConfig.
func NewStreamConfigBuilder ¶
func NewStreamConfigBuilder() *StreamConfigBuilder
NewStreamConfigBuilder creates a new builder with defaults.
func (*StreamConfigBuilder) Build ¶
func (b *StreamConfigBuilder) Build() StreamConfig
Build returns the constructed StreamConfig.
func (*StreamConfigBuilder) WithAdmissionLimiter ¶
func (b *StreamConfigBuilder) WithAdmissionLimiter(limiter *admission.Limiter) *StreamConfigBuilder
WithAdmissionLimiter sets the admission limiter.
func (*StreamConfigBuilder) WithAdmissionMode ¶
func (b *StreamConfigBuilder) WithAdmissionMode(mode core.AdmissionMode) *StreamConfigBuilder
WithAdmissionMode sets the admission mode.
func (*StreamConfigBuilder) WithDefaultDeadline ¶
func (b *StreamConfigBuilder) WithDefaultDeadline(deadline int64) *StreamConfigBuilder
WithDefaultDeadline sets the default deadline (in nanoseconds).
func (*StreamConfigBuilder) WithMetricsSink ¶
func (b *StreamConfigBuilder) WithMetricsSink(sink metrics.Sink) *StreamConfigBuilder
WithMetricsSink sets the metrics sink.
func (*StreamConfigBuilder) WithName ¶
func (b *StreamConfigBuilder) WithName(name string) *StreamConfigBuilder
WithName sets the stream name for identification.
func (*StreamConfigBuilder) WithShutdown ¶
func (b *StreamConfigBuilder) WithShutdown(shutdown core.ShutdownCoordinator) *StreamConfigBuilder
WithShutdown sets the shutdown coordinator.
func (*StreamConfigBuilder) WithSupervisorPolicy ¶
func (b *StreamConfigBuilder) WithSupervisorPolicy(policy supervisor.SupervisorPolicy) *StreamConfigBuilder
WithSupervisorPolicy sets the supervisor policy.
type Supervisor ¶
type Supervisor[T any] struct { // contains filtered or unexported fields }
func NewSupervisor ¶
func NewSupervisor[T any](handler Handler[T]) *Supervisor[T]
NewSupervisor constructs a stream supervisor.