pool

package
v9.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2025 License: BSD-2-Clause Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed performs any operation on the closed client will return this error.
	ErrClosed = errors.New("redis: client is closed")

	// ErrPoolExhausted is returned from a pool connection method
	// when the maximum number of database connections in the pool has been reached.
	ErrPoolExhausted = errors.New("redis: connection pool exhausted")

	// ErrPoolTimeout timed out waiting to get a connection from the connection pool.
	ErrPoolTimeout = errors.New("redis: connection pool timeout")

	// ErrConnUnusableTimeout is returned when a connection is not usable and we timed out trying to mark it as unusable.
	ErrConnUnusableTimeout = errors.New("redis: timed out trying to mark connection as unusable")
)

Functions

This section is empty.

Types

type BadConnError

type BadConnError struct {
	// contains filtered or unexported fields
}

func (BadConnError) Error

func (e BadConnError) Error() string

func (BadConnError) Unwrap

func (e BadConnError) Unwrap() error

type Conn

type Conn struct {

	// Inited flag to mark connection as initialized, this is almost the same as usable
	// but it is used to make sure we don't initialize a network connection twice
	// On handoff, the network connection is replaced, but the Conn struct is reused
	// this flag will be set to false when the network connection is replaced and
	// set to true after the new network connection is initialized
	Inited atomic.Bool
	// contains filtered or unexported fields
}

func NewConn

func NewConn(netConn net.Conn) *Conn

func NewConnWithBufferSize added in v9.12.0

func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn

func (*Conn) ClearHandoffState added in v9.15.0

func (cn *Conn) ClearHandoffState()

ClearHandoffState clears the handoff state after successful handoff (lock-free).

func (*Conn) ClearRelaxedTimeout added in v9.15.0

func (cn *Conn) ClearRelaxedTimeout()

ClearRelaxedTimeout removes relaxed timeouts, returning to normal timeout behavior. Uses atomic operations for lock-free access.

func (*Conn) Close

func (cn *Conn) Close() error

func (*Conn) CompareAndSwapUsable added in v9.16.0

func (cn *Conn) CompareAndSwapUsable(old, new bool) bool

CompareAndSwapUsable atomically compares and swaps the usable flag (lock-free).

This is used by background operations (handoff, re-auth) to acquire exclusive access to a connection. The operation sets usable to false, preventing the pool from returning the connection to clients.

Returns true if the swap was successful (old value matched), false otherwise.

func (*Conn) CompareAndSwapUsed added in v9.16.0

func (cn *Conn) CompareAndSwapUsed(old, new bool) bool

CompareAndSwapUsed atomically compares and swaps the used flag (lock-free).

This is the preferred method for acquiring a connection from the pool, as it ensures that only one goroutine marks the connection as used.

Returns true if the swap was successful (old value matched), false otherwise.

func (*Conn) ExecuteInitConn added in v9.15.0

func (cn *Conn) ExecuteInitConn(ctx context.Context) error

ExecuteInitConn runs the stored connection initialization function if available.

func (*Conn) GetHandoffEndpoint added in v9.15.0

func (cn *Conn) GetHandoffEndpoint() string

GetHandoffEndpoint returns the new endpoint for handoff (lock-free).

func (*Conn) GetHandoffInfo added in v9.15.0

func (cn *Conn) GetHandoffInfo() (bool, string, int64)

GetHandoffInfo returns all handoff information atomically (lock-free). This method prevents race conditions by returning all handoff state in a single atomic operation. Returns (shouldHandoff, endpoint, seqID).

func (*Conn) GetID added in v9.15.0

func (cn *Conn) GetID() uint64

GetID returns the unique identifier for this connection.

func (*Conn) GetMovingSeqID added in v9.15.0

func (cn *Conn) GetMovingSeqID() int64

GetMovingSeqID returns the sequence ID from the MOVING notification (lock-free).

func (*Conn) GetNetConn added in v9.15.0

func (cn *Conn) GetNetConn() net.Conn

GetNetConn safely returns the current network connection using atomic load (lock-free). This method is used by the pool for health checks and provides better performance.

func (*Conn) HandoffRetries added in v9.15.0

func (cn *Conn) HandoffRetries() int

GetHandoffRetries returns the current handoff retry count (lock-free).

func (*Conn) HasBufferedData added in v9.15.0

func (cn *Conn) HasBufferedData() bool

HasBufferedData safely checks if the connection has buffered data. This method is used to avoid data races when checking for push notifications.

func (*Conn) HasRelaxedTimeout added in v9.15.0

func (cn *Conn) HasRelaxedTimeout() bool

HasRelaxedTimeout returns true if relaxed timeouts are currently active on this connection. This checks both the timeout values and the deadline (if set). Uses atomic operations for lock-free access.

func (*Conn) IncrementAndGetHandoffRetries added in v9.15.0

func (cn *Conn) IncrementAndGetHandoffRetries(n int) int

IncrementAndGetHandoffRetries atomically increments and returns handoff retries (lock-free).

func (*Conn) IsClosed added in v9.15.0

func (cn *Conn) IsClosed() bool

func (*Conn) IsInited added in v9.15.0

func (cn *Conn) IsInited() bool

func (*Conn) IsPooled added in v9.15.0

func (cn *Conn) IsPooled() bool

IsPooled returns true if the connection is managed by a pool and will be pooled on Put.

func (*Conn) IsPubSub added in v9.15.0

func (cn *Conn) IsPubSub() bool

IsPubSub returns true if the connection is used for PubSub.

func (*Conn) IsUsable added in v9.15.0

func (cn *Conn) IsUsable() bool

IsUsable returns true if the connection is safe to use for new commands (lock-free).

A connection is "usable" when it's in a stable state and can be returned to clients. It becomes unusable during:

  • Initialization (before first use)
  • Handoff operations (network connection replacement)
  • Re-authentication (credential updates)
  • Other background operations that need exclusive access

func (*Conn) IsUsed added in v9.16.0

func (cn *Conn) IsUsed() bool

IsUsed returns true if the connection is currently in use (lock-free).

A connection is "used" when it has been retrieved from the pool and is actively processing a command. Background operations (like re-auth) should wait until the connection is not used before executing commands.

func (*Conn) MarkForHandoff added in v9.15.0

func (cn *Conn) MarkForHandoff(newEndpoint string, seqID int64) error

MarkForHandoff marks the connection for handoff due to MOVING notification (lock-free). Returns an error if the connection is already marked for handoff. This method uses atomic compare-and-swap to ensure all handoff state is updated atomically.

func (*Conn) MarkQueuedForHandoff added in v9.15.0

func (cn *Conn) MarkQueuedForHandoff() error

func (*Conn) MaybeHasData added in v9.15.0

func (cn *Conn) MaybeHasData() bool

MaybeHasData tries to peek at the next byte in the socket without consuming it This is used to check if there are push notifications available Important: This will work on Linux, but not on Windows

func (*Conn) PeekReplyTypeSafe added in v9.15.0

func (cn *Conn) PeekReplyTypeSafe() (byte, error)

PeekReplyTypeSafe safely peeks at the reply type. This method is used to avoid data races when checking for push notifications.

func (*Conn) RemoteAddr

func (cn *Conn) RemoteAddr() net.Addr

func (*Conn) SetInitConnFunc added in v9.15.0

func (cn *Conn) SetInitConnFunc(fn func(context.Context, *Conn) error)

SetInitConnFunc sets the connection initialization function to be called on reconnections.

func (*Conn) SetNetConn

func (cn *Conn) SetNetConn(netConn net.Conn)

func (*Conn) SetNetConnAndInitConn added in v9.15.0

func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error

SetNetConnAndInitConn replaces the underlying connection and executes the initialization.

func (*Conn) SetOnClose added in v9.9.0

func (cn *Conn) SetOnClose(fn func() error)

func (*Conn) SetRelaxedTimeout added in v9.15.0

func (cn *Conn) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration)

SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades. These timeouts will be used for all subsequent commands until the deadline expires. Uses atomic operations for lock-free access.

func (*Conn) SetRelaxedTimeoutWithDeadline added in v9.15.0

func (cn *Conn) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time)

SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline. After the deadline, timeouts automatically revert to normal values. Uses atomic operations for lock-free access.

func (*Conn) SetUsable added in v9.15.0

func (cn *Conn) SetUsable(usable bool)

SetUsable sets the usable flag for the connection (lock-free).

This should be called to mark a connection as usable after initialization or to release it after a background operation completes.

Prefer CompareAndSwapUsable() when acquiring exclusive access to avoid race conditions.

func (*Conn) SetUsed added in v9.16.0

func (cn *Conn) SetUsed(val bool)

SetUsed sets the used flag for the connection (lock-free).

This should be called when returning a connection to the pool (set to false) or when a single-connection pool retrieves its connection (set to true).

Prefer CompareAndSwapUsed() when acquiring from a multi-connection pool to avoid race conditions.

func (*Conn) SetUsedAt

func (cn *Conn) SetUsedAt(tm time.Time)

func (*Conn) ShouldHandoff added in v9.15.0

func (cn *Conn) ShouldHandoff() bool

ShouldHandoff returns true if the connection needs to be handed off (lock-free).

func (*Conn) UsedAt

func (cn *Conn) UsedAt() time.Time

func (*Conn) WithReader

func (cn *Conn) WithReader(
	ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error

func (*Conn) WithWriter

func (cn *Conn) WithWriter(
	ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
) error

func (*Conn) Write

func (cn *Conn) Write(b []byte) (int, error)

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

func NewConnPool

func NewConnPool(opt *Options) *ConnPool

func (*ConnPool) AddPoolHook added in v9.15.0

func (p *ConnPool) AddPoolHook(hook PoolHook)

AddPoolHook adds a pool hook to the pool.

func (*ConnPool) Close

func (p *ConnPool) Close() error

func (*ConnPool) CloseConn

func (p *ConnPool) CloseConn(cn *Conn) error

func (*ConnPool) Filter

func (p *ConnPool) Filter(fn func(*Conn) bool) error

func (*ConnPool) Get

func (p *ConnPool) Get(ctx context.Context) (*Conn, error)

Get returns existed connection from the pool or creates a new one.

func (*ConnPool) IdleLen

func (p *ConnPool) IdleLen() int

IdleLen returns number of idle connections.

func (*ConnPool) Len

func (p *ConnPool) Len() int

Len returns total number of connections.

func (*ConnPool) NewConn

func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error)

NewConn creates a new connection and returns it to the user. This will still obey MaxActiveConns but will not include it in the pool and won't increase the pool size.

NOTE: If you directly get a connection from the pool, it won't be pooled and won't support maintnotifications upgrades.

func (*ConnPool) Put

func (p *ConnPool) Put(ctx context.Context, cn *Conn)

func (*ConnPool) Remove

func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error)

func (*ConnPool) RemovePoolHook added in v9.15.0

func (p *ConnPool) RemovePoolHook(hook PoolHook)

RemovePoolHook removes a pool hook from the pool.

func (*ConnPool) Size added in v9.16.0

func (p *ConnPool) Size() int

Size returns the maximum pool size (capacity).

This is used by the streaming credentials manager to size the re-auth worker pool, ensuring that re-auth operations don't exhaust the connection pool.

func (*ConnPool) Stats

func (p *ConnPool) Stats() *Stats

type HandoffState added in v9.15.0

type HandoffState struct {
	ShouldHandoff bool   // Whether connection should be handed off
	Endpoint      string // New endpoint for handoff
	SeqID         int64  // Sequence ID from MOVING notification
}

HandoffState represents the atomic state for connection handoffs This struct is stored atomically to prevent race conditions between checking handoff status and reading handoff parameters

type Options

type Options struct {
	Dialer          func(context.Context) (net.Conn, error)
	ReadBufferSize  int
	WriteBufferSize int

	PoolFIFO                 bool
	PoolSize                 int32
	DialTimeout              time.Duration
	PoolTimeout              time.Duration
	MinIdleConns             int32
	MaxIdleConns             int32
	MaxActiveConns           int32
	ConnMaxIdleTime          time.Duration
	ConnMaxLifetime          time.Duration
	PushNotificationsEnabled bool

	// DialerRetries is the maximum number of retry attempts when dialing fails.
	// Default: 5
	DialerRetries int

	// DialerRetryTimeout is the backoff duration between retry attempts.
	// Default: 100ms
	DialerRetryTimeout time.Duration
}

type PoolHook added in v9.15.0

type PoolHook interface {
	// OnGet is called when a connection is retrieved from the pool.
	// It can modify the connection or return an error to prevent its use.
	// The accept flag can be used to prevent the connection from being used.
	// On Accept = false the connection is rejected and returned to the pool.
	// The error can be used to prevent the connection from being used and returned to the pool.
	// On Errors, the connection is removed from the pool.
	// It has isNewConn flag to indicate if this is a new connection (rather than idle from the pool)
	// The flag can be used for gathering metrics on pool hit/miss ratio.
	OnGet(ctx context.Context, conn *Conn, isNewConn bool) (accept bool, err error)

	// OnPut is called when a connection is returned to the pool.
	// It returns whether the connection should be pooled and whether it should be removed.
	OnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error)

	// OnRemove is called when a connection is removed from the pool.
	// This happens when:
	// - Connection fails health check
	// - Connection exceeds max lifetime
	// - Pool is being closed
	// - Connection encounters an error
	// Implementations should clean up any per-connection state.
	// The reason parameter indicates why the connection was removed.
	OnRemove(ctx context.Context, conn *Conn, reason error)
}

PoolHook defines the interface for connection lifecycle hooks.

type PoolHookManager added in v9.15.0

type PoolHookManager struct {
	// contains filtered or unexported fields
}

PoolHookManager manages multiple pool hooks.

func NewPoolHookManager added in v9.15.0

func NewPoolHookManager() *PoolHookManager

NewPoolHookManager creates a new pool hook manager.

func (*PoolHookManager) AddHook added in v9.15.0

func (phm *PoolHookManager) AddHook(hook PoolHook)

AddHook adds a pool hook to the manager. Hooks are called in the order they were added.

func (*PoolHookManager) GetHookCount added in v9.15.0

func (phm *PoolHookManager) GetHookCount() int

GetHookCount returns the number of registered hooks (for testing).

func (*PoolHookManager) GetHooks added in v9.15.0

func (phm *PoolHookManager) GetHooks() []PoolHook

GetHooks returns a copy of all registered hooks.

func (*PoolHookManager) ProcessOnGet added in v9.15.0

func (phm *PoolHookManager) ProcessOnGet(ctx context.Context, conn *Conn, isNewConn bool) (acceptConn bool, err error)

ProcessOnGet calls all OnGet hooks in order. If any hook returns an error, processing stops and the error is returned.

func (*PoolHookManager) ProcessOnPut added in v9.15.0

func (phm *PoolHookManager) ProcessOnPut(ctx context.Context, conn *Conn) (shouldPool bool, shouldRemove bool, err error)

ProcessOnPut calls all OnPut hooks in order. The first hook that returns shouldRemove=true or shouldPool=false will stop processing.

func (*PoolHookManager) ProcessOnRemove added in v9.16.0

func (phm *PoolHookManager) ProcessOnRemove(ctx context.Context, conn *Conn, reason error)

ProcessOnRemove calls all OnRemove hooks in order.

func (*PoolHookManager) RemoveHook added in v9.15.0

func (phm *PoolHookManager) RemoveHook(hook PoolHook)

RemoveHook removes a pool hook from the manager.

type Pooler

type Pooler interface {
	NewConn(context.Context) (*Conn, error)
	CloseConn(*Conn) error

	Get(context.Context) (*Conn, error)
	Put(context.Context, *Conn)
	Remove(context.Context, *Conn, error)

	Len() int
	IdleLen() int
	Stats() *Stats

	// Size returns the maximum pool size (capacity).
	// This is used by the streaming credentials manager to size the re-auth worker pool.
	Size() int

	AddPoolHook(hook PoolHook)
	RemovePoolHook(hook PoolHook)

	Close() error
}

type PubSubPool added in v9.15.0

type PubSubPool struct {
	// contains filtered or unexported fields
}

PubSubPool manages a pool of PubSub connections.

func NewPubSubPool added in v9.15.0

func NewPubSubPool(opt *Options, netDialer func(ctx context.Context, network, addr string) (net.Conn, error)) *PubSubPool

PubSubPool implements a pool for PubSub connections. It intentionally does not implement the Pooler interface

func (*PubSubPool) Close added in v9.15.0

func (p *PubSubPool) Close() error

func (*PubSubPool) NewConn added in v9.15.0

func (p *PubSubPool) NewConn(ctx context.Context, network string, addr string, channels []string) (*Conn, error)

func (*PubSubPool) Stats added in v9.15.0

func (p *PubSubPool) Stats() *PubSubStats

func (*PubSubPool) TrackConn added in v9.15.0

func (p *PubSubPool) TrackConn(cn *Conn)

func (*PubSubPool) UntrackConn added in v9.15.0

func (p *PubSubPool) UntrackConn(cn *Conn)

type PubSubStats added in v9.15.0

type PubSubStats struct {
	Created   uint32
	Untracked uint32
	Active    uint32
}

type SingleConnPool

type SingleConnPool struct {
	// contains filtered or unexported fields
}

SingleConnPool is a pool that always returns the same connection. Note: This pool is not thread-safe. It is intended to be used by clients that need a single connection.

func NewSingleConnPool

func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool

NewSingleConnPool creates a new single connection pool. The pool will always return the same connection. The pool will not: - Close the connection - Reconnect the connection - Track the connection in any way

func (*SingleConnPool) AddPoolHook added in v9.15.0

func (p *SingleConnPool) AddPoolHook(_ PoolHook)

func (*SingleConnPool) Close

func (p *SingleConnPool) Close() error

func (*SingleConnPool) CloseConn

func (p *SingleConnPool) CloseConn(cn *Conn) error

func (*SingleConnPool) Get

func (p *SingleConnPool) Get(_ context.Context) (*Conn, error)

func (*SingleConnPool) IdleLen

func (p *SingleConnPool) IdleLen() int

func (*SingleConnPool) Len

func (p *SingleConnPool) Len() int

func (*SingleConnPool) NewConn

func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error)

func (*SingleConnPool) Put

func (p *SingleConnPool) Put(_ context.Context, cn *Conn)

func (*SingleConnPool) Remove

func (p *SingleConnPool) Remove(_ context.Context, cn *Conn, reason error)

func (*SingleConnPool) RemovePoolHook added in v9.15.0

func (p *SingleConnPool) RemovePoolHook(_ PoolHook)

func (*SingleConnPool) Size added in v9.16.0

func (p *SingleConnPool) Size() int

Size returns the maximum pool size, which is always 1 for SingleConnPool.

func (*SingleConnPool) Stats

func (p *SingleConnPool) Stats() *Stats

type Stats

type Stats struct {
	Hits           uint32 // number of times free connection was found in the pool
	Misses         uint32 // number of times free connection was NOT found in the pool
	Timeouts       uint32 // number of times a wait timeout occurred
	WaitCount      uint32 // number of times a connection was waited
	Unusable       uint32 // number of times a connection was found to be unusable
	WaitDurationNs int64  // total time spent for waiting a connection in nanoseconds

	TotalConns uint32 // number of total connections in the pool
	IdleConns  uint32 // number of idle connections in the pool
	StaleConns uint32 // number of stale connections removed from the pool

	PubSubStats PubSubStats
}

Stats contains pool state information and accumulated stats.

type StickyConnPool

type StickyConnPool struct {
	// contains filtered or unexported fields
}

func NewStickyConnPool

func NewStickyConnPool(pool Pooler) *StickyConnPool

func (*StickyConnPool) AddPoolHook added in v9.15.0

func (p *StickyConnPool) AddPoolHook(hook PoolHook)

func (*StickyConnPool) Close

func (p *StickyConnPool) Close() error

func (*StickyConnPool) CloseConn

func (p *StickyConnPool) CloseConn(cn *Conn) error

func (*StickyConnPool) Get

func (p *StickyConnPool) Get(ctx context.Context) (*Conn, error)

func (*StickyConnPool) IdleLen

func (p *StickyConnPool) IdleLen() int

func (*StickyConnPool) Len

func (p *StickyConnPool) Len() int

func (*StickyConnPool) NewConn

func (p *StickyConnPool) NewConn(ctx context.Context) (*Conn, error)

func (*StickyConnPool) Put

func (p *StickyConnPool) Put(ctx context.Context, cn *Conn)

func (*StickyConnPool) Remove

func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error)

func (*StickyConnPool) RemovePoolHook added in v9.15.0

func (p *StickyConnPool) RemovePoolHook(hook PoolHook)

func (*StickyConnPool) Reset

func (p *StickyConnPool) Reset(ctx context.Context) error

func (*StickyConnPool) Size added in v9.16.0

func (p *StickyConnPool) Size() int

Size returns the maximum pool size, which is always 1 for StickyConnPool.

func (*StickyConnPool) Stats

func (p *StickyConnPool) Stats() *Stats

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL