grpc

package
v1.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 15 Imported by: 0

README

grpc

Opinionated gRPC server wrapper that implements lifecycle.Runner. Mirrors core/http2's API shape (functional options, fresh-context Shutdown, single registration with app.App).

When to use

Any gRPC-facing endpoint of a service: ConnectRPC handlers, etcd v3 emulation, internal service-to-service RPCs.

Quickstart

package main

import (
    "log"

    coregrpc "github.com/sergeyslonimsky/core/grpc"
    "github.com/sergeyslonimsky/core/app"
    "google.golang.org/grpc"

    pb "myapp/proto"
)

func main() {
    srv := coregrpc.NewServer(coregrpc.Config{Port: "50051"},
        coregrpc.WithRecovery(),
        coregrpc.WithOtel(),
    )
    srv.Mount(func(s *grpc.Server) {
        pb.RegisterMyServiceServer(s, &myImpl{})
    })

    a := app.New()
    a.Add(srv)
    log.Fatal(a.Run())
}

Configuration

type Config struct {
    Port            string        // Default "50051"
    ShutdownTimeout time.Duration // Default 10s
}

Options

Lifecycle / wiring
  • WithListener(net.Listener) — override the default TCP listener (tests, unix sockets, bufconn).
  • WithLogger(*slog.Logger) — used by WithRecovery to log panics. Default: slog.Default().
Server-level escape hatch
  • WithServerOptions(...grpc.ServerOption) — pass any underlying grpc.ServerOption (keepalive, TLS credentials, max message size).
Interceptors and observability
  • WithUnaryInterceptor(...grpc.UnaryServerInterceptor) — append unary interceptors. Multiple calls accumulate; chained in registration order.
  • WithStreamInterceptor(...grpc.StreamServerInterceptor) — same, for streaming RPCs.
  • WithStatsHandler(stats.Handler) — append a stats handler. Multiple calls fan out via internal composite (no override).
  • WithOtel() — convenience: WithStatsHandler(otelgrpc.NewServerHandler()).
  • WithRecovery() — install panic-recovery interceptors for unary + stream that log and return codes.Internal.
  • WithHealthService(lifecycle.Healthchecker) — registers a grpc.health.v1.Health service backed by the given Healthchecker (typically an *app.App). Answers SERVING / NOT_SERVING via Check. Watch is not implemented. Works with Envoy and k8s gRPC health probes.

Observability (WithOtel)

Calls otelgrpc.NewServerHandler() from go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc and registers it via WithStatsHandler. Uses the global tracer/meter providers.

Call otel.Setup first and register the provider with app.App before constructing the gRPC server, or instrumentation will be noop.

WithOtel and WithStatsHandler compose freely — register otelgrpc plus your own custom stats handler (e.g., a ClientRegistry bridge):

srv := coregrpc.NewServer(cfg,
    coregrpc.WithOtel(),
    coregrpc.WithStatsHandler(myClientRegistryStatsHandler),
)

Both handlers receive every event.

Health checks

a := app.New()
a.Add(otelProvider, db, redisClient)

grpcSrv := coregrpc.NewServer(cfg,
    coregrpc.WithRecovery(),
    coregrpc.WithOtel(),
    coregrpc.WithHealthService(a),   // gRPC health.v1 answers SERVING/NOT_SERVING via app.App.Healthcheck
)
a.Add(grpcSrv)

app.App.Healthcheck aggregates every registered component that implements lifecycle.Healthchecker, so this picks up sql/redis/kafka health automatically.

Lifecycle

Implements lifecycle.Runner. Register with app.App.Add(srv). The app handles startup, signal-driven shutdown, and ordering — never call srv.Run/srv.Shutdown from your code outside of tests.

Run is single-use — a second call on the same *Server is a programming error because the underlying grpc.Server cannot be reused after GracefulStop.

Shutdown semantics

Run blocks until ctx is cancelled, then calls Shutdown with a fresh context built from Config.ShutdownTimeout. Shutdown runs grpc.Server.GracefulStop in a goroutine and races it against the ctx — if the context expires before graceful drain completes, it calls grpc.Server.Stop (force close).

Mounting services

srv.Mount(func(s *grpc.Server) {
    pb.RegisterFooServer(s, fooImpl)
    pb.RegisterBarServer(s, barImpl)
})

The callback receives the underlying *grpc.Server so generated RegisterXxxServer functions can attach implementations. Call once during setup before Run.

Extending

There is no Unwrap accessor by design — server-level customisation goes through WithServerOptions. The Mount callback already exposes the underlying *grpc.Server for service registration, which covers every common extension point.

Testing

l, _ := net.Listen("tcp", "127.0.0.1:0")
srv := coregrpc.NewServer(coregrpc.Config{}, coregrpc.WithListener(l))
go srv.Run(ctx)
// ... dial l.Addr().String()

See also

Documentation

Overview

Package grpc provides an opinionated gRPC server wrapper that implements lifecycle.Runner. It standardises functional options, panic recovery, OpenTelemetry stats handler wiring, and a context-bounded graceful shutdown.

The package mirrors the API shape of core/http2 — same option-style, same Run/Shutdown contract, same registration pattern with app.App — so a service that uses both feels uniform.

Typical usage:

srv := grpc.NewServer(grpc.Config{Port: "50051"},
    grpc.WithOtel(),
    grpc.WithRecovery(),
)
srv.Mount(func(s *grpc.Server) {
    pb.RegisterMyServiceServer(s, myImpl)
})

a := app.New()
a.Add(srv)
log.Fatal(a.Run())

Index

Constants

This section is empty.

Variables

View Source
var ErrServerAlreadyStarted = errors.New("grpc: server already started")

ErrServerAlreadyStarted is returned by Server.Run if the server has already been started. A Server instance is single-use: the underlying grpc.Server cannot be reused after GracefulStop.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Port is the TCP port the listener binds to. Default: "50051".
	Port string

	// ShutdownTimeout caps the graceful shutdown phase. After it elapses,
	// the server is force-stopped via grpc.Server.Stop. Default: 10s.
	ShutdownTimeout time.Duration
}

Config holds the server's network and timeout settings. Both fields are optional — zero values are replaced with defaults in NewServer.

type Option added in v1.3.0

type Option func(*options)

Option configures a new Server. Mirrors the functional-options pattern used by http2.Server for symmetric setup.

func WithHealthService added in v1.3.0

func WithHealthService(checker lifecycle.Healthchecker) Option

WithHealthService registers a grpc.health.v1 service on the server that reports SERVING/NOT_SERVING based on the given Healthchecker (typically an *app.App).

Works with Envoy / k8s gRPC health probes. For HTTP-style probes use http2.WithHealthcheckFrom on the HTTP server instead.

func WithListener added in v1.3.0

func WithListener(l net.Listener) Option

WithListener overrides the default TCP listener. Useful for tests (net.Listen on :0, bufconn) or for exposing the server on a unix socket.

Promoted from a method to an option in v2 for consistency with http2.

func WithLogger added in v1.3.0

func WithLogger(l *slog.Logger) Option

WithLogger attaches a *slog.Logger to the server. Used by the recovery interceptor and lifecycle events. Defaults to slog.Default() when omitted.

func WithOtel added in v1.3.0

func WithOtel() Option

WithOtel enables OpenTelemetry instrumentation by registering otelgrpc.NewServerHandler as a stats.Handler. Uses the global tracer and meter providers, so otel.Setup must have run and its Provider must be registered with app.App before NewServer is called.

Equivalent to: WithStatsHandler(otelgrpc.NewServerHandler()).

func WithRecovery added in v1.3.0

func WithRecovery() Option

WithRecovery installs panic-recovery interceptors for both unary and stream RPCs. A panic is logged with stack trace via the configured slog logger and returned to the client as codes.Internal.

Recommended for every production server.

func WithServerOptions added in v1.3.0

func WithServerOptions(opts ...grpc.ServerOption) Option

WithServerOptions passes arbitrary grpc.ServerOption values through. Escape hatch for keepalive, TLS credentials, message size limits, etc.

Composes with the typed With* options — both are applied to the same underlying *grpc.Server.

func WithStatsHandler added in v1.3.0

func WithStatsHandler(h stats.Handler) Option

WithStatsHandler attaches a stats.Handler. Multiple calls accumulate and every handler receives every event, fanned out via an internal composite (see stats.go). This is preferable to passing multiple grpc.StatsHandler options directly, where ordering and override semantics are ambiguous.

func WithStreamInterceptor added in v1.3.0

func WithStreamInterceptor(interceptors ...grpc.StreamServerInterceptor) Option

WithStreamInterceptor appends stream interceptors. Chained in registration order via grpc.ChainStreamInterceptor. See WithUnaryInterceptor for ordering rules.

func WithUnaryInterceptor added in v1.3.0

func WithUnaryInterceptor(interceptors ...grpc.UnaryServerInterceptor) Option

WithUnaryInterceptor appends unary interceptors. Multiple calls accumulate and are chained in registration order via grpc.ChainUnaryInterceptor. This is the gRPC analogue of http2.Server.WithMiddleware.

Built-in interceptors (WithRecovery) are appended AFTER user interceptors so they sit innermost — closest to the handler — and catch panics from user-interceptor work.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server wraps a *grpc.Server with lifecycle methods. Implements lifecycle.Runner so it can be registered with app.App.

func NewServer

func NewServer(config Config, opts ...Option) *Server

NewServer constructs a Server with the given Config and options. Zero values in cfg are replaced with defaults. The returned *Server implements lifecycle.Runner — register it with app.App.

func (*Server) Address

func (s *Server) Address() string

Address returns the server's listen address in ":port" form.

func (*Server) Mount

func (s *Server) Mount(mountFunc func(server *grpc.Server))

Mount registers services on the underlying *grpc.Server. The provided callback receives the *grpc.Server so generated RegisterXxxServer functions can attach implementations.

Mount must be called before Run; calling concurrently with Run is undefined behavior.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

Run blocks until ctx is cancelled or the server fails. On ctx cancellation, Shutdown is called with a fresh ShutdownTimeout context (NOT the already-cancelled ctx).

Implements lifecycle.Runner.

Run must be called at most once per Server instance — the underlying grpc.Server cannot be reused after GracefulStop. A second call returns ErrServerAlreadyStarted.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown gracefully stops the server, falling back to a hard Stop when ctx expires. Implements lifecycle.Resource.

Idempotent: subsequent calls after the first complete return nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL