Documentation
¶
Index ¶
- type Backoff
- type BidirectionalTransport
- type CircuitBreaker
- type CircuitState
- type Config
- type DiskStorage
- func (s *DiskStorage) Close() error
- func (s *DiskStorage) Compact(compactIndex uint64) error
- func (s *DiskStorage) Entries(lo, hi uint64) ([]raftpb.Entry, error)
- func (s *DiskStorage) FirstIndex() (uint64, error)
- func (s *DiskStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error)
- func (s *DiskStorage) LastIndex() (uint64, error)
- func (s *DiskStorage) SaveEntries(entries []raftpb.Entry) error
- func (s *DiskStorage) SaveSnapshot(snap raftpb.Snapshot) error
- func (s *DiskStorage) SaveState(st raftpb.HardState) error
- func (s *DiskStorage) Snapshot() (raftpb.Snapshot, error)
- func (s *DiskStorage) Term(i uint64) (uint64, error)
- type MessageHandler
- type Node
- type Peer
- type RaftNode
- func (rn *RaftNode) AddNode(ctx context.Context, nodeID uint64, addr string) error
- func (rn *RaftNode) Advance()
- func (rn *RaftNode) Campaign(ctx context.Context) error
- func (rn *RaftNode) IsLeader() bool
- func (rn *RaftNode) Leader() uint64
- func (rn *RaftNode) Propose(ctx context.Context, data []byte) error
- func (rn *RaftNode) Ready() <-chan Ready
- func (rn *RaftNode) RemoveNode(ctx context.Context, nodeID uint64) error
- func (rn *RaftNode) SetBindAddr(addr string)
- func (rn *RaftNode) Start() error
- func (rn *RaftNode) Status() raft.Status
- func (rn *RaftNode) Stop() error
- type Ready
- type SoftState
- type StateMachine
- type StateType
- type Storage
- type Transport
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Backoff ¶
type Backoff struct {
InitialInterval time.Duration
MaxInterval time.Duration
Multiplier float64
Jitter float64
// contains filtered or unexported fields
}
Backoff implements exponential backoff with jitter. This is industry standard from: - AWS SDK - gRPC - Kubernetes client-go - etcd client
func NewBackoff ¶
func NewBackoff() *Backoff
NewBackoff creates a new backoff with sensible defaults. Defaults are based on AWS SDK and gRPC recommendations.
type BidirectionalTransport ¶
type BidirectionalTransport struct {
// contains filtered or unexported fields
}
BidirectionalTransport implements bidirectional TCP connections for Raft messages. Key improvement: Each peer connection is used for both sending and receiving.
func NewBidirectionalTransport ¶
func NewBidirectionalTransport(nodeID uint64, bindAddr string, handler MessageHandler) *BidirectionalTransport
NewBidirectionalTransport creates a new bidirectional transport.
func (*BidirectionalTransport) AddPeer ¶
func (t *BidirectionalTransport) AddPeer(nodeID uint64, addr string) error
AddPeer adds a peer to the transport.
func (*BidirectionalTransport) RemovePeer ¶
func (t *BidirectionalTransport) RemovePeer(nodeID uint64) error
RemovePeer removes a peer from the transport.
func (*BidirectionalTransport) Send ¶
func (t *BidirectionalTransport) Send(msg raftpb.Message) error
Send sends a message to the specified node.
func (*BidirectionalTransport) Start ¶
func (t *BidirectionalTransport) Start() error
Start starts the transport layer.
func (*BidirectionalTransport) Stop ¶
func (t *BidirectionalTransport) Stop() error
Stop stops the transport layer.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern. This is industry standard from: - Netflix Hystrix - Kafka - Cassandra - Google SRE book
func NewCircuitBreaker ¶
func NewCircuitBreaker() *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker with sensible defaults. Defaults are based on Hystrix recommendations.
func (*CircuitBreaker) Call ¶
func (cb *CircuitBreaker) Call() bool
Call attempts to execute the operation through the circuit breaker. Returns true if the call should proceed, false if circuit is open.
func (*CircuitBreaker) IsOpen ¶
func (cb *CircuitBreaker) IsOpen() bool
IsOpen returns true if the circuit is open (rejecting requests).
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() CircuitState
State returns the current circuit state.
type CircuitState ¶
type CircuitState int
CircuitState represents the state of the circuit breaker.
const ( StateClosed CircuitState = iota // Normal operation StateOpen // Circuit open, rejecting requests StateHalfOpen // Testing if service recovered )
type Config ¶
type Config struct {
// NodeID is the unique identifier for this node in the cluster.
// Must be non-zero.
NodeID uint64
// Peers is the initial list of peers in the cluster.
// For bootstrapping a new cluster, include all initial members.
// For joining an existing cluster, this can be a subset.
Peers []Peer
// DataDir is the directory where Raft state is persisted.
DataDir string
// TickInterval is the time between Raft ticks.
// The Raft paper recommends 100-500ms for typical deployments.
// Default: 100ms
TickInterval time.Duration
// ElectionTick is the number of ticks before starting an election.
// A higher value can reduce election frequency in stable networks.
// Must be greater than HeartbeatTick.
// Default: 10 (1 second with 100ms tick interval)
ElectionTick int
// HeartbeatTick is the number of ticks between heartbeats.
// Leaders send heartbeats to maintain their leadership.
// Default: 1 (100ms with 100ms tick interval)
HeartbeatTick int
// MaxSizePerMsg is the maximum size of each Raft message.
// Default: 1MB
MaxSizePerMsg uint64
// MaxInflightMsgs is the maximum number of in-flight append messages.
// This limits the number of messages being sent to followers at once.
// Default: 256
MaxInflightMsgs int
// SnapshotInterval is the number of log entries between snapshots.
// A snapshot compacts the Raft log to prevent unbounded growth.
// Default: 10000
SnapshotInterval uint64
// SnapshotCatchUpEntries is the number of entries a follower can lag
// before the leader sends a snapshot instead of log entries.
// Default: 5000
SnapshotCatchUpEntries uint64
// PreVote enables the pre-vote algorithm to reduce disruptions.
// When enabled, candidates first check if they would win an election
// before actually starting one.
// Default: true
PreVote bool
// CheckQuorum enables leader health checking by followers.
// If the leader doesn't receive a quorum of heartbeat responses,
// it steps down to follower.
// Default: true
CheckQuorum bool
// DisableProposalForwarding disables proposal forwarding to the leader.
// When disabled, proposals on non-leaders will be rejected immediately.
// Default: false
DisableProposalForwarding bool
}
Config holds the configuration for a consensus node.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults for a typical deployment.
type DiskStorage ¶
type DiskStorage struct {
// contains filtered or unexported fields
}
DiskStorage implements the Storage interface with file-based persistence.
func NewDiskStorage ¶
func NewDiskStorage(dir string) (*DiskStorage, error)
NewDiskStorage creates a new disk-based storage.
func (*DiskStorage) Compact ¶
func (s *DiskStorage) Compact(compactIndex uint64) error
Compact compacts the log up to compactIndex.
func (*DiskStorage) Entries ¶
func (s *DiskStorage) Entries(lo, hi uint64) ([]raftpb.Entry, error)
Entries returns a slice of log entries in the range [lo, hi).
func (*DiskStorage) FirstIndex ¶
func (s *DiskStorage) FirstIndex() (uint64, error)
FirstIndex returns the index of the first log entry that is available.
func (*DiskStorage) InitialState ¶
InitialState returns the saved HardState and ConfState.
func (*DiskStorage) LastIndex ¶
func (s *DiskStorage) LastIndex() (uint64, error)
LastIndex returns the index of the last entry in the log.
func (*DiskStorage) SaveEntries ¶
func (s *DiskStorage) SaveEntries(entries []raftpb.Entry) error
SaveEntries saves entries to stable storage.
func (*DiskStorage) SaveSnapshot ¶
func (s *DiskStorage) SaveSnapshot(snap raftpb.Snapshot) error
SaveSnapshot saves the snapshot to stable storage.
func (*DiskStorage) SaveState ¶
func (s *DiskStorage) SaveState(st raftpb.HardState) error
SaveState saves the current HardState.
type MessageHandler ¶
MessageHandler is a callback function that handles incoming Raft messages.
type Node ¶
type Node interface {
// Propose proposes data to be appended to the Raft log.
// This is used for any state changes that need to be replicated.
Propose(ctx context.Context, data []byte) error
// Start starts the Raft node and begins participating in the cluster.
Start() error
// Stop gracefully stops the Raft node.
Stop() error
// IsLeader returns true if this node is currently the Raft leader.
IsLeader() bool
// Leader returns the ID of the current leader, or 0 if unknown.
Leader() uint64
// AddNode adds a new node to the Raft cluster.
AddNode(ctx context.Context, nodeID uint64, addr string) error
// RemoveNode removes a node from the Raft cluster.
RemoveNode(ctx context.Context, nodeID uint64) error
// Campaign causes the node to transition to candidate state and attempt to become leader.
Campaign(ctx context.Context) error
// Ready returns a channel that receives Ready structs when the Raft state changes.
Ready() <-chan Ready
// Advance notifies the Raft node that the application has applied the committed entries.
Advance()
}
Node represents a consensus node in the cluster. It wraps the etcd/raft implementation and provides a clean interface for StreamBus to interact with the consensus layer.
type RaftNode ¶
type RaftNode struct {
// contains filtered or unexported fields
}
RaftNode wraps etcd/raft and implements the Node interface.
func NewNode ¶
func NewNode(config *Config, sm StateMachine) (*RaftNode, error)
NewNode creates a new Raft consensus node.
func (*RaftNode) Advance ¶
func (rn *RaftNode) Advance()
Advance notifies the Raft node that the application has applied entries.
func (*RaftNode) RemoveNode ¶
RemoveNode removes a node from the Raft cluster.
func (*RaftNode) SetBindAddr ¶
SetBindAddr sets the bind address for the transport (for testing).
type Ready ¶
type Ready struct {
// SoftState provides state that is useful for logging and debugging.
// It does not need to be persisted.
SoftState *SoftState
// HardState contains the durable state that must be persisted to stable storage.
HardState raftpb.HardState
// Entries specifies entries to be saved to stable storage.
Entries []raftpb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot raftpb.Snapshot
// CommittedEntries specifies entries to be committed to the state machine.
CommittedEntries []raftpb.Entry
// Messages specifies outgoing messages to be sent to other nodes.
Messages []raftpb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk before proceeding.
MustSync bool
}
Ready encapsulates the state changes that need to be processed by the application. This is similar to raft.Ready but simplified for StreamBus use cases.
type StateMachine ¶
type StateMachine interface {
// Apply applies a committed log entry to the state machine.
Apply(entry []byte) error
// Snapshot returns the current state of the state machine as a snapshot.
Snapshot() ([]byte, error)
// Restore restores the state machine from a snapshot.
Restore(snapshot []byte) error
}
StateMachine defines the interface for applying committed log entries. This is the application-specific logic that processes Raft commands.
type StateType ¶
type StateType uint64
StateType represents the role of the node in the Raft cluster.
type Storage ¶
type Storage interface {
// InitialState returns the saved HardState and ConfState.
InitialState() (raftpb.HardState, raftpb.ConfState, error)
// SaveState saves the current HardState.
SaveState(st raftpb.HardState) error
// Entries returns a slice of log entries in the range [lo, hi).
Entries(lo, hi uint64) ([]raftpb.Entry, error)
// Term returns the term of entry i.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is available.
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
Snapshot() (raftpb.Snapshot, error)
// SaveEntries saves entries to stable storage.
SaveEntries(entries []raftpb.Entry) error
// SaveSnapshot saves the snapshot to stable storage.
SaveSnapshot(snap raftpb.Snapshot) error
// Compact compacts the log up to compactIndex.
Compact(compactIndex uint64) error
// Close closes the storage.
Close() error
}
Storage defines the interface for persisting Raft state.
type Transport ¶
type Transport interface {
// Send sends a message to the specified node.
Send(msg raftpb.Message) error
// Start starts the transport layer.
Start() error
// Stop stops the transport layer.
Stop() error
// AddPeer adds a peer to the transport.
AddPeer(nodeID uint64, addr string) error
// RemovePeer removes a peer from the transport.
RemovePeer(nodeID uint64) error
}
Transport defines the interface for sending Raft messages to other nodes.