cluster

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package cluster contains graphdb's cluster-coordination substrate: Raft-style leader election, membership tracking, and node discovery.

Key types are ClusterMembership (nodes and their Primary/Replica/Candidate roles, heartbeats, quorum detection), ElectionManager (election timeouts, term tracking, vote collection), and seed-based node discovery.

Status: substrate only — not wired to the live write path

This code is real and tested in isolation, but it is NOT connected to the graph store. There is no replication log or append path; EnableAutoFailover and EnableQuorumWrites default to false; and nothing outside this package's own tests imports it. graphdb runs single-node — the write path assumes one node. Treat this package as a foundation for future clustering work, not a shipping feature.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidNodeID           = errors.New("node ID cannot be empty")
	ErrInvalidNodeAddr         = errors.New("node address cannot be empty")
	ErrElectionTimeoutTooSmall = errors.New("election timeout must be greater than heartbeat interval")
	ErrInvalidQuorumSize       = errors.New("quorum size must be at least 1")
	ErrNoSeedNodes             = errors.New("seed nodes required when auto-failover is enabled")
)

Configuration errors

View Source
var (
	ErrNotLeader           = errors.New("not the current leader")
	ErrAlreadyVoted        = errors.New("already voted in this term")
	ErrStaleTerm           = errors.New("term is older than current term")
	ErrStaleLSN            = errors.New("candidate LSN is behind local LSN")
	ErrElectionTimeout     = errors.New("election timed out without winning")
	ErrInsufficientQuorum  = errors.New("insufficient nodes for quorum")
	ErrDualPrimaryDetected = errors.New("dual primary detected - split brain")
)

Election errors

View Source
var (
	ErrNodeNotFound      = errors.New("node not found in membership")
	ErrNodeAlreadyExists = errors.New("node already exists in membership")
	ErrCannotRemoveSelf  = errors.New("cannot remove self from cluster")
	ErrClusterTooSmall   = errors.New("cluster too small to maintain quorum")
)

Membership errors

View Source
var (
	ErrNoHealthySeeds  = errors.New("no healthy seed nodes available")
	ErrDiscoveryFailed = errors.New("node discovery failed")
)

Discovery errors

Functions

This section is empty.

Types

type AnnouncementMessage

type AnnouncementMessage struct {
	MessageType string    `json:"message_type"` // "node_announcement"
	NodeID      string    `json:"node_id"`
	NodeAddr    string    `json:"node_addr"`
	Role        NodeRole  `json:"role"`
	Epoch       uint64    `json:"epoch"`
	Term        uint64    `json:"term"`
	Priority    int       `json:"priority"`
	Timestamp   time.Time `json:"timestamp"`
}

AnnouncementMessage is sent to seed nodes to register presence

type ClusterConfig

type ClusterConfig struct {
	// Node identification
	NodeID   string // Unique identifier for this node
	NodeAddr string // Address other nodes can reach this node at (host:port)

	// Seed nodes for initial discovery
	SeedNodes []string // List of seed node addresses for bootstrapping

	// Election configuration
	ElectionTimeout   time.Duration // Time without heartbeat before starting election (default: 5s)
	HeartbeatInterval time.Duration // Interval between heartbeats (default: 1s)
	MinQuorumSize     int           // Minimum nodes for quorum (typically N/2 + 1)
	Priority          int           // Election priority (higher wins ties)

	// Feature flags for gradual rollout
	EnableAutoFailover bool // Enable automatic failover (default: false)
	EnableQuorumWrites bool // Enable quorum-based writes (default: false)

	// Timeouts and limits
	VoteRequestTimeout time.Duration // Timeout waiting for vote responses (default: 2s)
	MaxElectionRetries int           // Maximum consecutive election failures before backing off
}

ClusterConfig defines configuration for cluster management and high availability

func DefaultClusterConfig

func DefaultClusterConfig() ClusterConfig

DefaultClusterConfig returns a safe default configuration

func (*ClusterConfig) Validate

func (c *ClusterConfig) Validate() error

Validate checks if configuration is valid

type ClusterMembership

type ClusterMembership struct {
	// contains filtered or unexported fields
}

ClusterMembership tracks all nodes in the cluster

Concurrent Safety: 1. All public methods use RWMutex for thread-safe access 2. Read operations (GetXxx) use RLock for concurrent reads 3. Write operations (AddNode, UpdateXxx, RemoveNode) use Lock 4. Map iteration creates defensive copies to avoid holding lock

func NewClusterMembership

func NewClusterMembership(localNodeID string, localAddr string) *ClusterMembership

NewClusterMembership creates a new membership tracker

func (*ClusterMembership) AddNode

func (cm *ClusterMembership) AddNode(info NodeInfo) error

AddNode registers a node in the cluster

func (*ClusterMembership) GetAllNodes

func (cm *ClusterMembership) GetAllNodes() []NodeInfo

GetAllNodes returns all nodes in the cluster

func (*ClusterMembership) GetEpoch

func (cm *ClusterMembership) GetEpoch() uint64

GetEpoch returns the current cluster epoch

func (*ClusterMembership) GetHealthyNodeCount

func (cm *ClusterMembership) GetHealthyNodeCount(healthTimeout time.Duration) int

GetHealthyNodeCount returns the number of healthy nodes

func (*ClusterMembership) GetHealthyNodes

func (cm *ClusterMembership) GetHealthyNodes(healthTimeout time.Duration) []NodeInfo

GetHealthyNodes returns nodes that have sent heartbeats recently

func (*ClusterMembership) GetLocalNode

func (cm *ClusterMembership) GetLocalNode() *NodeInfo

GetLocalNode returns this node's info

func (*ClusterMembership) GetNode

func (cm *ClusterMembership) GetNode(nodeID string) (*NodeInfo, error)

GetNode returns info about a specific node

func (*ClusterMembership) GetNodeCount

func (cm *ClusterMembership) GetNodeCount() int

GetNodeCount returns the total number of nodes

func (*ClusterMembership) GetNodesByRole

func (cm *ClusterMembership) GetNodesByRole(role NodeRole) []NodeInfo

GetNodesByRole returns all nodes with a specific role

func (*ClusterMembership) GetPrimary

func (cm *ClusterMembership) GetPrimary() *NodeInfo

GetPrimary returns the current primary node (if any)

func (*ClusterMembership) HasQuorum

func (cm *ClusterMembership) HasQuorum(minQuorum int, healthTimeout time.Duration) bool

HasQuorum returns true if enough healthy nodes exist for quorum

func (*ClusterMembership) IncrementEpoch

func (cm *ClusterMembership) IncrementEpoch() uint64

IncrementEpoch increments the cluster epoch (used on leadership change)

func (*ClusterMembership) RemoveNode

func (cm *ClusterMembership) RemoveNode(nodeID string) error

RemoveNode removes a node from the cluster

func (*ClusterMembership) SetLocalLSN

func (cm *ClusterMembership) SetLocalLSN(lsn uint64)

SetLocalLSN updates this node's last LSN

func (*ClusterMembership) SetLocalRole

func (cm *ClusterMembership) SetLocalRole(role NodeRole)

SetLocalRole updates this node's role

func (*ClusterMembership) SetLocalTerm

func (cm *ClusterMembership) SetLocalTerm(term uint64)

SetLocalTerm updates this node's term

func (*ClusterMembership) UpdateNodeHeartbeat

func (cm *ClusterMembership) UpdateNodeHeartbeat(nodeID string, seq uint64, epoch uint64, term uint64) error

UpdateNodeHeartbeat updates the last heartbeat info for a node

func (*ClusterMembership) UpdateNodeLSN

func (cm *ClusterMembership) UpdateNodeLSN(nodeID string, lsn uint64) error

UpdateNodeLSN updates a node's last known LSN

func (*ClusterMembership) UpdateNodeRole

func (cm *ClusterMembership) UpdateNodeRole(nodeID string, role NodeRole) error

UpdateNodeRole updates a node's role

type DiscoveryResponse

type DiscoveryResponse struct {
	MessageType string     `json:"message_type"` // "node_list"
	Nodes       []NodeInfo `json:"nodes"`
	Success     bool       `json:"success"`
	Error       string     `json:"error,omitempty"`
}

DiscoveryResponse is returned from seed nodes

type ElectionManager

type ElectionManager struct {
	// contains filtered or unexported fields
}

ElectionManager handles leader election using simplified Raft-style consensus

Concurrent Safety: 1. All state access protected by sync.Mutex 2. Election timer runs in dedicated goroutine 3. Vote collection uses channels to avoid races 4. State transitions are atomic (under lock)

func NewElectionManager

func NewElectionManager(config ClusterConfig, membership *ClusterMembership) *ElectionManager

NewElectionManager creates a new election manager

func (*ElectionManager) GetCurrentTerm

func (em *ElectionManager) GetCurrentTerm() uint64

GetCurrentTerm returns the current election term

func (*ElectionManager) GetLeaderID

func (em *ElectionManager) GetLeaderID() string

GetLeaderID returns the ID of the current leader (if known)

func (*ElectionManager) GetState

func (em *ElectionManager) GetState() ElectionState

GetState returns the current election state

func (*ElectionManager) HandleVoteRequest

func (em *ElectionManager) HandleVoteRequest(request VoteRequest) VoteResponse

HandleVoteRequest processes a vote request from a candidate

func (*ElectionManager) IsLeader

func (em *ElectionManager) IsLeader() bool

IsLeader returns true if this node is the leader

func (*ElectionManager) ResetElectionTimer

func (em *ElectionManager) ResetElectionTimer()

ResetElectionTimer resets the election timeout (called on heartbeat receipt)

func (*ElectionManager) SetCallbacks

func (em *ElectionManager) SetCallbacks(onLeader, onFollower, onCandidate func())

SetCallbacks registers callbacks for state transitions

func (*ElectionManager) Start

func (em *ElectionManager) Start() error

Start begins election monitoring

func (*ElectionManager) StartElection

func (em *ElectionManager) StartElection() error

StartElection initiates a new election (public method)

func (*ElectionManager) StepDown

func (em *ElectionManager) StepDown(term uint64) error

StepDown forces this node to step down from leader (if it is one)

func (*ElectionManager) Stop

func (em *ElectionManager) Stop() error

Stop stops the election manager

type ElectionState

type ElectionState int

ElectionState represents the current state of this node in the election process

const (
	// StateFollower is a node following the current leader
	StateFollower ElectionState = iota
	// StateCandidate is a node requesting votes
	StateCandidate
	// StateLeader is the elected leader
	StateLeader
)

func (ElectionState) String

func (s ElectionState) String() string

String returns the string representation of an ElectionState

type NodeDiscovery

type NodeDiscovery struct {
	// contains filtered or unexported fields
}

NodeDiscovery handles node registration and discovery using seed nodes

Concurrent Safety: 1. Start/Stop use sync.Once to ensure single initialization/cleanup 2. Background goroutine (announceLoop) respects stopCh for clean shutdown 3. Uses membership's thread-safe methods for node registration

func NewNodeDiscovery

func NewNodeDiscovery(config ClusterConfig, membership *ClusterMembership) *NodeDiscovery

NewNodeDiscovery creates a new discovery service

func (*NodeDiscovery) GetSeeds

func (nd *NodeDiscovery) GetSeeds() []string

GetSeeds returns the configured seed nodes

func (*NodeDiscovery) HandleAnnouncement

func (nd *NodeDiscovery) HandleAnnouncement(announcement AnnouncementMessage) (*DiscoveryResponse, error)

HandleAnnouncement processes an announcement from another node (server-side) This is called when we receive a discovery request from another node

func (*NodeDiscovery) IsHealthy

func (nd *NodeDiscovery) IsHealthy() bool

IsHealthy returns true if at least one seed is reachable

func (*NodeDiscovery) Start

func (nd *NodeDiscovery) Start() error

Start begins the discovery process

func (*NodeDiscovery) Stop

func (nd *NodeDiscovery) Stop() error

Stop stops the discovery process

type NodeInfo

type NodeInfo struct {
	ID               string    // Unique node identifier
	Addr             string    // Network address (host:port)
	Role             NodeRole  // Current role in cluster
	LastSeen         time.Time // Last heartbeat received
	LastHeartbeatSeq uint64    // Last heartbeat sequence number
	Epoch            uint64    // Cluster generation number
	Term             uint64    // Election term
	LastLSN          uint64    // Last known LSN for this node
	Priority         int       // Election priority
}

NodeInfo contains information about a cluster node

func (*NodeInfo) IsHealthy

func (n *NodeInfo) IsHealthy(timeout time.Duration) bool

IsHealthy returns true if the node has been seen recently

type NodeRole

type NodeRole int

NodeRole represents the role of a node in the cluster

const (
	// RoleReplica is a follower node that replicates data
	RoleReplica NodeRole = iota
	// RoleCandidate is a node in the process of election
	RoleCandidate
	// RolePrimary is the elected leader that accepts writes
	RolePrimary
)

func (NodeRole) String

func (r NodeRole) String() string

String returns the string representation of a NodeRole

type VoteRequest

type VoteRequest struct {
	MessageType string `json:"message_type"` // "vote_request"
	CandidateID string `json:"candidate_id"`
	Term        uint64 `json:"term"`
	LastLSN     uint64 `json:"last_lsn"`
	Epoch       uint64 `json:"epoch"`
	Priority    int    `json:"priority"`
}

VoteRequest is sent by candidates to request votes

type VoteResponse

type VoteResponse struct {
	MessageType string `json:"message_type"` // "vote_response"
	VoterID     string `json:"voter_id"`
	Term        uint64 `json:"term"`
	VoteGranted bool   `json:"vote_granted"`
	Reason      string `json:"reason,omitempty"`
}

VoteResponse is returned in response to a vote request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL