Documentation
¶
Index ¶
- Constants
- Variables
- func ExampleCircuitBreakerMonitor(poolHook *PoolHook)
- type CircuitBreaker
- type CircuitBreakerEntry
- type CircuitBreakerManager
- type CircuitBreakerState
- type CircuitBreakerStats
- type Config
- type EndpointType
- type HandoffRequest
- type LoggingHook
- type Manager
- func (hm *Manager) AddNotificationHook(notificationHook NotificationHook)
- func (hm *Manager) Close() error
- func (hm *Manager) GetActiveMovingOperations() map[MovingOperationKey]*MovingOperation
- func (hm *Manager) GetActiveOperationCount() int64
- func (hm *Manager) GetState() State
- func (hm *Manager) InitPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error))
- func (hm *Manager) IsHandoffInProgress() bool
- func (hm *Manager) TrackMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, ...) error
- func (hm *Manager) UntrackOperationWithConnID(seqID int64, connID uint64)
- type MetricsHook
- type Mode
- type MovingOperation
- type MovingOperationKey
- type NotificationHandler
- type NotificationHook
- type OperationsManagerInterface
- type PoolHook
- func (ph *PoolHook) GetCircuitBreakerStats() []CircuitBreakerStats
- func (ph *PoolHook) GetCurrentWorkers() int
- func (ph *PoolHook) GetHandoffQueue() chan HandoffRequest
- func (ph *PoolHook) GetMaxWorkers() int
- func (ph *PoolHook) GetPendingMap() *sync.Map
- func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool
- func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error
- func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool, shouldRemove bool, err error)
- func (ph *PoolHook) ResetCircuitBreakers()
- func (ph *PoolHook) SetPool(pooler pool.Pooler)
- func (ph *PoolHook) Shutdown(ctx context.Context) error
- type State
Constants ¶
const ( NotificationMoving = "MOVING" NotificationMigrating = "MIGRATING" NotificationMigrated = "MIGRATED" NotificationFailingOver = "FAILING_OVER" NotificationFailedOver = "FAILED_OVER" )
Push notification type constants for maintenance
Variables ¶
var ( ErrInvalidRelaxedTimeout = errors.New(logs.InvalidRelaxedTimeoutError()) ErrInvalidHandoffTimeout = errors.New(logs.InvalidHandoffTimeoutError()) ErrInvalidHandoffWorkers = errors.New(logs.InvalidHandoffWorkersError()) ErrInvalidHandoffQueueSize = errors.New(logs.InvalidHandoffQueueSizeError()) ErrInvalidPostHandoffRelaxedDuration = errors.New(logs.InvalidPostHandoffRelaxedDurationError()) ErrInvalidEndpointType = errors.New(logs.InvalidEndpointTypeError()) ErrInvalidMaintNotifications = errors.New(logs.InvalidMaintNotificationsError()) ErrMaxHandoffRetriesReached = errors.New(logs.MaxHandoffRetriesReachedError()) // Configuration validation errors ErrInvalidHandoffRetries = errors.New(logs.InvalidHandoffRetriesError()) )
Configuration errors
var ( // ErrConnectionMarkedForHandoff is returned when a connection is marked for handoff // and should not be used until the handoff is complete ErrConnectionMarkedForHandoff = errors.New("" + logs.ConnectionMarkedForHandoffErrorMessage) // ErrConnectionInvalidHandoffState is returned when a connection is in an invalid state for handoff ErrConnectionInvalidHandoffState = errors.New("" + logs.ConnectionInvalidHandoffStateErrorMessage) )
connection handoff errors
var ( ErrInvalidCircuitBreakerFailureThreshold = errors.New(logs.InvalidCircuitBreakerFailureThresholdError()) ErrInvalidCircuitBreakerResetTimeout = errors.New(logs.InvalidCircuitBreakerResetTimeoutError()) ErrInvalidCircuitBreakerMaxRequests = errors.New(logs.InvalidCircuitBreakerMaxRequestsError()) )
circuit breaker configuration errors
var (
ErrCircuitBreakerOpen = errors.New("" + logs.CircuitBreakerOpenErrorMessage)
)
circuit breaker errors
var (
ErrHandoffQueueFull = errors.New(logs.HandoffQueueFullError())
)
Handoff errors
var (
ErrInvalidClient = errors.New(logs.InvalidClientError())
)
Integration errors
var (
ErrInvalidNotification = errors.New(logs.InvalidNotificationError())
)
Notification errors
var (
ErrShutdown = errors.New(logs.ShutdownError())
)
general errors
Functions ¶
func ExampleCircuitBreakerMonitor ¶
func ExampleCircuitBreakerMonitor(poolHook *PoolHook)
ExampleCircuitBreakerMonitor demonstrates how to monitor circuit breaker status
Types ¶
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for endpoint-specific failure handling
func (*CircuitBreaker) Execute ¶
func (cb *CircuitBreaker) Execute(fn func() error) error
Execute runs the given function with circuit breaker protection
func (*CircuitBreaker) GetState ¶
func (cb *CircuitBreaker) GetState() CircuitBreakerState
GetState returns the current state of the circuit breaker
func (*CircuitBreaker) GetStats ¶
func (cb *CircuitBreaker) GetStats() CircuitBreakerStats
GetStats returns current statistics for monitoring
func (*CircuitBreaker) IsOpen ¶
func (cb *CircuitBreaker) IsOpen() bool
IsOpen returns true if the circuit breaker is open (rejecting requests)
type CircuitBreakerEntry ¶
type CircuitBreakerEntry struct {
// contains filtered or unexported fields
}
CircuitBreakerEntry wraps a circuit breaker with access tracking
type CircuitBreakerManager ¶
type CircuitBreakerManager struct {
// contains filtered or unexported fields
}
CircuitBreakerManager manages circuit breakers for multiple endpoints
func (*CircuitBreakerManager) GetAllStats ¶
func (cbm *CircuitBreakerManager) GetAllStats() []CircuitBreakerStats
GetAllStats returns statistics for all circuit breakers
func (*CircuitBreakerManager) GetCircuitBreaker ¶
func (cbm *CircuitBreakerManager) GetCircuitBreaker(endpoint string) *CircuitBreaker
GetCircuitBreaker returns the circuit breaker for an endpoint, creating it if necessary
func (*CircuitBreakerManager) Reset ¶
func (cbm *CircuitBreakerManager) Reset()
Reset resets all circuit breakers (useful for testing)
func (*CircuitBreakerManager) Shutdown ¶
func (cbm *CircuitBreakerManager) Shutdown()
Shutdown stops the cleanup goroutine
type CircuitBreakerState ¶
type CircuitBreakerState int32
CircuitBreakerState represents the state of a circuit breaker
const ( // CircuitBreakerClosed - normal operation, requests allowed CircuitBreakerClosed CircuitBreakerState = iota // CircuitBreakerOpen - failing fast, requests rejected CircuitBreakerOpen // CircuitBreakerHalfOpen - testing if service recovered CircuitBreakerHalfOpen )
func (CircuitBreakerState) String ¶
func (s CircuitBreakerState) String() string
type CircuitBreakerStats ¶
type CircuitBreakerStats struct { Endpoint string State CircuitBreakerState Failures int64 Successes int64 Requests int64 LastFailureTime time.Time LastSuccessTime time.Time }
CircuitBreakerStats provides statistics about a circuit breaker
type Config ¶
type Config struct { // Mode controls how client maintenance notifications are handled. // Valid values: ModeDisabled, ModeEnabled, ModeAuto // Default: ModeAuto Mode Mode // EndpointType specifies the type of endpoint to request in MOVING notifications. // Valid values: EndpointTypeAuto, EndpointTypeInternalIP, EndpointTypeInternalFQDN, // EndpointTypeExternalIP, EndpointTypeExternalFQDN, EndpointTypeNone // Default: EndpointTypeAuto EndpointType EndpointType // RelaxedTimeout is the concrete timeout value to use during // MIGRATING/FAILING_OVER states to accommodate increased latency. // This applies to both read and write timeouts. // Default: 10 seconds RelaxedTimeout time.Duration // HandoffTimeout is the maximum time to wait for connection handoff to complete. // If handoff takes longer than this, the old connection will be forcibly closed. // Default: 15 seconds (matches server-side eviction timeout) HandoffTimeout time.Duration // MaxWorkers is the maximum number of worker goroutines for processing handoff requests. // Workers are created on-demand and automatically cleaned up when idle. // If zero, defaults to min(10, PoolSize/2) to handle bursts effectively. // If explicitly set, enforces minimum of PoolSize/2 // // Default: min(PoolSize/2, max(10, PoolSize/3)), Minimum when set: PoolSize/2 MaxWorkers int // HandoffQueueSize is the size of the buffered channel used to queue handoff requests. // If the queue is full, new handoff requests will be rejected. // Scales with both worker count and pool size for better burst handling. // // Default: max(20×MaxWorkers, PoolSize), capped by MaxActiveConns+1 (if set) or 5×PoolSize // When set: minimum 200, capped by MaxActiveConns+1 (if set) or 5×PoolSize HandoffQueueSize int // PostHandoffRelaxedDuration is how long to keep relaxed timeouts on the new connection // after a handoff completes. This provides additional resilience during cluster transitions. // Default: 2 * RelaxedTimeout PostHandoffRelaxedDuration time.Duration // Circuit breaker configuration for endpoint failure handling // CircuitBreakerFailureThreshold is the number of failures before opening the circuit. // Default: 5 CircuitBreakerFailureThreshold int // CircuitBreakerResetTimeout is how long to wait before testing if the endpoint recovered. // Default: 60 seconds CircuitBreakerResetTimeout time.Duration // CircuitBreakerMaxRequests is the maximum number of requests allowed in half-open state. // Default: 3 CircuitBreakerMaxRequests int // MaxHandoffRetries is the maximum number of times to retry a failed handoff. // After this many retries, the connection will be removed from the pool. // Default: 3 MaxHandoffRetries int }
Config provides configuration options for maintenance notifications
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults.
func (*Config) ApplyDefaults ¶
ApplyDefaults applies default values to any zero-value fields in the configuration. This ensures that partially configured structs get sensible defaults for missing fields.
func (*Config) ApplyDefaultsWithPoolConfig ¶
ApplyDefaultsWithPoolConfig applies default values to any zero-value fields in the configuration, using the provided pool size and max active connections to calculate worker and queue defaults. This ensures that partially configured structs get sensible defaults for missing fields.
func (*Config) ApplyDefaultsWithPoolSize ¶
ApplyDefaultsWithPoolSize applies default values to any zero-value fields in the configuration, using the provided pool size to calculate worker defaults. This ensures that partially configured structs get sensible defaults for missing fields.
type EndpointType ¶
type EndpointType string
EndpointType represents the type of endpoint to request in MOVING notifications
const ( EndpointTypeAuto EndpointType = "auto" // Auto-detect based on connection EndpointTypeInternalIP EndpointType = "internal-ip" // Internal IP address EndpointTypeInternalFQDN EndpointType = "internal-fqdn" // Internal FQDN EndpointTypeExternalIP EndpointType = "external-ip" // External IP address EndpointTypeExternalFQDN EndpointType = "external-fqdn" // External FQDN EndpointTypeNone EndpointType = "none" // No endpoint (reconnect with current config) )
Constants for endpoint types
func DetectEndpointType ¶
func DetectEndpointType(addr string, tlsEnabled bool) EndpointType
DetectEndpointType automatically detects the appropriate endpoint type based on the connection address and TLS configuration.
For IP addresses:
- If TLS is enabled: requests FQDN for proper certificate validation
- If TLS is disabled: requests IP for better performance
For hostnames:
- If TLS is enabled: always requests FQDN for proper certificate validation
- If TLS is disabled: requests IP for better performance
Internal vs External detection:
- For IPs: uses private IP range detection
- For hostnames: uses heuristics based on common internal naming patterns
func (EndpointType) IsValid ¶
func (e EndpointType) IsValid() bool
IsValid returns true if the endpoint type is valid
func (EndpointType) String ¶
func (e EndpointType) String() string
String returns the string representation of the endpoint type
type HandoffRequest ¶
type HandoffRequest struct { Conn *pool.Conn ConnID uint64 // Unique connection identifier Endpoint string SeqID int64 Pool pool.Pooler // Pool to remove connection from on failure }
HandoffRequest represents a request to handoff a connection to a new endpoint
type LoggingHook ¶
type LoggingHook struct {
LogLevel int // 0=Error, 1=Warn, 2=Info, 3=Debug
}
LoggingHook is an example hook implementation that logs all notifications.
func NewLoggingHook ¶
func NewLoggingHook(logLevel int) *LoggingHook
NewLoggingHook creates a new logging hook with the specified log level. Log levels: 0=Error, 1=Warn, 2=Info, 3=Debug
func (*LoggingHook) PostHook ¶
func (lh *LoggingHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
PostHook logs the result after processing.
func (*LoggingHook) PreHook ¶
func (lh *LoggingHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
PreHook logs the notification before processing and allows modification.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager provides a simplified upgrade functionality with hooks and atomic state.
func NewManager ¶
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error)
NewManager creates a new simplified manager.
func (*Manager) AddNotificationHook ¶
func (hm *Manager) AddNotificationHook(notificationHook NotificationHook)
func (*Manager) GetActiveMovingOperations ¶
func (hm *Manager) GetActiveMovingOperations() map[MovingOperationKey]*MovingOperation
GetActiveMovingOperations returns active operations with composite keys. WARNING: This method creates a new map and copies all operations on every call. Use sparingly, especially in hot paths or high-frequency logging.
func (*Manager) GetActiveOperationCount ¶
GetActiveOperationCount returns the number of active operations. Uses atomic counter for lock-free operation.
func (*Manager) GetState ¶
GetState returns current state using atomic counter for lock-free operation.
func (*Manager) InitPoolHook ¶
GetPoolHook creates a pool hook with a custom dialer.
func (*Manager) IsHandoffInProgress ¶
IsHandoffInProgress returns true if any handoff is in progress. Uses atomic counter for lock-free operation.
func (*Manager) TrackMovingOperationWithConnID ¶
func (hm *Manager) TrackMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, connID uint64) error
TrackMovingOperationWithConnID starts a new MOVING operation with a specific connection ID.
func (*Manager) UntrackOperationWithConnID ¶
UntrackOperationWithConnID completes a MOVING operation with a specific connection ID.
type MetricsHook ¶
type MetricsHook struct { NotificationCounts map[string]int64 ProcessingTimes map[string]time.Duration ErrorCounts map[string]int64 HandoffCounts int64 // Total handoffs initiated HandoffSuccesses int64 // Successful handoffs HandoffFailures int64 // Failed handoffs }
MetricsHook collects metrics about notification processing.
func NewMetricsHook ¶
func NewMetricsHook() *MetricsHook
NewMetricsHook creates a new metrics collection hook.
func (*MetricsHook) GetMetrics ¶
func (mh *MetricsHook) GetMetrics() map[string]interface{}
GetMetrics returns a summary of collected metrics.
func (*MetricsHook) PostHook ¶
func (mh *MetricsHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
PostHook records processing completion and any errors.
func (*MetricsHook) PreHook ¶
func (mh *MetricsHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
PreHook records the start time for processing metrics.
type Mode ¶
type Mode string
Mode represents the maintenance notifications mode
const ( ModeDisabled Mode = "disabled" // Client doesn't send CLIENT MAINT_NOTIFICATIONS ON command ModeEnabled Mode = "enabled" // Client forcefully sends command, interrupts connection on error ModeAuto Mode = "auto" // Client tries to send command, disables feature on error )
Constants for maintenance push notifications modes
type MovingOperation ¶
type MovingOperation struct { SeqID int64 NewEndpoint string StartTime time.Time Deadline time.Time }
MovingOperation tracks an active MOVING operation.
type MovingOperationKey ¶
type MovingOperationKey struct { SeqID int64 // Sequence ID from MOVING notification ConnID uint64 // Unique connection identifier }
MovingOperationKey provides a unique key for tracking MOVING operations that combines sequence ID with connection identifier to handle duplicate sequence IDs across multiple connections to the same node.
func (MovingOperationKey) String ¶
func (k MovingOperationKey) String() string
String returns a string representation of the key for debugging
type NotificationHandler ¶
type NotificationHandler struct {
// contains filtered or unexported fields
}
NotificationHandler handles push notifications for the simplified manager.
func (*NotificationHandler) HandlePushNotification ¶
func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error
HandlePushNotification processes push notifications with hook support.
type NotificationHook ¶
type NotificationHook interface { PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) }
NotificationHook is called before and after notification processing PreHook can modify the notification and return false to skip processing PostHook is called after successful processing
type OperationsManagerInterface ¶
type OperationsManagerInterface interface { TrackMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, connID uint64) error UntrackOperationWithConnID(seqID int64, connID uint64) }
OperationsManagerInterface defines the interface for completing handoff operations
type PoolHook ¶
type PoolHook struct {
// contains filtered or unexported fields
}
PoolHook implements pool.PoolHook for Redis-specific connection handling with maintenance notifications support.
func NewPoolHook ¶
func NewPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, operationsManager OperationsManagerInterface) *PoolHook
NewPoolHook creates a new pool hook
func NewPoolHookWithPoolSize ¶
func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, operationsManager OperationsManagerInterface, poolSize int) *PoolHook
NewPoolHookWithPoolSize creates a new pool hook with pool size for better worker defaults
func (*PoolHook) GetCircuitBreakerStats ¶
func (ph *PoolHook) GetCircuitBreakerStats() []CircuitBreakerStats
GetCircuitBreakerStats returns circuit breaker statistics for monitoring
func (*PoolHook) GetCurrentWorkers ¶
GetCurrentWorkers returns the current number of active workers (for testing)
func (*PoolHook) GetHandoffQueue ¶
func (ph *PoolHook) GetHandoffQueue() chan HandoffRequest
GetHandoffQueue returns the handoff queue for testing purposes
func (*PoolHook) GetMaxWorkers ¶
GetMaxWorkers returns the max workers for testing purposes
func (*PoolHook) GetPendingMap ¶
GetPendingMap returns the pending map for testing purposes
func (*PoolHook) IsHandoffPending ¶
IsHandoffPending returns true if the given connection has a pending handoff
func (*PoolHook) OnPut ¶
func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool, shouldRemove bool, err error)
OnPut is called when a connection is returned to the pool
func (*PoolHook) ResetCircuitBreakers ¶
func (ph *PoolHook) ResetCircuitBreakers()
ResetCircuitBreakers resets all circuit breakers (useful for testing)