interceptor

package
v1.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2025 License: MIT Imports: 17 Imported by: 0

README

grpcserver/interceptor

A collection of gRPC interceptors for logging, metrics collection, panic recovery, and request ID handling. These interceptors enhance gRPC services with observability and reliability features.

Available Interceptors

Logging Interceptor

Provides comprehensive logging for gRPC calls with configurable options.

Features:

  • Call start and finish logging
  • Slow call detection
  • Custom header logging
  • Method exclusion
  • Custom logger providers
  • Request ID and trace ID integration

Usage:

import "github.com/acronis/go-appkit/grpcserver/interceptor"

// Basic usage
unaryInterceptor := interceptor.LoggingUnaryInterceptor(logger)
streamInterceptor := interceptor.LoggingStreamInterceptor(logger)

// With options
unaryInterceptor := interceptor.LoggingUnaryInterceptor(logger,
    interceptor.WithLoggingCallStart(true),
    interceptor.WithLoggingSlowCallThreshold(2*time.Second),
    interceptor.WithLoggingExcludedMethods("/health/check", "/metrics"),
    interceptor.WithLoggingCallHeaders(map[string]string{
        "x-tenant-id": "tenant_id",
        "x-user-id": "user_id",
    }),
)

Configuration Options:

  • WithLoggingCallStart(bool): Enable/disable call start logging
  • WithLoggingSlowCallThreshold(time.Duration): Set slow call threshold
  • WithLoggingExcludedMethods(...string): Exclude specific methods from logging
  • WithLoggingCallHeaders(map[string]string): Log custom headers
  • WithLoggingUnaryCustomLoggerProvider(func): Custom logger provider for unary calls
  • WithLoggingStreamCustomLoggerProvider(func): Custom logger provider for stream calls
Metrics Interceptor

Collects Prometheus metrics for gRPC calls.

Features:

  • Call duration histograms
  • In-flight call gauges
  • Method and service labels
  • User agent type classification
  • Custom label support

Usage:

// Create metrics collector
metrics := interceptor.NewPrometheusMetrics(
    interceptor.WithPrometheusNamespace("myapp"),
    interceptor.WithPrometheusDurationBuckets([]float64{0.1, 0.5, 1.0, 5.0}),
    interceptor.WithPrometheusConstLabels(prometheus.Labels{"version": "1.0"}),
)

// Register metrics
metrics.MustRegister()

// Create interceptors
unaryInterceptor := interceptor.MetricsUnaryInterceptor(metrics)
streamInterceptor := interceptor.MetricsStreamInterceptor(metrics)

// With user agent type provider
unaryInterceptor := interceptor.MetricsUnaryInterceptor(metrics,
    interceptor.WithMetricsUnaryUserAgentTypeProvider(func(ctx context.Context, info *grpc.UnaryServerInfo) string {
        // Return user agent type based on context
        return "c2c_agent" // or "mobile", "api", etc.
    }),
)

Metrics Collected:

  • grpc_call_duration_seconds: Histogram of call durations
  • grpc_calls_in_flight: Gauge of currently active calls

Labels:

  • grpc_service: Service name
  • grpc_method: Method name
  • grpc_method_type: "unary" or "stream"
  • grpc_code: gRPC status code
  • user_agent_type: Classification of user agent (if provided)
Recovery Interceptor

Recovers from panics and returns proper gRPC errors.

Features:

  • Panic recovery with stack trace logging
  • Configurable stack size
  • Proper gRPC error responses

Usage:

// Basic usage
unaryInterceptor := interceptor.RecoveryUnaryInterceptor()
streamInterceptor := interceptor.RecoveryStreamInterceptor()

// With custom stack size
unaryInterceptor := interceptor.RecoveryUnaryInterceptor(
    interceptor.WithRecoveryStackSize(16384),
)

Configuration Options:

  • WithRecoveryStackSize(int): Set stack trace size for logging (default: 8192)
Request ID Interceptor

Manages request IDs for call tracing and correlation.

Features:

  • Extracts request ID from incoming headers
  • Generates new request IDs if missing
  • Generates internal request IDs
  • Sets response headers
  • Context integration

Usage:

// Basic usage
unaryInterceptor := interceptor.RequestIDUnaryInterceptor()
streamInterceptor := interceptor.RequestIDStreamInterceptor()

// With custom ID generators
unaryInterceptor := interceptor.RequestIDUnaryInterceptor(
    interceptor.WithRequestIDGenerator(func() string {
        return uuid.New().String()
    }),
    interceptor.WithInternalRequestIDGenerator(func() string {
        return fmt.Sprintf("internal-%d", time.Now().UnixNano())
    }),
)

Headers:

  • x-request-id: External request ID (extracted or generated)
  • x-int-request-id: Internal request ID (always generated)

Context Utilities

The package provides utilities for working with context values:

Call Start Time
// Get call start time
startTime := interceptor.GetCallStartTimeFromContext(ctx)

// Set call start time
ctx = interceptor.NewContextWithCallStartTime(ctx, time.Now())
Request IDs
// Get request ID
requestID := interceptor.GetRequestIDFromContext(ctx)

// Get internal request ID
internalRequestID := interceptor.GetInternalRequestIDFromContext(ctx)

// Set request IDs
ctx = interceptor.NewContextWithRequestID(ctx, "req-123")
ctx = interceptor.NewContextWithInternalRequestID(ctx, "int-456")
Trace ID
// Get trace ID (from OpenTelemetry or similar)
traceID := interceptor.GetTraceIDFromContext(ctx)
Logger
// Get logger from context
logger := interceptor.GetLoggerFromContext(ctx)

// Set logger in context
ctx = interceptor.NewContextWithLogger(ctx, logger)
Logging Parameters
// Get logging parameters
params := interceptor.GetLoggingParamsFromContext(ctx)

// Add custom log fields
params.AddFields(log.String("custom", "value"))

// Set logging parameters
ctx = interceptor.NewContextWithLoggingParams(ctx, params)

Stream Utilities

WrappedServerStream

A utility for wrapping gRPC server streams with custom context:

wrappedStream := &interceptor.WrappedServerStream{
    ServerStream: originalStream,
    Ctx:          customContext,
}

Integration Example

Here's how to use multiple interceptors together:

func createGRPCServer(logger log.FieldLogger) *grpc.Server {
    // Create metrics collector
    metrics := interceptor.NewPrometheusMetrics(
        interceptor.WithPrometheusNamespace("myapp"),
    )
    metrics.MustRegister()
    
    // Create interceptor chain
    unaryInterceptors := []grpc.UnaryServerInterceptor{
        // Set call start time first
        func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
            ctx = interceptor.NewContextWithCallStartTime(ctx, time.Now())
            return handler(ctx, req)
        },
        // Request ID handling
        interceptor.RequestIDUnaryInterceptor(),
        // Logging
        interceptor.LoggingUnaryInterceptor(logger,
            interceptor.WithLoggingCallStart(true),
            interceptor.WithLoggingSlowCallThreshold(time.Second),
        ),
        // Recovery
        interceptor.RecoveryUnaryInterceptor(),
        // Metrics
        interceptor.MetricsUnaryInterceptor(metrics),
    }
    
    streamInterceptors := []grpc.StreamServerInterceptor{
        // Similar chain for stream interceptors...
    }
    
    return grpc.NewServer(
        grpc.ChainUnaryInterceptor(unaryInterceptors...),
        grpc.ChainStreamInterceptor(streamInterceptors...),
    )
}

Best Practices

  1. Interceptor Order: Place interceptors in the correct order:

    • Call start time (first)
    • Request ID
    • Logging
    • Recovery
    • Metrics
    • Custom interceptors (last)
  2. Performance: Use method exclusion for high-frequency health checks and metrics endpoints

  3. Security: Be careful with custom header logging to avoid logging sensitive information

  4. Metrics: Use appropriate duration buckets for your use case

  5. Error Handling: Always use the recovery interceptor to prevent panics from crashing the server

  6. Context: Use the provided context utilities for consistent data access across interceptors

Documentation

Overview

Package interceptor provides gRPC interceptors for logging, metrics collection, panic recovery, and request ID handling. These interceptors can be used to enhance gRPC services with observability and reliability features.

Index

Constants

View Source
const (
	// RecoveryDefaultStackSize defines the default size of stack part which will be logged.
	RecoveryDefaultStackSize = 8192
)

Variables

View Source
var DefaultPrometheusDurationBuckets = []float64{0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30, 60, 150, 300, 600}

DefaultPrometheusDurationBuckets is default buckets into which observations of serving gRPC calls are counted.

View Source
var InternalError = status.Error(codes.Internal, "Internal error")

InternalError is the default error returned when a panic is recovered.

Functions

func GetCallStartTimeFromContext

func GetCallStartTimeFromContext(ctx context.Context) time.Time

GetCallStartTimeFromContext extracts request start time from the context.

func GetInternalRequestIDFromContext

func GetInternalRequestIDFromContext(ctx context.Context) string

GetInternalRequestIDFromContext extracts internal request id from the context.

func GetLoggerFromContext

func GetLoggerFromContext(ctx context.Context) log.FieldLogger

GetLoggerFromContext extracts logger from the context.

func GetRequestIDFromContext

func GetRequestIDFromContext(ctx context.Context) string

GetRequestIDFromContext extracts external request id from the context.

func GetTraceIDFromContext

func GetTraceIDFromContext(ctx context.Context) string

GetTraceIDFromContext extracts trace id from the context.

func LoggingStreamInterceptor

func LoggingStreamInterceptor(logger log.FieldLogger, options ...LoggingOption) func(
	srv interface{},
	ss grpc.ServerStream,
	info *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) error

LoggingStreamInterceptor is a gRPC stream interceptor that logs the start and end of each RPC call.

func LoggingUnaryInterceptor

func LoggingUnaryInterceptor(logger log.FieldLogger, options ...LoggingOption) func(
	ctx context.Context,
	req interface{},
	info *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error)

LoggingUnaryInterceptor is a gRPC unary interceptor that logs the start and end of each RPC call.

func MetricsStreamInterceptor

func MetricsStreamInterceptor(
	collector MetricsCollector,
	opts ...MetricsOption,
) grpc.StreamServerInterceptor

MetricsStreamInterceptor is an interceptor that collects metrics for incoming gRPC stream calls.

func MetricsUnaryInterceptor

func MetricsUnaryInterceptor(
	collector MetricsCollector,
	opts ...MetricsOption,
) grpc.UnaryServerInterceptor

MetricsUnaryInterceptor is an interceptor that collects metrics for incoming gRPC calls.

func NewContextWithCallStartTime

func NewContextWithCallStartTime(ctx context.Context, startTime time.Time) context.Context

NewContextWithCallStartTime creates a new context with request start time.

func NewContextWithInternalRequestID

func NewContextWithInternalRequestID(ctx context.Context, internalRequestID string) context.Context

NewContextWithInternalRequestID creates a new context with internal request id.

func NewContextWithLogger

func NewContextWithLogger(ctx context.Context, logger log.FieldLogger) context.Context

NewContextWithLogger creates a new context with logger.

func NewContextWithLoggingParams

func NewContextWithLoggingParams(ctx context.Context, loggingParams *LoggingParams) context.Context

NewContextWithLoggingParams creates a new context with logging params.

func NewContextWithRequestID

func NewContextWithRequestID(ctx context.Context, requestID string) context.Context

NewContextWithRequestID creates a new context with external request id.

func NewContextWithTraceID

func NewContextWithTraceID(ctx context.Context, traceID string) context.Context

NewContextWithTraceID creates a new context with trace id.

func RecoveryStreamInterceptor

func RecoveryStreamInterceptor(options ...RecoveryOption) func(
	srv interface{},
	ss grpc.ServerStream,
	_ *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) error

RecoveryStreamInterceptor is a gRPC stream interceptor that recovers from panics and returns Internal error.

func RecoveryUnaryInterceptor

func RecoveryUnaryInterceptor(options ...RecoveryOption) func(
	ctx context.Context,
	req interface{},
	_ *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error)

RecoveryUnaryInterceptor is a gRPC unary interceptor that recovers from panics and returns Internal error.

func RequestIDStreamInterceptor

func RequestIDStreamInterceptor(options ...RequestIDOption) func(
	srv interface{},
	ss grpc.ServerStream,
	_ *grpc.StreamServerInfo,
	handler grpc.StreamHandler,
) error

RequestIDStreamInterceptor is a gRPC stream interceptor that extracts the request ID from the incoming context metadata and attaches it to the context. If the request ID is missing, a new one is generated.

func RequestIDUnaryInterceptor

func RequestIDUnaryInterceptor(options ...RequestIDOption) func(
	ctx context.Context,
	req interface{},
	_ *grpc.UnaryServerInfo,
	handler grpc.UnaryHandler,
) (interface{}, error)

RequestIDUnaryInterceptor is a gRPC unary interceptor that extracts the request ID from the incoming context metadata and attaches it to the context. If the request ID is missing, a new one is generated.

Types

type CallInfoMetrics

type CallInfoMetrics struct {
	Service       string
	Method        string
	UserAgentType string
}

CallInfoMetrics represents a call info for collecting metrics.

type CallMethodType

type CallMethodType string

CallMethodType represents the type of gRPC method call.

const (
	// CallMethodTypeUnary represents a unary gRPC method call.
	CallMethodTypeUnary CallMethodType = "unary"
	// CallMethodTypeStream represents a streaming gRPC method call.
	CallMethodTypeStream CallMethodType = "stream"
)

type LoggingOption

type LoggingOption func(*loggingOptions)

LoggingOption represents a configuration option for the logging interceptor.

func WithLoggingAddCallInfoToLogger

func WithLoggingAddCallInfoToLogger(addCallInfo bool) LoggingOption

WithLoggingAddCallInfoToLogger adds call information to the logger context.

func WithLoggingCallHeaders

func WithLoggingCallHeaders(headers map[string]string) LoggingOption

WithLoggingCallHeaders specifies custom headers to log from gRPC metadata.

func WithLoggingCallStart

func WithLoggingCallStart(logCallStart bool) LoggingOption

WithLoggingCallStart enables logging of call start events.

func WithLoggingExcludedMethods

func WithLoggingExcludedMethods(methods ...string) LoggingOption

WithLoggingExcludedMethods specifies gRPC methods to exclude from logging.

func WithLoggingSlowCallThreshold

func WithLoggingSlowCallThreshold(threshold time.Duration) LoggingOption

WithLoggingSlowCallThreshold sets the threshold for slow call detection.

func WithLoggingStreamCustomLoggerProvider

func WithLoggingStreamCustomLoggerProvider(provider StreamCustomLoggerProvider) LoggingOption

WithLoggingStreamCustomLoggerProvider sets a custom logger provider function for stream interceptors.

func WithLoggingUnaryCustomLoggerProvider

func WithLoggingUnaryCustomLoggerProvider(provider UnaryCustomLoggerProvider) LoggingOption

WithLoggingUnaryCustomLoggerProvider sets a custom logger provider function.

type LoggingParams

type LoggingParams struct {
	// contains filtered or unexported fields
}

LoggingParams stores parameters for the gRPC logging interceptor that may be modified dynamically by the other underlying interceptors/handlers.

func GetLoggingParamsFromContext

func GetLoggingParamsFromContext(ctx context.Context) *LoggingParams

GetLoggingParamsFromContext extracts logging params from the context.

func (*LoggingParams) AddTimeSlotDurationInMs

func (lp *LoggingParams) AddTimeSlotDurationInMs(name string, dur time.Duration)

AddTimeSlotDurationInMs sets (if new) or adds duration value in milliseconds to the element of the time_slots map

func (*LoggingParams) AddTimeSlotInt

func (lp *LoggingParams) AddTimeSlotInt(name string, dur int64)

AddTimeSlotInt sets (if new) or adds duration value to the element of the time_slots map

func (*LoggingParams) ExtendFields

func (lp *LoggingParams) ExtendFields(fields ...log.Field)

ExtendFields extends list of fields that will be logged by the logging interceptor.

type MetricsCollector

type MetricsCollector interface {
	// IncInFlightCalls increments the counter of in-flight calls.
	IncInFlightCalls(callInfo CallInfoMetrics, methodType CallMethodType)

	// DecInFlightCalls decrements the counter of in-flight calls.
	DecInFlightCalls(callInfo CallInfoMetrics, methodType CallMethodType)

	// ObserveCallFinish observes the duration of the call and the status code.
	ObserveCallFinish(callInfo CallInfoMetrics, methodType CallMethodType, code codes.Code, startTime time.Time)
}

MetricsCollector is an interface for collecting metrics for incoming gRPC calls.

type MetricsOption

type MetricsOption func(*metricsOptions)

MetricsOption is a function type for configuring the metrics interceptor.

func WithMetricsExcludedMethods

func WithMetricsExcludedMethods(methods ...string) MetricsOption

WithMetricsExcludedMethods returns an option that excludes the specified methods from metrics collection.

func WithMetricsStreamUserAgentTypeProvider

func WithMetricsStreamUserAgentTypeProvider(provider StreamUserAgentTypeProvider) MetricsOption

WithMetricsStreamUserAgentTypeProvider sets a user agent type provider for stream interceptors.

func WithMetricsUnaryUserAgentTypeProvider

func WithMetricsUnaryUserAgentTypeProvider(provider UnaryUserAgentTypeProvider) MetricsOption

WithMetricsUnaryUserAgentTypeProvider sets a user agent type provider for unary interceptors.

type PrometheusMetrics

type PrometheusMetrics struct {
	Durations *prometheus.HistogramVec
	InFlight  *prometheus.GaugeVec
}

PrometheusMetrics represents collector of metrics for incoming gRPC calls.

func NewPrometheusMetrics

func NewPrometheusMetrics(opts ...PrometheusOption) *PrometheusMetrics

NewPrometheusMetrics creates a new instance of PrometheusMetrics with the provided options.

func (*PrometheusMetrics) DecInFlightCalls

func (pm *PrometheusMetrics) DecInFlightCalls(callInfo CallInfoMetrics, methodType CallMethodType)

DecInFlightCalls decrements the counter of in-flight calls.

func (*PrometheusMetrics) IncInFlightCalls

func (pm *PrometheusMetrics) IncInFlightCalls(callInfo CallInfoMetrics, methodType CallMethodType)

IncInFlightCalls increments the counter of in-flight calls.

func (*PrometheusMetrics) MustCurryWith

func (pm *PrometheusMetrics) MustCurryWith(labels prometheus.Labels) *PrometheusMetrics

MustCurryWith curries the metrics collector with the provided labels.

func (*PrometheusMetrics) MustRegister

func (pm *PrometheusMetrics) MustRegister()

MustRegister does registration of metrics collector in Prometheus and panics if any error occurs.

func (*PrometheusMetrics) ObserveCallFinish

func (pm *PrometheusMetrics) ObserveCallFinish(
	callInfo CallInfoMetrics, methodType CallMethodType, code codes.Code, startTime time.Time,
)

ObserveCallFinish observes the duration of the call and the status code.

func (*PrometheusMetrics) Unregister

func (pm *PrometheusMetrics) Unregister()

Unregister cancels registration of metrics collector in Prometheus.

type PrometheusOption

type PrometheusOption func(*prometheusOptions)

PrometheusOption is a function type for configuring the metrics collector.

func WithPrometheusConstLabels

func WithPrometheusConstLabels(labels prometheus.Labels) PrometheusOption

WithPrometheusConstLabels sets constant labels that will be applied to all metrics.

func WithPrometheusCurriedLabelNames

func WithPrometheusCurriedLabelNames(labelNames []string) PrometheusOption

WithPrometheusCurriedLabelNames sets label names that will be curried.

func WithPrometheusDurationBuckets

func WithPrometheusDurationBuckets(buckets []float64) PrometheusOption

WithPrometheusDurationBuckets sets the duration buckets for histogram metrics.

func WithPrometheusNamespace

func WithPrometheusNamespace(namespace string) PrometheusOption

WithPrometheusNamespace sets the namespace for metrics.

type RecoveryOption

type RecoveryOption func(*recoveryOptions)

RecoveryOption is a function type for configuring recoveryOptions.

func WithRecoveryStackSize

func WithRecoveryStackSize(size int) RecoveryOption

WithRecoveryStackSize sets the stack size for logging stack traces.

type RequestIDOption

type RequestIDOption func(*requestIDOptions)

RequestIDOption is a function type for configuring requestIDOptions.

func WithInternalRequestIDGenerator

func WithInternalRequestIDGenerator(generator func() string) RequestIDOption

WithInternalRequestIDGenerator sets the function for generating internal request IDs.

func WithRequestIDGenerator

func WithRequestIDGenerator(generator func() string) RequestIDOption

WithRequestIDGenerator sets the function for generating request IDs.

type StreamCustomLoggerProvider

type StreamCustomLoggerProvider func(ctx context.Context, info *grpc.StreamServerInfo) log.FieldLogger

StreamCustomLoggerProvider returns a custom logger or nil based on the gRPC context and stream method info.

type StreamUserAgentTypeProvider

type StreamUserAgentTypeProvider func(ctx context.Context, info *grpc.StreamServerInfo) string

StreamUserAgentTypeProvider returns a user agent type or empty string based on the gRPC context and stream method info.

type UnaryCustomLoggerProvider

type UnaryCustomLoggerProvider func(ctx context.Context, info *grpc.UnaryServerInfo) log.FieldLogger

UnaryCustomLoggerProvider returns a custom logger or nil based on the gRPC context and method info.

type UnaryUserAgentTypeProvider

type UnaryUserAgentTypeProvider func(ctx context.Context, info *grpc.UnaryServerInfo) string

UnaryUserAgentTypeProvider returns a user agent type or empty string based on the gRPC context and method info.

type WrappedServerStream

type WrappedServerStream struct {
	grpc.ServerStream
	Ctx context.Context
}

WrappedServerStream wraps grpc.ServerStream to provide a custom context for the stream.

func (*WrappedServerStream) Context

func (ss *WrappedServerStream) Context() context.Context

Context returns the custom context for the wrapped server stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL