Documentation
¶
Index ¶
- type ConnReAuthCredentialsListener
- type CredentialsListeners
- type Manager
- type ReAuthPoolHook
- func (r *ReAuthPoolHook) MarkForReAuth(connID uint64, reAuthFn func(error))
- func (r *ReAuthPoolHook) OnGet(_ context.Context, conn *pool.Conn, _ bool) (accept bool, err error)
- func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, error)
- func (r *ReAuthPoolHook) OnRemove(_ context.Context, conn *pool.Conn, _ error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnReAuthCredentialsListener ¶
type ConnReAuthCredentialsListener struct {
// contains filtered or unexported fields
}
ConnReAuthCredentialsListener is a credentials listener for a specific connection that triggers re-authentication when credentials change.
This listener implements the auth.CredentialsListener interface and is subscribed to a StreamingCredentialsProvider. When new credentials are received via OnNext, it marks the connection for re-authentication through the manager.
The re-authentication is always performed asynchronously to avoid blocking the credentials provider and to prevent potential deadlocks with the pool semaphore. The actual re-auth happens when the connection is returned to the pool in an idle state.
Lifecycle:
- Created during connection initialization via Manager.Listener()
- Subscribed to the StreamingCredentialsProvider
- Receives credential updates via OnNext()
- Cleaned up when connection is removed from pool via Manager.RemoveListener()
func (*ConnReAuthCredentialsListener) OnError ¶
func (c *ConnReAuthCredentialsListener) OnError(err error)
OnError is called when an error occurs during credential streaming or re-authentication.
This method can be called from:
- The StreamingCredentialsProvider when there's an error in the credentials stream
- The re-auth process when connection acquisition times out
- The re-auth process when the AUTH command fails
The error is delegated to the onErr callback provided during listener creation.
Thread-safe: Can be called from multiple goroutines (provider, re-auth worker).
func (*ConnReAuthCredentialsListener) OnNext ¶
func (c *ConnReAuthCredentialsListener) OnNext(credentials auth.Credentials)
OnNext is called when new credentials are received from the StreamingCredentialsProvider.
This method marks the connection for asynchronous re-authentication. The actual re-authentication happens in the background when the connection is returned to the pool and is in an idle state.
Asynchronous re-auth is used to:
- Avoid blocking the credentials provider's notification goroutine
- Prevent deadlocks with the pool's semaphore (especially with small pool sizes)
- Ensure re-auth happens when the connection is safe to use (not processing commands)
The reAuthFn callback receives:
- nil if the connection was successfully acquired for re-auth
- error if acquisition timed out or failed
Thread-safe: Called by the credentials provider's notification goroutine.
type CredentialsListeners ¶
type CredentialsListeners struct {
// contains filtered or unexported fields
}
CredentialsListeners is a thread-safe collection of credentials listeners indexed by connection ID.
This collection is used by the Manager to maintain a registry of listeners for each connection in the pool. Listeners are reused when connections are reinitialized (e.g., after a handoff) to avoid creating duplicate subscriptions to the StreamingCredentialsProvider.
The collection supports concurrent access from multiple goroutines during connection initialization, credential updates, and connection removal.
func NewCredentialsListeners ¶
func NewCredentialsListeners() *CredentialsListeners
NewCredentialsListeners creates a new thread-safe credentials listeners collection.
func (*CredentialsListeners) Add ¶
func (c *CredentialsListeners) Add(connID uint64, listener auth.CredentialsListener)
Add adds or updates a credentials listener for a connection.
If a listener already exists for the connection ID, it is replaced. This is safe because the old listener should have been unsubscribed before the connection was reinitialized.
Thread-safe: Can be called concurrently from multiple goroutines.
func (*CredentialsListeners) Get ¶
func (c *CredentialsListeners) Get(connID uint64) (auth.CredentialsListener, bool)
Get retrieves the credentials listener for a connection.
Returns:
- listener: The credentials listener for the connection, or nil if not found
- ok: true if a listener exists for the connection ID, false otherwise
Thread-safe: Can be called concurrently from multiple goroutines.
func (*CredentialsListeners) Remove ¶
func (c *CredentialsListeners) Remove(connID uint64)
Remove removes the credentials listener for a connection.
This is called when a connection is removed from the pool to prevent memory leaks. If no listener exists for the connection ID, this is a no-op.
Thread-safe: Can be called concurrently from multiple goroutines.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager coordinates streaming credentials and re-authentication for a connection pool.
The manager is responsible for:
- Creating and managing per-connection credentials listeners
- Providing the pool hook for re-authentication
- Coordinating between credentials updates and pool operations
When credentials change via a StreamingCredentialsProvider:
- The credentials listener (ConnReAuthCredentialsListener) receives the update
- It calls MarkForReAuth on the manager
- The manager delegates to the pool hook
- The pool hook schedules background re-authentication
The manager maintains a registry of credentials listeners indexed by connection ID, allowing listener reuse when connections are reinitialized (e.g., after handoff).
func NewManager ¶
NewManager creates a new streaming credentials manager.
Parameters:
- pl: The connection pool to manage
- reAuthTimeout: Maximum time to wait for acquiring a connection for re-authentication
The manager creates a ReAuthPoolHook sized to match the pool size, ensuring that re-auth operations don't exhaust the connection pool.
func (*Manager) Listener ¶
func (m *Manager) Listener( poolCn *pool.Conn, reAuth func(*pool.Conn, auth.Credentials) error, onErr func(*pool.Conn, error), ) (auth.CredentialsListener, error)
Listener returns or creates a credentials listener for a connection.
This method is called during connection initialization to set up the credentials listener. If a listener already exists for the connection ID (e.g., after a handoff), it is reused.
Parameters:
- poolCn: The connection to create/get a listener for
- reAuth: Function to re-authenticate the connection with new credentials
- onErr: Function to call when re-authentication fails
Returns:
- auth.CredentialsListener: The listener to subscribe to the credentials provider
- error: Non-nil if poolCn is nil
Note: The reAuth and onErr callbacks are captured once when the listener is created and reused for the connection's lifetime. They should not change.
Thread-safe: Can be called concurrently during connection initialization.
func (*Manager) MarkForReAuth ¶
MarkForReAuth marks a connection for re-authentication.
This method is called by the credentials listener when new credentials are received. It delegates to the pool hook to schedule background re-authentication.
Parameters:
- poolCn: The connection to re-authenticate
- reAuthFn: Function to call for re-authentication, receives error if acquisition fails
Thread-safe: Called by credentials listeners when credentials change.
func (*Manager) PoolHook ¶
PoolHook returns the pool hook for re-authentication.
This hook should be registered with the connection pool to enable automatic re-authentication when credentials change.
func (*Manager) RemoveListener ¶
RemoveListener removes the credentials listener for a connection.
This method is called by the pool hook's OnRemove to clean up listeners when connections are removed from the pool.
Parameters:
- connID: The connection ID whose listener should be removed
Thread-safe: Called during connection removal.
type ReAuthPoolHook ¶
type ReAuthPoolHook struct {
// contains filtered or unexported fields
}
ReAuthPoolHook is a pool hook that manages background re-authentication of connections when credentials change via a streaming credentials provider.
The hook uses a semaphore-based worker pool to limit concurrent re-authentication operations and prevent pool exhaustion. When credentials change, connections are marked for re-authentication and processed asynchronously in the background.
The re-authentication process:
- OnPut: When a connection is returned to the pool, check if it needs re-auth
- If yes, schedule it for background processing (move from shouldReAuth to scheduledReAuth)
- A worker goroutine acquires the connection (waits until it's not in use)
- Executes the re-auth function while holding the connection
- Releases the connection back to the pool
The hook ensures that:
- Only one re-auth operation runs per connection at a time
- Connections are not used for commands during re-authentication
- Re-auth operations timeout if they can't acquire the connection
- Resources are properly cleaned up on connection removal
func NewReAuthPoolHook ¶
func NewReAuthPoolHook(poolSize int, reAuthTimeout time.Duration) *ReAuthPoolHook
NewReAuthPoolHook creates a new re-authentication pool hook.
Parameters:
- poolSize: Maximum number of concurrent re-auth operations (typically matches pool size)
- reAuthTimeout: Maximum time to wait for acquiring a connection for re-authentication
The poolSize parameter is used to initialize the worker semaphore, ensuring that re-auth operations don't exhaust the connection pool.
func (*ReAuthPoolHook) MarkForReAuth ¶
func (r *ReAuthPoolHook) MarkForReAuth(connID uint64, reAuthFn func(error))
MarkForReAuth marks a connection for re-authentication.
This method is called when credentials change and a connection needs to be re-authenticated. The actual re-authentication happens asynchronously when the connection is returned to the pool (in OnPut).
Parameters:
- connID: The connection ID to mark for re-authentication
- reAuthFn: Function to call for re-authentication, receives error if acquisition fails
Thread-safe: Can be called concurrently from multiple goroutines.
func (*ReAuthPoolHook) OnGet ¶
OnGet is called when a connection is retrieved from the pool.
This hook checks if the connection needs re-authentication or has a scheduled re-auth operation. If so, it rejects the connection (returns accept=false), causing the pool to try another connection.
Returns:
- accept: false if connection needs re-auth, true otherwise
- err: always nil (errors are not used in this hook)
Thread-safe: Called concurrently by multiple goroutines getting connections.
func (*ReAuthPoolHook) OnPut ¶
OnPut is called when a connection is returned to the pool.
This hook checks if the connection needs re-authentication. If so, it schedules a background goroutine to perform the re-auth asynchronously. The goroutine:
- Waits for a worker slot (semaphore)
- Acquires the connection (waits until not in use)
- Executes the re-auth function
- Releases the connection and worker slot
The connection is always pooled (not removed) since re-auth happens in background.
Returns:
- shouldPool: always true (connection stays in pool during background re-auth)
- shouldRemove: always false
- err: always nil
Thread-safe: Called concurrently by multiple goroutines returning connections.
func (*ReAuthPoolHook) OnRemove ¶
OnRemove is called when a connection is removed from the pool.
This hook cleans up all state associated with the connection:
- Removes from shouldReAuth map (pending re-auth)
- Removes from scheduledReAuth map (active re-auth)
- Removes credentials listener from manager
This prevents memory leaks and ensures that removed connections don't have lingering re-auth operations or listeners.
Thread-safe: Called when connections are removed due to errors, timeouts, or pool closure.