Documentation
¶
Index ¶
- Constants
- func ClosePreAllocatedListener(c *ServerConfig)
- func DefaultHealthCheckService() *health.Server
- func ListenRetryExecute(ctx context.Context, f func() error) error
- func NewConcurrencyLimit(maxConcurrentStreams int) *semaphore.Weighted
- func NewRateLimiter(config *RateLimitConfig) *rate.Limiter
- func PreAllocateListener(tb testing.TB, c *ServerConfig) net.Listener
- func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor
- func RegisterDynamicTLSUpdater(d *TLSProvider, updater *DynamicTLSUpdater)
- func Serve(ctx context.Context, r Registerer, conf *Config) error
- func StartAndServe(ctx context.Context, service Service, serverConfig ...*Config) error
- func StreamConcurrencyInterceptor(sem *semaphore.Weighted) grpc.StreamServerInterceptor
- type Config
- type DynamicTLSUpdater
- type RateLimitConfig
- type Registerer
- type ServerConfig
- type ServerKeepAliveConfig
- type ServerKeepAliveEnforcementPolicyConfig
- type ServerKeepAliveParamsConfig
- type Servers
- type Service
- type TLSProvider
Constants ¶
const DefaultServiceStartupTimeout = 5 * time.Minute
DefaultServiceStartupTimeout is the default timeout for waiting for service startup.
Variables ¶
This section is empty.
Functions ¶
func ClosePreAllocatedListener ¶
func ClosePreAllocatedListener(c *ServerConfig)
ClosePreAllocatedListener closes the pre-allocated listener if it exists.
func DefaultHealthCheckService ¶
DefaultHealthCheckService returns a health-check service that returns SERVING for all services.
func ListenRetryExecute ¶
ListenRetryExecute executes the provided function with retry logic for port binding conflicts. It automatically retries when port conflicts are detected (e.g., "address already in use"), using exponential backoff. Non-port-conflict errors are treated as permanent failures and will not be retried. The retry behavior is controlled by the listenRetry profile.
func NewConcurrencyLimit ¶
NewConcurrencyLimit creates a weighted semaphore for limiting concurrent streams.
func NewRateLimiter ¶
func NewRateLimiter(config *RateLimitConfig) *rate.Limiter
NewRateLimiter creates a rate limiter based on the configuration.
func PreAllocateListener ¶
func PreAllocateListener(tb testing.TB, c *ServerConfig) net.Listener
PreAllocateListener allocates a port and binds ahead of server initialization. It stores the listener object internally for reuse by later listener calls.
func RateLimitInterceptor ¶
func RateLimitInterceptor(limiter *rate.Limiter) grpc.UnaryServerInterceptor
RateLimitInterceptor returns a UnaryServerInterceptor that implements rate limiting using a token bucket algorithm.
func RegisterDynamicTLSUpdater ¶
func RegisterDynamicTLSUpdater(d *TLSProvider, updater *DynamicTLSUpdater)
RegisterDynamicTLSUpdater registers a DynamicTLSUpdater with a GRPCTLSProvider. It uses the similar signature as GRPC server registration to allow common language for all server<->service interfaces.
func Serve ¶
func Serve(ctx context.Context, r Registerer, conf *Config) error
Serve creates servers from the config, registers the service, and serves until the context is done. It starts the main gRPC server and an HTTP server if configured.
func StartAndServe ¶
StartAndServe runs a full lifecycle service: starts the service, waits for it to be ready, then creates and serves gRPC and HTTP server(s).
func StreamConcurrencyInterceptor ¶
func StreamConcurrencyInterceptor(sem *semaphore.Weighted) grpc.StreamServerInterceptor
StreamConcurrencyInterceptor returns a gRPC StreamServerInterceptor that limits the number of concurrently active streaming RPCs using a weighted semaphore.
Types ¶
type Config ¶
type Config struct {
GRPC ServerConfig `mapstructure:"server"`
HTTP ServerConfig `mapstructure:"monitoring"`
// ServiceStartupTimeout is the maximum time to wait for a service
// to become ready before startup fails.
ServiceStartupTimeout time.Duration `mapstructure:"startup-timeout"`
}
Config holds serving infrastructure configuration. This is parsed separately from service-specific configuration. The GRPC serves the GRPC service. The HTTP serves the service's e Prometheus metrics and HTTP API.
type DynamicTLSUpdater ¶
type DynamicTLSUpdater struct {
// contains filtered or unexported fields
}
DynamicTLSUpdater is a TLS CA handler that can be used to update the GRPCTLSProvider.
func (*DynamicTLSUpdater) Load ¶
func (d *DynamicTLSUpdater) Load() [][]byte
Load loads the dynamic certificates.
func (*DynamicTLSUpdater) UpdateClientRootCAs ¶
func (d *DynamicTLSUpdater) UpdateClientRootCAs(certs [][]byte)
UpdateClientRootCAs updates the client root CAs with the given certificates.
type RateLimitConfig ¶
type RateLimitConfig struct {
// RequestsPerSecond is the maximum number of requests per second allowed.
// Set to 0 or negative to disable rate limiting.
RequestsPerSecond int `mapstructure:"requests-per-second"`
// Burst is the maximum number of requests allowed in a single burst.
// This allows handling sudden spikes of concurrent requests from multiple clients.
// Must be greater than 0 and less than or equal to RequestsPerSecond when rate limiting is enabled.
Burst int `mapstructure:"burst"`
}
RateLimitConfig describes the rate limiting configuration for unary gRPC endpoints.
func (*RateLimitConfig) Validate ¶
func (c *RateLimitConfig) Validate() error
Validate checks that the rate limit configuration is valid.
type Registerer ¶
type Registerer interface {
RegisterService(Servers)
}
Registerer is for services that register on gRPC and HTTP servers. It is used to register the service gRPC, HTTP, and monitoring services.
type ServerConfig ¶
type ServerConfig struct {
Endpoint connection.Endpoint `mapstructure:"endpoint"`
TLS connection.TLSConfig `mapstructure:"tls"`
KeepAlive *ServerKeepAliveConfig `mapstructure:"keep-alive"`
RateLimit RateLimitConfig `mapstructure:"rate-limit"`
MaxConcurrentStreams int `mapstructure:"max-concurrent-streams" validate:"gte=0"`
// contains filtered or unexported fields
}
ServerConfig describes the connection parameter for a server.
func (*ServerConfig) Listener ¶
Listener instantiates a net.Listener and updates the config port with the effective port. If the port is predefined, it retries to bind to the port until successful or until the context ends.
type ServerKeepAliveConfig ¶
type ServerKeepAliveConfig struct {
Params *ServerKeepAliveParamsConfig `mapstructure:"params"`
EnforcementPolicy *ServerKeepAliveEnforcementPolicyConfig `mapstructure:"enforcement-policy"`
}
ServerKeepAliveConfig describes the keep alive parameters.
type ServerKeepAliveEnforcementPolicyConfig ¶
type ServerKeepAliveEnforcementPolicyConfig struct {
MinTime time.Duration `mapstructure:"min-time"`
PermitWithoutStream bool `mapstructure:"permit-without-stream"`
}
ServerKeepAliveEnforcementPolicyConfig describes the keep alive enforcement policy.
type ServerKeepAliveParamsConfig ¶
type ServerKeepAliveParamsConfig struct {
MaxConnectionIdle time.Duration `mapstructure:"max-connection-idle"`
MaxConnectionAge time.Duration `mapstructure:"max-connection-age"`
MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace"`
Time time.Duration `mapstructure:"time"`
Timeout time.Duration `mapstructure:"timeout"`
}
ServerKeepAliveParamsConfig describes the keep alive policy.
type Servers ¶
type Servers struct {
GRPC *grpc.Server
HTTP *http.ServeMux
GrpcTLSProvider *TLSProvider
// contains filtered or unexported fields
}
Servers holds the gRPC, and HTTP servers along with their listeners. It provides a unified interface for service registration and lifecycle management.
func NewServers ¶
NewServers creates and initializes server instances from the provided configuration. It sets up gRPC, and HTTP servers along with their listeners. It create the server objects even if we do not deploy it eventually. This allows us to avoid nil checks in the service.RegisterService() method in case the endpoint is empty. IMPORTANT: If error is returned, the caller is responsible for calling StopFunc() on the returned Servers.
type Service ¶
type Service interface {
Registerer
// Run executes the service until the context is done.
Run(ctx context.Context) error
// WaitForReady waits for the service resources to initialize.
// If the context ended before the service is ready, returns false.
WaitForReady(ctx context.Context) bool
}
Service is for full lifecycle services that run, signal readiness, and register on gRPC and HTTP servers.
type TLSProvider ¶
type TLSProvider struct {
// contains filtered or unexported fields
}
TLSProvider holds a dynamically updatable TLS configuration.
The design separates writers (services) from readers (server), ensuring a linear dependency flow:
Service -> DynamicTLSUpdater <- TLSProvider <- Server
func NewTLSProvider ¶
func NewTLSProvider(tlsConfig connection.TLSConfig) (*TLSProvider, error)
NewTLSProvider loads TLS credentials from a TLSConfig.
func (*TLSProvider) GetConfigForClient ¶
func (d *TLSProvider) GetConfigForClient(*tls.ClientHelloInfo) (*tls.Config, error)
GetConfigForClient returns the current TLS config for a new client connection. This is a single atomic pointer load with no allocations, making it safe and efficient to call on every TLS handshake.
func (*TLSProvider) GetServerTLSCredentials ¶
func (d *TLSProvider) GetServerTLSCredentials() *tls.Config
GetServerTLSCredentials returns the TLS credentials for the server.