Documentation
¶
Index ¶
- Variables
- func ReadHeaderInt64(md metadata.MD, key string) (v int64, err error)
- func SessionKey(sessionId SessionId) string
- func ShadowKey(sessionId SessionId, key string) string
- type Client
- type Config
- type CursorAcker
- type FollowerController
- type FollowerCursor
- type GetResult
- type LeaderController
- type MessageWithTerm
- type QuorumAckTracker
- type ReplicateStreamProvider
- type ReplicationRpcProvider
- type Server
- func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
- func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
- func (s *Server) Close() error
- func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
- func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
- func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
- func (s Server) GetNotifications(req *proto.NotificationsRequest, ...) error
- func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, ...) error
- func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
- func (s *Server) InternalPort() int
- func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)
- func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error
- func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
- func (s Server) Port() int
- func (s *Server) PublicPort() int
- func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error
- func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error
- func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error
- func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error
- func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
- func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
- type SessionId
- type SessionManager
- type ShardAssignmentsDispatcher
- type ShardsDirector
- type Standalone
- type StandaloneConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTooManyCursors = errors.New("too many cursors") ErrInvalidHeadOffset = errors.New("invalid head offset") )
View Source
var InvalidEntryId = &proto.EntryId{ Term: wal.InvalidTerm, Offset: wal.InvalidOffset, }
View Source
var SessionUpdateOperationCallback kv.UpdateOperationCallback = &updateCallback{}
Functions ¶
func SessionKey ¶
Types ¶
type Client ¶
type Client interface {
Send(*proto.ShardAssignments) error
Context() context.Context
}
type CursorAcker ¶
type CursorAcker interface {
Ack(offset int64)
}
type FollowerController ¶
type FollowerController interface {
io.Closer
// NewTerm
//
// Node handles a new term request
//
// A node receives a new term request, fences itself and responds
// with its head offset.
//
// When a node is fenced it cannot:
// - accept any writes from a client.
// - accept append from a leader.
// - send any entries to followers if it was a leader.
//
// Any existing follow cursors are destroyed as is any state
// regarding reconfigurations.
NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error)
// Truncate
//
// A node that receives a truncate request knows that it
// has been selected as a follower. It truncates its log
// to the indicates entry id, updates its term and changes
// to a Follower.
Truncate(req *proto.TruncateRequest) (*proto.TruncateResponse, error)
Replicate(stream proto.OxiaLogReplication_ReplicateServer) error
SendSnapshot(stream proto.OxiaLogReplication_SendSnapshotServer) error
GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
Term() int64
CommitOffset() int64
Status() proto.ServingStatus
}
FollowerController handles all the operations of a given shard's follower.
type FollowerCursor ¶
type FollowerCursor interface {
io.Closer
ShardId() int64
// LastPushed
// The last entry that was sent to this follower
LastPushed() int64
// AckOffset The highest entry already acknowledged by this follower
AckOffset() int64
}
FollowerCursor The FollowerCursor represents a cursor on the leader WAL that sends entries to a specific follower and receives a stream of acknowledgments from that follower.
func NewFollowerCursor ¶
func NewFollowerCursor( follower string, term int64, namespace string, shardId int64, replicateStreamProvider ReplicateStreamProvider, ackTracker QuorumAckTracker, walObject wal.Wal, db kv.DB, ackOffset int64) (FollowerCursor, error)
type GetResult ¶
type GetResult struct {
Response *proto.GetResponse
Err error
}
type LeaderController ¶
type LeaderController interface {
io.Closer
Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
Read(ctx context.Context, request *proto.ReadRequest) <-chan GetResult
List(ctx context.Context, request *proto.ListRequest) (<-chan string, error)
ListSliceNoMutex(ctx context.Context, request *proto.ListRequest) ([]string, error)
// NewTerm Handle new term requests
NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error)
// BecomeLeader Handles BecomeLeaderRequest from coordinator and prepares to be leader for the shard
BecomeLeader(ctx context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
AddFollower(request *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error
GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
// Term The current term of the leader
Term() int64
// Status The Status of the leader
Status() proto.ServingStatus
CreateSession(*proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
KeepAlive(sessionId int64) error
CloseSession(*proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
}
func NewLeaderController ¶
func NewLeaderController(config Config, namespace string, shardId int64, rpcClient ReplicationRpcProvider, walFactory wal.Factory, kvFactory kv.Factory) (LeaderController, error)
type MessageWithTerm ¶
type MessageWithTerm interface {
GetTerm() int64
}
type QuorumAckTracker ¶
type QuorumAckTracker interface {
io.Closer
CommitOffset() int64
// WaitForCommitOffset
// Waits for the specific entry id to be fully committed.
// After that, invokes the function f
WaitForCommitOffset(ctx context.Context, offset int64, f func() (*proto.WriteResponse, error)) (*proto.WriteResponse, error)
// NextOffset returns the offset for the next entry to write
// Note this can go ahead of the head-offset as there can be multiple operations in flight.
NextOffset() int64
HeadOffset() int64
AdvanceHeadOffset(headOffset int64)
// WaitForHeadOffset
// Waits until the specified entry is written on the wal
WaitForHeadOffset(ctx context.Context, offset int64) error
// NewCursorAcker creates a tracker for a new cursor
// The `ackOffset` is the previous last-acked position for the cursor
NewCursorAcker(ackOffset int64) (CursorAcker, error)
}
QuorumAckTracker The QuorumAckTracker is responsible for keeping track of the head offset and commit offset of a shard
- Head offset: the last entry written in the local WAL of the leader
- Commit offset: the oldest entry that is considered "fully committed", as it has received the requested amount of acks from the followers
The quorum ack tracker is also used to block until the head offset or commit offset are advanced.
func NewQuorumAckTracker ¶
func NewQuorumAckTracker(replicationFactor uint32, headOffset int64, commitOffset int64) QuorumAckTracker
type ReplicateStreamProvider ¶
type ReplicateStreamProvider interface {
GetReplicateStream(ctx context.Context, follower string, namespace string, shard int64) (proto.OxiaLogReplication_ReplicateClient, error)
SendSnapshot(ctx context.Context, follower string, namespace string, shard int64) (proto.OxiaLogReplication_SendSnapshotClient, error)
}
ReplicateStreamProvider This is a provider for the ReplicateStream Grpc handler It's used to allow passing in a mocked version of the Grpc service.
type ReplicationRpcProvider ¶
type ReplicationRpcProvider interface {
io.Closer
ReplicateStreamProvider
Truncate(follower string, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
}
func NewReplicationRpcProvider ¶
func NewReplicationRpcProvider(tlsConf *tls.Config) ReplicationRpcProvider
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewWithGrpcProvider ¶
func NewWithGrpcProvider(config Config, provider container.GrpcProvider, replicationRpcProvider ReplicationRpcProvider) (*Server, error)
func (Server) AddFollower ¶
func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
func (Server) BecomeLeader ¶
func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
func (Server) CloseSession ¶
func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
func (Server) CreateSession ¶
func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
func (Server) DeleteShard ¶
func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
func (Server) GetNotifications ¶
func (s Server) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error
func (Server) GetShardAssignments ¶
func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, srv proto.OxiaClient_GetShardAssignmentsServer) error
func (Server) GetStatus ¶
func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
func (*Server) InternalPort ¶
func (Server) KeepAlive ¶
func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)
func (Server) List ¶
func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error
func (Server) NewTerm ¶
func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
func (*Server) PublicPort ¶
func (Server) PushShardAssignments ¶
func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error
func (Server) Read ¶
func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error
func (Server) Replicate ¶
func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error
func (Server) SendSnapshot ¶
func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error
func (Server) Truncate ¶
func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
func (Server) Write ¶
func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
type SessionManager ¶
type SessionManager interface {
io.Closer
CreateSession(request *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
KeepAlive(sessionId int64) error
CloseSession(request *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
Initialize() error
}
func NewSessionManager ¶
func NewSessionManager(ctx context.Context, namespace string, shardId int64, controller *leaderController) SessionManager
type ShardAssignmentsDispatcher ¶
type ShardAssignmentsDispatcher interface {
io.Closer
Initialized() bool
PushShardAssignments(stream proto.OxiaCoordination_PushShardAssignmentsServer) error
RegisterForUpdates(req *proto.ShardAssignmentsRequest, client Client) error
}
func NewShardAssignmentDispatcher ¶
func NewShardAssignmentDispatcher(healthServer *health.Server) ShardAssignmentsDispatcher
func NewStandaloneShardAssignmentDispatcher ¶
func NewStandaloneShardAssignmentDispatcher(numShards uint32) ShardAssignmentsDispatcher
type ShardsDirector ¶
type ShardsDirector interface {
io.Closer
GetLeader(shardId int64) (LeaderController, error)
GetFollower(shardId int64) (FollowerController, error)
GetOrCreateLeader(namespace string, shardId int64) (LeaderController, error)
GetOrCreateFollower(namespace string, shardId int64) (FollowerController, error)
DeleteShard(req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
}
func NewShardsDirector ¶
func NewShardsDirector(config Config, walFactory wal.Factory, kvFactory kv.Factory, provider ReplicationRpcProvider) ShardsDirector
type Standalone ¶
type Standalone struct {
// contains filtered or unexported fields
}
func NewStandalone ¶
func NewStandalone(config StandaloneConfig) (*Standalone, error)
func (*Standalone) Close ¶
func (s *Standalone) Close() error
func (*Standalone) RpcPort ¶
func (s *Standalone) RpcPort() int
type StandaloneConfig ¶
func NewTestConfig ¶
func NewTestConfig(dir string) StandaloneConfig
Source Files
¶
Click to show internal directories.
Click to hide internal directories.