Documentation
¶
Index ¶
- type ApplyMsg
- type CandidateState
- func (cs *CandidateState) GetElectionDuration() time.Duration
- func (cs *CandidateState) GetQuorumProgress() float64
- func (cs *CandidateState) GetStatus() CandidateStatus
- func (cs *CandidateState) GetTerm() uint64
- func (cs *CandidateState) GetVoteCount() (granted, denied, needed int)
- func (cs *CandidateState) GetVoters() map[string]bool
- func (cs *CandidateState) HasElectionTimedOut() bool
- func (cs *CandidateState) HasLostElection(totalNodes int) bool
- func (cs *CandidateState) HasWonElection() bool
- func (cs *CandidateState) IsStillViable(totalNodes int) bool
- func (cs *CandidateState) RecordVote(voterID string, granted bool)
- func (cs *CandidateState) Start(ctx context.Context, term uint64, clusterSize int) error
- func (cs *CandidateState) Stop(ctx context.Context) error
- type CandidateStatus
- type Config
- func (c *Config) Validate() error
- func (c *Config) WithElectionTimeout(min, max time.Duration) *Config
- func (c *Config) WithHeartbeatInterval(interval time.Duration) *Config
- func (c *Config) WithLogSettings(maxEntries, trailingLogs uint64, compactionEnabled bool) *Config
- func (c *Config) WithNodeID(nodeID string) *Config
- func (c *Config) WithObservability(metrics, tracing bool) *Config
- func (c *Config) WithPerformanceSettings(maxInflight int, batchInterval time.Duration, maxBatchSize int) *Config
- func (c *Config) WithReplicationSettings(maxEntries, batchSize int, timeout time.Duration) *Config
- func (c *Config) WithSafetySettings(preVote, checkQuorum bool) *Config
- func (c *Config) WithSnapshotSettings(interval time.Duration, threshold uint64) *Config
- type ConfigChangeMsg
- type ConfigChangeType
- type ElectionResult
- type FollowerState
- func (fs *FollowerState) ClearLeader()
- func (fs *FollowerState) GetCurrentLeader() string
- func (fs *FollowerState) GetElectionTimeout() time.Duration
- func (fs *FollowerState) GetLastHeartbeat() time.Time
- func (fs *FollowerState) GetStatus() FollowerStatus
- func (fs *FollowerState) GetTimeSinceLastHeartbeat() time.Duration
- func (fs *FollowerState) HasHeartbeatTimedOut() bool
- func (fs *FollowerState) IsHealthy() bool
- func (fs *FollowerState) RecordHeartbeat(leaderID string)
- func (fs *FollowerState) ResetHeartbeatTimer()
- func (fs *FollowerState) SetElectionTimeout(timeout time.Duration)
- func (fs *FollowerState) Start(ctx context.Context) error
- func (fs *FollowerState) Stop(ctx context.Context) error
- type FollowerStatus
- type LeaderState
- func (ls *LeaderState) AddPeer(peerID string, nextIndex uint64)
- func (ls *LeaderState) CalculateCommitIndex(currentCommitIndex, lastLogIndex uint64) uint64
- func (ls *LeaderState) DecrementNextIndex(peerID string) uint64
- func (ls *LeaderState) GetAllMatchIndexes() map[string]uint64
- func (ls *LeaderState) GetHealthyPeerCount() int
- func (ls *LeaderState) GetLastHeartbeat(peerID string) time.Time
- func (ls *LeaderState) GetMatchIndex(peerID string) uint64
- func (ls *LeaderState) GetNextIndex(peerID string) uint64
- func (ls *LeaderState) GetPeersNeedingHeartbeat() []string
- func (ls *LeaderState) GetReplicationStatus() map[string]ReplicationStatus
- func (ls *LeaderState) HasQuorum(totalNodes int) bool
- func (ls *LeaderState) NeedsHeartbeat(peerID string) bool
- func (ls *LeaderState) RecordHeartbeat(peerID string)
- func (ls *LeaderState) RemovePeer(peerID string)
- func (ls *LeaderState) SetMatchIndex(peerID string, index uint64)
- func (ls *LeaderState) SetNextIndex(peerID string, index uint64)
- func (ls *LeaderState) Start(ctx context.Context, lastLogIndex uint64, peers []string) error
- func (ls *LeaderState) Stop(ctx context.Context) error
- func (ls *LeaderState) UpdateMatchIndex(peerID string, matchIndex uint64)
- type Log
- func (l *Log) Append(entry internal.LogEntry) error
- func (l *Log) AppendBatch(entries []internal.LogEntry) error
- func (l *Log) Compact(index uint64, term uint64) error
- func (l *Log) Count() int
- func (l *Log) FirstIndex() uint64
- func (l *Log) Get(index uint64) (internal.LogEntry, error)
- func (l *Log) GetEntriesAfter(index uint64, maxEntries int) ([]internal.LogEntry, error)
- func (l *Log) GetRange(start, end uint64) ([]internal.LogEntry, error)
- func (l *Log) LastIndex() uint64
- func (l *Log) LastTerm() uint64
- func (l *Log) MatchesTerm(index, term uint64) bool
- func (l *Log) String() string
- func (l *Log) TruncateAfter(index uint64) error
- type LogEntryBatch
- type Node
- func (n *Node) AddNode(ctx context.Context, nodeID, address string, port int) error
- func (n *Node) AddPeer(peerID string)
- func (n *Node) AppendEntries(ctx context.Context, req *internal.AppendEntriesRequest) (*internal.AppendEntriesResponse, error)
- func (n *Node) Apply(ctx context.Context, entry internal.LogEntry) error
- func (n *Node) GetCommitIndex() uint64
- func (n *Node) GetCurrentTerm() uint64
- func (n *Node) GetLeader() string
- func (n *Node) GetRole() internal.NodeRole
- func (n *Node) GetStats() internal.RaftStats
- func (n *Node) GetTerm() uint64
- func (n *Node) InstallSnapshot(ctx context.Context, req *internal.InstallSnapshotRequest) (*internal.InstallSnapshotResponse, error)
- func (n *Node) IsLeader() bool
- func (n *Node) Propose(ctx context.Context, command []byte) error
- func (n *Node) RemoveNode(ctx context.Context, nodeID string) error
- func (n *Node) RequestVote(ctx context.Context, req *internal.RequestVoteRequest) (*internal.RequestVoteResponse, error)
- func (n *Node) Start(ctx context.Context) error
- func (n *Node) StepDown(ctx context.Context) error
- func (n *Node) Stop(ctx context.Context) error
- func (n *Node) TransferLeadership(ctx context.Context, targetNodeID string) error
- type PeerState
- type RPCHandler
- func (rh *RPCHandler) BroadcastAppendEntries(ctx context.Context, peers []string, req internal.AppendEntriesRequest) map[string]error
- func (rh *RPCHandler) BroadcastRequestVote(ctx context.Context, peers []string, req internal.RequestVoteRequest) map[string]error
- func (rh *RPCHandler) HandleAppendEntries(ctx context.Context, req internal.AppendEntriesRequest) internal.AppendEntriesResponse
- func (rh *RPCHandler) HandleInstallSnapshot(ctx context.Context, req internal.InstallSnapshotRequest) internal.InstallSnapshotResponse
- func (rh *RPCHandler) HandleRequestVote(ctx context.Context, req internal.RequestVoteRequest) internal.RequestVoteResponse
- func (rh *RPCHandler) SendAppendEntries(ctx context.Context, target string, req internal.AppendEntriesRequest) (internal.AppendEntriesResponse, error)
- func (rh *RPCHandler) SendInstallSnapshot(ctx context.Context, target string, req internal.InstallSnapshotRequest) (internal.InstallSnapshotResponse, error)
- func (rh *RPCHandler) SendRequestVote(ctx context.Context, target string, req internal.RequestVoteRequest) (internal.RequestVoteResponse, error)
- type ReplicationResult
- type ReplicationState
- type ReplicationStatus
- type SnapshotManager
- func (sm *SnapshotManager) CompactLog(snapshotIndex uint64) error
- func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, lastIncludedIndex, lastIncludedTerm uint64) error
- func (sm *SnapshotManager) GetLastSnapshotIndex() uint64
- func (sm *SnapshotManager) GetLastSnapshotTerm() uint64
- func (sm *SnapshotManager) GetLatestSnapshot() (*internal.SnapshotMetadata, []byte, error)
- func (sm *SnapshotManager) InstallSnapshot(req internal.InstallSnapshotRequest) error
- func (sm *SnapshotManager) ListSnapshots() ([]internal.SnapshotMetadata, error)
- func (sm *SnapshotManager) RestoreSnapshot(ctx context.Context, snapshotData []byte) error
- func (sm *SnapshotManager) ShouldCreateSnapshot(currentIndex uint64) bool
- func (sm *SnapshotManager) StreamSnapshot(writer io.Writer) error
- type SnapshotManagerConfig
- type SnapshotMeta
- type SnapshotMsg
- type State
- type VoteResponse
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CandidateState ¶
type CandidateState struct {
// contains filtered or unexported fields
}
CandidateState manages candidate-specific state and operations.
func NewCandidateState ¶
func NewCandidateState(nodeID string, electionTimeout time.Duration, logger forge.Logger) *CandidateState
NewCandidateState creates a new candidate state.
func (*CandidateState) GetElectionDuration ¶
func (cs *CandidateState) GetElectionDuration() time.Duration
GetElectionDuration returns how long the election has been running.
func (*CandidateState) GetQuorumProgress ¶
func (cs *CandidateState) GetQuorumProgress() float64
GetQuorumProgress returns progress towards quorum as a percentage.
func (*CandidateState) GetStatus ¶
func (cs *CandidateState) GetStatus() CandidateStatus
GetStatus returns candidate status.
func (*CandidateState) GetTerm ¶
func (cs *CandidateState) GetTerm() uint64
GetTerm returns the election term.
func (*CandidateState) GetVoteCount ¶
func (cs *CandidateState) GetVoteCount() (granted, denied, needed int)
GetVoteCount returns the current vote counts.
func (*CandidateState) GetVoters ¶
func (cs *CandidateState) GetVoters() map[string]bool
GetVoters returns all nodes that have voted and their decisions.
func (*CandidateState) HasElectionTimedOut ¶
func (cs *CandidateState) HasElectionTimedOut() bool
HasElectionTimedOut checks if the election has timed out.
func (*CandidateState) HasLostElection ¶
func (cs *CandidateState) HasLostElection(totalNodes int) bool
HasLostElection checks if the candidate has definitively lost.
func (*CandidateState) HasWonElection ¶
func (cs *CandidateState) HasWonElection() bool
HasWonElection checks if the candidate has won the election.
func (*CandidateState) IsStillViable ¶
func (cs *CandidateState) IsStillViable(totalNodes int) bool
IsStillViable checks if the election is still viable.
func (*CandidateState) RecordVote ¶
func (cs *CandidateState) RecordVote(voterID string, granted bool)
RecordVote records a vote response.
type CandidateStatus ¶
type CandidateStatus struct {
NodeID string
Term uint64
VotesGranted int
VotesDenied int
VotesNeeded int
TotalVotes int
ElectionStarted time.Time
ElectionDuration time.Duration
ElectionTimeout time.Duration
TimeRemaining time.Duration
}
CandidateStatus represents candidate status.
type Config ¶
type Config struct {
// Node identification
NodeID string `json:"node_id" yaml:"node_id"`
ClusterID string `json:"cluster_id" yaml:"cluster_id"`
// Timing parameters
ElectionTimeoutMin time.Duration `json:"election_timeout_min" yaml:"election_timeout_min"`
ElectionTimeoutMax time.Duration `json:"election_timeout_max" yaml:"election_timeout_max"`
HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
HeartbeatTimeout time.Duration `json:"heartbeat_timeout" yaml:"heartbeat_timeout"`
ApplyInterval time.Duration `json:"apply_interval" yaml:"apply_interval"`
CommitTimeout time.Duration `json:"commit_timeout" yaml:"commit_timeout"`
// Snapshot configuration
SnapshotInterval time.Duration `json:"snapshot_interval" yaml:"snapshot_interval"`
SnapshotThreshold uint64 `json:"snapshot_threshold" yaml:"snapshot_threshold"`
EnableSnapshots bool `json:"enable_snapshots" yaml:"enable_snapshots"`
// Replication configuration
MaxAppendEntries int `json:"max_append_entries" yaml:"max_append_entries"`
ReplicationBatchSize int `json:"replication_batch_size" yaml:"replication_batch_size"`
ReplicationTimeout time.Duration `json:"replication_timeout" yaml:"replication_timeout"`
EnablePipeline bool `json:"enable_pipeline" yaml:"enable_pipeline"`
// Performance tuning
MaxInflightReplications int `json:"max_inflight_replications" yaml:"max_inflight_replications"`
BatchInterval time.Duration `json:"batch_interval" yaml:"batch_interval"`
MaxBatchSize int `json:"max_batch_size" yaml:"max_batch_size"`
// Log configuration
MaxLogEntries uint64 `json:"max_log_entries" yaml:"max_log_entries"`
LogCacheSize int `json:"log_cache_size" yaml:"log_cache_size"`
LogCompactionEnabled bool `json:"log_compaction_enabled" yaml:"log_compaction_enabled"`
TrailingLogs uint64 `json:"trailing_logs" yaml:"trailing_logs"`
// Safety
PreVoteEnabled bool `json:"pre_vote_enabled" yaml:"pre_vote_enabled"`
CheckQuorum bool `json:"check_quorum" yaml:"check_quorum"`
// Observability
EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"`
EnableTracing bool `json:"enable_tracing" yaml:"enable_tracing"`
}
Config contains Raft algorithm configuration.
func DevelopmentConfig ¶
func DevelopmentConfig() Config
DevelopmentConfig returns a development-friendly configuration.
func ProductionConfig ¶
func ProductionConfig() Config
ProductionConfig returns a production-ready configuration.
func TestConfig ¶
func TestConfig() Config
TestConfig returns a configuration optimized for testing.
func (*Config) WithElectionTimeout ¶
WithElectionTimeout sets the election timeout range.
func (*Config) WithHeartbeatInterval ¶
WithHeartbeatInterval sets the heartbeat interval.
func (*Config) WithLogSettings ¶
WithLogSettings sets log configuration.
func (*Config) WithNodeID ¶
WithNodeID sets the node ID.
func (*Config) WithObservability ¶
WithObservability sets observability configuration.
func (*Config) WithPerformanceSettings ¶
func (c *Config) WithPerformanceSettings(maxInflight int, batchInterval time.Duration, maxBatchSize int) *Config
WithPerformanceSettings sets performance tuning parameters.
func (*Config) WithReplicationSettings ¶
WithReplicationSettings sets replication configuration.
func (*Config) WithSafetySettings ¶
WithSafetySettings sets safety configuration.
type ConfigChangeMsg ¶
type ConfigChangeMsg struct {
Type ConfigChangeType
NodeID string
Address string
Port int
ResultCh chan error
}
ConfigChangeMsg represents a configuration change message.
type ConfigChangeType ¶
type ConfigChangeType int
ConfigChangeType represents the type of configuration change.
const ( // ConfigChangeAdd adds a node to the cluster. ConfigChangeAdd ConfigChangeType = iota // ConfigChangeRemove removes a node from the cluster. ConfigChangeRemove )
type ElectionResult ¶
ElectionResult represents the result of an election.
type FollowerState ¶
type FollowerState struct {
// contains filtered or unexported fields
}
FollowerState manages follower-specific state and operations.
func NewFollowerState ¶
func NewFollowerState(nodeID string, electionTimeout time.Duration, logger forge.Logger) *FollowerState
NewFollowerState creates a new follower state.
func (*FollowerState) ClearLeader ¶
func (fs *FollowerState) ClearLeader()
ClearLeader clears the current leader.
func (*FollowerState) GetCurrentLeader ¶
func (fs *FollowerState) GetCurrentLeader() string
GetCurrentLeader returns the current leader.
func (*FollowerState) GetElectionTimeout ¶
func (fs *FollowerState) GetElectionTimeout() time.Duration
GetElectionTimeout returns the election timeout.
func (*FollowerState) GetLastHeartbeat ¶
func (fs *FollowerState) GetLastHeartbeat() time.Time
GetLastHeartbeat returns the last heartbeat time.
func (*FollowerState) GetStatus ¶
func (fs *FollowerState) GetStatus() FollowerStatus
GetStatus returns follower status.
func (*FollowerState) GetTimeSinceLastHeartbeat ¶
func (fs *FollowerState) GetTimeSinceLastHeartbeat() time.Duration
GetTimeSinceLastHeartbeat returns time since last heartbeat.
func (*FollowerState) HasHeartbeatTimedOut ¶
func (fs *FollowerState) HasHeartbeatTimedOut() bool
HasHeartbeatTimedOut checks if election timeout has expired.
func (*FollowerState) IsHealthy ¶
func (fs *FollowerState) IsHealthy() bool
IsHealthy checks if follower is healthy (receiving heartbeats).
func (*FollowerState) RecordHeartbeat ¶
func (fs *FollowerState) RecordHeartbeat(leaderID string)
RecordHeartbeat records a heartbeat from the leader.
func (*FollowerState) ResetHeartbeatTimer ¶
func (fs *FollowerState) ResetHeartbeatTimer()
ResetHeartbeatTimer resets the heartbeat timer.
func (*FollowerState) SetElectionTimeout ¶
func (fs *FollowerState) SetElectionTimeout(timeout time.Duration)
SetElectionTimeout sets the election timeout.
type FollowerStatus ¶
type FollowerStatus struct {
NodeID string
CurrentLeader string
LastHeartbeat time.Time
TimeSinceHeartbeat time.Duration
ElectionTimeout time.Duration
TimeUntilTimeout time.Duration
Healthy bool
}
FollowerStatus represents follower status.
type LeaderState ¶
type LeaderState struct {
// contains filtered or unexported fields
}
LeaderState manages leader-specific state and operations.
func NewLeaderState ¶
func NewLeaderState(nodeID string, heartbeatInterval time.Duration, logger forge.Logger) *LeaderState
NewLeaderState creates a new leader state.
func (*LeaderState) AddPeer ¶
func (ls *LeaderState) AddPeer(peerID string, nextIndex uint64)
AddPeer adds a new peer to track.
func (*LeaderState) CalculateCommitIndex ¶
func (ls *LeaderState) CalculateCommitIndex(currentCommitIndex, lastLogIndex uint64) uint64
CalculateCommitIndex calculates the new commit index based on majority replication.
func (*LeaderState) DecrementNextIndex ¶
func (ls *LeaderState) DecrementNextIndex(peerID string) uint64
DecrementNextIndex decrements the next index for a peer (on replication failure).
func (*LeaderState) GetAllMatchIndexes ¶
func (ls *LeaderState) GetAllMatchIndexes() map[string]uint64
GetAllMatchIndexes returns all match indexes.
func (*LeaderState) GetHealthyPeerCount ¶
func (ls *LeaderState) GetHealthyPeerCount() int
GetHealthyPeerCount returns the number of healthy peers.
func (*LeaderState) GetLastHeartbeat ¶
func (ls *LeaderState) GetLastHeartbeat(peerID string) time.Time
GetLastHeartbeat returns the last heartbeat time for a peer.
func (*LeaderState) GetMatchIndex ¶
func (ls *LeaderState) GetMatchIndex(peerID string) uint64
GetMatchIndex returns the match index for a peer.
func (*LeaderState) GetNextIndex ¶
func (ls *LeaderState) GetNextIndex(peerID string) uint64
GetNextIndex returns the next index for a peer.
func (*LeaderState) GetPeersNeedingHeartbeat ¶
func (ls *LeaderState) GetPeersNeedingHeartbeat() []string
GetPeersNeedingHeartbeat returns peers that need heartbeats.
func (*LeaderState) GetReplicationStatus ¶
func (ls *LeaderState) GetReplicationStatus() map[string]ReplicationStatus
GetReplicationStatus returns replication status for all peers.
func (*LeaderState) HasQuorum ¶
func (ls *LeaderState) HasQuorum(totalNodes int) bool
HasQuorum checks if we have a healthy quorum.
func (*LeaderState) NeedsHeartbeat ¶
func (ls *LeaderState) NeedsHeartbeat(peerID string) bool
NeedsHeartbeat checks if a peer needs a heartbeat.
func (*LeaderState) RecordHeartbeat ¶
func (ls *LeaderState) RecordHeartbeat(peerID string)
RecordHeartbeat records a successful heartbeat to a peer.
func (*LeaderState) RemovePeer ¶
func (ls *LeaderState) RemovePeer(peerID string)
RemovePeer removes a peer from tracking.
func (*LeaderState) SetMatchIndex ¶
func (ls *LeaderState) SetMatchIndex(peerID string, index uint64)
SetMatchIndex sets the match index for a peer.
func (*LeaderState) SetNextIndex ¶
func (ls *LeaderState) SetNextIndex(peerID string, index uint64)
SetNextIndex sets the next index for a peer.
func (*LeaderState) Stop ¶
func (ls *LeaderState) Stop(ctx context.Context) error
Stop stops the leader state.
func (*LeaderState) UpdateMatchIndex ¶
func (ls *LeaderState) UpdateMatchIndex(peerID string, matchIndex uint64)
UpdateMatchIndex updates match index and next index atomically.
type Log ¶
type Log struct {
// contains filtered or unexported fields
}
Log represents the replicated log.
func (*Log) AppendBatch ¶
AppendBatch appends multiple log entries.
func (*Log) FirstIndex ¶
FirstIndex returns the index of the first log entry.
func (*Log) GetEntriesAfter ¶
GetEntriesAfter returns all entries after the given index.
func (*Log) MatchesTerm ¶
MatchesTerm checks if the log entry at index has the given term.
func (*Log) TruncateAfter ¶
TruncateAfter truncates the log after the given index.
type LogEntryBatch ¶
LogEntryBatch represents a batch of log entries.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node implements the Raft consensus algorithm.
func NewNode ¶
func NewNode( config Config, logger forge.Logger, stateMachine internal.StateMachine, transport internal.Transport, storage internal.Storage, ) (*Node, error)
NewNode creates a new Raft node.
func (*Node) AppendEntries ¶
func (n *Node) AppendEntries(ctx context.Context, req *internal.AppendEntriesRequest) (*internal.AppendEntriesResponse, error)
AppendEntries handles an AppendEntries RPC from the leader.
func (*Node) GetCommitIndex ¶
GetCommitIndex returns the current commit index.
func (*Node) GetCurrentTerm ¶
GetCurrentTerm returns the current term (alias for GetTerm for RPC handler compatibility).
func (*Node) InstallSnapshot ¶
func (n *Node) InstallSnapshot(ctx context.Context, req *internal.InstallSnapshotRequest) (*internal.InstallSnapshotResponse, error)
InstallSnapshot handles an InstallSnapshot RPC.
func (*Node) RemoveNode ¶
RemoveNode removes a node from the cluster (must be called on leader).
func (*Node) RequestVote ¶
func (n *Node) RequestVote(ctx context.Context, req *internal.RequestVoteRequest) (*internal.RequestVoteResponse, error)
RequestVote handles a RequestVote RPC.
type PeerState ¶
type PeerState struct {
ID string
Address string
Port int
NextIndex uint64
MatchIndex uint64
LastContact time.Time
Replicating bool
ReplicationMu sync.Mutex
}
PeerState represents the state of a peer node.
type RPCHandler ¶
type RPCHandler struct {
// contains filtered or unexported fields
}
RPCHandler handles Raft RPC requests.
func NewRPCHandler ¶
func NewRPCHandler(node *Node, logger forge.Logger) *RPCHandler
NewRPCHandler creates a new RPC handler.
func (*RPCHandler) BroadcastAppendEntries ¶
func (rh *RPCHandler) BroadcastAppendEntries(ctx context.Context, peers []string, req internal.AppendEntriesRequest) map[string]error
BroadcastAppendEntries broadcasts AppendEntries to all peers.
func (*RPCHandler) BroadcastRequestVote ¶
func (rh *RPCHandler) BroadcastRequestVote(ctx context.Context, peers []string, req internal.RequestVoteRequest) map[string]error
BroadcastRequestVote broadcasts RequestVote to all peers.
func (*RPCHandler) HandleAppendEntries ¶
func (rh *RPCHandler) HandleAppendEntries(ctx context.Context, req internal.AppendEntriesRequest) internal.AppendEntriesResponse
HandleAppendEntries handles AppendEntries RPC.
func (*RPCHandler) HandleInstallSnapshot ¶
func (rh *RPCHandler) HandleInstallSnapshot(ctx context.Context, req internal.InstallSnapshotRequest) internal.InstallSnapshotResponse
HandleInstallSnapshot handles InstallSnapshot RPC.
func (*RPCHandler) HandleRequestVote ¶
func (rh *RPCHandler) HandleRequestVote(ctx context.Context, req internal.RequestVoteRequest) internal.RequestVoteResponse
HandleRequestVote handles RequestVote RPC.
func (*RPCHandler) SendAppendEntries ¶
func (rh *RPCHandler) SendAppendEntries(ctx context.Context, target string, req internal.AppendEntriesRequest) (internal.AppendEntriesResponse, error)
SendAppendEntries sends AppendEntries RPC to a peer.
func (*RPCHandler) SendInstallSnapshot ¶
func (rh *RPCHandler) SendInstallSnapshot(ctx context.Context, target string, req internal.InstallSnapshotRequest) (internal.InstallSnapshotResponse, error)
SendInstallSnapshot sends InstallSnapshot RPC to a peer.
func (*RPCHandler) SendRequestVote ¶
func (rh *RPCHandler) SendRequestVote(ctx context.Context, target string, req internal.RequestVoteRequest) (internal.RequestVoteResponse, error)
SendRequestVote sends RequestVote RPC to a peer.
type ReplicationResult ¶
type ReplicationResult struct {
Success bool
MatchIndex uint64
NodeID string
Term uint64
Error error
}
ReplicationResult represents the result of log replication.
type ReplicationState ¶
type ReplicationState struct {
NextIndex uint64
MatchIndex uint64
LastContact time.Time
Inflight int
LastHeartbeat time.Time
}
ReplicationState represents the replication state for a peer.
type ReplicationStatus ¶
type ReplicationStatus struct {
PeerID string
NextIndex uint64
MatchIndex uint64
Lag uint64
LastHeartbeat time.Time
Healthy bool
}
ReplicationStatus represents replication status for a peer.
type SnapshotManager ¶
type SnapshotManager struct {
// contains filtered or unexported fields
}
SnapshotManager manages Raft snapshots.
func NewSnapshotManager ¶
func NewSnapshotManager( config SnapshotManagerConfig, storage internal.Storage, stateMachine internal.StateMachine, logger forge.Logger, ) *SnapshotManager
NewSnapshotManager creates a new snapshot manager.
func (*SnapshotManager) CompactLog ¶
func (sm *SnapshotManager) CompactLog(snapshotIndex uint64) error
CompactLog compacts the log up to the snapshot point.
func (*SnapshotManager) CreateSnapshot ¶
func (sm *SnapshotManager) CreateSnapshot(ctx context.Context, lastIncludedIndex, lastIncludedTerm uint64) error
CreateSnapshot creates a new snapshot.
func (*SnapshotManager) GetLastSnapshotIndex ¶
func (sm *SnapshotManager) GetLastSnapshotIndex() uint64
GetLastSnapshotIndex returns the last snapshot index.
func (*SnapshotManager) GetLastSnapshotTerm ¶
func (sm *SnapshotManager) GetLastSnapshotTerm() uint64
GetLastSnapshotTerm returns the last snapshot term.
func (*SnapshotManager) GetLatestSnapshot ¶
func (sm *SnapshotManager) GetLatestSnapshot() (*internal.SnapshotMetadata, []byte, error)
GetLatestSnapshot retrieves the latest snapshot.
func (*SnapshotManager) InstallSnapshot ¶
func (sm *SnapshotManager) InstallSnapshot(req internal.InstallSnapshotRequest) error
InstallSnapshot installs a snapshot received from leader.
func (*SnapshotManager) ListSnapshots ¶
func (sm *SnapshotManager) ListSnapshots() ([]internal.SnapshotMetadata, error)
ListSnapshots lists all available snapshots.
func (*SnapshotManager) RestoreSnapshot ¶
func (sm *SnapshotManager) RestoreSnapshot(ctx context.Context, snapshotData []byte) error
RestoreSnapshot restores from a snapshot.
func (*SnapshotManager) ShouldCreateSnapshot ¶
func (sm *SnapshotManager) ShouldCreateSnapshot(currentIndex uint64) bool
ShouldCreateSnapshot checks if a snapshot should be created.
func (*SnapshotManager) StreamSnapshot ¶
func (sm *SnapshotManager) StreamSnapshot(writer io.Writer) error
StreamSnapshot streams a snapshot to a writer (for sending to peers).
type SnapshotManagerConfig ¶
type SnapshotManagerConfig struct {
NodeID string
SnapshotInterval time.Duration
SnapshotThreshold uint64
}
SnapshotManagerConfig contains snapshot manager configuration.
type SnapshotMeta ¶
SnapshotMeta represents snapshot metadata.
type SnapshotMsg ¶
SnapshotMsg represents a snapshot message.
type VoteResponse ¶
VoteResponse represents a vote response from a peer.