streaming

package
v9.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2025 License: BSD-2-Clause Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnReAuthCredentialsListener

type ConnReAuthCredentialsListener struct {
	// contains filtered or unexported fields
}

ConnReAuthCredentialsListener is a credentials listener for a specific connection that triggers re-authentication when credentials change.

This listener implements the auth.CredentialsListener interface and is subscribed to a StreamingCredentialsProvider. When new credentials are received via OnNext, it marks the connection for re-authentication through the manager.

The re-authentication is always performed asynchronously to avoid blocking the credentials provider and to prevent potential deadlocks with the pool semaphore. The actual re-auth happens when the connection is returned to the pool in an idle state.

Lifecycle:

  • Created during connection initialization via Manager.Listener()
  • Subscribed to the StreamingCredentialsProvider
  • Receives credential updates via OnNext()
  • Cleaned up when connection is removed from pool via Manager.RemoveListener()

func (*ConnReAuthCredentialsListener) OnError

func (c *ConnReAuthCredentialsListener) OnError(err error)

OnError is called when an error occurs during credential streaming or re-authentication.

This method can be called from:

  • The StreamingCredentialsProvider when there's an error in the credentials stream
  • The re-auth process when connection acquisition times out
  • The re-auth process when the AUTH command fails

The error is delegated to the onErr callback provided during listener creation.

Thread-safe: Can be called from multiple goroutines (provider, re-auth worker).

func (*ConnReAuthCredentialsListener) OnNext

func (c *ConnReAuthCredentialsListener) OnNext(credentials auth.Credentials)

OnNext is called when new credentials are received from the StreamingCredentialsProvider.

This method marks the connection for asynchronous re-authentication. The actual re-authentication happens in the background when the connection is returned to the pool and is in an idle state.

Asynchronous re-auth is used to:

  • Avoid blocking the credentials provider's notification goroutine
  • Prevent deadlocks with the pool's semaphore (especially with small pool sizes)
  • Ensure re-auth happens when the connection is safe to use (not processing commands)

The reAuthFn callback receives:

  • nil if the connection was successfully acquired for re-auth
  • error if acquisition timed out or failed

Thread-safe: Called by the credentials provider's notification goroutine.

type CredentialsListeners

type CredentialsListeners struct {
	// contains filtered or unexported fields
}

CredentialsListeners is a thread-safe collection of credentials listeners indexed by connection ID.

This collection is used by the Manager to maintain a registry of listeners for each connection in the pool. Listeners are reused when connections are reinitialized (e.g., after a handoff) to avoid creating duplicate subscriptions to the StreamingCredentialsProvider.

The collection supports concurrent access from multiple goroutines during connection initialization, credential updates, and connection removal.

func NewCredentialsListeners

func NewCredentialsListeners() *CredentialsListeners

NewCredentialsListeners creates a new thread-safe credentials listeners collection.

func (*CredentialsListeners) Add

func (c *CredentialsListeners) Add(connID uint64, listener auth.CredentialsListener)

Add adds or updates a credentials listener for a connection.

If a listener already exists for the connection ID, it is replaced. This is safe because the old listener should have been unsubscribed before the connection was reinitialized.

Thread-safe: Can be called concurrently from multiple goroutines.

func (*CredentialsListeners) Get

Get retrieves the credentials listener for a connection.

Returns:

  • listener: The credentials listener for the connection, or nil if not found
  • ok: true if a listener exists for the connection ID, false otherwise

Thread-safe: Can be called concurrently from multiple goroutines.

func (*CredentialsListeners) Remove

func (c *CredentialsListeners) Remove(connID uint64)

Remove removes the credentials listener for a connection.

This is called when a connection is removed from the pool to prevent memory leaks. If no listener exists for the connection ID, this is a no-op.

Thread-safe: Can be called concurrently from multiple goroutines.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager coordinates streaming credentials and re-authentication for a connection pool.

The manager is responsible for:

  • Creating and managing per-connection credentials listeners
  • Providing the pool hook for re-authentication
  • Coordinating between credentials updates and pool operations

When credentials change via a StreamingCredentialsProvider:

  1. The credentials listener (ConnReAuthCredentialsListener) receives the update
  2. It calls MarkForReAuth on the manager
  3. The manager delegates to the pool hook
  4. The pool hook schedules background re-authentication

The manager maintains a registry of credentials listeners indexed by connection ID, allowing listener reuse when connections are reinitialized (e.g., after handoff).

func NewManager

func NewManager(pl pool.Pooler, reAuthTimeout time.Duration) *Manager

NewManager creates a new streaming credentials manager.

Parameters:

  • pl: The connection pool to manage
  • reAuthTimeout: Maximum time to wait for acquiring a connection for re-authentication

The manager creates a ReAuthPoolHook sized to match the pool size, ensuring that re-auth operations don't exhaust the connection pool.

func (*Manager) Listener

func (m *Manager) Listener(
	poolCn *pool.Conn,
	reAuth func(*pool.Conn, auth.Credentials) error,
	onErr func(*pool.Conn, error),
) (auth.CredentialsListener, error)

Listener returns or creates a credentials listener for a connection.

This method is called during connection initialization to set up the credentials listener. If a listener already exists for the connection ID (e.g., after a handoff), it is reused.

Parameters:

  • poolCn: The connection to create/get a listener for
  • reAuth: Function to re-authenticate the connection with new credentials
  • onErr: Function to call when re-authentication fails

Returns:

  • auth.CredentialsListener: The listener to subscribe to the credentials provider
  • error: Non-nil if poolCn is nil

Note: The reAuth and onErr callbacks are captured once when the listener is created and reused for the connection's lifetime. They should not change.

Thread-safe: Can be called concurrently during connection initialization.

func (*Manager) MarkForReAuth

func (m *Manager) MarkForReAuth(poolCn *pool.Conn, reAuthFn func(error))

MarkForReAuth marks a connection for re-authentication.

This method is called by the credentials listener when new credentials are received. It delegates to the pool hook to schedule background re-authentication.

Parameters:

  • poolCn: The connection to re-authenticate
  • reAuthFn: Function to call for re-authentication, receives error if acquisition fails

Thread-safe: Called by credentials listeners when credentials change.

func (*Manager) PoolHook

func (m *Manager) PoolHook() pool.PoolHook

PoolHook returns the pool hook for re-authentication.

This hook should be registered with the connection pool to enable automatic re-authentication when credentials change.

func (*Manager) RemoveListener

func (m *Manager) RemoveListener(connID uint64)

RemoveListener removes the credentials listener for a connection.

This method is called by the pool hook's OnRemove to clean up listeners when connections are removed from the pool.

Parameters:

  • connID: The connection ID whose listener should be removed

Thread-safe: Called during connection removal.

type ReAuthPoolHook

type ReAuthPoolHook struct {
	// contains filtered or unexported fields
}

ReAuthPoolHook is a pool hook that manages background re-authentication of connections when credentials change via a streaming credentials provider.

The hook uses a semaphore-based worker pool to limit concurrent re-authentication operations and prevent pool exhaustion. When credentials change, connections are marked for re-authentication and processed asynchronously in the background.

The re-authentication process:

  1. OnPut: When a connection is returned to the pool, check if it needs re-auth
  2. If yes, schedule it for background processing (move from shouldReAuth to scheduledReAuth)
  3. A worker goroutine acquires the connection (waits until it's not in use)
  4. Executes the re-auth function while holding the connection
  5. Releases the connection back to the pool

The hook ensures that:

  • Only one re-auth operation runs per connection at a time
  • Connections are not used for commands during re-authentication
  • Re-auth operations timeout if they can't acquire the connection
  • Resources are properly cleaned up on connection removal

func NewReAuthPoolHook

func NewReAuthPoolHook(poolSize int, reAuthTimeout time.Duration) *ReAuthPoolHook

NewReAuthPoolHook creates a new re-authentication pool hook.

Parameters:

  • poolSize: Maximum number of concurrent re-auth operations (typically matches pool size)
  • reAuthTimeout: Maximum time to wait for acquiring a connection for re-authentication

The poolSize parameter is used to initialize the worker semaphore, ensuring that re-auth operations don't exhaust the connection pool.

func (*ReAuthPoolHook) MarkForReAuth

func (r *ReAuthPoolHook) MarkForReAuth(connID uint64, reAuthFn func(error))

MarkForReAuth marks a connection for re-authentication.

This method is called when credentials change and a connection needs to be re-authenticated. The actual re-authentication happens asynchronously when the connection is returned to the pool (in OnPut).

Parameters:

  • connID: The connection ID to mark for re-authentication
  • reAuthFn: Function to call for re-authentication, receives error if acquisition fails

Thread-safe: Can be called concurrently from multiple goroutines.

func (*ReAuthPoolHook) OnGet

func (r *ReAuthPoolHook) OnGet(_ context.Context, conn *pool.Conn, _ bool) (accept bool, err error)

OnGet is called when a connection is retrieved from the pool.

This hook checks if the connection needs re-authentication or has a scheduled re-auth operation. If so, it rejects the connection (returns accept=false), causing the pool to try another connection.

Returns:

  • accept: false if connection needs re-auth, true otherwise
  • err: always nil (errors are not used in this hook)

Thread-safe: Called concurrently by multiple goroutines getting connections.

func (*ReAuthPoolHook) OnPut

func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, error)

OnPut is called when a connection is returned to the pool.

This hook checks if the connection needs re-authentication. If so, it schedules a background goroutine to perform the re-auth asynchronously. The goroutine:

  1. Waits for a worker slot (semaphore)
  2. Acquires the connection (waits until not in use)
  3. Executes the re-auth function
  4. Releases the connection and worker slot

The connection is always pooled (not removed) since re-auth happens in background.

Returns:

  • shouldPool: always true (connection stays in pool during background re-auth)
  • shouldRemove: always false
  • err: always nil

Thread-safe: Called concurrently by multiple goroutines returning connections.

func (*ReAuthPoolHook) OnRemove

func (r *ReAuthPoolHook) OnRemove(_ context.Context, conn *pool.Conn, _ error)

OnRemove is called when a connection is removed from the pool.

This hook cleans up all state associated with the connection:

  • Removes from shouldReAuth map (pending re-auth)
  • Removes from scheduledReAuth map (active re-auth)
  • Removes credentials listener from manager

This prevents memory leaks and ensures that removed connections don't have lingering re-auth operations or listeners.

Thread-safe: Called when connections are removed due to errors, timeouts, or pool closure.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL