Documentation
¶
Overview ¶
Package cluster provides cluster membership and coordination.
This package handles:
- Node discovery and membership tracking
- Role management (primary, replica, candidate)
- Heartbeat monitoring and health checks
- Quorum detection
Index ¶
- Variables
- type AnnouncementMessage
- type ClusterConfig
- type ClusterMembership
- func (cm *ClusterMembership) AddNode(info NodeInfo) error
- func (cm *ClusterMembership) GetAllNodes() []NodeInfo
- func (cm *ClusterMembership) GetEpoch() uint64
- func (cm *ClusterMembership) GetHealthyNodeCount(healthTimeout time.Duration) int
- func (cm *ClusterMembership) GetHealthyNodes(healthTimeout time.Duration) []NodeInfo
- func (cm *ClusterMembership) GetLocalNode() *NodeInfo
- func (cm *ClusterMembership) GetNode(nodeID string) (*NodeInfo, error)
- func (cm *ClusterMembership) GetNodeCount() int
- func (cm *ClusterMembership) GetNodesByRole(role NodeRole) []NodeInfo
- func (cm *ClusterMembership) GetPrimary() *NodeInfo
- func (cm *ClusterMembership) HasQuorum(minQuorum int, healthTimeout time.Duration) bool
- func (cm *ClusterMembership) IncrementEpoch() uint64
- func (cm *ClusterMembership) RemoveNode(nodeID string) error
- func (cm *ClusterMembership) SetLocalLSN(lsn uint64)
- func (cm *ClusterMembership) SetLocalRole(role NodeRole)
- func (cm *ClusterMembership) SetLocalTerm(term uint64)
- func (cm *ClusterMembership) UpdateNodeHeartbeat(nodeID string, seq uint64, epoch uint64, term uint64) error
- func (cm *ClusterMembership) UpdateNodeLSN(nodeID string, lsn uint64) error
- func (cm *ClusterMembership) UpdateNodeRole(nodeID string, role NodeRole) error
- type DiscoveryResponse
- type ElectionManager
- func (em *ElectionManager) GetCurrentTerm() uint64
- func (em *ElectionManager) GetLeaderID() string
- func (em *ElectionManager) GetState() ElectionState
- func (em *ElectionManager) HandleVoteRequest(request VoteRequest) VoteResponse
- func (em *ElectionManager) IsLeader() bool
- func (em *ElectionManager) ResetElectionTimer()
- func (em *ElectionManager) SetCallbacks(onLeader, onFollower, onCandidate func())
- func (em *ElectionManager) Start() error
- func (em *ElectionManager) StartElection() error
- func (em *ElectionManager) StepDown(term uint64) error
- func (em *ElectionManager) Stop() error
- type ElectionState
- type NodeDiscovery
- type NodeInfo
- type NodeRole
- type VoteRequest
- type VoteResponse
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidNodeID = errors.New("node ID cannot be empty") ErrInvalidNodeAddr = errors.New("node address cannot be empty") ErrElectionTimeoutTooSmall = errors.New("election timeout must be greater than heartbeat interval") ErrInvalidQuorumSize = errors.New("quorum size must be at least 1") ErrNoSeedNodes = errors.New("seed nodes required when auto-failover is enabled") )
Configuration errors
var ( ErrNotLeader = errors.New("not the current leader") ErrAlreadyVoted = errors.New("already voted in this term") ErrStaleTerm = errors.New("term is older than current term") ErrStaleLSN = errors.New("candidate LSN is behind local LSN") ErrElectionTimeout = errors.New("election timed out without winning") ErrInsufficientQuorum = errors.New("insufficient nodes for quorum") ErrDualPrimaryDetected = errors.New("dual primary detected - split brain") )
Election errors
var ( ErrNodeNotFound = errors.New("node not found in membership") ErrNodeAlreadyExists = errors.New("node already exists in membership") ErrCannotRemoveSelf = errors.New("cannot remove self from cluster") ErrClusterTooSmall = errors.New("cluster too small to maintain quorum") )
Membership errors
var ( ErrNoHealthySeeds = errors.New("no healthy seed nodes available") ErrDiscoveryFailed = errors.New("node discovery failed") )
Discovery errors
Functions ¶
This section is empty.
Types ¶
type AnnouncementMessage ¶
type AnnouncementMessage struct {
MessageType string `json:"message_type"` // "node_announcement"
NodeID string `json:"node_id"`
NodeAddr string `json:"node_addr"`
Role NodeRole `json:"role"`
Epoch uint64 `json:"epoch"`
Term uint64 `json:"term"`
Priority int `json:"priority"`
Timestamp time.Time `json:"timestamp"`
}
AnnouncementMessage is sent to seed nodes to register presence
type ClusterConfig ¶
type ClusterConfig struct {
// Node identification
NodeID string // Unique identifier for this node
NodeAddr string // Address other nodes can reach this node at (host:port)
// Seed nodes for initial discovery
SeedNodes []string // List of seed node addresses for bootstrapping
// Election configuration
ElectionTimeout time.Duration // Time without heartbeat before starting election (default: 5s)
HeartbeatInterval time.Duration // Interval between heartbeats (default: 1s)
MinQuorumSize int // Minimum nodes for quorum (typically N/2 + 1)
Priority int // Election priority (higher wins ties)
// Feature flags for gradual rollout
EnableAutoFailover bool // Enable automatic failover (default: false)
EnableQuorumWrites bool // Enable quorum-based writes (default: false)
// Timeouts and limits
VoteRequestTimeout time.Duration // Timeout waiting for vote responses (default: 2s)
MaxElectionRetries int // Maximum consecutive election failures before backing off
}
ClusterConfig defines configuration for cluster management and high availability
func DefaultClusterConfig ¶
func DefaultClusterConfig() ClusterConfig
DefaultClusterConfig returns a safe default configuration
func (*ClusterConfig) Validate ¶
func (c *ClusterConfig) Validate() error
Validate checks if configuration is valid
type ClusterMembership ¶
type ClusterMembership struct {
// contains filtered or unexported fields
}
ClusterMembership tracks all nodes in the cluster
Concurrent Safety: 1. All public methods use RWMutex for thread-safe access 2. Read operations (GetXxx) use RLock for concurrent reads 3. Write operations (AddNode, UpdateXxx, RemoveNode) use Lock 4. Map iteration creates defensive copies to avoid holding lock
func NewClusterMembership ¶
func NewClusterMembership(localNodeID string, localAddr string) *ClusterMembership
NewClusterMembership creates a new membership tracker
func (*ClusterMembership) AddNode ¶
func (cm *ClusterMembership) AddNode(info NodeInfo) error
AddNode registers a node in the cluster
func (*ClusterMembership) GetAllNodes ¶
func (cm *ClusterMembership) GetAllNodes() []NodeInfo
GetAllNodes returns all nodes in the cluster
func (*ClusterMembership) GetEpoch ¶
func (cm *ClusterMembership) GetEpoch() uint64
GetEpoch returns the current cluster epoch
func (*ClusterMembership) GetHealthyNodeCount ¶
func (cm *ClusterMembership) GetHealthyNodeCount(healthTimeout time.Duration) int
GetHealthyNodeCount returns the number of healthy nodes
func (*ClusterMembership) GetHealthyNodes ¶
func (cm *ClusterMembership) GetHealthyNodes(healthTimeout time.Duration) []NodeInfo
GetHealthyNodes returns nodes that have sent heartbeats recently
func (*ClusterMembership) GetLocalNode ¶
func (cm *ClusterMembership) GetLocalNode() *NodeInfo
GetLocalNode returns this node's info
func (*ClusterMembership) GetNode ¶
func (cm *ClusterMembership) GetNode(nodeID string) (*NodeInfo, error)
GetNode returns info about a specific node
func (*ClusterMembership) GetNodeCount ¶
func (cm *ClusterMembership) GetNodeCount() int
GetNodeCount returns the total number of nodes
func (*ClusterMembership) GetNodesByRole ¶
func (cm *ClusterMembership) GetNodesByRole(role NodeRole) []NodeInfo
GetNodesByRole returns all nodes with a specific role
func (*ClusterMembership) GetPrimary ¶
func (cm *ClusterMembership) GetPrimary() *NodeInfo
GetPrimary returns the current primary node (if any)
func (*ClusterMembership) HasQuorum ¶
func (cm *ClusterMembership) HasQuorum(minQuorum int, healthTimeout time.Duration) bool
HasQuorum returns true if enough healthy nodes exist for quorum
func (*ClusterMembership) IncrementEpoch ¶
func (cm *ClusterMembership) IncrementEpoch() uint64
IncrementEpoch increments the cluster epoch (used on leadership change)
func (*ClusterMembership) RemoveNode ¶
func (cm *ClusterMembership) RemoveNode(nodeID string) error
RemoveNode removes a node from the cluster
func (*ClusterMembership) SetLocalLSN ¶
func (cm *ClusterMembership) SetLocalLSN(lsn uint64)
SetLocalLSN updates this node's last LSN
func (*ClusterMembership) SetLocalRole ¶
func (cm *ClusterMembership) SetLocalRole(role NodeRole)
SetLocalRole updates this node's role
func (*ClusterMembership) SetLocalTerm ¶
func (cm *ClusterMembership) SetLocalTerm(term uint64)
SetLocalTerm updates this node's term
func (*ClusterMembership) UpdateNodeHeartbeat ¶
func (cm *ClusterMembership) UpdateNodeHeartbeat(nodeID string, seq uint64, epoch uint64, term uint64) error
UpdateNodeHeartbeat updates the last heartbeat info for a node
func (*ClusterMembership) UpdateNodeLSN ¶
func (cm *ClusterMembership) UpdateNodeLSN(nodeID string, lsn uint64) error
UpdateNodeLSN updates a node's last known LSN
func (*ClusterMembership) UpdateNodeRole ¶
func (cm *ClusterMembership) UpdateNodeRole(nodeID string, role NodeRole) error
UpdateNodeRole updates a node's role
type DiscoveryResponse ¶
type DiscoveryResponse struct {
MessageType string `json:"message_type"` // "node_list"
Nodes []NodeInfo `json:"nodes"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
DiscoveryResponse is returned from seed nodes
type ElectionManager ¶
type ElectionManager struct {
// contains filtered or unexported fields
}
ElectionManager handles leader election using simplified Raft-style consensus
Concurrent Safety: 1. All state access protected by sync.Mutex 2. Election timer runs in dedicated goroutine 3. Vote collection uses channels to avoid races 4. State transitions are atomic (under lock)
func NewElectionManager ¶
func NewElectionManager(config ClusterConfig, membership *ClusterMembership) *ElectionManager
NewElectionManager creates a new election manager
func (*ElectionManager) GetCurrentTerm ¶
func (em *ElectionManager) GetCurrentTerm() uint64
GetCurrentTerm returns the current election term
func (*ElectionManager) GetLeaderID ¶
func (em *ElectionManager) GetLeaderID() string
GetLeaderID returns the ID of the current leader (if known)
func (*ElectionManager) GetState ¶
func (em *ElectionManager) GetState() ElectionState
GetState returns the current election state
func (*ElectionManager) HandleVoteRequest ¶
func (em *ElectionManager) HandleVoteRequest(request VoteRequest) VoteResponse
HandleVoteRequest processes a vote request from a candidate
func (*ElectionManager) IsLeader ¶
func (em *ElectionManager) IsLeader() bool
IsLeader returns true if this node is the leader
func (*ElectionManager) ResetElectionTimer ¶
func (em *ElectionManager) ResetElectionTimer()
ResetElectionTimer resets the election timeout (called on heartbeat receipt)
func (*ElectionManager) SetCallbacks ¶
func (em *ElectionManager) SetCallbacks(onLeader, onFollower, onCandidate func())
SetCallbacks registers callbacks for state transitions
func (*ElectionManager) Start ¶
func (em *ElectionManager) Start() error
Start begins election monitoring
func (*ElectionManager) StartElection ¶
func (em *ElectionManager) StartElection() error
StartElection initiates a new election (public method)
func (*ElectionManager) StepDown ¶
func (em *ElectionManager) StepDown(term uint64) error
StepDown forces this node to step down from leader (if it is one)
func (*ElectionManager) Stop ¶
func (em *ElectionManager) Stop() error
Stop stops the election manager
type ElectionState ¶
type ElectionState int
ElectionState represents the current state of this node in the election process
const ( // StateFollower is a node following the current leader StateFollower ElectionState = iota // StateCandidate is a node requesting votes StateCandidate // StateLeader is the elected leader StateLeader )
func (ElectionState) String ¶
func (s ElectionState) String() string
String returns the string representation of an ElectionState
type NodeDiscovery ¶
type NodeDiscovery struct {
// contains filtered or unexported fields
}
NodeDiscovery handles node registration and discovery using seed nodes
Concurrent Safety: 1. Start/Stop use sync.Once to ensure single initialization/cleanup 2. Background goroutine (announceLoop) respects stopCh for clean shutdown 3. Uses membership's thread-safe methods for node registration
func NewNodeDiscovery ¶
func NewNodeDiscovery(config ClusterConfig, membership *ClusterMembership) *NodeDiscovery
NewNodeDiscovery creates a new discovery service
func (*NodeDiscovery) GetSeeds ¶
func (nd *NodeDiscovery) GetSeeds() []string
GetSeeds returns the configured seed nodes
func (*NodeDiscovery) HandleAnnouncement ¶
func (nd *NodeDiscovery) HandleAnnouncement(announcement AnnouncementMessage) (*DiscoveryResponse, error)
HandleAnnouncement processes an announcement from another node (server-side) This is called when we receive a discovery request from another node
func (*NodeDiscovery) IsHealthy ¶
func (nd *NodeDiscovery) IsHealthy() bool
IsHealthy returns true if at least one seed is reachable
func (*NodeDiscovery) Start ¶
func (nd *NodeDiscovery) Start() error
Start begins the discovery process
type NodeInfo ¶
type NodeInfo struct {
ID string // Unique node identifier
Addr string // Network address (host:port)
Role NodeRole // Current role in cluster
LastSeen time.Time // Last heartbeat received
LastHeartbeatSeq uint64 // Last heartbeat sequence number
Epoch uint64 // Cluster generation number
Term uint64 // Election term
LastLSN uint64 // Last known LSN for this node
Priority int // Election priority
}
NodeInfo contains information about a cluster node
type VoteRequest ¶
type VoteRequest struct {
MessageType string `json:"message_type"` // "vote_request"
CandidateID string `json:"candidate_id"`
Term uint64 `json:"term"`
LastLSN uint64 `json:"last_lsn"`
Epoch uint64 `json:"epoch"`
Priority int `json:"priority"`
}
VoteRequest is sent by candidates to request votes
type VoteResponse ¶
type VoteResponse struct {
MessageType string `json:"message_type"` // "vote_response"
VoterID string `json:"voter_id"`
Term uint64 `json:"term"`
VoteGranted bool `json:"vote_granted"`
Reason string `json:"reason,omitempty"`
}
VoteResponse is returned in response to a vote request