Documentation
¶
Overview ¶
Package raft implements a Consensus component for IPFS Cluster which uses Raft (go-libp2p-raft).
Index ¶
- Variables
- func NewConsensusWithRPCClient(staging bool) ...
- func ValidateConfig(cfg *ClusterRaftConfig) error
- type ClusterRaftConfig
- type Consensus
- func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) Clean(ctx context.Context) error
- func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error
- func (cc *Consensus) Distrust(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) IsLeader(ctx context.Context) bool
- func (cc *Consensus) IsTrustedPeer(ctx context.Context, p peer.ID) bool
- func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error)
- func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error)
- func (cc *Consensus) Ready(ctx context.Context) <-chan struct{}
- func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error)
- func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) Shutdown(ctx context.Context) error
- func (cc *Consensus) State(ctx context.Context) (*RaftState, error)
- func (cc *Consensus) Trust(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) WaitForSync(ctx context.Context) error
- type ConsensusAPI
- type ConsensusOp
- type RaftState
Constants ¶
This section is empty.
Variables ¶
var ( DefaultDataSubFolder = "raft-cluster" DefaultWaitForLeaderTimeout = 15 * time.Second DefaultCommitRetries = 1 DefaultNetworkTimeout = 100 * time.Second DefaultCommitRetryDelay = 200 * time.Millisecond DefaultBackupsRotate = 6 )
Configuration defaults
var RaftLogCacheSize = 512
RaftLogCacheSize is the maximum number of logs to cache in-memory. This is used to reduce disk I/O for the recently committed entries.
var RaftMaxSnapshots = 5
RaftMaxSnapshots indicates how many snapshots to keep in the consensus data folder. TODO: Maybe include this in Config. Not sure how useful it is to touch this anyways.
Functions ¶
func NewConsensusWithRPCClient ¶
func NewConsensusWithRPCClient(staging bool) func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool, repo repo.LockedRepo, ) (*Consensus, error)
TODO: Merge with NewConsensus and remove the rpcReady chan
func ValidateConfig ¶
func ValidateConfig(cfg *ClusterRaftConfig) error
// Validate checks that this configuration has working values, // at least in appearance.
Types ¶
type ClusterRaftConfig ¶
type ClusterRaftConfig struct {
// config to enabled node cluster with raft consensus
ClusterModeEnabled bool
// A folder to store Raft's data.
DataFolder string
// InitPeerset provides the list of initial cluster peers for new Raft
// peers (with no prior state). It is ignored when Raft was already
// initialized or when starting in staging mode.
InitPeerset []string
// LeaderTimeout specifies how long to wait for a leader before
// failing an operation.
WaitForLeaderTimeout time.Duration
// NetworkTimeout specifies how long before a Raft network
// operation is timed out
NetworkTimeout time.Duration
// CommitRetries specifies how many times we retry a failed commit until
// we give up.
CommitRetries int
// How long to wait between retries
CommitRetryDelay time.Duration
// BackupsRotate specifies the maximum number of Raft's DataFolder
// copies that we keep as backups (renaming) after cleanup.
BackupsRotate int
// A Hashicorp Raft's configuration object.
RaftConfig *hraft.Config
// Tracing enables propagation of contexts across binary boundaries.
Tracing bool
}
ClusterRaftConfig allows to configure the Raft Consensus component for the node cluster.
func DefaultClusterRaftConfig ¶
func DefaultClusterRaftConfig() *ClusterRaftConfig
func NewClusterRaftConfig ¶
func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftConfig
func (*ClusterRaftConfig) GetDataFolder ¶
func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string
GetDataFolder returns the Raft data folder that we are using.
type Consensus ¶
Consensus handles the work of keeping a shared-state between the peers of a Lotus Cluster, as well as modifying that state and applying any updates in a thread-safe manner.
func NewConsensus ¶
func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, repo repo.LockedRepo, staging bool) (*Consensus, error)
NewConsensus builds a new ClusterConsensus component using Raft.
Raft saves state snapshots regularly and persists log data in a bolt datastore. Therefore, unless memory usage is a concern, it is recommended to use an in-memory go-datastore as store parameter.
The staging parameter controls if the Raft peer should start in staging mode (used when joining a new Raft peerset with other peers).
func (*Consensus) AddPeer ¶
AddPeer adds a new peer to participate in this consensus. It will forward the operation to the leader if this is not it.
func (*Consensus) Commit ¶
func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error
commit submits a cc.consensus commit. It retries upon failures.
func (*Consensus) IsTrustedPeer ¶
IsTrustedPeer returns true. In Raft we trust all peers.
func (*Consensus) Leader ¶
Leader returns the peerID of the Leader of the cluster. It returns an error when there is no leader.
func (*Consensus) Peers ¶
Peers return the current list of peers in the consensus. The list will be sorted alphabetically.
func (*Consensus) Ready ¶
Ready returns a channel which is signaled when the Consensus algorithm has finished bootstrapping and is ready to use
func (*Consensus) RedirectToLeader ¶
func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error)
returns true if the operation was redirected to the leader note that if the leader just dissappeared, the rpc call will fail because we haven't heard that it's gone.
func (*Consensus) RmPeer ¶
RmPeer removes a peer from this consensus. It will forward the operation to the leader if this is not it.
func (*Consensus) Shutdown ¶
Shutdown stops the component so it will not process any more updates. The underlying consensus is permanently shutdown, along with the libp2p transport.
func (*Consensus) State ¶
RaftState retrieves the current consensus RaftState. It may error if no RaftState has been agreed upon or the state is not consistent. The returned RaftState is the last agreed-upon RaftState known by this node. No writes are allowed, as all writes to the shared state should happen through the Consensus component methods.
type ConsensusAPI ¶
type ConsensusAPI interface {
// Returns a channel to signal that the consensus layer is ready
// allowing the main component to wait for it during start.
Ready(context.Context) <-chan struct{}
AddPeer(context.Context, peer.ID) error
RmPeer(context.Context, peer.ID) error
State(context.Context) (consensus.State, error)
// Provide a node which is responsible to perform
// specific tasks which must only run in 1 cluster peer.
Leader(context.Context) (peer.ID, error)
// Only returns when the consensus state has all log
// updates applied to it.
WaitForSync(context.Context) error
// Clean removes all consensus data.
Clean(context.Context) error
// Peers returns the peerset participating in the Consensus.
Peers(context.Context) ([]peer.ID, error)
// IsTrustedPeer returns true if the given peer is "trusted".
// This will grant access to more rpc endpoints and a
// non-trusted one. This should be fast as it will be
// called repeatedly for every remote RPC request.
IsTrustedPeer(context.Context, peer.ID) bool
// Trust marks a peer as "trusted".
Trust(context.Context, peer.ID) error
// Distrust removes a peer from the "trusted" set.
Distrust(context.Context, peer.ID) error
// Returns true if current node is the cluster leader
IsLeader(ctx context.Context) bool
Shutdown(context.Context) error
}
type ConsensusOp ¶
type RaftState ¶
type RaftState struct {
NonceMap api.NonceMapType
MsgUuids api.MsgUuidMapType
// This is because the miner only stores signed CIDs but the message received from in a
// block will be unsigned (for BLS). Hence, the process relies on the node to store the
// signed message which holds a copy of the unsigned message to properly perform all the
// needed checks
Mpool *messagepool.MessagePool
}