Documentation
¶
Index ¶
- Constants
- Variables
- type Addresses
- type AppendEntriesRequest
- type AppendEntriesResponse
- type ClusterRequest
- type ClusterResponse
- type Configuration
- type FSM
- type FSMSnapshot
- type FileSnapshotSink
- type FsmReadRequest
- type FsmReadResponse
- type Future
- type Handler
- type Handlers
- type HeartbeatPayload
- type HeartbeatRequest
- type HeartbeatResponse
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type LeaderOptions
- type Log
- type LogMeta
- type LogType
- type Message
- type MessageReader
- type MessageRequestType
- type MessageWriter
- type Node
- type Options
- type OptionsBuilder
- func (builder *OptionsBuilder) Addresses(addresses Addresses)
- func (builder *OptionsBuilder) Fsm(fsm FSM)
- func (builder *OptionsBuilder) Id(id string)
- func (builder *OptionsBuilder) LeaderCommitTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) LeaderElectionTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) LeaderHeartbeatTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) LeaderLeaseTimeout(timeout time.Duration)
- func (builder *OptionsBuilder) Nonvoter(yes bool)
- func (builder *OptionsBuilder) SnapshotInterval(interval time.Duration)
- func (builder *OptionsBuilder) SnapshotNotRestoreOnStart(yes bool)
- func (builder *OptionsBuilder) SnapshotStore(store SnapshotStore)
- func (builder *OptionsBuilder) SnapshotThreshold(threshold uint64)
- func (builder *OptionsBuilder) SnapshotTrailingLogs(trailingLogs uint64)
- func (builder *OptionsBuilder) Store(store Store)
- func (builder *OptionsBuilder) TLS(v TLS)
- func (builder *OptionsBuilder) Transport(transport Transport)
- type Promise
- type RPC
- type RPCHeader
- type Raft
- type Server
- type ServerSuffrage
- type SnapshotMeta
- type SnapshotMetas
- type SnapshotOptions
- type SnapshotSink
- type SnapshotStore
- type State
- type Store
- type TLS
- type TimeoutNowRequest
- type TimeoutNowResponse
- type Transport
- type Trunk
- type VoteRequest
- type VoteResponse
Constants ¶
View Source
const ( AppendEntriesRequestType = MessageRequestType(iota + 1) AppendEntriesResponseType VoteRequestType VoteResponseType InstallSnapshotRequestType InstallSnapshotResponseType TimeoutNowRequestType TimeoutNowResponseType HeartbeatRequestType HeartbeatResponseType ClusterRequestType ClusterResponseType FsmReadRequestType FsmReadResponseType )
Variables ¶
View Source
var ( ErrLogNotFound = fmt.Errorf("log not found") ErrNotFound = fmt.Errorf("data not found") )
View Source
var (
FutureWaitTimeoutErr = fmt.Errorf("future wait timeout")
)
Functions ¶
This section is empty.
Types ¶
type Addresses ¶
type Addresses interface {
Local() (local string, err error)
Members() (addresses []string, err error)
}
func DesignatedAddresses ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct {
RPCHeader
Term uint64
PrevLogEntry uint64
PrevLogTerm uint64
LeaderCommitIndex uint64
Key []byte
Entries []*Log
}
func (*AppendEntriesRequest) Decode ¶
func (request *AppendEntriesRequest) Decode(msg MessageReader) (err error)
func (*AppendEntriesRequest) Encode ¶
func (request *AppendEntriesRequest) Encode() (writer MessageWriter, err error)
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
RPCHeader
Term uint64
LastLog uint64
Succeed bool
NoRetryBackoff bool
}
func (*AppendEntriesResponse) Decode ¶
func (response *AppendEntriesResponse) Decode(msg MessageReader) (err error)
func (*AppendEntriesResponse) Encode ¶
func (response *AppendEntriesResponse) Encode() (writer MessageWriter, err error)
type ClusterRequest ¶
func (*ClusterRequest) Decode ¶
func (request *ClusterRequest) Decode(msg MessageReader) (err error)
func (*ClusterRequest) Encode ¶
func (request *ClusterRequest) Encode() (writer MessageWriter, err error)
type ClusterResponse ¶
func (*ClusterResponse) Decode ¶
func (response *ClusterResponse) Decode(msg MessageReader) (err error)
func (*ClusterResponse) Encode ¶
func (response *ClusterResponse) Encode() (writer MessageWriter, err error)
type Configuration ¶
type Configuration struct {
Servers []Server
}
type FSMSnapshot ¶
type FSMSnapshot interface {
Persist(sink SnapshotSink) error
Release()
}
type FileSnapshotSink ¶
type FileSnapshotSink struct {
// contains filtered or unexported fields
}
func (*FileSnapshotSink) Cancel ¶
func (sink *FileSnapshotSink) Cancel() (err error)
func (*FileSnapshotSink) Close ¶
func (sink *FileSnapshotSink) Close() (err error)
func (*FileSnapshotSink) Id ¶
func (sink *FileSnapshotSink) Id() (id string)
type FsmReadRequest ¶
func (*FsmReadRequest) Decode ¶
func (request *FsmReadRequest) Decode(msg MessageReader) (err error)
func (*FsmReadRequest) Encode ¶
func (request *FsmReadRequest) Encode() (writer MessageWriter, err error)
type FsmReadResponse ¶
func (*FsmReadResponse) Decode ¶
func (response *FsmReadResponse) Decode(msg MessageReader) (err error)
func (*FsmReadResponse) Encode ¶
func (response *FsmReadResponse) Encode() (writer MessageWriter, err error)
type Handler ¶
type Handler struct {
}
func (*Handler) Handle ¶
func (handler *Handler) Handle(request *AppendEntriesRequest) (response *AppendEntriesResponse)
type Handlers ¶
type Handlers struct {
// contains filtered or unexported fields
}
func (*Handlers) Dispatch ¶
func (handlers *Handlers) Dispatch(request *AppendEntriesRequest) (response *AppendEntriesResponse)
type HeartbeatPayload ¶
type HeartbeatRequest ¶
type HeartbeatRequest struct {
RPCHeader
Term uint64
Key []byte
// contains filtered or unexported fields
}
func (*HeartbeatRequest) Decode ¶
func (request *HeartbeatRequest) Decode(msg MessageReader) (err error)
func (*HeartbeatRequest) Encode ¶
func (request *HeartbeatRequest) Encode() (writer MessageWriter, err error)
type HeartbeatResponse ¶
type HeartbeatResponse struct {
RPCHeader
}
func (*HeartbeatResponse) Decode ¶
func (response *HeartbeatResponse) Decode(msg MessageReader) (err error)
func (*HeartbeatResponse) Encode ¶
func (response *HeartbeatResponse) Encode() (writer MessageWriter, err error)
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct {
RPCHeader
Term uint64
LastLogIndex uint64
LastLogTerm uint64
ConfigurationIndex uint64
Size uint64
Leader []byte
Configuration []byte
Snapshot io.Reader
}
func (*InstallSnapshotRequest) Decode ¶
func (request *InstallSnapshotRequest) Decode(msg MessageReader) (err error)
func (*InstallSnapshotRequest) Encode ¶
func (request *InstallSnapshotRequest) Encode() (writer MessageWriter, err error)
type InstallSnapshotResponse ¶
func (*InstallSnapshotResponse) Decode ¶
func (response *InstallSnapshotResponse) Decode(msg MessageReader) (err error)
func (*InstallSnapshotResponse) Encode ¶
func (response *InstallSnapshotResponse) Encode() (writer MessageWriter, err error)
type LeaderOptions ¶
type LeaderOptions struct {
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
CommitTimeout time.Duration
// LeaseTimeout
// 用于控制“租约”的持续时间
// 作为领导者而无法联系法定人数
// 个节点。如果我们在没有联系的情况下达到这个间隔,我们将
// 辞去领导职务。
LeaseTimeout time.Duration
}
func (LeaderOptions) Verify ¶
func (options LeaderOptions) Verify() (err error)
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message +---------------------------------------------------------+-----------+-----------+ | Header | Body | Trunk | +---------------------+-----------------+-----------------+-----------+-----------+ | 4(BigEndian) | 2(BigEndian) | 2(BigEndian) | n | reader | +---------------------+-----------------+-----------------+-----------+-----------+ | Len(data) | request type | kind | data | snappy | +---------------------+-----------------+-----------------+-----------+-----------+
func (*Message) RequestType ¶
func (msg *Message) RequestType() (typ MessageRequestType)
type MessageReader ¶
type MessageReader interface {
ReadFrom(r io.Reader) (n int64, err error)
RequestType() (typ MessageRequestType)
Bytes() (p []byte)
Trunk() (trunk *Trunk, has bool)
}
func NewMessageReader ¶
func NewMessageReader() (msg MessageReader)
type MessageRequestType ¶
type MessageRequestType uint16
type MessageWriter ¶
func NewMessageWriter ¶
func NewMessageWriter(requestType MessageRequestType, data []byte) (msg MessageWriter)
func NewMessageWriterWithTrunk ¶
func NewMessageWriterWithTrunk(requestType MessageRequestType, data []byte, sink io.Reader) (msg MessageWriter)
type Options ¶
type Options struct {
Id string
Addresses Addresses
Nonvoter bool
TLS TLS
Leader LeaderOptions
Snapshot SnapshotOptions
Store Store
FSM FSM
Transport Transport
}
type OptionsBuilder ¶
type OptionsBuilder struct {
// contains filtered or unexported fields
}
func NewOptionsBuilder ¶
func NewOptionsBuilder() (builder *OptionsBuilder)
func (*OptionsBuilder) Addresses ¶
func (builder *OptionsBuilder) Addresses(addresses Addresses)
func (*OptionsBuilder) Fsm ¶
func (builder *OptionsBuilder) Fsm(fsm FSM)
func (*OptionsBuilder) Id ¶
func (builder *OptionsBuilder) Id(id string)
func (*OptionsBuilder) LeaderCommitTimeout ¶
func (builder *OptionsBuilder) LeaderCommitTimeout(timeout time.Duration)
func (*OptionsBuilder) LeaderElectionTimeout ¶
func (builder *OptionsBuilder) LeaderElectionTimeout(timeout time.Duration)
func (*OptionsBuilder) LeaderHeartbeatTimeout ¶
func (builder *OptionsBuilder) LeaderHeartbeatTimeout(timeout time.Duration)
func (*OptionsBuilder) LeaderLeaseTimeout ¶
func (builder *OptionsBuilder) LeaderLeaseTimeout(timeout time.Duration)
func (*OptionsBuilder) Nonvoter ¶
func (builder *OptionsBuilder) Nonvoter(yes bool)
func (*OptionsBuilder) SnapshotInterval ¶
func (builder *OptionsBuilder) SnapshotInterval(interval time.Duration)
func (*OptionsBuilder) SnapshotNotRestoreOnStart ¶
func (builder *OptionsBuilder) SnapshotNotRestoreOnStart(yes bool)
func (*OptionsBuilder) SnapshotStore ¶
func (builder *OptionsBuilder) SnapshotStore(store SnapshotStore)
func (*OptionsBuilder) SnapshotThreshold ¶
func (builder *OptionsBuilder) SnapshotThreshold(threshold uint64)
func (*OptionsBuilder) SnapshotTrailingLogs ¶
func (builder *OptionsBuilder) SnapshotTrailingLogs(trailingLogs uint64)
func (*OptionsBuilder) Store ¶
func (builder *OptionsBuilder) Store(store Store)
func (*OptionsBuilder) TLS ¶
func (builder *OptionsBuilder) TLS(v TLS)
func (*OptionsBuilder) Transport ¶
func (builder *OptionsBuilder) Transport(transport Transport)
type RPC ¶
type RPC interface {
Encode() (writer MessageWriter, err error)
Decode(msg MessageReader) (err error)
}
type Raft ¶
type Raft interface {
// Run start node
//
// check cluster was serving, when serving, then call leader to add this node,
// when not serving, then boot a cluster.
Run(ctx context.Context) (err error)
// Close shutdown node
//
// check leader, when this node is leader, then just shutdown,
// when this nod is not leader, then call leader to remove this node and shutdown.
Close(ctx context.Context) (err error)
// Apply make fsm to apply a log
Apply(ctx context.Context, key []byte, body []byte, timeout time.Duration) (result []byte, err error)
}
type Server ¶
type Server struct {
Suffrage ServerSuffrage
Id string
Address string
}
type ServerSuffrage ¶
type ServerSuffrage int
const ( Voter ServerSuffrage = iota + 1 Nonvoter )
func (ServerSuffrage) String ¶
func (s ServerSuffrage) String() string
type SnapshotMeta ¶
type SnapshotMeta struct {
Id string
Index uint64
Term uint64
Configuration Configuration
ConfigurationIndex uint64
Size uint64
CRC []byte
CreateAT time.Time
}
func (*SnapshotMeta) Encode ¶
func (meta *SnapshotMeta) Encode() (p []byte)
type SnapshotMetas ¶
type SnapshotMetas []*SnapshotMeta
func (SnapshotMetas) Len ¶
func (metas SnapshotMetas) Len() int
func (SnapshotMetas) Less ¶
func (metas SnapshotMetas) Less(i, j int) bool
func (SnapshotMetas) Swap ¶
func (metas SnapshotMetas) Swap(i, j int)
type SnapshotOptions ¶
type SnapshotOptions struct {
Store SnapshotStore
// TrailingLogs控制快照后留下的日志数量。这是为了让我们可以快速回放跟踪者的日志,而不是被迫发送整个快照。此处传递的值是使用的初始设置。这可以在运行期间使用ReloadConfig进行调整。
TrailingLogs uint64
// SnapshotThreshold控制在执行快照之前必须有多少未完成的日志。这是为了通过重放一小组日志来防止过度快照。此处传递的值是使用的初始设置。这可以在运行期间使用ReloadConfig进行调整。
Threshold uint64
Interval time.Duration
NoSnapshotRestoreOnStart bool
}
func (SnapshotOptions) Verify ¶
func (options SnapshotOptions) Verify() (err error)
type SnapshotSink ¶
type SnapshotSink interface {
io.WriteCloser
Id() string
Cancel() error
}
type SnapshotStore ¶
type SnapshotStore interface {
Create(index uint64, term uint64, configuration Configuration, configurationIndex uint64) (sink SnapshotSink, err error)
List() (metas []*SnapshotMeta, err error)
Open(id string) (meta *SnapshotMeta, reader io.ReadCloser, err error)
}
func FileSnapshotStore ¶
func FileSnapshotStore(dir string) (store SnapshotStore, err error)
type Store ¶
type Store interface {
AcquireIndex() (term uint64, index uint64, err error)
Meta() (meta LogMeta)
FirstIndex() (index uint64, err error)
LastIndex() (index uint64, err error)
Read(index uint64) (log *Log, err error)
Write(logs ...*Log) (err error)
Commit(logs ...*Log) (err error)
Remove(logs ...*Log) (err error)
Close()
}
type TimeoutNowRequest ¶
type TimeoutNowRequest struct {
RPCHeader
}
func (*TimeoutNowRequest) Decode ¶
func (request *TimeoutNowRequest) Decode(msg MessageReader) (err error)
func (*TimeoutNowRequest) Encode ¶
func (request *TimeoutNowRequest) Encode() (writer MessageWriter, err error)
type TimeoutNowResponse ¶
type TimeoutNowResponse struct {
RPCHeader
}
func (*TimeoutNowResponse) Decode ¶
func (response *TimeoutNowResponse) Decode(msg MessageReader) (err error)
func (*TimeoutNowResponse) Encode ¶
func (response *TimeoutNowResponse) Encode() (writer MessageWriter, err error)
type Transport ¶
type Transport interface {
Dial(address string) (conn net.Conn, err error)
Listen(address string) (ln net.Listener, err error)
}
func TcpTransport ¶
func TcpTransport() Transport
type Trunk ¶
type Trunk struct {
// contains filtered or unexported fields
}
func (*Trunk) SetLimiter ¶
type VoteRequest ¶
type VoteRequest struct {
RPCHeader
Term uint64
LastLogIndex uint64
LastLogTerm uint64
LeadershipTransfer bool
}
func (*VoteRequest) Decode ¶
func (request *VoteRequest) Decode(msg MessageReader) (err error)
func (*VoteRequest) Encode ¶
func (request *VoteRequest) Encode() (writer MessageWriter, err error)
type VoteResponse ¶
func (*VoteResponse) Decode ¶
func (response *VoteResponse) Decode(msg MessageReader) (err error)
func (*VoteResponse) Encode ¶
func (response *VoteResponse) Encode() (writer MessageWriter, err error)
Source Files
¶
Click to show internal directories.
Click to hide internal directories.