Documentation
¶
Index ¶
- Variables
- func ReadHeaderInt64(md metadata.MD, key string) (v int64, err error)
- func SessionKey(sessionId SessionId) string
- func ShadowKey(sessionId SessionId, key string) string
- type Client
- type Config
- type CursorAcker
- type FollowerController
- type FollowerCursor
- type LeaderController
- type MessageWithTerm
- type QuorumAckTracker
- type ReplicateStreamProvider
- type ReplicationRpcProvider
- type Server
- func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
- func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
- func (s *Server) Close() error
- func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
- func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
- func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
- func (s Server) GetNotifications(req *proto.NotificationsRequest, ...) error
- func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, ...) error
- func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
- func (s *Server) InternalPort() int
- func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)
- func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error
- func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
- func (s Server) Port() int
- func (s *Server) PublicPort() int
- func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error
- func (s Server) RangeScan(request *proto.RangeScanRequest, stream proto.OxiaClient_RangeScanServer) error
- func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error
- func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error
- func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error
- func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
- func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
- func (s Server) WriteStream(stream proto.OxiaClient_WriteStreamServer) error
- type SessionId
- type SessionManager
- type ShardAssignmentsDispatcher
- type ShardsDirector
- type Standalone
- type StandaloneConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrTooManyCursors = errors.New("too many cursors") ErrInvalidHeadOffset = errors.New("invalid head offset") )
View Source
var ErrLeaderClosed = errors.New("the leader has been closed")
View Source
var InvalidEntryId = &proto.EntryId{ Term: wal.InvalidTerm, Offset: wal.InvalidOffset, }
View Source
var WrapperUpdateOperationCallback kv.UpdateOperationCallback = &wrapperUpdateCallback{}
Functions ¶
func SessionKey ¶
Types ¶
type Client ¶
type Client interface {
Send(*proto.ShardAssignments) error
Context() context.Context
}
type Config ¶
type Config struct {
PublicServiceAddr string
InternalServiceAddr string
PeerTLS *tls.Config
ServerTLS *tls.Config
InternalServerTLS *tls.Config
MetricsServiceAddr string
AuthOptions auth.Options
DataDir string
WalDir string
WalRetentionTime time.Duration
WalSyncData bool
NotificationsRetentionTime time.Duration
DbBlockCacheMB int64
}
type CursorAcker ¶
type CursorAcker interface {
Ack(offset int64)
}
type FollowerController ¶
type FollowerController interface {
io.Closer
// NewTerm
//
// Node handles a new term request
//
// A node receives a new term request, fences itself and responds
// with its head offset.
//
// When a node is fenced it cannot:
// - accept any writes from a client.
// - accept append from a leader.
// - send any entries to followers if it was a leader.
//
// Any existing follow cursors are destroyed as is any state
// regarding reconfigurations.
NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error)
// Truncate
//
// A node that receives a truncate request knows that it
// has been selected as a follower. It truncates its log
// to the indicates entry id, updates its term and changes
// to a Follower.
Truncate(req *proto.TruncateRequest) (*proto.TruncateResponse, error)
Replicate(stream proto.OxiaLogReplication_ReplicateServer) error
SendSnapshot(stream proto.OxiaLogReplication_SendSnapshotServer) error
GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
Term() int64
CommitOffset() int64
Status() proto.ServingStatus
}
FollowerController handles all the operations of a given shard's follower.
type FollowerCursor ¶
type FollowerCursor interface {
io.Closer
ShardId() int64
// LastPushed
// The last entry that was sent to this follower
LastPushed() int64
// AckOffset The highest entry already acknowledged by this follower
AckOffset() int64
}
FollowerCursor The FollowerCursor represents a cursor on the leader WAL that sends entries to a specific follower and receives a stream of acknowledgments from that follower.
func NewFollowerCursor ¶
func NewFollowerCursor( follower string, term int64, namespace string, shardId int64, replicateStreamProvider ReplicateStreamProvider, ackTracker QuorumAckTracker, walObject wal.Wal, db kv.DB, ackOffset int64) (FollowerCursor, error)
type LeaderController ¶
type LeaderController interface {
io.Closer
Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
ListBlock(ctx context.Context, request *proto.ListRequest) ([]string, error)
WriteStream(stream proto.OxiaClient_WriteStreamServer) error
List(ctx context.Context, request *proto.ListRequest, cb callback.StreamCallback[string])
Read(ctx context.Context, request *proto.ReadRequest, cb callback.StreamCallback[*proto.GetResponse])
RangeScan(ctx context.Context, request *proto.RangeScanRequest, cb callback.StreamCallback[*proto.GetResponse])
// NewTerm Handle new term requests
NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error)
// BecomeLeader Handles BecomeLeaderRequest from coordinator and prepares to be leader for the shard
BecomeLeader(ctx context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
AddFollower(request *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error
GetStatus(request *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
DeleteShard(request *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
// Term The current term of the leader
Term() int64
// Status The Status of the leader
Status() proto.ServingStatus
CreateSession(*proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
KeepAlive(sessionId int64) error
CloseSession(*proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
}
func NewLeaderController ¶
func NewLeaderController(config Config, namespace string, shardId int64, rpcClient ReplicationRpcProvider, walFactory wal.Factory, kvFactory kv.Factory) (LeaderController, error)
type MessageWithTerm ¶
type MessageWithTerm interface {
GetTerm() int64
}
type QuorumAckTracker ¶
type QuorumAckTracker interface {
io.Closer
CommitOffset() int64
WaitForCommitOffset(ctx context.Context, offset int64) error
// WaitForCommitOffsetAsync
// Asynchronously waits for the specific entry, identified by its offset, to be fully committed.
// Once the commit is confirmed, the provided callback function (cb) is invoked.
//
// Parameters:
// - ctx: The context used for managing cancellation and deadlines for the operation.
// - offset: The unique identifier (offset) of the entry to wait for.
// - cb: The callback function to invoke after the commit is confirmed. The callback
// will receive the result or error from the operation.
//
// Returns:
// - This method does not return anything immediately. The callback will handle
// the result or error asynchronously.
//
// Note:
// This method returns immediately and does not block the caller, allowing other
// operations to continue while waiting for the commit.
WaitForCommitOffsetAsync(ctx context.Context, offset int64, cb callback.Callback[any]) // NextOffset returns the offset for the next entry to write
// Note this can go ahead of the head-offset as there can be multiple operations in flight.
NextOffset() int64
HeadOffset() int64
AdvanceHeadOffset(headOffset int64)
// WaitForHeadOffset
// Waits until the specified entry is written on the wal
WaitForHeadOffset(ctx context.Context, offset int64) error
// NewCursorAcker creates a tracker for a new cursor
// The `ackOffset` is the previous last-acked position for the cursor
NewCursorAcker(ackOffset int64) (CursorAcker, error)
}
QuorumAckTracker The QuorumAckTracker is responsible for keeping track of the head offset and commit offset of a shard
- Head offset: the last entry written in the local WAL of the leader
- Commit offset: the oldest entry that is considered "fully committed", as it has received the requested amount of acks from the followers
The quorum ack tracker is also used to block until the head offset or commit offset are advanced.
func NewQuorumAckTracker ¶
func NewQuorumAckTracker(replicationFactor uint32, headOffset int64, commitOffset int64) QuorumAckTracker
type ReplicateStreamProvider ¶
type ReplicateStreamProvider interface {
GetReplicateStream(ctx context.Context, follower string, namespace string, shard int64, term int64) (proto.OxiaLogReplication_ReplicateClient, error)
SendSnapshot(ctx context.Context, follower string, namespace string, shard int64, term int64) (proto.OxiaLogReplication_SendSnapshotClient, error)
}
ReplicateStreamProvider This is a provider for the ReplicateStream Grpc handler It's used to allow passing in a mocked version of the Grpc service.
type ReplicationRpcProvider ¶
type ReplicationRpcProvider interface {
io.Closer
ReplicateStreamProvider
Truncate(follower string, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
}
func NewReplicationRpcProvider ¶
func NewReplicationRpcProvider(tlsConf *tls.Config) ReplicationRpcProvider
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewWithGrpcProvider ¶
func NewWithGrpcProvider(config Config, provider container.GrpcProvider, replicationRpcProvider ReplicationRpcProvider) (*Server, error)
func (Server) AddFollower ¶
func (s Server) AddFollower(c context.Context, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
func (Server) BecomeLeader ¶
func (s Server) BecomeLeader(c context.Context, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
func (Server) CloseSession ¶
func (s Server) CloseSession(ctx context.Context, req *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
func (Server) CreateSession ¶
func (s Server) CreateSession(ctx context.Context, req *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
func (Server) DeleteShard ¶
func (s Server) DeleteShard(_ context.Context, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
func (Server) GetNotifications ¶
func (s Server) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error
func (Server) GetShardAssignments ¶
func (s Server) GetShardAssignments(req *proto.ShardAssignmentsRequest, srv proto.OxiaClient_GetShardAssignmentsServer) error
func (Server) GetStatus ¶
func (s Server) GetStatus(_ context.Context, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
func (*Server) InternalPort ¶
func (Server) KeepAlive ¶
func (s Server) KeepAlive(ctx context.Context, req *proto.SessionHeartbeat) (*proto.KeepAliveResponse, error)
func (Server) List ¶
func (s Server) List(request *proto.ListRequest, stream proto.OxiaClient_ListServer) error
func (Server) NewTerm ¶
func (s Server) NewTerm(c context.Context, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
func (*Server) PublicPort ¶
func (Server) PushShardAssignments ¶
func (s Server) PushShardAssignments(srv proto.OxiaCoordination_PushShardAssignmentsServer) error
func (Server) RangeScan ¶ added in v0.5.0
func (s Server) RangeScan(request *proto.RangeScanRequest, stream proto.OxiaClient_RangeScanServer) error
func (Server) Read ¶
func (s Server) Read(request *proto.ReadRequest, stream proto.OxiaClient_ReadServer) error
func (Server) Replicate ¶
func (s Server) Replicate(srv proto.OxiaLogReplication_ReplicateServer) error
func (Server) SendSnapshot ¶
func (s Server) SendSnapshot(srv proto.OxiaLogReplication_SendSnapshotServer) error
func (Server) Truncate ¶
func (s Server) Truncate(c context.Context, req *proto.TruncateRequest) (*proto.TruncateResponse, error)
func (Server) Write ¶
func (s Server) Write(ctx context.Context, write *proto.WriteRequest) (*proto.WriteResponse, error)
func (Server) WriteStream ¶ added in v0.7.0
func (s Server) WriteStream(stream proto.OxiaClient_WriteStreamServer) error
type SessionManager ¶
type SessionManager interface {
io.Closer
CreateSession(request *proto.CreateSessionRequest) (*proto.CreateSessionResponse, error)
KeepAlive(sessionId int64) error
CloseSession(request *proto.CloseSessionRequest) (*proto.CloseSessionResponse, error)
Initialize() error
}
func NewSessionManager ¶
func NewSessionManager(ctx context.Context, namespace string, shardId int64, controller *leaderController) SessionManager
type ShardAssignmentsDispatcher ¶
type ShardAssignmentsDispatcher interface {
io.Closer
Initialized() bool
PushShardAssignments(stream proto.OxiaCoordination_PushShardAssignmentsServer) error
RegisterForUpdates(req *proto.ShardAssignmentsRequest, client Client) error
}
func NewShardAssignmentDispatcher ¶
func NewShardAssignmentDispatcher(healthServer *health.Server) ShardAssignmentsDispatcher
func NewStandaloneShardAssignmentDispatcher ¶
func NewStandaloneShardAssignmentDispatcher(numShards uint32) ShardAssignmentsDispatcher
type ShardsDirector ¶
type ShardsDirector interface {
io.Closer
GetLeader(shardId int64) (LeaderController, error)
GetFollower(shardId int64) (FollowerController, error)
GetOrCreateLeader(namespace string, shardId int64) (LeaderController, error)
GetOrCreateFollower(namespace string, shardId int64, term int64) (FollowerController, error)
DeleteShard(req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
}
func NewShardsDirector ¶
func NewShardsDirector(config Config, walFactory wal.Factory, kvFactory kv.Factory, provider ReplicationRpcProvider) ShardsDirector
type Standalone ¶
type Standalone struct {
// contains filtered or unexported fields
}
func NewStandalone ¶
func NewStandalone(config StandaloneConfig) (*Standalone, error)
func (*Standalone) Close ¶
func (s *Standalone) Close() error
func (*Standalone) RpcPort ¶
func (s *Standalone) RpcPort() int
type StandaloneConfig ¶
func NewTestConfig ¶
func NewTestConfig(dir string) StandaloneConfig
Source Files
¶
Click to show internal directories.
Click to hide internal directories.