Documentation
¶
Index ¶
- Variables
- type BadConnError
- type Conn
- func (cn *Conn) ClearHandoffState()
- func (cn *Conn) ClearRelaxedTimeout()
- func (cn *Conn) Close() error
- func (cn *Conn) CompareAndSwapUsable(old, new bool) bool
- func (cn *Conn) CompareAndSwapUsed(old, new bool) bool
- func (cn *Conn) ExecuteInitConn(ctx context.Context) error
- func (cn *Conn) GetHandoffEndpoint() string
- func (cn *Conn) GetHandoffInfo() (bool, string, int64)
- func (cn *Conn) GetID() uint64
- func (cn *Conn) GetMovingSeqID() int64
- func (cn *Conn) GetNetConn() net.Conn
- func (cn *Conn) HandoffRetries() int
- func (cn *Conn) HasBufferedData() bool
- func (cn *Conn) HasRelaxedTimeout() bool
- func (cn *Conn) IncrementAndGetHandoffRetries(n int) int
- func (cn *Conn) IsClosed() bool
- func (cn *Conn) IsInited() bool
- func (cn *Conn) IsPooled() bool
- func (cn *Conn) IsPubSub() bool
- func (cn *Conn) IsUsable() bool
- func (cn *Conn) IsUsed() bool
- func (cn *Conn) MarkForHandoff(newEndpoint string, seqID int64) error
- func (cn *Conn) MarkQueuedForHandoff() error
- func (cn *Conn) MaybeHasData() bool
- func (cn *Conn) PeekReplyTypeSafe() (byte, error)
- func (cn *Conn) RemoteAddr() net.Addr
- func (cn *Conn) SetInitConnFunc(fn func(context.Context, *Conn) error)
- func (cn *Conn) SetNetConn(netConn net.Conn)
- func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error
- func (cn *Conn) SetOnClose(fn func() error)
- func (cn *Conn) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration)
- func (cn *Conn) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time)
- func (cn *Conn) SetUsable(usable bool)
- func (cn *Conn) SetUsed(val bool)
- func (cn *Conn) SetUsedAt(tm time.Time)
- func (cn *Conn) ShouldHandoff() bool
- func (cn *Conn) UsedAt() time.Time
- func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error
- func (cn *Conn) WithWriter(ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error) error
- func (cn *Conn) Write(b []byte) (int, error)
- type ConnPool
- func (p *ConnPool) AddPoolHook(hook PoolHook)
- func (p *ConnPool) Close() error
- func (p *ConnPool) CloseConn(cn *Conn) error
- func (p *ConnPool) Filter(fn func(*Conn) bool) error
- func (p *ConnPool) Get(ctx context.Context) (*Conn, error)
- func (p *ConnPool) IdleLen() int
- func (p *ConnPool) Len() int
- func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error)
- func (p *ConnPool) Put(ctx context.Context, cn *Conn)
- func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error)
- func (p *ConnPool) RemovePoolHook(hook PoolHook)
- func (p *ConnPool) Size() int
- func (p *ConnPool) Stats() *Stats
- type HandoffState
- type Options
- type PoolHook
- type PoolHookManager
- func (phm *PoolHookManager) AddHook(hook PoolHook)
- func (phm *PoolHookManager) GetHookCount() int
- func (phm *PoolHookManager) GetHooks() []PoolHook
- func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewConn bool) (acceptConn bool, err error)
- func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error)
- func (phm *PoolHookManager) ProcessOnRemove(ctx context.Context, conn *Conn, reason error)
- func (phm *PoolHookManager) RemoveHook(hook PoolHook)
- type Pooler
- type PubSubPool
- type PubSubStats
- type SingleConnPool
- func (p *SingleConnPool) AddPoolHook(_ PoolHook)
- func (p *SingleConnPool) Close() error
- func (p *SingleConnPool) CloseConn(cn *Conn) error
- func (p *SingleConnPool) Get(_ context.Context) (*Conn, error)
- func (p *SingleConnPool) IdleLen() int
- func (p *SingleConnPool) Len() int
- func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error)
- func (p *SingleConnPool) Put(_ context.Context, cn *Conn)
- func (p *SingleConnPool) Remove(_ context.Context, cn *Conn, reason error)
- func (p *SingleConnPool) RemovePoolHook(_ PoolHook)
- func (p *SingleConnPool) Size() int
- func (p *SingleConnPool) Stats() *Stats
- type Stats
- type StickyConnPool
- func (p *StickyConnPool) AddPoolHook(hook PoolHook)
- func (p *StickyConnPool) Close() error
- func (p *StickyConnPool) CloseConn(cn *Conn) error
- func (p *StickyConnPool) Get(ctx context.Context) (*Conn, error)
- func (p *StickyConnPool) IdleLen() int
- func (p *StickyConnPool) Len() int
- func (p *StickyConnPool) NewConn(ctx context.Context) (*Conn, error)
- func (p *StickyConnPool) Put(ctx context.Context, cn *Conn)
- func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error)
- func (p *StickyConnPool) RemovePoolHook(hook PoolHook)
- func (p *StickyConnPool) Reset(ctx context.Context) error
- func (p *StickyConnPool) Size() int
- func (p *StickyConnPool) Stats() *Stats
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed performs any operation on the closed client will return this error. ErrClosed = errors.New("redis: client is closed") // ErrPoolExhausted is returned from a pool connection method // when the maximum number of database connections in the pool has been reached. ErrPoolExhausted = errors.New("redis: connection pool exhausted") // ErrPoolTimeout timed out waiting to get a connection from the connection pool. ErrPoolTimeout = errors.New("redis: connection pool timeout") // ErrConnUnusableTimeout is returned when a connection is not usable and we timed out trying to mark it as unusable. ErrConnUnusableTimeout = errors.New("redis: timed out trying to mark connection as unusable") )
Functions ¶
This section is empty.
Types ¶
type BadConnError ¶
type BadConnError struct {
// contains filtered or unexported fields
}
func (BadConnError) Error ¶
func (e BadConnError) Error() string
func (BadConnError) Unwrap ¶
func (e BadConnError) Unwrap() error
type Conn ¶
type Conn struct {
// Inited flag to mark connection as initialized, this is almost the same as usable
// but it is used to make sure we don't initialize a network connection twice
// On handoff, the network connection is replaced, but the Conn struct is reused
// this flag will be set to false when the network connection is replaced and
// set to true after the new network connection is initialized
Inited atomic.Bool
// contains filtered or unexported fields
}
func NewConnWithBufferSize ¶ added in v9.12.0
func (*Conn) ClearHandoffState ¶ added in v9.15.0
func (cn *Conn) ClearHandoffState()
ClearHandoffState clears the handoff state after successful handoff (lock-free).
func (*Conn) ClearRelaxedTimeout ¶ added in v9.15.0
func (cn *Conn) ClearRelaxedTimeout()
ClearRelaxedTimeout removes relaxed timeouts, returning to normal timeout behavior. Uses atomic operations for lock-free access.
func (*Conn) CompareAndSwapUsable ¶ added in v9.16.0
CompareAndSwapUsable atomically compares and swaps the usable flag (lock-free).
This is used by background operations (handoff, re-auth) to acquire exclusive access to a connection. The operation sets usable to false, preventing the pool from returning the connection to clients.
Returns true if the swap was successful (old value matched), false otherwise.
func (*Conn) CompareAndSwapUsed ¶ added in v9.16.0
CompareAndSwapUsed atomically compares and swaps the used flag (lock-free).
This is the preferred method for acquiring a connection from the pool, as it ensures that only one goroutine marks the connection as used.
Returns true if the swap was successful (old value matched), false otherwise.
func (*Conn) ExecuteInitConn ¶ added in v9.15.0
ExecuteInitConn runs the stored connection initialization function if available.
func (*Conn) GetHandoffEndpoint ¶ added in v9.15.0
GetHandoffEndpoint returns the new endpoint for handoff (lock-free).
func (*Conn) GetHandoffInfo ¶ added in v9.15.0
GetHandoffInfo returns all handoff information atomically (lock-free). This method prevents race conditions by returning all handoff state in a single atomic operation. Returns (shouldHandoff, endpoint, seqID).
func (*Conn) GetMovingSeqID ¶ added in v9.15.0
GetMovingSeqID returns the sequence ID from the MOVING notification (lock-free).
func (*Conn) GetNetConn ¶ added in v9.15.0
GetNetConn safely returns the current network connection using atomic load (lock-free). This method is used by the pool for health checks and provides better performance.
func (*Conn) HandoffRetries ¶ added in v9.15.0
GetHandoffRetries returns the current handoff retry count (lock-free).
func (*Conn) HasBufferedData ¶ added in v9.15.0
HasBufferedData safely checks if the connection has buffered data. This method is used to avoid data races when checking for push notifications.
func (*Conn) HasRelaxedTimeout ¶ added in v9.15.0
HasRelaxedTimeout returns true if relaxed timeouts are currently active on this connection. This checks both the timeout values and the deadline (if set). Uses atomic operations for lock-free access.
func (*Conn) IncrementAndGetHandoffRetries ¶ added in v9.15.0
IncrementAndGetHandoffRetries atomically increments and returns handoff retries (lock-free).
func (*Conn) IsPooled ¶ added in v9.15.0
IsPooled returns true if the connection is managed by a pool and will be pooled on Put.
func (*Conn) IsPubSub ¶ added in v9.15.0
IsPubSub returns true if the connection is used for PubSub.
func (*Conn) IsUsable ¶ added in v9.15.0
IsUsable returns true if the connection is safe to use for new commands (lock-free).
A connection is "usable" when it's in a stable state and can be returned to clients. It becomes unusable during:
- Initialization (before first use)
- Handoff operations (network connection replacement)
- Re-authentication (credential updates)
- Other background operations that need exclusive access
func (*Conn) IsUsed ¶ added in v9.16.0
IsUsed returns true if the connection is currently in use (lock-free).
A connection is "used" when it has been retrieved from the pool and is actively processing a command. Background operations (like re-auth) should wait until the connection is not used before executing commands.
func (*Conn) MarkForHandoff ¶ added in v9.15.0
MarkForHandoff marks the connection for handoff due to MOVING notification (lock-free). Returns an error if the connection is already marked for handoff. This method uses atomic compare-and-swap to ensure all handoff state is updated atomically.
func (*Conn) MarkQueuedForHandoff ¶ added in v9.15.0
func (*Conn) MaybeHasData ¶ added in v9.15.0
MaybeHasData tries to peek at the next byte in the socket without consuming it This is used to check if there are push notifications available Important: This will work on Linux, but not on Windows
func (*Conn) PeekReplyTypeSafe ¶ added in v9.15.0
PeekReplyTypeSafe safely peeks at the reply type. This method is used to avoid data races when checking for push notifications.
func (*Conn) RemoteAddr ¶
func (*Conn) SetInitConnFunc ¶ added in v9.15.0
SetInitConnFunc sets the connection initialization function to be called on reconnections.
func (*Conn) SetNetConn ¶
func (*Conn) SetNetConnAndInitConn ¶ added in v9.15.0
SetNetConnAndInitConn replaces the underlying connection and executes the initialization.
func (*Conn) SetOnClose ¶ added in v9.9.0
func (*Conn) SetRelaxedTimeout ¶ added in v9.15.0
SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades. These timeouts will be used for all subsequent commands until the deadline expires. Uses atomic operations for lock-free access.
func (*Conn) SetRelaxedTimeoutWithDeadline ¶ added in v9.15.0
func (cn *Conn) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time)
SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline. After the deadline, timeouts automatically revert to normal values. Uses atomic operations for lock-free access.
func (*Conn) SetUsable ¶ added in v9.15.0
SetUsable sets the usable flag for the connection (lock-free).
This should be called to mark a connection as usable after initialization or to release it after a background operation completes.
Prefer CompareAndSwapUsable() when acquiring exclusive access to avoid race conditions.
func (*Conn) SetUsed ¶ added in v9.16.0
SetUsed sets the used flag for the connection (lock-free).
This should be called when returning a connection to the pool (set to false) or when a single-connection pool retrieves its connection (set to true).
Prefer CompareAndSwapUsed() when acquiring from a multi-connection pool to avoid race conditions.
func (*Conn) ShouldHandoff ¶ added in v9.15.0
ShouldHandoff returns true if the connection needs to be handed off (lock-free).
func (*Conn) WithReader ¶
func (*Conn) WithWriter ¶
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
func NewConnPool ¶
func (*ConnPool) AddPoolHook ¶ added in v9.15.0
AddPoolHook adds a pool hook to the pool.
func (*ConnPool) NewConn ¶
NewConn creates a new connection and returns it to the user. This will still obey MaxActiveConns but will not include it in the pool and won't increase the pool size.
NOTE: If you directly get a connection from the pool, it won't be pooled and won't support maintnotifications upgrades.
func (*ConnPool) RemovePoolHook ¶ added in v9.15.0
RemovePoolHook removes a pool hook from the pool.
type HandoffState ¶ added in v9.15.0
type HandoffState struct {
ShouldHandoff bool // Whether connection should be handed off
Endpoint string // New endpoint for handoff
SeqID int64 // Sequence ID from MOVING notification
}
HandoffState represents the atomic state for connection handoffs This struct is stored atomically to prevent race conditions between checking handoff status and reading handoff parameters
type Options ¶
type Options struct {
Dialer func(context.Context) (net.Conn, error)
ReadBufferSize int
WriteBufferSize int
PoolFIFO bool
PoolSize int32
DialTimeout time.Duration
PoolTimeout time.Duration
MinIdleConns int32
MaxIdleConns int32
MaxActiveConns int32
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
PushNotificationsEnabled bool
// DialerRetries is the maximum number of retry attempts when dialing fails.
// Default: 5
DialerRetries int
// DialerRetryTimeout is the backoff duration between retry attempts.
// Default: 100ms
DialerRetryTimeout time.Duration
}
type PoolHook ¶ added in v9.15.0
type PoolHook interface {
// OnGet is called when a connection is retrieved from the pool.
// It can modify the connection or return an error to prevent its use.
// The accept flag can be used to prevent the connection from being used.
// On Accept = false the connection is rejected and returned to the pool.
// The error can be used to prevent the connection from being used and returned to the pool.
// On Errors, the connection is removed from the pool.
// It has isNewConn flag to indicate if this is a new connection (rather than idle from the pool)
// The flag can be used for gathering metrics on pool hit/miss ratio.
OnGet(ctx context.Context, conn *Conn, isNewConn bool) (accept bool, err error)
// OnPut is called when a connection is returned to the pool.
// It returns whether the connection should be pooled and whether it should be removed.
OnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error)
// OnRemove is called when a connection is removed from the pool.
// This happens when:
// - Connection fails health check
// - Connection exceeds max lifetime
// - Pool is being closed
// - Connection encounters an error
// Implementations should clean up any per-connection state.
// The reason parameter indicates why the connection was removed.
OnRemove(ctx context.Context, conn *Conn, reason error)
}
PoolHook defines the interface for connection lifecycle hooks.
type PoolHookManager ¶ added in v9.15.0
type PoolHookManager struct {
// contains filtered or unexported fields
}
PoolHookManager manages multiple pool hooks.
func NewPoolHookManager ¶ added in v9.15.0
func NewPoolHookManager() *PoolHookManager
NewPoolHookManager creates a new pool hook manager.
func (*PoolHookManager) AddHook ¶ added in v9.15.0
func (phm *PoolHookManager) AddHook(hook PoolHook)
AddHook adds a pool hook to the manager. Hooks are called in the order they were added.
func (*PoolHookManager) GetHookCount ¶ added in v9.15.0
func (phm *PoolHookManager) GetHookCount() int
GetHookCount returns the number of registered hooks (for testing).
func (*PoolHookManager) GetHooks ¶ added in v9.15.0
func (phm *PoolHookManager) GetHooks() []PoolHook
GetHooks returns a copy of all registered hooks.
func (*PoolHookManager) ProcessOnGet ¶ added in v9.15.0
func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewConn bool) (acceptConn bool, err error)
ProcessOnGet calls all OnGet hooks in order. If any hook returns an error, processing stops and the error is returned.
func (*PoolHookManager) ProcessOnPut ¶ added in v9.15.0
func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error)
ProcessOnPut calls all OnPut hooks in order. The first hook that returns shouldRemove=true or shouldPool=false will stop processing.
func (*PoolHookManager) ProcessOnRemove ¶ added in v9.16.0
func (phm *PoolHookManager) ProcessOnRemove(ctx context.Context, conn *Conn, reason error)
ProcessOnRemove calls all OnRemove hooks in order.
func (*PoolHookManager) RemoveHook ¶ added in v9.15.0
func (phm *PoolHookManager) RemoveHook(hook PoolHook)
RemoveHook removes a pool hook from the manager.
type Pooler ¶
type Pooler interface {
NewConn(context.Context) (*Conn, error)
CloseConn(*Conn) error
Get(context.Context) (*Conn, error)
Put(context.Context, *Conn)
Remove(context.Context, *Conn, error)
Len() int
IdleLen() int
Stats() *Stats
// Size returns the maximum pool size (capacity).
// This is used by the streaming credentials manager to size the re-auth worker pool.
Size() int
AddPoolHook(hook PoolHook)
RemovePoolHook(hook PoolHook)
Close() error
}
type PubSubPool ¶ added in v9.15.0
type PubSubPool struct {
// contains filtered or unexported fields
}
PubSubPool manages a pool of PubSub connections.
func NewPubSubPool ¶ added in v9.15.0
func NewPubSubPool(opt *Options, netDialer func(ctx context.Context, network, addr string) (net.Conn, error)) *PubSubPool
PubSubPool implements a pool for PubSub connections. It intentionally does not implement the Pooler interface
func (*PubSubPool) Close ¶ added in v9.15.0
func (p *PubSubPool) Close() error
func (*PubSubPool) Stats ¶ added in v9.15.0
func (p *PubSubPool) Stats() *PubSubStats
func (*PubSubPool) TrackConn ¶ added in v9.15.0
func (p *PubSubPool) TrackConn(cn *Conn)
func (*PubSubPool) UntrackConn ¶ added in v9.15.0
func (p *PubSubPool) UntrackConn(cn *Conn)
type PubSubStats ¶ added in v9.15.0
type SingleConnPool ¶
type SingleConnPool struct {
// contains filtered or unexported fields
}
SingleConnPool is a pool that always returns the same connection. Note: This pool is not thread-safe. It is intended to be used by clients that need a single connection.
func NewSingleConnPool ¶
func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool
NewSingleConnPool creates a new single connection pool. The pool will always return the same connection. The pool will not: - Close the connection - Reconnect the connection - Track the connection in any way
func (*SingleConnPool) AddPoolHook ¶ added in v9.15.0
func (p *SingleConnPool) AddPoolHook(_ PoolHook)
func (*SingleConnPool) Close ¶
func (p *SingleConnPool) Close() error
func (*SingleConnPool) CloseConn ¶
func (p *SingleConnPool) CloseConn(cn *Conn) error
func (*SingleConnPool) IdleLen ¶
func (p *SingleConnPool) IdleLen() int
func (*SingleConnPool) Len ¶
func (p *SingleConnPool) Len() int
func (*SingleConnPool) NewConn ¶
func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error)
func (*SingleConnPool) Remove ¶
func (p *SingleConnPool) Remove(_ context.Context, cn *Conn, reason error)
func (*SingleConnPool) RemovePoolHook ¶ added in v9.15.0
func (p *SingleConnPool) RemovePoolHook(_ PoolHook)
func (*SingleConnPool) Size ¶ added in v9.16.0
func (p *SingleConnPool) Size() int
Size returns the maximum pool size, which is always 1 for SingleConnPool.
func (*SingleConnPool) Stats ¶
func (p *SingleConnPool) Stats() *Stats
type Stats ¶
type Stats struct {
Hits uint32 // number of times free connection was found in the pool
Misses uint32 // number of times free connection was NOT found in the pool
Timeouts uint32 // number of times a wait timeout occurred
WaitCount uint32 // number of times a connection was waited
Unusable uint32 // number of times a connection was found to be unusable
WaitDurationNs int64 // total time spent for waiting a connection in nanoseconds
TotalConns uint32 // number of total connections in the pool
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
PubSubStats PubSubStats
}
Stats contains pool state information and accumulated stats.
type StickyConnPool ¶
type StickyConnPool struct {
// contains filtered or unexported fields
}
func NewStickyConnPool ¶
func NewStickyConnPool(pool Pooler) *StickyConnPool
func (*StickyConnPool) AddPoolHook ¶ added in v9.15.0
func (p *StickyConnPool) AddPoolHook(hook PoolHook)
func (*StickyConnPool) Close ¶
func (p *StickyConnPool) Close() error
func (*StickyConnPool) CloseConn ¶
func (p *StickyConnPool) CloseConn(cn *Conn) error
func (*StickyConnPool) IdleLen ¶
func (p *StickyConnPool) IdleLen() int
func (*StickyConnPool) Len ¶
func (p *StickyConnPool) Len() int
func (*StickyConnPool) NewConn ¶
func (p *StickyConnPool) NewConn(ctx context.Context) (*Conn, error)
func (*StickyConnPool) Remove ¶
func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error)
func (*StickyConnPool) RemovePoolHook ¶ added in v9.15.0
func (p *StickyConnPool) RemovePoolHook(hook PoolHook)
func (*StickyConnPool) Size ¶ added in v9.16.0
func (p *StickyConnPool) Size() int
Size returns the maximum pool size, which is always 1 for StickyConnPool.
func (*StickyConnPool) Stats ¶
func (p *StickyConnPool) Stats() *Stats