Documentation
¶
Index ¶
- Constants
- Variables
- func AddressString[T WithAddress](addresses ...T) string
- func CloseConnections[T io.Closer](connections ...T) error
- func CloseConnectionsLog[T io.Closer](connections ...T)
- func Connect(config *DialConfig) (*grpc.ClientConn, error)
- func DefaultHealthCheckService() *health.Server
- func FilterStreamRPCError(rpcErr error) error
- func IsStreamContextEnd(rpcErr error) bool
- func IsStreamEnd(rpcErr error) bool
- func OpenConnections[T WithAddress](endpoints []T, transportCredentials credentials.TransportCredentials) ([]*grpc.ClientConn, error)
- func RunGrpcServer(ctx context.Context, serverConfig *ServerConfig, ...) error
- func StartService(ctx context.Context, service Service, serverConfigs ...*ServerConfig) error
- func Sustain(ctx context.Context, op func() error) error
- func WaitForNextBackOffDuration(ctx context.Context, b *backoff.ExponentialBackOff) error
- type ClientConfig
- type DialConfig
- type Endpoint
- type RetryProfile
- func (p *RetryProfile) Execute(ctx context.Context, o backoff.Operation) error
- func (p *RetryProfile) ExecuteSQL(ctx context.Context, executor *pgxpool.Pool, sqlStmt string, args ...any) error
- func (p *RetryProfile) MakeGrpcRetryPolicyJSON() string
- func (p *RetryProfile) NewBackoff() *backoff.ExponentialBackOff
- type ServerConfig
- type ServerCredsConfig
- type ServerKeepAliveConfig
- type ServerKeepAliveEnforcementPolicyConfig
- type ServerKeepAliveParamsConfig
- type Service
- type WithAddress
Constants ¶
const ( // Connected indicates that the connection to the service is currently established. Connected = 1.0 // Disconnected indicates that the connection to the service is currently not established. Disconnected = 0 )
Variables ¶
var ( // ErrRetryTimeout is returned if the retry attempts were exhausted due to timeout. ErrRetryTimeout = errors.New("retry timed out") // ErrNonRetryable represents an error that should not trigger a retry. // It's used to wrap an underlying error condition when an operation // fails and retrying would be useless. // The code performing retries will check for this error type // (e.g., using errors.Is) and stop the retry loop if found. ErrNonRetryable = errors.New("cannot recover from error") // ErrBackOff is returned for transient errors specifically on an existing connection, // implying the initial connection was successful. Retry the operation after a backoff delay. ErrBackOff = errors.New("backoff required before retrying") )
Functions ¶
func AddressString ¶
func AddressString[T WithAddress](addresses ...T) string
AddressString returns the addresses as a string with comma as a separator between them.
func CloseConnections ¶
CloseConnections calls [closer.Close()] for all the given connections and return the close errors.
func CloseConnectionsLog ¶
CloseConnectionsLog calls [closer.Close()] for all the given connections and log the close errors.
func Connect ¶
func Connect(config *DialConfig) (*grpc.ClientConn, error)
Connect creates a new grpc.ClientConn with the given DialConfig. It will not attempt to create a connection with the remote.
func DefaultHealthCheckService ¶ added in v0.1.5
DefaultHealthCheckService returns a health-check service that returns SERVING for all services.
func FilterStreamRPCError ¶
FilterStreamRPCError filters RPC errors that caused due to ending stream.
func IsStreamContextEnd ¶
IsStreamContextEnd returns true if an RPC error indicates stream context end.
func IsStreamEnd ¶
IsStreamEnd returns true if an RPC error indicates stream end.
func OpenConnections ¶
func OpenConnections[T WithAddress]( endpoints []T, transportCredentials credentials.TransportCredentials, ) ([]*grpc.ClientConn, error)
OpenConnections opens connections with multiple remotes.
func RunGrpcServer ¶ added in v0.1.5
func RunGrpcServer( ctx context.Context, serverConfig *ServerConfig, register func(server *grpc.Server), ) error
RunGrpcServer runs a server and returns error if failed.
func StartService ¶
func StartService( ctx context.Context, service Service, serverConfigs ...*ServerConfig, ) error
StartService runs a service, waits until it is ready, and register the gRPC server(s). It will stop if either the service ended or its respective gRPC server.
func Sustain ¶
Sustain attempts to keep an operation `op` running successfully against the connection, handling transient issues (like ErrBackOff) via retry with backoff.
It stops retrying if:
- op returns an error wrapping ErrNonRetryable (permanent failure).
- The context ctx is cancelled.
- The internal backoff strategy times out (via ErrRetryTimeout).
func WaitForNextBackOffDuration ¶
WaitForNextBackOffDuration waits for the next backoff duration. It stops if the context ends. If the backoff should stop, it returns ErrRetryTimeout.
Types ¶
type ClientConfig ¶
type ClientConfig struct { Endpoints []*Endpoint `mapstructure:"endpoints"` Retry *RetryProfile `mapstructure:"reconnect"` RootCA [][]byte `mapstructure:"root-ca"` // RootCAPaths The path to the root CAs (alternative to the raw data). RootCAPaths []string `mapstructure:"root-ca-paths"` }
ClientConfig contains the endpoints, CAs, and retry profile.
type DialConfig ¶
type DialConfig struct { Address string DialOpts []grpc.DialOption // Resolver may be updated to update the connection's endpoints. Resolver *manual.Resolver }
DialConfig represents the dial options to create a connection.
func NewDialConfigPerEndpoint ¶
func NewDialConfigPerEndpoint(config *ClientConfig) ([]*DialConfig, error)
NewDialConfigPerEndpoint creates a list of dial configs; one for each endpoint in the given config.
func NewInsecureDialConfig ¶
func NewInsecureDialConfig(endpoint WithAddress) *DialConfig
NewInsecureDialConfig creates the default dial config with insecure credentials.
func NewInsecureLoadBalancedDialConfig ¶
func NewInsecureLoadBalancedDialConfig(endpoint []*Endpoint) *DialConfig
NewInsecureLoadBalancedDialConfig creates the default dial config with insecure credentials.
func NewLoadBalancedDialConfig ¶
func NewLoadBalancedDialConfig(config *ClientConfig) (*DialConfig, error)
NewLoadBalancedDialConfig creates a dial config with load balancing between the endpoints in the given config.
func (*DialConfig) SetRetryProfile ¶
func (d *DialConfig) SetRetryProfile(profile *RetryProfile)
SetRetryProfile replaces the GRPC retry policy.
type Endpoint ¶
type Endpoint struct { Host string `mapstructure:"host" json:"host,omitempty" yaml:"host,omitempty"` Port int `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"` }
Endpoint describes a remote endpoint.
func CreateEndpoint ¶
CreateEndpoint parses an endpoint from an address string. It panics if it fails to parse.
func CreateEndpointHP ¶
CreateEndpointHP parses an endpoint from give host and port. It panics if it fails to parse.
func NewEndpoint ¶
NewEndpoint parses an endpoint from an address string.
func NewLocalHost ¶
func NewLocalHost() *Endpoint
NewLocalHost returns a default endpoint "localhost:0".
type RetryProfile ¶
type RetryProfile struct { InitialInterval time.Duration `mapstructure:"initial-interval" yaml:"initial-interval"` RandomizationFactor float64 `mapstructure:"randomization-factor" yaml:"randomization-factor"` Multiplier float64 `mapstructure:"multiplier" yaml:"multiplier"` MaxInterval time.Duration `mapstructure:"max-interval" yaml:"max-interval"` // After MaxElapsedTime the ExponentialBackOff returns RetryStopDuration. // It never stops if MaxElapsedTime == 0. MaxElapsedTime time.Duration `mapstructure:"max-elapsed-time" yaml:"max-elapsed-time"` }
RetryProfile can be used to define the backoff properties for retries.
We use it as a workaround for a known issues:
- Dropping a database with proximity to accessing it. See: https://support.yugabyte.com/hc/en-us/articles/10552861830541-Unable-to-Drop-Database.
- Creating/dropping tables immediately after creating a database. See: https://github.com/yugabyte/yugabyte-db/issues/14519.
var DefaultGrpcRetryProfile RetryProfile
DefaultGrpcRetryProfile defines the retry policy for a gRPC client connection.
func (*RetryProfile) Execute ¶
func (p *RetryProfile) Execute(ctx context.Context, o backoff.Operation) error
Execute executes the given operation repeatedly until it succeeds or a timeout occurs. It returns nil on success, or the error returned by the final attempt on timeout.
func (*RetryProfile) ExecuteSQL ¶
func (p *RetryProfile) ExecuteSQL(ctx context.Context, executor *pgxpool.Pool, sqlStmt string, args ...any) error
ExecuteSQL executes the given SQL statement until it succeeds or a timeout occurs.
func (*RetryProfile) MakeGrpcRetryPolicyJSON ¶
func (p *RetryProfile) MakeGrpcRetryPolicyJSON() string
MakeGrpcRetryPolicyJSON defines the retry policy for a gRPC client connection. The retry policy applies to all subsequent gRPC calls made through the client connection. Our GRPC retry policy is applicable only for the following status codes:
(1) UNAVAILABLE The service is currently unavailable (e.g., transient network issue, server down). (2) DEADLINE_EXCEEDED Operation took too long (deadline passed). (3) RESOURCE_EXHAUSTED Some resource (e.g., quota) has been exhausted; the operation cannot proceed.
func (*RetryProfile) NewBackoff ¶
func (p *RetryProfile) NewBackoff() *backoff.ExponentialBackOff
NewBackoff creates a new backoff.ExponentialBackOff instance with this profile.
type ServerConfig ¶
type ServerConfig struct { Endpoint Endpoint `mapstructure:"endpoint"` Creds *ServerCredsConfig `mapstructure:"creds"` KeepAlive *ServerKeepAliveConfig `mapstructure:"keep-alive"` // contains filtered or unexported fields }
ServerConfig describes the connection parameter for a server.
func NewLocalHostServer ¶
func NewLocalHostServer() *ServerConfig
NewLocalHostServer returns a default server config with endpoint "localhost:0".
func (*ServerConfig) GrpcServer ¶
func (c *ServerConfig) GrpcServer() *grpc.Server
GrpcServer instantiate a grpc.Server.
func (*ServerConfig) Listener ¶
func (c *ServerConfig) Listener() (net.Listener, error)
Listener instantiate a net.Listener and updates the config port with the effective port.
func (*ServerConfig) PreAllocateListener ¶
func (c *ServerConfig) PreAllocateListener() (net.Listener, error)
PreAllocateListener is used to allocate a port and bind to ahead of the server initialization. It stores the listener object internally to be reused on subsequent calls to Listener().
type ServerCredsConfig ¶
type ServerCredsConfig struct { CertPath string `mapstructure:"cert-path"` KeyPath string `mapstructure:"key-path"` }
ServerCredsConfig describes the server's credentials configuration.
type ServerKeepAliveConfig ¶
type ServerKeepAliveConfig struct { Params *ServerKeepAliveParamsConfig `mapstructure:"params"` EnforcementPolicy *ServerKeepAliveEnforcementPolicyConfig `mapstructure:"enforcement-policy"` }
ServerKeepAliveConfig describes the keep alive parameters.
type ServerKeepAliveEnforcementPolicyConfig ¶
type ServerKeepAliveEnforcementPolicyConfig struct { MinTime time.Duration `mapstructure:"min-time"` PermitWithoutStream bool `mapstructure:"permit-without-stream"` }
ServerKeepAliveEnforcementPolicyConfig describes the keep alive enforcement policy.
type ServerKeepAliveParamsConfig ¶
type ServerKeepAliveParamsConfig struct { MaxConnectionIdle time.Duration `mapstructure:"max-connection-idle"` MaxConnectionAge time.Duration `mapstructure:"max-connection-age"` MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace"` Time time.Duration `mapstructure:"time"` Timeout time.Duration `mapstructure:"timeout"` }
ServerKeepAliveParamsConfig describes the keep alive policy.
type Service ¶
type Service interface { // Run executes the service until the context is done. Run(ctx context.Context) error // WaitForReady waits for the service resources to initialize. // If the context ended before the service is ready, returns false. WaitForReady(ctx context.Context) bool // RegisterService registers the supported APIs for this service. RegisterService(server *grpc.Server) }
Service describes the method that are required for a service to run.
type WithAddress ¶
type WithAddress interface {
Address() string
}
WithAddress represents any type that can generate an address.