connection

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2025 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Connected indicates that the connection to the service is currently established.
	Connected = 1.0
	// Disconnected indicates that the connection to the service is currently not established.
	Disconnected = 0
)

Variables

View Source
var (
	// ErrRetryTimeout is returned if the retry attempts were exhausted due to timeout.
	ErrRetryTimeout = errors.New("retry timed out")
	// ErrNonRetryable represents an error that should not trigger a retry.
	// It's used to wrap an underlying error condition when an operation
	// fails and retrying would be useless.
	// The code performing retries will check for this error type
	// (e.g., using errors.Is) and stop the retry loop if found.
	ErrNonRetryable = errors.New("cannot recover from error")
	// ErrBackOff is returned for transient errors specifically on an existing connection,
	// implying the initial connection was successful. Retry the operation after a backoff delay.
	ErrBackOff = errors.New("backoff required before retrying")
)

Functions

func AddressString

func AddressString[T WithAddress](addresses ...T) string

AddressString returns the addresses as a string with comma as a separator between them.

func CloseConnections

func CloseConnections[T io.Closer](connections ...T) error

CloseConnections calls [closer.Close()] for all the given connections and return the close errors.

func CloseConnectionsLog

func CloseConnectionsLog[T io.Closer](connections ...T)

CloseConnectionsLog calls [closer.Close()] for all the given connections and log the close errors.

func Connect

func Connect(config *DialConfig) (*grpc.ClientConn, error)

Connect creates a new grpc.ClientConn with the given DialConfig. It will not attempt to create a connection with the remote.

func DefaultHealthCheckService added in v0.1.5

func DefaultHealthCheckService() *health.Server

DefaultHealthCheckService returns a health-check service that returns SERVING for all services.

func FilterStreamRPCError

func FilterStreamRPCError(rpcErr error) error

FilterStreamRPCError filters RPC errors that caused due to ending stream.

func IsStreamContextEnd

func IsStreamContextEnd(rpcErr error) bool

IsStreamContextEnd returns true if an RPC error indicates stream context end.

func IsStreamEnd

func IsStreamEnd(rpcErr error) bool

IsStreamEnd returns true if an RPC error indicates stream end.

func OpenConnections

func OpenConnections[T WithAddress](
	endpoints []T,
	transportCredentials credentials.TransportCredentials,
) ([]*grpc.ClientConn, error)

OpenConnections opens connections with multiple remotes.

func RunGrpcServer added in v0.1.5

func RunGrpcServer(
	ctx context.Context,
	serverConfig *ServerConfig,
	register func(server *grpc.Server),
) error

RunGrpcServer runs a server and returns error if failed.

func StartService

func StartService(
	ctx context.Context,
	service Service,
	serverConfigs ...*ServerConfig,
) error

StartService runs a service, waits until it is ready, and register the gRPC server(s). It will stop if either the service ended or its respective gRPC server.

func Sustain

func Sustain(ctx context.Context, op func() error) error

Sustain attempts to keep an operation `op` running successfully against the connection, handling transient issues (like ErrBackOff) via retry with backoff.

It stops retrying if:

  • op returns an error wrapping ErrNonRetryable (permanent failure).
  • The context ctx is cancelled.
  • The internal backoff strategy times out (via ErrRetryTimeout).

func WaitForNextBackOffDuration

func WaitForNextBackOffDuration(ctx context.Context, b *backoff.ExponentialBackOff) error

WaitForNextBackOffDuration waits for the next backoff duration. It stops if the context ends. If the backoff should stop, it returns ErrRetryTimeout.

Types

type ClientConfig

type ClientConfig struct {
	Endpoints []*Endpoint   `mapstructure:"endpoints"`
	Retry     *RetryProfile `mapstructure:"reconnect"`
	RootCA    [][]byte      `mapstructure:"root-ca"`
	// RootCAPaths The path to the root CAs (alternative to the raw data).
	RootCAPaths []string `mapstructure:"root-ca-paths"`
}

ClientConfig contains the endpoints, CAs, and retry profile.

type DialConfig

type DialConfig struct {
	Address  string
	DialOpts []grpc.DialOption
	// Resolver may be updated to update the connection's endpoints.
	Resolver *manual.Resolver
}

DialConfig represents the dial options to create a connection.

func NewDialConfigPerEndpoint

func NewDialConfigPerEndpoint(config *ClientConfig) ([]*DialConfig, error)

NewDialConfigPerEndpoint creates a list of dial configs; one for each endpoint in the given config.

func NewInsecureDialConfig

func NewInsecureDialConfig(endpoint WithAddress) *DialConfig

NewInsecureDialConfig creates the default dial config with insecure credentials.

func NewInsecureLoadBalancedDialConfig

func NewInsecureLoadBalancedDialConfig(endpoint []*Endpoint) *DialConfig

NewInsecureLoadBalancedDialConfig creates the default dial config with insecure credentials.

func NewLoadBalancedDialConfig

func NewLoadBalancedDialConfig(config *ClientConfig) (*DialConfig, error)

NewLoadBalancedDialConfig creates a dial config with load balancing between the endpoints in the given config.

func (*DialConfig) SetRetryProfile

func (d *DialConfig) SetRetryProfile(profile *RetryProfile)

SetRetryProfile replaces the GRPC retry policy.

type Endpoint

type Endpoint struct {
	Host string `mapstructure:"host" json:"host,omitempty" yaml:"host,omitempty"`
	Port int    `mapstructure:"port" json:"port,omitempty" yaml:"port,omitempty"`
}

Endpoint describes a remote endpoint.

func CreateEndpoint

func CreateEndpoint(value string) *Endpoint

CreateEndpoint parses an endpoint from an address string. It panics if it fails to parse.

func CreateEndpointHP

func CreateEndpointHP(host, port string) *Endpoint

CreateEndpointHP parses an endpoint from give host and port. It panics if it fails to parse.

func NewEndpoint

func NewEndpoint(hostPort string) (*Endpoint, error)

NewEndpoint parses an endpoint from an address string.

func NewLocalHost

func NewLocalHost() *Endpoint

NewLocalHost returns a default endpoint "localhost:0".

func (*Endpoint) Address

func (e *Endpoint) Address() string

Address returns a string representation of the endpoint's address.

func (*Endpoint) Empty

func (e *Endpoint) Empty() bool

Empty returns true if no port is assigned.

func (*Endpoint) GetHost

func (e *Endpoint) GetHost() string

GetHost returns the host of the endpoint.

func (*Endpoint) String

func (e *Endpoint) String() string

String returns a string representation of the endpoint.

type RetryProfile

type RetryProfile struct {
	InitialInterval     time.Duration `mapstructure:"initial-interval" yaml:"initial-interval"`
	RandomizationFactor float64       `mapstructure:"randomization-factor" yaml:"randomization-factor"`
	Multiplier          float64       `mapstructure:"multiplier" yaml:"multiplier"`
	MaxInterval         time.Duration `mapstructure:"max-interval" yaml:"max-interval"`
	// After MaxElapsedTime the ExponentialBackOff returns RetryStopDuration.
	// It never stops if MaxElapsedTime == 0.
	MaxElapsedTime time.Duration `mapstructure:"max-elapsed-time" yaml:"max-elapsed-time"`
}

RetryProfile can be used to define the backoff properties for retries.

We use it as a workaround for a known issues:

var DefaultGrpcRetryProfile RetryProfile

DefaultGrpcRetryProfile defines the retry policy for a gRPC client connection.

func (*RetryProfile) Execute

func (p *RetryProfile) Execute(ctx context.Context, o backoff.Operation) error

Execute executes the given operation repeatedly until it succeeds or a timeout occurs. It returns nil on success, or the error returned by the final attempt on timeout.

func (*RetryProfile) ExecuteSQL

func (p *RetryProfile) ExecuteSQL(ctx context.Context, executor *pgxpool.Pool, sqlStmt string, args ...any) error

ExecuteSQL executes the given SQL statement until it succeeds or a timeout occurs.

func (*RetryProfile) MakeGrpcRetryPolicyJSON

func (p *RetryProfile) MakeGrpcRetryPolicyJSON() string

MakeGrpcRetryPolicyJSON defines the retry policy for a gRPC client connection. The retry policy applies to all subsequent gRPC calls made through the client connection. Our GRPC retry policy is applicable only for the following status codes:

(1) UNAVAILABLE	The service is currently unavailable (e.g., transient network issue, server down).
(2) DEADLINE_EXCEEDED	Operation took too long (deadline passed).
(3) RESOURCE_EXHAUSTED	Some resource (e.g., quota) has been exhausted; the operation cannot proceed.

func (*RetryProfile) NewBackoff

func (p *RetryProfile) NewBackoff() *backoff.ExponentialBackOff

NewBackoff creates a new backoff.ExponentialBackOff instance with this profile.

type ServerConfig

type ServerConfig struct {
	Endpoint  Endpoint               `mapstructure:"endpoint"`
	Creds     *ServerCredsConfig     `mapstructure:"creds"`
	KeepAlive *ServerKeepAliveConfig `mapstructure:"keep-alive"`
	// contains filtered or unexported fields
}

ServerConfig describes the connection parameter for a server.

func NewLocalHostServer

func NewLocalHostServer() *ServerConfig

NewLocalHostServer returns a default server config with endpoint "localhost:0".

func (*ServerConfig) GrpcServer

func (c *ServerConfig) GrpcServer() *grpc.Server

GrpcServer instantiate a grpc.Server.

func (*ServerConfig) Listener

func (c *ServerConfig) Listener() (net.Listener, error)

Listener instantiate a net.Listener and updates the config port with the effective port.

func (*ServerConfig) PreAllocateListener

func (c *ServerConfig) PreAllocateListener() (net.Listener, error)

PreAllocateListener is used to allocate a port and bind to ahead of the server initialization. It stores the listener object internally to be reused on subsequent calls to Listener().

type ServerCredsConfig

type ServerCredsConfig struct {
	CertPath string `mapstructure:"cert-path"`
	KeyPath  string `mapstructure:"key-path"`
}

ServerCredsConfig describes the server's credentials configuration.

type ServerKeepAliveConfig

type ServerKeepAliveConfig struct {
	Params            *ServerKeepAliveParamsConfig            `mapstructure:"params"`
	EnforcementPolicy *ServerKeepAliveEnforcementPolicyConfig `mapstructure:"enforcement-policy"`
}

ServerKeepAliveConfig describes the keep alive parameters.

type ServerKeepAliveEnforcementPolicyConfig

type ServerKeepAliveEnforcementPolicyConfig struct {
	MinTime             time.Duration `mapstructure:"min-time"`
	PermitWithoutStream bool          `mapstructure:"permit-without-stream"`
}

ServerKeepAliveEnforcementPolicyConfig describes the keep alive enforcement policy.

type ServerKeepAliveParamsConfig

type ServerKeepAliveParamsConfig struct {
	MaxConnectionIdle     time.Duration `mapstructure:"max-connection-idle"`
	MaxConnectionAge      time.Duration `mapstructure:"max-connection-age"`
	MaxConnectionAgeGrace time.Duration `mapstructure:"max-connection-age-grace"`
	Time                  time.Duration `mapstructure:"time"`
	Timeout               time.Duration `mapstructure:"timeout"`
}

ServerKeepAliveParamsConfig describes the keep alive policy.

type Service

type Service interface {
	// Run executes the service until the context is done.
	Run(ctx context.Context) error
	// WaitForReady waits for the service resources to initialize.
	// If the context ended before the service is ready, returns false.
	WaitForReady(ctx context.Context) bool
	// RegisterService registers the supported APIs for this service.
	RegisterService(server *grpc.Server)
}

Service describes the method that are required for a service to run.

type WithAddress

type WithAddress interface {
	Address() string
}

WithAddress represents any type that can generate an address.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL