maintnotifications

package
v9.16.0-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2025 License: BSD-2-Clause Imports: 16 Imported by: 0

README

Maintenance Notifications

Seamless Redis connection handoffs during cluster maintenance operations without dropping connections.

⚠️ Important Note

Maintenance notifications are currently supported only in standalone Redis clients. Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality.

Quick Start

client := redis.NewClient(&redis.Options{
    Addr:     "localhost:6379",
    Protocol: 3, // RESP3 required
	MaintNotificationsConfig: &maintnotifications.Config{
        Mode: maintnotifications.ModeEnabled,
    },
})

Modes

  • ModeDisabled - Maintenance notifications disabled
  • ModeEnabled - Forcefully enabled (fails if server doesn't support)
  • ModeAuto - Auto-detect server support (default)

Configuration

&maintnotifications.Config{
    Mode:                       maintnotifications.ModeAuto,
    EndpointType:               maintnotifications.EndpointTypeAuto,
    RelaxedTimeout:             10 * time.Second,
    HandoffTimeout:             15 * time.Second,
    MaxHandoffRetries:          3,
    MaxWorkers:                 0,    // Auto-calculated
    HandoffQueueSize:           0,    // Auto-calculated
    PostHandoffRelaxedDuration: 0,    // 2 * RelaxedTimeout
}
Endpoint Types
  • EndpointTypeAuto - Auto-detect based on connection (default)
  • EndpointTypeInternalIP - Internal IP address
  • EndpointTypeInternalFQDN - Internal FQDN
  • EndpointTypeExternalIP - External IP address
  • EndpointTypeExternalFQDN - External FQDN
  • EndpointTypeNone - No endpoint (reconnect with current config)
Auto-Scaling

Workers: min(PoolSize/2, max(10, PoolSize/3)) when auto-calculated Queue: max(20×Workers, PoolSize) capped by MaxActiveConns+1 or 5×PoolSize

Examples:

  • Pool 100: 33 workers, 660 queue (capped at 500)
  • Pool 100 + MaxActiveConns 150: 33 workers, 151 queue

How It Works

  1. Redis sends push notifications about cluster maintenance operations
  2. Client creates new connections to updated endpoints
  3. Active operations transfer to new connections
  4. Old connections close gracefully

Supported Notifications

  • MOVING - Slot moving to new node
  • MIGRATING - Slot in migration state
  • MIGRATED - Migration completed
  • FAILING_OVER - Node failing over
  • FAILED_OVER - Failover completed

Hooks (Optional)

Monitor and customize maintenance notification operations:

type NotificationHook interface {
    PreHook(ctx, notificationCtx, notificationType, notification) ([]interface{}, bool)
    PostHook(ctx, notificationCtx, notificationType, notification, result)
}

// Add custom hook
manager.AddNotificationHook(&MyHook{})
Metrics Hook Example
// Create metrics hook
metricsHook := maintnotifications.NewMetricsHook()
manager.AddNotificationHook(metricsHook)

// Access collected metrics
metrics := metricsHook.GetMetrics()
fmt.Printf("Notification counts: %v\n", metrics["notification_counts"])
fmt.Printf("Processing times: %v\n", metrics["processing_times"])
fmt.Printf("Error counts: %v\n", metrics["error_counts"])

Documentation

Index

Constants

View Source
const (
	NotificationMoving      = "MOVING"
	NotificationMigrating   = "MIGRATING"
	NotificationMigrated    = "MIGRATED"
	NotificationFailingOver = "FAILING_OVER"
	NotificationFailedOver  = "FAILED_OVER"
)

Push notification type constants for maintenance

Variables

View Source
var (
	ErrInvalidRelaxedTimeout             = errors.New(logs.InvalidRelaxedTimeoutError())
	ErrInvalidHandoffTimeout             = errors.New(logs.InvalidHandoffTimeoutError())
	ErrInvalidHandoffWorkers             = errors.New(logs.InvalidHandoffWorkersError())
	ErrInvalidHandoffQueueSize           = errors.New(logs.InvalidHandoffQueueSizeError())
	ErrInvalidPostHandoffRelaxedDuration = errors.New(logs.InvalidPostHandoffRelaxedDurationError())
	ErrInvalidEndpointType               = errors.New(logs.InvalidEndpointTypeError())
	ErrInvalidMaintNotifications         = errors.New(logs.InvalidMaintNotificationsError())
	ErrMaxHandoffRetriesReached          = errors.New(logs.MaxHandoffRetriesReachedError())

	// Configuration validation errors
	ErrInvalidHandoffRetries = errors.New(logs.InvalidHandoffRetriesError())
)

Configuration errors

View Source
var (
	// ErrConnectionMarkedForHandoff is returned when a connection is marked for handoff
	// and should not be used until the handoff is complete
	ErrConnectionMarkedForHandoff = errors.New("" + logs.ConnectionMarkedForHandoffErrorMessage)
	// ErrConnectionInvalidHandoffState is returned when a connection is in an invalid state for handoff
	ErrConnectionInvalidHandoffState = errors.New("" + logs.ConnectionInvalidHandoffStateErrorMessage)
)

connection handoff errors

View Source
var (
	ErrInvalidCircuitBreakerFailureThreshold = errors.New(logs.InvalidCircuitBreakerFailureThresholdError())
	ErrInvalidCircuitBreakerResetTimeout     = errors.New(logs.InvalidCircuitBreakerResetTimeoutError())
	ErrInvalidCircuitBreakerMaxRequests      = errors.New(logs.InvalidCircuitBreakerMaxRequestsError())
)

circuit breaker configuration errors

View Source
var (
	ErrCircuitBreakerOpen = errors.New("" + logs.CircuitBreakerOpenErrorMessage)
)

circuit breaker errors

View Source
var (
	ErrHandoffQueueFull = errors.New(logs.HandoffQueueFullError())
)

Handoff errors

View Source
var (
	ErrInvalidClient = errors.New(logs.InvalidClientError())
)

Integration errors

View Source
var (
	ErrInvalidNotification = errors.New(logs.InvalidNotificationError())
)

Notification errors

View Source
var (
	ErrShutdown = errors.New(logs.ShutdownError())
)

general errors

Functions

func ExampleCircuitBreakerMonitor

func ExampleCircuitBreakerMonitor(poolHook *PoolHook)

ExampleCircuitBreakerMonitor demonstrates how to monitor circuit breaker status

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for endpoint-specific failure handling

func (*CircuitBreaker) Execute

func (cb *CircuitBreaker) Execute(fn func() error) error

Execute runs the given function with circuit breaker protection

func (*CircuitBreaker) GetState

func (cb *CircuitBreaker) GetState() CircuitBreakerState

GetState returns the current state of the circuit breaker

func (*CircuitBreaker) GetStats

func (cb *CircuitBreaker) GetStats() CircuitBreakerStats

GetStats returns current statistics for monitoring

func (*CircuitBreaker) IsOpen

func (cb *CircuitBreaker) IsOpen() bool

IsOpen returns true if the circuit breaker is open (rejecting requests)

type CircuitBreakerEntry

type CircuitBreakerEntry struct {
	// contains filtered or unexported fields
}

CircuitBreakerEntry wraps a circuit breaker with access tracking

type CircuitBreakerManager

type CircuitBreakerManager struct {
	// contains filtered or unexported fields
}

CircuitBreakerManager manages circuit breakers for multiple endpoints

func (*CircuitBreakerManager) GetAllStats

func (cbm *CircuitBreakerManager) GetAllStats() []CircuitBreakerStats

GetAllStats returns statistics for all circuit breakers

func (*CircuitBreakerManager) GetCircuitBreaker

func (cbm *CircuitBreakerManager) GetCircuitBreaker(endpoint string) *CircuitBreaker

GetCircuitBreaker returns the circuit breaker for an endpoint, creating it if necessary

func (*CircuitBreakerManager) Reset

func (cbm *CircuitBreakerManager) Reset()

Reset resets all circuit breakers (useful for testing)

func (*CircuitBreakerManager) Shutdown

func (cbm *CircuitBreakerManager) Shutdown()

Shutdown stops the cleanup goroutine

type CircuitBreakerState

type CircuitBreakerState int32

CircuitBreakerState represents the state of a circuit breaker

const (
	// CircuitBreakerClosed - normal operation, requests allowed
	CircuitBreakerClosed CircuitBreakerState = iota
	// CircuitBreakerOpen - failing fast, requests rejected
	CircuitBreakerOpen
	// CircuitBreakerHalfOpen - testing if service recovered
	CircuitBreakerHalfOpen
)

func (CircuitBreakerState) String

func (s CircuitBreakerState) String() string

type CircuitBreakerStats

type CircuitBreakerStats struct {
	Endpoint        string
	State           CircuitBreakerState
	Failures        int64
	Successes       int64
	Requests        int64
	LastFailureTime time.Time
	LastSuccessTime time.Time
}

CircuitBreakerStats provides statistics about a circuit breaker

type Config

type Config struct {
	// Mode controls how client maintenance notifications are handled.
	// Valid values: ModeDisabled, ModeEnabled, ModeAuto
	// Default: ModeAuto
	Mode Mode

	// EndpointType specifies the type of endpoint to request in MOVING notifications.
	// Valid values: EndpointTypeAuto, EndpointTypeInternalIP, EndpointTypeInternalFQDN,
	//               EndpointTypeExternalIP, EndpointTypeExternalFQDN, EndpointTypeNone
	// Default: EndpointTypeAuto
	EndpointType EndpointType

	// RelaxedTimeout is the concrete timeout value to use during
	// MIGRATING/FAILING_OVER states to accommodate increased latency.
	// This applies to both read and write timeouts.
	// Default: 10 seconds
	RelaxedTimeout time.Duration

	// HandoffTimeout is the maximum time to wait for connection handoff to complete.
	// If handoff takes longer than this, the old connection will be forcibly closed.
	// Default: 15 seconds (matches server-side eviction timeout)
	HandoffTimeout time.Duration

	// MaxWorkers is the maximum number of worker goroutines for processing handoff requests.
	// Workers are created on-demand and automatically cleaned up when idle.
	// If zero, defaults to min(10, PoolSize/2) to handle bursts effectively.
	// If explicitly set, enforces minimum of PoolSize/2
	//
	// Default: min(PoolSize/2, max(10, PoolSize/3)), Minimum when set: PoolSize/2
	MaxWorkers int

	// HandoffQueueSize is the size of the buffered channel used to queue handoff requests.
	// If the queue is full, new handoff requests will be rejected.
	// Scales with both worker count and pool size for better burst handling.
	//
	// Default: max(20×MaxWorkers, PoolSize), capped by MaxActiveConns+1 (if set) or 5×PoolSize
	// When set: minimum 200, capped by MaxActiveConns+1 (if set) or 5×PoolSize
	HandoffQueueSize int

	// PostHandoffRelaxedDuration is how long to keep relaxed timeouts on the new connection
	// after a handoff completes. This provides additional resilience during cluster transitions.
	// Default: 2 * RelaxedTimeout
	PostHandoffRelaxedDuration time.Duration

	// Circuit breaker configuration for endpoint failure handling
	// CircuitBreakerFailureThreshold is the number of failures before opening the circuit.
	// Default: 5
	CircuitBreakerFailureThreshold int

	// CircuitBreakerResetTimeout is how long to wait before testing if the endpoint recovered.
	// Default: 60 seconds
	CircuitBreakerResetTimeout time.Duration

	// CircuitBreakerMaxRequests is the maximum number of requests allowed in half-open state.
	// Default: 3
	CircuitBreakerMaxRequests int

	// MaxHandoffRetries is the maximum number of times to retry a failed handoff.
	// After this many retries, the connection will be removed from the pool.
	// Default: 3
	MaxHandoffRetries int
}

Config provides configuration options for maintenance notifications

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults.

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults() *Config

ApplyDefaults applies default values to any zero-value fields in the configuration. This ensures that partially configured structs get sensible defaults for missing fields.

func (*Config) ApplyDefaultsWithPoolConfig

func (c *Config) ApplyDefaultsWithPoolConfig(poolSize int, maxActiveConns int) *Config

ApplyDefaultsWithPoolConfig applies default values to any zero-value fields in the configuration, using the provided pool size and max active connections to calculate worker and queue defaults. This ensures that partially configured structs get sensible defaults for missing fields.

func (*Config) ApplyDefaultsWithPoolSize

func (c *Config) ApplyDefaultsWithPoolSize(poolSize int) *Config

ApplyDefaultsWithPoolSize applies default values to any zero-value fields in the configuration, using the provided pool size to calculate worker defaults. This ensures that partially configured structs get sensible defaults for missing fields.

func (*Config) Clone

func (c *Config) Clone() *Config

Clone creates a deep copy of the configuration.

func (*Config) IsEnabled

func (c *Config) IsEnabled() bool

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid.

type EndpointType

type EndpointType string

EndpointType represents the type of endpoint to request in MOVING notifications

const (
	EndpointTypeAuto         EndpointType = "auto"          // Auto-detect based on connection
	EndpointTypeInternalIP   EndpointType = "internal-ip"   // Internal IP address
	EndpointTypeInternalFQDN EndpointType = "internal-fqdn" // Internal FQDN
	EndpointTypeExternalIP   EndpointType = "external-ip"   // External IP address
	EndpointTypeExternalFQDN EndpointType = "external-fqdn" // External FQDN
	EndpointTypeNone         EndpointType = "none"          // No endpoint (reconnect with current config)
)

Constants for endpoint types

func DetectEndpointType

func DetectEndpointType(addr string, tlsEnabled bool) EndpointType

DetectEndpointType automatically detects the appropriate endpoint type based on the connection address and TLS configuration.

For IP addresses:

  • If TLS is enabled: requests FQDN for proper certificate validation
  • If TLS is disabled: requests IP for better performance

For hostnames:

  • If TLS is enabled: always requests FQDN for proper certificate validation
  • If TLS is disabled: requests IP for better performance

Internal vs External detection:

  • For IPs: uses private IP range detection
  • For hostnames: uses heuristics based on common internal naming patterns

func (EndpointType) IsValid

func (e EndpointType) IsValid() bool

IsValid returns true if the endpoint type is valid

func (EndpointType) String

func (e EndpointType) String() string

String returns the string representation of the endpoint type

type HandoffRequest

type HandoffRequest struct {
	Conn     *pool.Conn
	ConnID   uint64 // Unique connection identifier
	Endpoint string
	SeqID    int64
	Pool     pool.Pooler // Pool to remove connection from on failure
}

HandoffRequest represents a request to handoff a connection to a new endpoint

type LoggingHook

type LoggingHook struct {
	LogLevel int // 0=Error, 1=Warn, 2=Info, 3=Debug
}

LoggingHook is an example hook implementation that logs all notifications.

func NewLoggingHook

func NewLoggingHook(logLevel int) *LoggingHook

NewLoggingHook creates a new logging hook with the specified log level. Log levels: 0=Error, 1=Warn, 2=Info, 3=Debug

func (*LoggingHook) PostHook

func (lh *LoggingHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)

PostHook logs the result after processing.

func (*LoggingHook) PreHook

func (lh *LoggingHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)

PreHook logs the notification before processing and allows modification.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager provides a simplified upgrade functionality with hooks and atomic state.

func NewManager

func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error)

NewManager creates a new simplified manager.

func (*Manager) AddNotificationHook

func (hm *Manager) AddNotificationHook(notificationHook NotificationHook)

func (*Manager) Close

func (hm *Manager) Close() error

Close closes the manager.

func (*Manager) GetActiveMovingOperations

func (hm *Manager) GetActiveMovingOperations() map[MovingOperationKey]*MovingOperation

GetActiveMovingOperations returns active operations with composite keys. WARNING: This method creates a new map and copies all operations on every call. Use sparingly, especially in hot paths or high-frequency logging.

func (*Manager) GetActiveOperationCount

func (hm *Manager) GetActiveOperationCount() int64

GetActiveOperationCount returns the number of active operations. Uses atomic counter for lock-free operation.

func (*Manager) GetState

func (hm *Manager) GetState() State

GetState returns current state using atomic counter for lock-free operation.

func (*Manager) InitPoolHook

func (hm *Manager) InitPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error))

GetPoolHook creates a pool hook with a custom dialer.

func (*Manager) IsHandoffInProgress

func (hm *Manager) IsHandoffInProgress() bool

IsHandoffInProgress returns true if any handoff is in progress. Uses atomic counter for lock-free operation.

func (*Manager) TrackMovingOperationWithConnID

func (hm *Manager) TrackMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, connID uint64) error

TrackMovingOperationWithConnID starts a new MOVING operation with a specific connection ID.

func (*Manager) UntrackOperationWithConnID

func (hm *Manager) UntrackOperationWithConnID(seqID int64, connID uint64)

UntrackOperationWithConnID completes a MOVING operation with a specific connection ID.

type MetricsHook

type MetricsHook struct {
	NotificationCounts map[string]int64
	ProcessingTimes    map[string]time.Duration
	ErrorCounts        map[string]int64
	HandoffCounts      int64 // Total handoffs initiated
	HandoffSuccesses   int64 // Successful handoffs
	HandoffFailures    int64 // Failed handoffs
}

MetricsHook collects metrics about notification processing.

func NewMetricsHook

func NewMetricsHook() *MetricsHook

NewMetricsHook creates a new metrics collection hook.

func (*MetricsHook) GetMetrics

func (mh *MetricsHook) GetMetrics() map[string]interface{}

GetMetrics returns a summary of collected metrics.

func (*MetricsHook) PostHook

func (mh *MetricsHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)

PostHook records processing completion and any errors.

func (*MetricsHook) PreHook

func (mh *MetricsHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)

PreHook records the start time for processing metrics.

type Mode

type Mode string

Mode represents the maintenance notifications mode

const (
	ModeDisabled Mode = "disabled" // Client doesn't send CLIENT MAINT_NOTIFICATIONS ON command
	ModeEnabled  Mode = "enabled"  // Client forcefully sends command, interrupts connection on error
	ModeAuto     Mode = "auto"     // Client tries to send command, disables feature on error
)

Constants for maintenance push notifications modes

func (Mode) IsValid

func (m Mode) IsValid() bool

IsValid returns true if the maintenance notifications mode is valid

func (Mode) String

func (m Mode) String() string

String returns the string representation of the mode

type MovingOperation

type MovingOperation struct {
	SeqID       int64
	NewEndpoint string
	StartTime   time.Time
	Deadline    time.Time
}

MovingOperation tracks an active MOVING operation.

type MovingOperationKey

type MovingOperationKey struct {
	SeqID  int64  // Sequence ID from MOVING notification
	ConnID uint64 // Unique connection identifier
}

MovingOperationKey provides a unique key for tracking MOVING operations that combines sequence ID with connection identifier to handle duplicate sequence IDs across multiple connections to the same node.

func (MovingOperationKey) String

func (k MovingOperationKey) String() string

String returns a string representation of the key for debugging

type NotificationHandler

type NotificationHandler struct {
	// contains filtered or unexported fields
}

NotificationHandler handles push notifications for the simplified manager.

func (*NotificationHandler) HandlePushNotification

func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error

HandlePushNotification processes push notifications with hook support.

type NotificationHook

type NotificationHook interface {
	PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool)
	PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error)
}

NotificationHook is called before and after notification processing PreHook can modify the notification and return false to skip processing PostHook is called after successful processing

type OperationsManagerInterface

type OperationsManagerInterface interface {
	TrackMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, connID uint64) error
	UntrackOperationWithConnID(seqID int64, connID uint64)
}

OperationsManagerInterface defines the interface for completing handoff operations

type PoolHook

type PoolHook struct {
	// contains filtered or unexported fields
}

PoolHook implements pool.PoolHook for Redis-specific connection handling with maintenance notifications support.

func NewPoolHook

func NewPoolHook(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, operationsManager OperationsManagerInterface) *PoolHook

NewPoolHook creates a new pool hook

func NewPoolHookWithPoolSize

func NewPoolHookWithPoolSize(baseDialer func(context.Context, string, string) (net.Conn, error), network string, config *Config, operationsManager OperationsManagerInterface, poolSize int) *PoolHook

NewPoolHookWithPoolSize creates a new pool hook with pool size for better worker defaults

func (*PoolHook) GetCircuitBreakerStats

func (ph *PoolHook) GetCircuitBreakerStats() []CircuitBreakerStats

GetCircuitBreakerStats returns circuit breaker statistics for monitoring

func (*PoolHook) GetCurrentWorkers

func (ph *PoolHook) GetCurrentWorkers() int

GetCurrentWorkers returns the current number of active workers (for testing)

func (*PoolHook) GetHandoffQueue

func (ph *PoolHook) GetHandoffQueue() chan HandoffRequest

GetHandoffQueue returns the handoff queue for testing purposes

func (*PoolHook) GetMaxWorkers

func (ph *PoolHook) GetMaxWorkers() int

GetMaxWorkers returns the max workers for testing purposes

func (*PoolHook) GetPendingMap

func (ph *PoolHook) GetPendingMap() *sync.Map

GetPendingMap returns the pending map for testing purposes

func (*PoolHook) IsHandoffPending

func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool

IsHandoffPending returns true if the given connection has a pending handoff

func (*PoolHook) OnGet

func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error

OnGet is called when a connection is retrieved from the pool

func (*PoolHook) OnPut

func (ph *PoolHook) OnPut(ctx context.Context, conn *pool.Conn) (shouldPool bool, shouldRemove bool, err error)

OnPut is called when a connection is returned to the pool

func (*PoolHook) ResetCircuitBreakers

func (ph *PoolHook) ResetCircuitBreakers()

ResetCircuitBreakers resets all circuit breakers (useful for testing)

func (*PoolHook) SetPool

func (ph *PoolHook) SetPool(pooler pool.Pooler)

SetPool sets the pool interface for removing connections on handoff failure

func (*PoolHook) Shutdown

func (ph *PoolHook) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the processor, waiting for workers to complete

type State

type State int

State represents the current state of a maintenance operation

const (
	// StateIdle indicates no upgrade is in progress
	StateIdle State = iota

	// StateHandoff indicates a connection handoff is in progress
	StateMoving
)

func (State) String

func (s State) String() string

String returns a string representation of the state.

Directories

Path Synopsis
Package e2e provides end-to-end testing scenarios for the maintenance notifications system.
Package e2e provides end-to-end testing scenarios for the maintenance notifications system.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL