serve

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultServiceStartupTimeout = 5 * time.Minute

DefaultServiceStartupTimeout is the default timeout for waiting for service startup.

Variables

This section is empty.

Functions

func ClosePreAllocatedListener

func ClosePreAllocatedListener(c *ServerConfig)

ClosePreAllocatedListener closes the pre-allocated listener if it exists.

func DefaultHealthCheckService

func DefaultHealthCheckService() *health.Server

DefaultHealthCheckService returns a health-check service that returns SERVING for all services.

func ListenRetryExecute

func ListenRetryExecute(ctx context.Context, f func() error) error

ListenRetryExecute executes the provided function with retry logic for port binding conflicts. It automatically retries when port conflicts are detected (e.g., "address already in use"), using exponential backoff. Non-port-conflict errors are treated as permanent failures and will not be retried. The retry behavior is controlled by the listenRetry profile.

func NewConcurrencyLimit

func NewConcurrencyLimit(maxConcurrentStreams int) *semaphore.Weighted

NewConcurrencyLimit creates a weighted semaphore for limiting concurrent streams.

func NewRateLimiter

func NewRateLimiter(config *RateLimitConfig) *rate.Limiter

NewRateLimiter creates a rate limiter based on the configuration.

func PreAllocateListener

func PreAllocateListener(tb testing.TB, c *ServerConfig) net.Listener

PreAllocateListener allocates a port and binds ahead of server initialization. It stores the listener object internally for reuse by later listener calls.

func RateLimitInterceptor

func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor

RateLimitInterceptor returns a UnaryServerInterceptor that implements rate limiting using a token bucket algorithm.

func RegisterDynamicTLSUpdater

func RegisterDynamicTLSUpdater(d *TLSProvider, updater *DynamicTLSUpdater)

RegisterDynamicTLSUpdater registers a DynamicTLSUpdater with a GRPCTLSProvider. It uses the similar signature as GRPC server registration to allow common language for all server<->service interfaces.

func Serve

func Serve(ctx context.Context, r Registerer, conf *Config) error

Serve creates servers from the config, registers the service, and serves until the context is done. It starts the main gRPC server and an HTTP server if configured.

func StartAndServe

func StartAndServe(ctx context.Context, service Service, serverConfig ...*Config) error

StartAndServe runs a full lifecycle service: starts the service, waits for it to be ready, then creates and serves gRPC and HTTP server(s).

func StreamConcurrencyInterceptor

func StreamConcurrencyInterceptor(sem *semaphore.Weighted) grpc.StreamServerInterceptor

StreamConcurrencyInterceptor returns a gRPC StreamServerInterceptor that limits the number of concurrently active streaming RPCs using a weighted semaphore.

Types

type Config

type Config struct {
	GRPC ServerConfig `mapstructure:"server"`
	HTTP ServerConfig `mapstructure:"monitoring"`
	// ServiceStartupTimeout is the maximum time to wait for a service
	// to become ready before startup fails.
	ServiceStartupTimeout time.Duration `mapstructure:"startup-timeout"`
}

Config holds serving infrastructure configuration. This is parsed separately from service-specific configuration. The GRPC serves the GRPC service. The HTTP serves the service's e Prometheus metrics and HTTP API.

type DynamicTLSUpdater

type DynamicTLSUpdater struct {
	// contains filtered or unexported fields
}

DynamicTLSUpdater is a TLS CA handler that can be used to update the GRPCTLSProvider.

func (*DynamicTLSUpdater) Load

func (d *DynamicTLSUpdater) Load() [][]byte

Load loads the dynamic certificates.

func (*DynamicTLSUpdater) UpdateClientRootCAs

func (d *DynamicTLSUpdater) UpdateClientRootCAs(certs [][]byte)

UpdateClientRootCAs updates the client root CAs with the given certificates.

type RateLimitConfig

type RateLimitConfig struct {
	// RequestsPerSecond is the maximum number of requests per second allowed.
	// Set to 0 or negative to disable rate limiting.
	RequestsPerSecond int `mapstructure:"requests-per-second"`
	// Burst is the maximum number of requests allowed in a single burst.
	// This allows handling sudden spikes of concurrent requests from multiple clients.
	// Must be greater than 0 and less than or equal to RequestsPerSecond when rate limiting is enabled.
	Burst int `mapstructure:"burst"`
}

RateLimitConfig describes the rate limiting configuration for unary gRPC endpoints.

func (*RateLimitConfig) Validate

func (c *RateLimitConfig) Validate() error

Validate checks that the rate limit configuration is valid.

type Registerer

type Registerer interface {
	RegisterService(Servers)
}

Registerer is for services that register on gRPC and HTTP servers. It is used to register the service gRPC, HTTP, and monitoring services.

type ServerConfig

type ServerConfig struct {
	Endpoint             connection.Endpoint    `mapstructure:"endpoint"`
	TLS                  connection.TLSConfig   `mapstructure:"tls"`
	KeepAlive            *ServerKeepAliveConfig `mapstructure:"keep-alive"`
	RateLimit            RateLimitConfig        `mapstructure:"rate-limit"`
	MaxConcurrentStreams int                    `mapstructure:"max-concurrent-streams" validate:"gte=0"`
	// contains filtered or unexported fields
}

ServerConfig describes the connection parameter for a server.

func (*ServerConfig) Listener

func (c *ServerConfig) Listener(ctx context.Context) (net.Listener, error)

Listener instantiates a net.Listener and updates the config port with the effective port. If the port is predefined, it retries to bind to the port until successful or until the context ends.

type ServerKeepAliveConfig

type ServerKeepAliveConfig struct {
	Params            *ServerKeepAliveParamsConfig            `mapstructure:"params"`
	EnforcementPolicy *ServerKeepAliveEnforcementPolicyConfig `mapstructure:"enforcement-policy"`
}

ServerKeepAliveConfig describes the keep alive parameters.

type ServerKeepAliveEnforcementPolicyConfig

type ServerKeepAliveEnforcementPolicyConfig struct {
	MinTime             time.Duration `mapstructure:"min-time"`
	PermitWithoutStream bool          `mapstructure:"permit-without-stream"`
}

ServerKeepAliveEnforcementPolicyConfig describes the keep alive enforcement policy.

type ServerKeepAliveParamsConfig

type ServerKeepAliveParamsConfig struct {
	MaxConnectionIdle     time.Duration `mapstructure:"max-connection-idle"`
	MaxConnectionAge      time.Duration `mapstructure:"max-connection-age"`
	MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace"`
	Time                  time.Duration `mapstructure:"time"`
	Timeout               time.Duration `mapstructure:"timeout"`
}

ServerKeepAliveParamsConfig describes the keep alive policy.

type Servers

type Servers struct {
	GRPC            *grpc.Server
	HTTP            *http.ServeMux
	GrpcTLSProvider *TLSProvider
	// contains filtered or unexported fields
}

Servers holds the gRPC, and HTTP servers along with their listeners. It provides a unified interface for service registration and lifecycle management.

func NewServers

func NewServers(ctx context.Context, conf *Config) (s Servers, err error)

NewServers creates and initializes server instances from the provided configuration. It sets up gRPC, and HTTP servers along with their listeners. It create the server objects even if we do not deploy it eventually. This allows us to avoid nil checks in the service.RegisterService() method in case the endpoint is empty. IMPORTANT: If error is returned, the caller is responsible for calling StopFunc() on the returned Servers.

func (*Servers) Serve

func (s *Servers) Serve(ctx context.Context, r Registerer) error

Serve registers the service and starts serving on all configured endpoints. It blocks until the context is canceled or an error occurs.

func (*Servers) Stop

func (s *Servers) Stop()

Stop stops the servers.

type Service

type Service interface {
	Registerer
	// Run executes the service until the context is done.
	Run(ctx context.Context) error
	// WaitForReady waits for the service resources to initialize.
	// If the context ended before the service is ready, returns false.
	WaitForReady(ctx context.Context) bool
}

Service is for full lifecycle services that run, signal readiness, and register on gRPC and HTTP servers.

type TLSProvider

type TLSProvider struct {
	// contains filtered or unexported fields
}

TLSProvider holds a dynamically updatable TLS configuration.

The design separates writers (services) from readers (server), ensuring a linear dependency flow:

Service -> DynamicTLSUpdater <- TLSProvider <- Server

func NewTLSProvider

func NewTLSProvider(tlsConfig connection.TLSConfig) (*TLSProvider, error)

NewTLSProvider loads TLS credentials from a TLSConfig.

func (*TLSProvider) GetConfigForClient

func (d *TLSProvider) GetConfigForClient(*tls.ClientHelloInfo) (*tls.Config, error)

GetConfigForClient returns the current TLS config for a new client connection. This is a single atomic pointer load with no allocations, making it safe and efficient to call on every TLS handshake.

func (*TLSProvider) GetServerTLSCredentials

func (d *TLSProvider) GetServerTLSCredentials() *tls.Config

GetServerTLSCredentials returns the TLS credentials for the server.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL