Documentation
¶
Index ¶
- Constants
- Variables
- func AddressString[T WithAddress](addresses ...T) string
- func CloseConnections[T io.Closer](connections ...T) error
- func CloseConnectionsLog[T io.Closer](connections ...T)
- func DefaultHealthCheckService() *health.Server
- func FilterStreamRPCError(rpcErr error) error
- func IsStreamContextEnd(rpcErr error) bool
- func IsStreamEnd(rpcErr error) bool
- func NewConnection(p Parameters) (*grpc.ClientConn, error)
- func NewConnectionPerEndpoint(config *MultiClientConfig) ([]*grpc.ClientConn, error)
- func NewLoadBalancedConnection(config *MultiClientConfig) (*grpc.ClientConn, error)
- func NewSingleConnection(config *ClientConfig) (*grpc.ClientConn, error)
- func RunGrpcServer(ctx context.Context, serverConfig *ServerConfig, ...) error
- func StartService(ctx context.Context, service Service, serverConfigs ...*ServerConfig) error
- func Sustain(ctx context.Context, op func() error) error
- func WaitForNextBackOffDuration(ctx context.Context, b *backoff.ExponentialBackOff) error
- type ClientConfig
- type Endpoint
- type MultiClientConfig
- type Parameters
- type RetryProfile
- func (p *RetryProfile) Execute(ctx context.Context, o backoff.Operation) error
- func (p *RetryProfile) ExecuteSQL(ctx context.Context, executor *pgxpool.Pool, sqlStmt string, args ...any) error
- func (p *RetryProfile) MakeGrpcRetryPolicyJSON() string
- func (p *RetryProfile) NewBackoff() *backoff.ExponentialBackOff
- type ServerConfig
- type ServerKeepAliveConfig
- type ServerKeepAliveEnforcementPolicyConfig
- type ServerKeepAliveParamsConfig
- type Service
- type TLSConfig
- type WithAddress
Constants ¶
const ( // Connected indicates that the connection to the service is currently established. Connected = 1.0 // Disconnected indicates that the connection to the service is currently not established. Disconnected = 0 )
const ( //nolint:revive // usage: TLS configuration modes. UnmentionedTLSMode = "" NoneTLSMode = "none" OneSideTLSMode = "tls" MutualTLSMode = "mtls" // DefaultTLSMinVersion is the minimum version required to achieve secure connections. DefaultTLSMinVersion = tls.VersionTLS12 )
Variables ¶
var ( // ErrRetryTimeout is returned if the retry attempts were exhausted due to timeout. ErrRetryTimeout = errors.New("retry timed out") // ErrNonRetryable represents an error that should not trigger a retry. // It's used to wrap an underlying error condition when an operation // fails and retrying would be useless. // The code performing retries will check for this error type // (e.g., using errors.Is) and stop the retry loop if found. ErrNonRetryable = errors.New("cannot recover from error") // ErrBackOff is returned for transient errors specifically on an existing connection, // implying the initial connection was successful. Retry the operation after a backoff delay. ErrBackOff = errors.New("backoff required before retrying") )
Functions ¶
func AddressString ¶
func AddressString[T WithAddress](addresses ...T) string
AddressString returns the addresses as a string with comma as a separator between them.
func CloseConnections ¶
CloseConnections calls [closer.Close()] for all the given connections and return the close errors.
func CloseConnectionsLog ¶
CloseConnectionsLog calls [closer.Close()] for all the given connections and log the close errors.
func DefaultHealthCheckService ¶ added in v0.1.5
DefaultHealthCheckService returns a health-check service that returns SERVING for all services.
func FilterStreamRPCError ¶
FilterStreamRPCError filters RPC errors that caused due to ending stream.
func IsStreamContextEnd ¶
IsStreamContextEnd returns true if an RPC error indicates stream context end.
func IsStreamEnd ¶
IsStreamEnd returns true if an RPC error indicates stream end.
func NewConnection ¶ added in v0.1.7
func NewConnection(p Parameters) (*grpc.ClientConn, error)
NewConnection creates a connection with the given parameters. It will not attempt to create a connection with the remote.
func NewConnectionPerEndpoint ¶ added in v0.1.7
func NewConnectionPerEndpoint(config *MultiClientConfig) ([]*grpc.ClientConn, error)
NewConnectionPerEndpoint creates a list of connections; one for each endpoint in the given config.
func NewLoadBalancedConnection ¶ added in v0.1.7
func NewLoadBalancedConnection(config *MultiClientConfig) (*grpc.ClientConn, error)
NewLoadBalancedConnection creates a connection with load balancing between the endpoints in the given config.
func NewSingleConnection ¶ added in v0.1.7
func NewSingleConnection(config *ClientConfig) (*grpc.ClientConn, error)
NewSingleConnection creates a single connection given a client config.
func RunGrpcServer ¶ added in v0.1.5
func RunGrpcServer( ctx context.Context, serverConfig *ServerConfig, register func(server *grpc.Server), ) error
RunGrpcServer runs a server and returns error if failed.
func StartService ¶
func StartService( ctx context.Context, service Service, serverConfigs ...*ServerConfig, ) error
StartService runs a service, waits until it is ready, and register the gRPC server(s). It will stop if either the service ended or its respective gRPC server.
func Sustain ¶
Sustain attempts to keep an operation `op` running successfully against the connection, handling transient issues (like ErrBackOff) via retry with backoff.
It stops retrying if:
- op returns an error wrapping ErrNonRetryable (permanent failure).
- The context ctx is cancelled.
- The internal backoff strategy times out (via ErrRetryTimeout).
func WaitForNextBackOffDuration ¶
WaitForNextBackOffDuration waits for the next backoff duration. It stops if the context ends. If the backoff should stop, it returns ErrRetryTimeout.
Types ¶
type ClientConfig ¶
type ClientConfig struct {
Endpoint *Endpoint `mapstructure:"endpoint" yaml:"endpoint"`
TLS TLSConfig `mapstructure:"tls" yaml:"tls"`
Retry *RetryProfile `mapstructure:"reconnect" yaml:"reconnect"`
}
ClientConfig contains a single endpoint, TLS config, and retry profile.
type Endpoint ¶
type Endpoint struct {
Host string `mapstructure:"host" json:"host,omitempty" yaml:"host,omitempty"`
Port int `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"`
}
Endpoint describes a remote endpoint.
func CreateEndpointHP ¶
CreateEndpointHP parses an endpoint from give host and port. It panics if it fails to parse.
func NewEndpoint ¶
NewEndpoint parses an endpoint from an address string.
func NewLocalHost ¶
func NewLocalHost() *Endpoint
NewLocalHost returns a default endpoint "localhost:0".
type MultiClientConfig ¶ added in v0.1.7
type MultiClientConfig struct {
Endpoints []*Endpoint `mapstructure:"endpoints" yaml:"endpoints"`
TLS TLSConfig `mapstructure:"tls" yaml:"tls"`
Retry *RetryProfile `mapstructure:"reconnect" yaml:"reconnect"`
}
MultiClientConfig contains the endpoints, TLS config, and retry profile. This config allows the support of number of different endpoints to multiple service instances.
type Parameters ¶ added in v0.1.7
type Parameters struct {
Address string
Creds credentials.TransportCredentials
Retry *RetryProfile
Resolver *manual.Resolver
AdditionalOpts []grpc.DialOption
}
Parameters contain connection parameters.
type RetryProfile ¶
type RetryProfile struct {
InitialInterval time.Duration `mapstructure:"initial-interval" yaml:"initial-interval"`
RandomizationFactor float64 `mapstructure:"randomization-factor" yaml:"randomization-factor"`
Multiplier float64 `mapstructure:"multiplier" yaml:"multiplier"`
MaxInterval time.Duration `mapstructure:"max-interval" yaml:"max-interval"`
// After MaxElapsedTime the ExponentialBackOff returns RetryStopDuration.
// It never stops if MaxElapsedTime == 0.
MaxElapsedTime time.Duration `mapstructure:"max-elapsed-time" yaml:"max-elapsed-time"`
}
RetryProfile can be used to define the backoff properties for retries.
We use it as a workaround for a known issues:
- Dropping a database with proximity to accessing it. See: https://support.yugabyte.com/hc/en-us/articles/10552861830541-Unable-to-Drop-Database.
- Creating/dropping tables immediately after creating a database. See: https://github.com/yugabyte/yugabyte-db/issues/14519.
func (*RetryProfile) Execute ¶
func (p *RetryProfile) Execute(ctx context.Context, o backoff.Operation) error
Execute executes the given operation repeatedly until it succeeds or a timeout occurs. It returns nil on success, or the error returned by the final attempt on timeout.
func (*RetryProfile) ExecuteSQL ¶
func (p *RetryProfile) ExecuteSQL(ctx context.Context, executor *pgxpool.Pool, sqlStmt string, args ...any) error
ExecuteSQL executes the given SQL statement until it succeeds or a timeout occurs.
func (*RetryProfile) MakeGrpcRetryPolicyJSON ¶
func (p *RetryProfile) MakeGrpcRetryPolicyJSON() string
MakeGrpcRetryPolicyJSON defines the retry policy for a gRPC client connection. The retry policy applies to all subsequent gRPC calls made through the client connection. Our GRPC retry policy is applicable only for the following status codes:
(1) UNAVAILABLE The service is currently unavailable (e.g., transient network issue, server down). (2) DEADLINE_EXCEEDED Operation took too long (deadline passed). (3) RESOURCE_EXHAUSTED Some resource (e.g., quota) has been exhausted; the operation cannot proceed.
func (*RetryProfile) NewBackoff ¶
func (p *RetryProfile) NewBackoff() *backoff.ExponentialBackOff
NewBackoff creates a new backoff.ExponentialBackOff instance with this profile.
type ServerConfig ¶
type ServerConfig struct {
Endpoint Endpoint `mapstructure:"endpoint"`
TLS TLSConfig `mapstructure:"tls"`
KeepAlive *ServerKeepAliveConfig `mapstructure:"keep-alive"`
// contains filtered or unexported fields
}
ServerConfig describes the connection parameter for a server.
func NewLocalHostServerWithTLS ¶ added in v0.1.7
func NewLocalHostServerWithTLS(creds TLSConfig) *ServerConfig
NewLocalHostServerWithTLS returns a default server config with endpoint "localhost:0" given server credentials.
func (*ServerConfig) GrpcServer ¶
func (c *ServerConfig) GrpcServer() (*grpc.Server, error)
GrpcServer instantiate a grpc.Server.
func (*ServerConfig) Listener ¶
Listener instantiate a net.Listener and updates the config port with the effective port. If the port is predefined, it will retry to bind to the port until successful or until the context ends.
func (*ServerConfig) PreAllocateListener ¶
func (c *ServerConfig) PreAllocateListener() (net.Listener, error)
PreAllocateListener is used to allocate a port and bind to ahead of the server initialization. It stores the listener object internally to be reused on subsequent calls to Listener().
type ServerKeepAliveConfig ¶
type ServerKeepAliveConfig struct {
Params *ServerKeepAliveParamsConfig `mapstructure:"params"`
EnforcementPolicy *ServerKeepAliveEnforcementPolicyConfig `mapstructure:"enforcement-policy"`
}
ServerKeepAliveConfig describes the keep alive parameters.
type ServerKeepAliveEnforcementPolicyConfig ¶
type ServerKeepAliveEnforcementPolicyConfig struct {
MinTime time.Duration `mapstructure:"min-time"`
PermitWithoutStream bool `mapstructure:"permit-without-stream"`
}
ServerKeepAliveEnforcementPolicyConfig describes the keep alive enforcement policy.
type ServerKeepAliveParamsConfig ¶
type ServerKeepAliveParamsConfig struct {
MaxConnectionIdle time.Duration `mapstructure:"max-connection-idle"`
MaxConnectionAge time.Duration `mapstructure:"max-connection-age"`
MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace"`
Time time.Duration `mapstructure:"time"`
Timeout time.Duration `mapstructure:"timeout"`
}
ServerKeepAliveParamsConfig describes the keep alive policy.
type Service ¶
type Service interface {
// Run executes the service until the context is done.
Run(ctx context.Context) error
// WaitForReady waits for the service resources to initialize.
// If the context ended before the service is ready, returns false.
WaitForReady(ctx context.Context) bool
// RegisterService registers the supported APIs for this service.
RegisterService(server *grpc.Server)
}
Service describes the method that are required for a service to run.
type TLSConfig ¶ added in v0.1.7
type TLSConfig struct {
Mode string `mapstructure:"mode"`
// CertPath is the path to the certificate file (public key).
CertPath string `mapstructure:"cert-path"`
// KeyPath is the path to the key file (private key).
KeyPath string `mapstructure:"key-path"`
CACertPaths []string `mapstructure:"ca-cert-paths"`
}
TLSConfig holds the TLS options and certificate paths used for secure communication between servers and clients. Credentials are built based on the configuration mode. For example, If only server-side TLS is required, the certificate pool (certPool) is not built (for a server), since the relevant certificates paths are defined in the YAML according to the selected mode.
func (TLSConfig) ClientCredentials ¶ added in v0.1.7
func (c TLSConfig) ClientCredentials() (credentials.TransportCredentials, error)
ClientCredentials returns the gRPC transport credentials to be used by a client, based on the provided TLS configuration.
func (TLSConfig) ServerCredentials ¶ added in v0.1.7
func (c TLSConfig) ServerCredentials() (credentials.TransportCredentials, error)
ServerCredentials returns the gRPC transport credentials to be used by a server, based on the provided TLS configuration.
type WithAddress ¶
type WithAddress interface {
Address() string
}
WithAddress represents any type that can generate an address.