Documentation
¶
Overview ¶
Package serverstream provides an execution pipeline for gRPC server-streaming RPCs.
It wraps streaming handlers with admission control, panic recovery, supervision, metrics, and graceful shutdown — matching the unary pipeline but adapted for server-side streaming where the handler produces multiple responses via a stream.
Quick Start ¶
engine := serverstream.New(
serverstream.WithAdmissionLimit(100),
)
stream := serverstream.NewStream(ctx, grpcStream)
err := engine.Run(ctx, stream, core.AdmissionSoftAllow, handler)
Features ¶
- Admission control with configurable concurrency limits
- Automatic panic recovery per stream
- Supervisor retry policies
- Prometheus and OpenTelemetry metrics
- Graceful shutdown coordination
Index ¶
- func StreamServerInterceptor(engine *Engine, cfg *Config) grpc.StreamServerInterceptor
- type Config
- type Engine
- type Option
- func WithAdmissionLimit(n int) Option
- func WithAdmissionLimiter(l *admission.Limiter) Option
- func WithAdmissionMode(m core.AdmissionMode) Option
- func WithMetricsSink(s metrics.Sink) Option
- func WithName(name string) Option
- func WithShutdown(s core.ShutdownCoordinator) Option
- func WithSupervisorPolicy(p supervisor.SupervisorPolicy) Option
- type Stream
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StreamServerInterceptor ¶
func StreamServerInterceptor( engine *Engine, cfg *Config, ) grpc.StreamServerInterceptor
StreamServerInterceptor adapts a gRPC server-stream handler into the Grip server-stream execution pipeline.
It does not run user code directly. It only wires gRPC to Engine.
Example ¶
===========================================================
Example: gRPC StreamServerInterceptor
===========================================================
Most users integrate Grip via the gRPC interceptor. The engine handles admission, lifecycle, and safety.
cfg := buildConfig( WithAdmissionLimiter(admission.NewLimiter(100)), ) engine := NewEngine(cfg) grpcServer := grpc.NewServer( grpc.StreamInterceptor( StreamServerInterceptor(engine, cfg), ), ) _ = grpcServer
Types ¶
type Config ¶
type Config struct {
AdmissionLimiter *admission.Limiter
AdmissionMode core.AdmissionMode
Shutdown core.ShutdownCoordinator
SupervisorPolicy supervisor.SupervisorPolicy
MetricsSink metrics.Sink
Name string
}
Config controls server-stream execution behavior. It mirrors unary configuration where possible.
func BuildConfig ¶
BuildConfig creates a validated Config from functional options. It starts from sensible defaults and applies options in order.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine wires admission, execution and supervision for server-side streaming handlers.
It mirrors the unary pipeline, except the handler produces multiple responses via a stream.
Example (AdmissionModes) ¶
===========================================================
Example: Admission modes
===========================================================
Admission mode is chosen per execution. This allows different endpoints to use different semantics.
cfg := buildConfig(
WithAdmissionLimiter(admission.NewLimiter(0)),
)
engine := NewEngine(cfg)
stream := NewStream(
context.Background(),
&fakeServerStream{},
)
handler := func(ctx context.Context, s *Stream) error {
// Under soft-allow, handler may still execute
// even if capacity is exhausted.
return s.Send("hello")
}
_ = engine.Run(
context.Background(),
stream,
core.AdmissionSoftAllow,
handler,
)
Example (BasicStream) ¶
===========================================================
Example: Basic server-stream execution
===========================================================
This example shows the simplest usage of the server-stream engine. A handler sends a few messages and exits normally.
cfg := buildConfig(
WithAdmissionLimiter(admission.NewLimiter(10)),
)
engine := NewEngine(cfg)
stream := NewStream(
context.Background(),
&fakeServerStream{},
)
handler := func(ctx context.Context, s *Stream) error {
for i := 0; i < 3; i++ {
if err := s.Send(i); err != nil {
return err
}
}
return nil
}
_ = engine.Run(
context.Background(),
stream,
core.AdmissionSoftAllow,
handler,
)
Example (GracefulShutdown) ¶
===========================================================
Example: Graceful shutdown
===========================================================
Once shutdown begins: - New executions observe shutdown - In-flight handlers receive cancellation - Streams close safely
shutdown := lifecycle.NewShutdown()
cfg := buildConfig(
WithShutdown(shutdown),
)
engine := NewEngine(cfg)
stream := NewStream(
context.Background(),
&fakeServerStream{},
)
handler := func(ctx context.Context, s *Stream) error {
for {
select {
case <-ctx.Done():
log.Println("stream canceled")
return ctx.Err()
default:
_ = s.Send("tick")
time.Sleep(10 * time.Millisecond)
}
}
}
go func() {
time.Sleep(50 * time.Millisecond)
shutdown.BeginShutdown()
}()
_ = engine.Run(
context.Background(),
stream,
core.AdmissionSoftAllow,
handler,
)
Example (PanicSafety) ¶
===========================================================
Example: Panic safety
===========================================================
Panics inside handlers are always recovered. They never escape the engine.
cfg := buildConfig()
engine := NewEngine(cfg)
stream := NewStream(
context.Background(),
&fakeServerStream{},
)
handler := func(ctx context.Context, s *Stream) error {
panic("unexpected bug")
}
_ = engine.Run(
context.Background(),
stream,
core.AdmissionSoftAllow,
handler,
)
func New ¶
New creates a new server stream Engine with functional options. This is the recommended constructor for external consumers.
engine := serverstream.New(
serverstream.WithSupervisorPolicy(policy),
)
func (*Engine) Run ¶
func (e *Engine) Run( ctx context.Context, stream *Stream, mode core.AdmissionMode, handler func(context.Context, *Stream) error, ) error
Run executes a server-stream handler under supervision.
AdmissionMode is per-execution and supplied by the caller (typically the interceptor).
type Option ¶
type Option func(*Config)
func WithAdmissionLimit ¶
WithAdmissionLimit sets the maximum number of concurrent executions.
This is the recommended way to configure admission control. No internal imports required.
serverstream.New(serverstream.WithAdmissionLimit(100)) // max 100 concurrent serverstream.New(serverstream.WithAdmissionLimit(-1)) // unlimited serverstream.New(serverstream.WithAdmissionLimit(0)) // reject all
func WithAdmissionLimiter ¶
WithAdmissionLimiter sets a custom admission limiter.
Prefer WithAdmissionLimit(n) for simple capacity configuration. Use this for advanced cases requiring a pre-built limiter:
import "github.com/abhipray-cpu/grip/admission" serverstream.WithAdmissionLimiter(admission.NewLimiter(100))
func WithAdmissionMode ¶
func WithAdmissionMode(m core.AdmissionMode) Option
func WithMetricsSink ¶
WithMetricsSink sets a custom metrics sink.
The sink must implement the metrics.Sink interface. Use the public packages for built-in implementations:
import "github.com/abhipray-cpu/grip/metrics" import "github.com/abhipray-cpu/grip/metrics/prommetrics" serverstream.WithMetricsSink(metrics.Noop()) serverstream.WithMetricsSink(prommetrics.New(prommetrics.DefaultOptions()))
func WithShutdown ¶
func WithShutdown(s core.ShutdownCoordinator) Option
func WithSupervisorPolicy ¶
func WithSupervisorPolicy(p supervisor.SupervisorPolicy) Option
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream wraps grpc.ServerStream and provides context-aware, shutdown-safe send semantics.
func (*Stream) Close ¶
func (s *Stream) Close()
Close marks the stream as closed. Safe to call multiple times.