grpc

package module
v0.0.0-...-25d3f34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

gRPC Extension for Forge

Production-ready gRPC server with TLS, observability, and streaming support.

Features

  • Full gRPC Support: Unary, client streaming, server streaming, bidirectional streaming
  • TLS/mTLS: Secure communication with client authentication
  • Health Checks: Built-in gRPC health checking protocol
  • Reflection: Service discovery support
  • Observability: Automatic metrics, logging, and tracing
  • Interceptors: Custom middleware support
  • Configuration: Keepalive, message sizes, concurrency limits
  • Server Statistics: Track RPCs, streams, and performance

Quick Start

package main

import (
    "context"
    "github.com/xraph/forge"
    "github.com/xraph/forge/extensions/grpc"
    pb "your/proto/package"
)

func main() {
    app := forge.NewApp(forge.AppConfig{
        Name: "grpc-service",
    })
    
    // Register gRPC extension
    app.RegisterExtension(grpc.NewExtension(
        grpc.WithAddress(":50051"),
        grpc.WithReflection(true),
        grpc.WithHealthCheck(true),
    ))
    
    // Start app
    app.Start(context.Background())
    
    // Get gRPC server and register service
    grpcServer, _ := forge.Resolve[grpc.GRPC](app.Container(), "grpc")
    pb.RegisterYourServiceServer(grpcServer.GetServer(), &yourServiceImpl{})
    
    app.Run(context.Background(), ":8080")
}

Configuration

YAML Configuration
extensions:
  grpc:
    address: ":50051"
    max_recv_msg_size: 4194304  # 4MB
    max_send_msg_size: 4194304  # 4MB
    max_concurrent_streams: 100
    connection_timeout: 120s
    
    # TLS/mTLS
    enable_tls: true
    tls_cert_file: "server.crt"
    tls_key_file: "server.key"
    tls_ca_file: "ca.crt"
    client_auth: true  # Require client certificates
    
    # Features
    enable_health_check: true
    enable_reflection: true
    enable_metrics: true
    enable_tracing: true
    enable_logging: true
    
    # Keepalive
    keepalive:
      time: 2h
      timeout: 20s
      enforcement_policy: true
      min_time: 5m
      permit_without_stream: false
Programmatic Configuration
grpc.NewExtension(
    grpc.WithAddress(":50051"),
    grpc.WithMaxMessageSize(8 * 1024 * 1024), // 8MB
    grpc.WithMaxConcurrentStreams(100),
    grpc.WithTLS("server.crt", "server.key", "ca.crt"),
    grpc.WithClientAuth(true),
    grpc.WithHealthCheck(true),
    grpc.WithReflection(true),
    grpc.WithMetrics(true),
)

TLS/mTLS Setup

Generate Certificates
# Generate CA
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 365 -key ca.key -out ca.crt

# Generate server certificate
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt

# Generate client certificate (for mTLS)
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 02 -out client.crt
Enable TLS
grpc.NewExtension(
    grpc.WithTLS("server.crt", "server.key", ""),
)
Enable mTLS (Mutual TLS)
grpc.NewExtension(
    grpc.WithTLS("server.crt", "server.key", "ca.crt"),
    grpc.WithClientAuth(true), // Verify client certificates
)

Custom Interceptors

Add Custom Middleware
// Create custom interceptor
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // Check authentication
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }
    
    token := md.Get("authorization")
    if len(token) == 0 {
        return nil, status.Error(codes.Unauthenticated, "missing token")
    }
    
    // Validate token...
    
    return handler(ctx, req)
}

// Register interceptor before starting the server
grpcServer, _ := forge.Resolve[grpc.GRPC](app.Container(), "grpc")
grpcServer.AddUnaryInterceptor(authInterceptor)

Health Checks

The extension automatically registers the gRPC health checking protocol.

Check Health
grpcurl -plaintext localhost:50051 grpc.health.v1.Health/Check
Register Custom Health Checkers
type myHealthChecker struct{}

func (h *myHealthChecker) Check(ctx context.Context) error {
    // Check dependencies (database, cache, etc.)
    return nil
}

grpcServer.RegisterHealthChecker("my-service", &myHealthChecker{})

Metrics

Automatic metrics collection when enable_metrics: true:

  • grpc_unary_started_total - Total unary RPCs started
  • grpc_unary_succeeded_total - Successful unary RPCs
  • grpc_unary_failed_total - Failed unary RPCs (with code label)
  • grpc_unary_duration_seconds - Unary RPC duration histogram
  • grpc_stream_active - Current active streams
  • grpc_stream_duration_seconds - Stream duration histogram
  • grpc_stream_succeeded_total - Successful streams
  • grpc_stream_failed_total - Failed streams (with code label)

Server Statistics

Get runtime statistics:

stats := grpcServer.GetStats()
fmt.Printf("Start Time: %d\n", stats.StartTime)
fmt.Printf("RPCs Started: %d\n", stats.RPCsStarted)
fmt.Printf("RPCs Succeeded: %d\n", stats.RPCsSucceeded)
fmt.Printf("RPCs Failed: %d\n", stats.RPCsFailed)
fmt.Printf("Active Streams: %d\n", stats.ActiveStreams)

Service Discovery

List registered services:

services := grpcServer.GetServices()
for _, service := range services {
    fmt.Printf("Service: %s\n", service.Name)
    for _, method := range service.Methods {
        fmt.Printf("  Method: %s (client=%v, server=%v)\n", 
            method.Name, method.IsClientStream, method.IsServerStream)
    }
}

Testing

func TestGRPCService(t *testing.T) {
    app := forge.NewApp(forge.AppConfig{Name: "test"})
    app.RegisterExtension(grpc.NewExtension(
        grpc.WithAddress("127.0.0.1:0"), // Random port
    ))
    
    app.Start(context.Background())
    defer app.Stop(context.Background())
    
    // Get server and register service
    grpcServer, _ := forge.Resolve[grpc.GRPC](app.Container(), "grpc")
    pb.RegisterTestServiceServer(grpcServer.GetServer(), &testImpl{})
    
    // Create client and test...
}

Best Practices

  1. Always enable TLS in production - Use WithTLS() to secure your gRPC traffic
  2. Use mTLS for service-to-service communication - Enable client authentication with WithClientAuth(true)
  3. Set appropriate message size limits - Use WithMaxRecvMsgSize() and WithMaxSendMsgSize() to prevent memory exhaustion
  4. Configure keepalive to detect dead connections - Adjust keepalive settings based on your network environment
  5. Enable health checks for load balancers - Use WithHealthCheck(true) and register custom health checkers
  6. Use reflection in development only - Disable in production for security
  7. Monitor metrics and set up alerts - Use the automatic metrics to track RPC performance
  8. Implement custom health checkers for dependencies - Check database, cache, and other service health
  9. Add custom interceptors for cross-cutting concerns - Authentication, authorization, rate limiting, etc.
  10. Use context for cancellation and deadlines - Always pass context through your RPC calls

Architecture

The gRPC extension follows Forge's extension pattern:

  1. Configuration: Loads config from ConfigManager with dual-key support
  2. DI Registration: Registers the gRPC server as "grpc" in the container
  3. Lifecycle: Implements Register(), Start(), Stop(), Health() methods
  4. Observability: Integrates with Forge's logging and metrics systems
  5. Interceptors: Chains observability and custom interceptors
  6. TLS: Loads certificates and configures mTLS if enabled
  7. Health Checking: Implements standard gRPC health protocol

Performance

The gRPC extension is designed for high-performance production use:

  • Zero-copy streaming where possible
  • Connection pooling with configurable limits
  • Concurrent stream handling with MaxConcurrentStreams
  • Message size limits to prevent memory exhaustion
  • Keepalive configuration to detect and close dead connections
  • Efficient observability with minimal overhead

Troubleshooting

Connection Refused

Ensure the server is started before registering services:

app.Start(ctx)  // Start first
grpcServer, _ := forge.Resolve[grpc.GRPC](app.Container(), "grpc")
pb.RegisterYourServiceServer(grpcServer.GetServer(), &impl{})  // Register after
TLS Certificate Errors

Verify certificate paths and permissions:

ls -la server.crt server.key ca.crt
openssl verify -CAfile ca.crt server.crt
Health Check Failures

Check if health check is enabled and custom health checkers are registered:

grpc.WithHealthCheck(true)
grpcServer.RegisterHealthChecker("my-service", &checker{})
High Memory Usage

Reduce message size limits and concurrent streams:

grpc.WithMaxRecvMsgSize(1 * 1024 * 1024)  // 1MB
grpc.WithMaxConcurrentStreams(50)

Examples

See the v2/examples/ directory for complete examples:

  • grpc-basic/ - Simple gRPC service
  • grpc-advanced/ - TLS, interceptors, health checks

License

MIT

Documentation

Index

Constants

View Source
const (
	// ServiceKey is the DI key for the gRPC service.
	ServiceKey = "grpc"
)

DI container keys for gRPC extension services.

Variables

View Source
var (
	ErrNotStarted        = errors.New("grpc: server not started")
	ErrAlreadyStarted    = errors.New("grpc: server already started")
	ErrServiceNotFound   = errors.New("grpc: service not found")
	ErrInvalidConfig     = errors.New("grpc: invalid configuration")
	ErrStartFailed       = errors.New("grpc: failed to start server")
	ErrStopFailed        = errors.New("grpc: failed to stop server")
	ErrHealthCheckFailed = errors.New("grpc: health check failed")
)

Common gRPC errors

Functions

func NewExtension

func NewExtension(opts ...ConfigOption) forge.Extension

NewExtension creates a new gRPC extension

func NewExtensionWithConfig

func NewExtensionWithConfig(config Config) forge.Extension

NewExtensionWithConfig creates a new gRPC extension with a complete config

Types

type Config

type Config struct {
	// Server settings
	Address              string          `json:"address" yaml:"address" mapstructure:"address"`
	MaxRecvMsgSize       int             `json:"max_recv_msg_size" yaml:"max_recv_msg_size" mapstructure:"max_recv_msg_size"`
	MaxSendMsgSize       int             `json:"max_send_msg_size" yaml:"max_send_msg_size" mapstructure:"max_send_msg_size"`
	MaxConcurrentStreams uint32          `json:"max_concurrent_streams" yaml:"max_concurrent_streams" mapstructure:"max_concurrent_streams"`
	ConnectionTimeout    time.Duration   `json:"connection_timeout" yaml:"connection_timeout" mapstructure:"connection_timeout"`
	Keepalive            KeepaliveConfig `json:"keepalive" yaml:"keepalive" mapstructure:"keepalive"`

	// TLS/mTLS
	EnableTLS   bool   `json:"enable_tls" yaml:"enable_tls" mapstructure:"enable_tls"`
	TLSCertFile string `json:"tls_cert_file,omitempty" yaml:"tls_cert_file,omitempty" mapstructure:"tls_cert_file"`
	TLSKeyFile  string `json:"tls_key_file,omitempty" yaml:"tls_key_file,omitempty" mapstructure:"tls_key_file"`
	TLSCAFile   string `json:"tls_ca_file,omitempty" yaml:"tls_ca_file,omitempty" mapstructure:"tls_ca_file"`
	ClientAuth  bool   `json:"client_auth" yaml:"client_auth" mapstructure:"client_auth"` // Require client cert

	// Health checking
	EnableHealthCheck bool `json:"enable_health_check" yaml:"enable_health_check" mapstructure:"enable_health_check"`

	// Reflection
	EnableReflection bool `json:"enable_reflection" yaml:"enable_reflection" mapstructure:"enable_reflection"`

	// Observability
	EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics" mapstructure:"enable_metrics"`
	EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing" mapstructure:"enable_tracing"`
	EnableLogging bool `json:"enable_logging" yaml:"enable_logging" mapstructure:"enable_logging"`

	// Config loading flags
	RequireConfig bool `json:"-" yaml:"-" mapstructure:"-"`
}

Config contains configuration for the gRPC extension

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns default gRPC configuration

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration

type ConfigOption

type ConfigOption func(*Config)

ConfigOption is a functional option for Config

func WithAddress

func WithAddress(addr string) ConfigOption

func WithClientAuth

func WithClientAuth(enable bool) ConfigOption

func WithConfig

func WithConfig(config Config) ConfigOption

func WithHealthCheck

func WithHealthCheck(enable bool) ConfigOption

func WithMaxConcurrentStreams

func WithMaxConcurrentStreams(max uint32) ConfigOption

func WithMaxMessageSize

func WithMaxMessageSize(size int) ConfigOption

func WithMetrics

func WithMetrics(enable bool) ConfigOption

func WithReflection

func WithReflection(enable bool) ConfigOption

func WithRequireConfig

func WithRequireConfig(require bool) ConfigOption

func WithTLS

func WithTLS(certFile, keyFile, caFile string) ConfigOption

func WithTracing

func WithTracing(enable bool) ConfigOption

type Extension

type Extension struct {
	*forge.BaseExtension
	// contains filtered or unexported fields
}

Extension implements forge.Extension for gRPC functionality. The extension is now a lightweight facade that loads config and registers services.

func (*Extension) Health

func (e *Extension) Health(ctx context.Context) error

Health checks the extension health. Service health is managed by Vessel through GRPCService.Health().

func (*Extension) Register

func (e *Extension) Register(app forge.App) error

Register registers the gRPC extension with the app. This method now only loads configuration and registers service constructors.

func (*Extension) Start

func (e *Extension) Start(ctx context.Context) error

Start marks the extension as started. The actual server is started by Vessel calling GRPCService.Start().

func (*Extension) Stop

func (e *Extension) Stop(ctx context.Context) error

Stop marks the extension as stopped. The actual server is stopped by Vessel calling GRPCService.Stop().

type GRPC

type GRPC interface {
	// Service registration
	RegisterService(desc *grpc.ServiceDesc, impl interface{}) error

	// Server management
	Start(ctx context.Context, addr string) error
	Stop(ctx context.Context) error
	GracefulStop(ctx context.Context) error

	// Interceptors
	AddUnaryInterceptor(interceptor grpc.UnaryServerInterceptor)
	AddStreamInterceptor(interceptor grpc.StreamServerInterceptor)

	// Health checking
	RegisterHealthChecker(service string, checker HealthChecker)

	// Server info
	GetServer() *grpc.Server
	IsRunning() bool
	GetStats() ServerStats
	GetServices() []ServiceInfo

	// Health
	Ping(ctx context.Context) error
}

GRPC represents a unified gRPC server interface

func NewGRPCServer

func NewGRPCServer(config Config, logger forge.Logger, metrics forge.Metrics) GRPC

NewGRPCServer creates a new gRPC server

type GRPCService

type GRPCService struct {
	// contains filtered or unexported fields
}

GRPCService wraps a GRPC server implementation and provides lifecycle management. It implements vessel's di.Service interface so Vessel can manage its lifecycle.

func NewGRPCService

func NewGRPCService(config Config, logger forge.Logger, metrics forge.Metrics) (*GRPCService, error)

NewGRPCService creates a new gRPC service with the given configuration. This is the constructor that will be registered with the DI container.

func (*GRPCService) AddStreamInterceptor

func (s *GRPCService) AddStreamInterceptor(interceptor grpc.StreamServerInterceptor)

func (*GRPCService) AddUnaryInterceptor

func (s *GRPCService) AddUnaryInterceptor(interceptor grpc.UnaryServerInterceptor)

func (*GRPCService) GetServer

func (s *GRPCService) GetServer() *grpc.Server

func (*GRPCService) GetServices

func (s *GRPCService) GetServices() []ServiceInfo

func (*GRPCService) GetStats

func (s *GRPCService) GetStats() ServerStats

func (*GRPCService) Health

func (s *GRPCService) Health(ctx context.Context) error

Health checks if the gRPC service is healthy.

func (*GRPCService) IsRunning

func (s *GRPCService) IsRunning() bool

func (*GRPCService) Name

func (s *GRPCService) Name() string

Name returns the service name for Vessel's lifecycle management.

func (*GRPCService) Ping

func (s *GRPCService) Ping(ctx context.Context) error

func (*GRPCService) RegisterHealthChecker

func (s *GRPCService) RegisterHealthChecker(service string, checker HealthChecker)

func (*GRPCService) RegisterService

func (s *GRPCService) RegisterService(desc *grpc.ServiceDesc, impl interface{}) error

func (*GRPCService) Server

func (s *GRPCService) Server() GRPC

Server returns the underlying gRPC server implementation.

func (*GRPCService) Start

func (s *GRPCService) Start(ctx context.Context) error

Start starts the gRPC service. This is called automatically by Vessel during container.Start().

func (*GRPCService) Stop

func (s *GRPCService) Stop(ctx context.Context) error

Stop stops the gRPC service gracefully. This is called automatically by Vessel during container.Stop().

type HealthChecker

type HealthChecker interface {
	Check(ctx context.Context) error
}

HealthChecker checks service health

type KeepaliveConfig

type KeepaliveConfig struct {
	Time                time.Duration `json:"time" yaml:"time" mapstructure:"time"`
	Timeout             time.Duration `json:"timeout" yaml:"timeout" mapstructure:"timeout"`
	EnforcementPolicy   bool          `json:"enforcement_policy" yaml:"enforcement_policy" mapstructure:"enforcement_policy"`
	MinTime             time.Duration `json:"min_time" yaml:"min_time" mapstructure:"min_time"`
	PermitWithoutStream bool          `json:"permit_without_stream" yaml:"permit_without_stream" mapstructure:"permit_without_stream"`
}

KeepaliveConfig contains keepalive settings

type MethodInfo

type MethodInfo struct {
	Name           string
	IsClientStream bool
	IsServerStream bool
	InputType      string
	OutputType     string
}

MethodInfo contains method metadata

type ServerStats

type ServerStats struct {
	StartTime        int64
	TotalConnections int64
	ActiveStreams    int64
	RPCsStarted      int64
	RPCsSucceeded    int64
	RPCsFailed       int64
}

ServerStats contains server statistics

type ServiceInfo

type ServiceInfo struct {
	Name        string
	Methods     []MethodInfo
	Description string
}

ServiceInfo contains service metadata

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL