Documentation
¶
Index ¶
- Constants
- Variables
- type AppPanicError
- type Config
- type DownReplica
- type FatalError
- type Future
- type MultiTransport
- type RaftConfig
- type RaftServer
- func (rs *RaftServer) AppliedIndex(id uint64) uint64
- func (rs *RaftServer) ChangeMember(ctx context.Context, id uint64, changeType proto.ConfChangeType, ...) (future *Future)
- func (rs *RaftServer) CreateRaft(raftConfig *RaftConfig) error
- func (rs *RaftServer) GetDownReplicas(id uint64) (downReplicas []DownReplica)
- func (rs *RaftServer) GetPendingReplica(id uint64) (peers []uint64)
- func (rs *RaftServer) GetUnreachable(id uint64) (nodes []uint64)
- func (rs *RaftServer) IsLeader(id uint64) bool
- func (rs *RaftServer) LeaderTerm(id uint64) (leader, term uint64)
- func (rs *RaftServer) RemoveRaft(id uint64) error
- func (rs *RaftServer) Status(id uint64) (status *Status)
- func (rs *RaftServer) Stop()
- func (rs *RaftServer) Submit(ctx context.Context, id uint64, cmd []byte) (future *Future)
- func (rs *RaftServer) Truncate(id uint64, index uint64)
- func (rs *RaftServer) TryToLeader(ctx context.Context, id uint64) (future *Future)
- type ReplicaStatus
- type SocketResolver
- type SocketType
- type StateMachine
- type Status
- type Transport
- type TransportConfig
Constants ¶
const ( KB = 1 << (10 * iota) MB )
const NoLeader uint64 = 0
NoLeader is a placeholder nodeID used when there is no leader.
Variables ¶
var ( ErrCompacted = errors.New("requested index is unavailable due to compaction.") ErrRaftExists = errors.New("raft already exists.") ErrRaftNotExists = errors.New("raft not exists.") ErrNotLeader = errors.New("raft is not the leader.") ErrStopped = errors.New("raft is already shutdown.") ErrSnapping = errors.New("raft is doing snapshot.") ErrCanceled = errors.New("raft request canceled by caller") )
Functions ¶
This section is empty.
Types ¶
type AppPanicError ¶
type AppPanicError string
AppPanicError is panic error when repl occurred fatal error. The server will recover this panic and stop the shard repl.
func (*AppPanicError) Error ¶
func (pe *AppPanicError) Error() string
type Config ¶
type Config struct {
TransportConfig
// NodeID is the identity of the local node. NodeID cannot be 0.
// This parameter is required.
NodeID uint64
// TickInterval is the interval of timer which check heartbeat and election timeout.
// The default value is 2s.
TickInterval time.Duration
// HeartbeatTick is the heartbeat interval. A leader sends heartbeat
// message to maintain the leadership every heartbeat interval.
// The default value is 2s.
HeartbeatTick int
// ElectionTick is the election timeout. If a follower does not receive any message
// from the leader of current term during ElectionTick, it will become candidate and start an election.
// ElectionTick must be greater than HeartbeatTick.
// We suggest to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary leader switching.
// The default value is 10s.
ElectionTick int
// MaxSizePerMsg limits the max size of each append message.
// The default value is 1M.
MaxSizePerMsg uint64
// MaxInflightMsgs limits the max number of in-flight append messages during optimistic replication phase.
// The application transportation layer usually has its own sending buffer over TCP/UDP.
// Setting MaxInflightMsgs to avoid overflowing that sending buffer.
// The default value is 128.
MaxInflightMsgs int
// ReqBufferSize limits the max number of recive request chan buffer.
// The default value is 1024.
ReqBufferSize int
// AppBufferSize limits the max number of apply chan buffer.
// The default value is 2048.
AppBufferSize int
// RetainLogs controls how many logs we leave after truncate.
// This is used so that we can quickly replay logs on a follower instead of being forced to send an entire snapshot.
// The default value is 20000.
RetainLogs uint64
// LeaseCheck whether to use the lease mechanism.
// The default value is false.
LeaseCheck bool
// contains filtered or unexported fields
}
Config contains the parameters to start a raft server. Default: Do not use lease mechanism. NOTE: NodeID and Resolver must be required.Other parameter has default value.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with usable defaults.
type DownReplica ¶
DownReplica down replica
type FatalError ¶
type MultiTransport ¶
type MultiTransport struct {
// contains filtered or unexported fields
}
func (*MultiTransport) Send ¶
func (t *MultiTransport) Send(m *proto.Message)
func (*MultiTransport) SendSnapshot ¶
func (t *MultiTransport) SendSnapshot(m *proto.Message, rs *snapshotStatus)
func (*MultiTransport) Stop ¶
func (t *MultiTransport) Stop()
type RaftConfig ¶
type RaftConfig struct {
ID uint64
Term uint64
Leader uint64
Applied uint64
Peers []proto.Peer
Storage storage.Storage
StateMachine StateMachine
}
ReplConfig contains the parameters to create a replication.
type RaftServer ¶
type RaftServer struct {
// contains filtered or unexported fields
}
func NewRaftServer ¶
func NewRaftServer(config *Config) (*RaftServer, error)
func (*RaftServer) AppliedIndex ¶
func (rs *RaftServer) AppliedIndex(id uint64) uint64
func (*RaftServer) ChangeMember ¶
func (rs *RaftServer) ChangeMember(ctx context.Context, id uint64, changeType proto.ConfChangeType, peer proto.Peer, context []byte) (future *Future)
func (*RaftServer) CreateRaft ¶
func (rs *RaftServer) CreateRaft(raftConfig *RaftConfig) error
func (*RaftServer) GetDownReplicas ¶
func (rs *RaftServer) GetDownReplicas(id uint64) (downReplicas []DownReplica)
GetDownReplicas 获取down的副本
func (*RaftServer) GetPendingReplica ¶
func (rs *RaftServer) GetPendingReplica(id uint64) (peers []uint64)
GetPendingReplica get snapshot pending followers
func (*RaftServer) GetUnreachable ¶
func (rs *RaftServer) GetUnreachable(id uint64) (nodes []uint64)
func (*RaftServer) IsLeader ¶
func (rs *RaftServer) IsLeader(id uint64) bool
func (*RaftServer) LeaderTerm ¶
func (rs *RaftServer) LeaderTerm(id uint64) (leader, term uint64)
func (*RaftServer) RemoveRaft ¶
func (rs *RaftServer) RemoveRaft(id uint64) error
func (*RaftServer) Status ¶
func (rs *RaftServer) Status(id uint64) (status *Status)
func (*RaftServer) Stop ¶
func (rs *RaftServer) Stop()
func (*RaftServer) Truncate ¶
func (rs *RaftServer) Truncate(id uint64, index uint64)
func (*RaftServer) TryToLeader ¶
func (rs *RaftServer) TryToLeader(ctx context.Context, id uint64) (future *Future)
type ReplicaStatus ¶
type ReplicaStatus struct {
Match uint64 // 复制进度
Commit uint64 // commmit位置
Next uint64
State string
Snapshoting bool
Paused bool
Active bool
LastActive time.Time
Inflight int
}
ReplicaStatus replica status
type SocketResolver ¶
type SocketResolver interface {
NodeAddress(nodeID uint64, stype SocketType) (addr string, err error)
}
The SocketResolver interface is supplied by the application to resolve NodeID to net.Addr addresses.
type SocketType ¶
type SocketType byte
const ( HeartBeat SocketType = 0 Replicate SocketType = 1 )
func (SocketType) String ¶
func (t SocketType) String() string
type StateMachine ¶
type StateMachine interface {
Apply(command []byte, index uint64) (interface{}, error)
ApplyMemberChange(confChange *proto.ConfChange, index uint64) (interface{}, error)
Snapshot() (proto.Snapshot, error)
ApplySnapshot(peers []proto.Peer, iter proto.SnapIterator) error
HandleFatalEvent(err *FatalError)
HandleLeaderChange(leader uint64)
}
The StateMachine interface is supplied by the application to persist/snapshot data of application.
type Status ¶
type Status struct {
ID uint64
NodeID uint64
Leader uint64
Term uint64
Index uint64
Commit uint64
Applied uint64
Vote uint64
PendQueue int
RecvQueue int
AppQueue int
Stopped bool
RestoringSnapshot bool
State string // leader、follower、candidate
Replicas map[uint64]*ReplicaStatus
}
Status raft status
type Transport ¶
type Transport interface {
Send(m *proto.Message)
SendSnapshot(m *proto.Message, rs *snapshotStatus)
Stop()
}
func NewMultiTransport ¶
func NewMultiTransport(raft *RaftServer, config *TransportConfig) (Transport, error)
type TransportConfig ¶
type TransportConfig struct {
// HeartbeatAddr is the Heartbeat port.
// The default value is 3016.
HeartbeatAddr string
// ReplicateAddr is the Replation port.
// The default value is 2015.
ReplicateAddr string
// 发送队列大小
SendBufferSize int
//复制并发数(node->node)
MaxReplConcurrency int
// MaxSnapConcurrency limits the max number of snapshot concurrency.
// The default value is 10.
MaxSnapConcurrency int
// This parameter is required.
Resolver SocketResolver
}
Source Files
¶
- config.go
- errors.go
- future.go
- pool.go
- raft.go
- raft_fsm.go
- raft_fsm_candidate.go
- raft_fsm_follower.go
- raft_fsm_leader.go
- raft_fsm_state.go
- raft_log.go
- raft_log_unstable.go
- raft_replica.go
- raft_snapshot.go
- server.go
- statemachine.go
- status.go
- transport.go
- transport_heartbeat.go
- transport_multi.go
- transport_replicate.go
- transport_sender.go