consensus

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backoff

type Backoff struct {
	InitialInterval time.Duration
	MaxInterval     time.Duration
	Multiplier      float64
	Jitter          float64
	// contains filtered or unexported fields
}

Backoff implements exponential backoff with jitter. This is industry standard from: - AWS SDK - gRPC - Kubernetes client-go - etcd client

func NewBackoff

func NewBackoff() *Backoff

NewBackoff creates a new backoff with sensible defaults. Defaults are based on AWS SDK and gRPC recommendations.

func (*Backoff) Next

func (b *Backoff) Next() time.Duration

Next returns the next backoff duration.

func (*Backoff) Reset

func (b *Backoff) Reset()

Reset resets the backoff to initial interval. Call this after a successful operation.

type BidirectionalTransport

type BidirectionalTransport struct {
	// contains filtered or unexported fields
}

BidirectionalTransport implements bidirectional TCP connections for Raft messages. Key improvement: Each peer connection is used for both sending and receiving.

func NewBidirectionalTransport

func NewBidirectionalTransport(nodeID uint64, bindAddr string, handler MessageHandler) *BidirectionalTransport

NewBidirectionalTransport creates a new bidirectional transport.

func (*BidirectionalTransport) AddPeer

func (t *BidirectionalTransport) AddPeer(nodeID uint64, addr string) error

AddPeer adds a peer to the transport.

func (*BidirectionalTransport) RemovePeer

func (t *BidirectionalTransport) RemovePeer(nodeID uint64) error

RemovePeer removes a peer from the transport.

func (*BidirectionalTransport) Send

Send sends a message to the specified node.

func (*BidirectionalTransport) Start

func (t *BidirectionalTransport) Start() error

Start starts the transport layer.

func (*BidirectionalTransport) Stop

func (t *BidirectionalTransport) Stop() error

Stop stops the transport layer.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern. This is industry standard from: - Netflix Hystrix - Kafka - Cassandra - Google SRE book

func NewCircuitBreaker

func NewCircuitBreaker() *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker with sensible defaults. Defaults are based on Hystrix recommendations.

func (*CircuitBreaker) Call

func (cb *CircuitBreaker) Call() bool

Call attempts to execute the operation through the circuit breaker. Returns true if the call should proceed, false if circuit is open.

func (*CircuitBreaker) IsOpen

func (cb *CircuitBreaker) IsOpen() bool

IsOpen returns true if the circuit is open (rejecting requests).

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() CircuitState

State returns the current circuit state.

type CircuitState

type CircuitState int

CircuitState represents the state of the circuit breaker.

const (
	StateClosed   CircuitState = iota // Normal operation
	StateOpen                         // Circuit open, rejecting requests
	StateHalfOpen                     // Testing if service recovered
)

type Config

type Config struct {
	// NodeID is the unique identifier for this node in the cluster.
	// Must be non-zero.
	NodeID uint64

	// Peers is the initial list of peers in the cluster.
	// For bootstrapping a new cluster, include all initial members.
	// For joining an existing cluster, this can be a subset.
	Peers []Peer

	// DataDir is the directory where Raft state is persisted.
	DataDir string

	// TickInterval is the time between Raft ticks.
	// The Raft paper recommends 100-500ms for typical deployments.
	// Default: 100ms
	TickInterval time.Duration

	// ElectionTick is the number of ticks before starting an election.
	// A higher value can reduce election frequency in stable networks.
	// Must be greater than HeartbeatTick.
	// Default: 10 (1 second with 100ms tick interval)
	ElectionTick int

	// HeartbeatTick is the number of ticks between heartbeats.
	// Leaders send heartbeats to maintain their leadership.
	// Default: 1 (100ms with 100ms tick interval)
	HeartbeatTick int

	// MaxSizePerMsg is the maximum size of each Raft message.
	// Default: 1MB
	MaxSizePerMsg uint64

	// MaxInflightMsgs is the maximum number of in-flight append messages.
	// This limits the number of messages being sent to followers at once.
	// Default: 256
	MaxInflightMsgs int

	// SnapshotInterval is the number of log entries between snapshots.
	// A snapshot compacts the Raft log to prevent unbounded growth.
	// Default: 10000
	SnapshotInterval uint64

	// SnapshotCatchUpEntries is the number of entries a follower can lag
	// before the leader sends a snapshot instead of log entries.
	// Default: 5000
	SnapshotCatchUpEntries uint64

	// PreVote enables the pre-vote algorithm to reduce disruptions.
	// When enabled, candidates first check if they would win an election
	// before actually starting one.
	// Default: true
	PreVote bool

	// CheckQuorum enables leader health checking by followers.
	// If the leader doesn't receive a quorum of heartbeat responses,
	// it steps down to follower.
	// Default: true
	CheckQuorum bool

	// DisableProposalForwarding disables proposal forwarding to the leader.
	// When disabled, proposals on non-leaders will be rejected immediately.
	// Default: false
	DisableProposalForwarding bool
}

Config holds the configuration for a consensus node.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults for a typical deployment.

func (*Config) Clone

func (c *Config) Clone() *Config

Clone creates a deep copy of the configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid.

type DiskStorage

type DiskStorage struct {
	// contains filtered or unexported fields
}

DiskStorage implements the Storage interface with file-based persistence.

func NewDiskStorage

func NewDiskStorage(dir string) (*DiskStorage, error)

NewDiskStorage creates a new disk-based storage.

func (*DiskStorage) Close

func (s *DiskStorage) Close() error

Close closes the storage.

func (*DiskStorage) Compact

func (s *DiskStorage) Compact(compactIndex uint64) error

Compact compacts the log up to compactIndex.

func (*DiskStorage) Entries

func (s *DiskStorage) Entries(lo, hi uint64) ([]raftpb.Entry, error)

Entries returns a slice of log entries in the range [lo, hi).

func (*DiskStorage) FirstIndex

func (s *DiskStorage) FirstIndex() (uint64, error)

FirstIndex returns the index of the first log entry that is available.

func (*DiskStorage) InitialState

func (s *DiskStorage) InitialState() (raftpb.HardState, raftpb.ConfState, error)

InitialState returns the saved HardState and ConfState.

func (*DiskStorage) LastIndex

func (s *DiskStorage) LastIndex() (uint64, error)

LastIndex returns the index of the last entry in the log.

func (*DiskStorage) SaveEntries

func (s *DiskStorage) SaveEntries(entries []raftpb.Entry) error

SaveEntries saves entries to stable storage.

func (*DiskStorage) SaveSnapshot

func (s *DiskStorage) SaveSnapshot(snap raftpb.Snapshot) error

SaveSnapshot saves the snapshot to stable storage.

func (*DiskStorage) SaveState

func (s *DiskStorage) SaveState(st raftpb.HardState) error

SaveState saves the current HardState.

func (*DiskStorage) Snapshot

func (s *DiskStorage) Snapshot() (raftpb.Snapshot, error)

Snapshot returns the most recent snapshot.

func (*DiskStorage) Term

func (s *DiskStorage) Term(i uint64) (uint64, error)

Term returns the term of entry i.

type MessageHandler

type MessageHandler func(msg raftpb.Message)

MessageHandler is a callback function that handles incoming Raft messages.

type Node

type Node interface {
	// Propose proposes data to be appended to the Raft log.
	// This is used for any state changes that need to be replicated.
	Propose(ctx context.Context, data []byte) error

	// Start starts the Raft node and begins participating in the cluster.
	Start() error

	// Stop gracefully stops the Raft node.
	Stop() error

	// IsLeader returns true if this node is currently the Raft leader.
	IsLeader() bool

	// Leader returns the ID of the current leader, or 0 if unknown.
	Leader() uint64

	// AddNode adds a new node to the Raft cluster.
	AddNode(ctx context.Context, nodeID uint64, addr string) error

	// RemoveNode removes a node from the Raft cluster.
	RemoveNode(ctx context.Context, nodeID uint64) error

	// Campaign causes the node to transition to candidate state and attempt to become leader.
	Campaign(ctx context.Context) error

	// Ready returns a channel that receives Ready structs when the Raft state changes.
	Ready() <-chan Ready

	// Advance notifies the Raft node that the application has applied the committed entries.
	Advance()
}

Node represents a consensus node in the cluster. It wraps the etcd/raft implementation and provides a clean interface for StreamBus to interact with the consensus layer.

type Peer

type Peer struct {
	ID   uint64 // Raft node ID
	Addr string // Network address (host:port)
}

Peer represents a peer node in the Raft cluster.

type RaftNode

type RaftNode struct {
	// contains filtered or unexported fields
}

RaftNode wraps etcd/raft and implements the Node interface.

func NewNode

func NewNode(config *Config, sm StateMachine) (*RaftNode, error)

NewNode creates a new Raft consensus node.

func (*RaftNode) AddNode

func (rn *RaftNode) AddNode(ctx context.Context, nodeID uint64, addr string) error

AddNode adds a new node to the Raft cluster.

func (*RaftNode) Advance

func (rn *RaftNode) Advance()

Advance notifies the Raft node that the application has applied entries.

func (*RaftNode) Campaign

func (rn *RaftNode) Campaign(ctx context.Context) error

Campaign causes the node to transition to candidate state.

func (*RaftNode) IsLeader

func (rn *RaftNode) IsLeader() bool

IsLeader returns true if this node is currently the Raft leader.

func (*RaftNode) Leader

func (rn *RaftNode) Leader() uint64

Leader returns the ID of the current leader.

func (*RaftNode) Propose

func (rn *RaftNode) Propose(ctx context.Context, data []byte) error

Propose proposes data to be appended to the Raft log.

func (*RaftNode) Ready

func (rn *RaftNode) Ready() <-chan Ready

Ready returns a channel that receives Ready structs.

func (*RaftNode) RemoveNode

func (rn *RaftNode) RemoveNode(ctx context.Context, nodeID uint64) error

RemoveNode removes a node from the Raft cluster.

func (*RaftNode) SetBindAddr

func (rn *RaftNode) SetBindAddr(addr string)

SetBindAddr sets the bind address for the transport (for testing).

func (*RaftNode) Start

func (rn *RaftNode) Start() error

Start starts the Raft node.

func (*RaftNode) Status

func (rn *RaftNode) Status() raft.Status

Status returns the current Raft status for debugging.

func (*RaftNode) Stop

func (rn *RaftNode) Stop() error

Stop stops the Raft node.

type Ready

type Ready struct {
	// SoftState provides state that is useful for logging and debugging.
	// It does not need to be persisted.
	SoftState *SoftState

	// HardState contains the durable state that must be persisted to stable storage.
	HardState raftpb.HardState

	// Entries specifies entries to be saved to stable storage.
	Entries []raftpb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot raftpb.Snapshot

	// CommittedEntries specifies entries to be committed to the state machine.
	CommittedEntries []raftpb.Entry

	// Messages specifies outgoing messages to be sent to other nodes.
	Messages []raftpb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk before proceeding.
	MustSync bool
}

Ready encapsulates the state changes that need to be processed by the application. This is similar to raft.Ready but simplified for StreamBus use cases.

type SoftState

type SoftState struct {
	Lead      uint64 // Lead is the ID of the current leader
	RaftState StateType
}

SoftState provides state that is useful for logging and debugging.

type StateMachine

type StateMachine interface {
	// Apply applies a committed log entry to the state machine.
	Apply(entry []byte) error

	// Snapshot returns the current state of the state machine as a snapshot.
	Snapshot() ([]byte, error)

	// Restore restores the state machine from a snapshot.
	Restore(snapshot []byte) error
}

StateMachine defines the interface for applying committed log entries. This is the application-specific logic that processes Raft commands.

type StateType

type StateType uint64

StateType represents the role of the node in the Raft cluster.

const (
	StateFollower StateType = iota
	StateCandidate
	StateLeader
)

func (StateType) String

func (st StateType) String() string

type Storage

type Storage interface {
	// InitialState returns the saved HardState and ConfState.
	InitialState() (raftpb.HardState, raftpb.ConfState, error)

	// SaveState saves the current HardState.
	SaveState(st raftpb.HardState) error

	// Entries returns a slice of log entries in the range [lo, hi).
	Entries(lo, hi uint64) ([]raftpb.Entry, error)

	// Term returns the term of entry i.
	Term(i uint64) (uint64, error)

	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)

	// FirstIndex returns the index of the first log entry that is available.
	FirstIndex() (uint64, error)

	// Snapshot returns the most recent snapshot.
	Snapshot() (raftpb.Snapshot, error)

	// SaveEntries saves entries to stable storage.
	SaveEntries(entries []raftpb.Entry) error

	// SaveSnapshot saves the snapshot to stable storage.
	SaveSnapshot(snap raftpb.Snapshot) error

	// Compact compacts the log up to compactIndex.
	Compact(compactIndex uint64) error

	// Close closes the storage.
	Close() error
}

Storage defines the interface for persisting Raft state.

type Transport

type Transport interface {
	// Send sends a message to the specified node.
	Send(msg raftpb.Message) error

	// Start starts the transport layer.
	Start() error

	// Stop stops the transport layer.
	Stop() error

	// AddPeer adds a peer to the transport.
	AddPeer(nodeID uint64, addr string) error

	// RemovePeer removes a peer from the transport.
	RemovePeer(nodeID uint64) error
}

Transport defines the interface for sending Raft messages to other nodes.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL