Documentation
¶
Index ¶
- Variables
- func NewK8SClientConfig() *rest.Config
- func NewK8SClientset(config *rest.Config) kubernetes.Interface
- type Client
- type Coordinator
- type MetadataContainer
- type MetadataProvider
- type NodeAvailabilityListener
- type NodeController
- type NodeStatus
- type ResourceInterface
- type RpcProvider
- type ServerRank
- type ShardAssignmentsProvider
- type ShardController
- type SwapNodeAction
- type Version
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrorMetadataNotInitialized = errors.New("metadata not initialized") ErrorMetadataBadVersion = errors.New("metadata bad version") )
View Source
var (
ErrorNamespaceNotFound = errors.New("namespace not found")
)
Functions ¶
func NewK8SClientConfig ¶
func NewK8SClientset ¶
func NewK8SClientset(config *rest.Config) kubernetes.Interface
Types ¶
type Client ¶
type Client[Resource resource] interface {
Upsert(namespace string, resource *Resource) (*Resource, error)
Delete(namespace, name string) error
Get(namespace, name string) (*Resource, error)
}
func K8SConfigMaps ¶
func K8SConfigMaps(kubernetes kubernetes.Interface) Client[coreV1.ConfigMap]
type Coordinator ¶
type Coordinator interface {
io.Closer
ShardAssignmentsProvider
InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error
ElectedLeader(namespace string, shard int64, metadata model.ShardMetadata) error
ShardDeleted(namespace string, shard int64) error
NodeAvailabilityListener
ClusterStatus() model.ClusterStatus
}
func NewCoordinator ¶
func NewCoordinator(metadataProvider MetadataProvider, clusterConfigProvider func() (model.ClusterConfig, error), clusterConfigRefreshTime time.Duration, rpc RpcProvider) (Coordinator, error)
type MetadataContainer ¶
type MetadataContainer struct {
ClusterStatus *model.ClusterStatus `json:"clusterStatus"`
Version Version `json:"version"`
}
type MetadataProvider ¶
type MetadataProvider interface {
io.Closer
Get() (cs *model.ClusterStatus, version Version, err error)
Store(cs *model.ClusterStatus, expectedVersion Version) (newVersion Version, err error)
}
func NewMetadataProviderConfigMap ¶
func NewMetadataProviderConfigMap(k8s k8s.Interface, namespace, name string) MetadataProvider
func NewMetadataProviderFile ¶
func NewMetadataProviderFile(path string) MetadataProvider
func NewMetadataProviderMemory ¶
func NewMetadataProviderMemory() MetadataProvider
type NodeAvailabilityListener ¶
type NodeAvailabilityListener interface {
}
type NodeController ¶
type NodeController interface {
io.Closer
Status() NodeStatus
}
The NodeController takes care of checking the health-status of each node and to push all the service discovery updates
func NewNodeController ¶
func NewNodeController(addr model.ServerAddress, shardAssignmentsProvider ShardAssignmentsProvider, nodeAvailabilityListener NodeAvailabilityListener, rpc RpcProvider) NodeController
type ResourceInterface ¶
type ResourceInterface[Resource resource] interface {
Create(ctx context.Context, Resource *Resource, opts metaV1.CreateOptions) (*Resource, error)
Update(ctx context.Context, Resource *Resource, opts metaV1.UpdateOptions) (*Resource, error)
Delete(ctx context.Context, name string, opts metaV1.DeleteOptions) error
Get(ctx context.Context, name string, opts metaV1.GetOptions) (*Resource, error)
}
type RpcProvider ¶
type RpcProvider interface {
PushShardAssignments(ctx context.Context, node model.ServerAddress) (proto.OxiaCoordination_PushShardAssignmentsClient, error)
NewTerm(ctx context.Context, node model.ServerAddress, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
BecomeLeader(ctx context.Context, node model.ServerAddress, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
AddFollower(ctx context.Context, node model.ServerAddress, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
GetStatus(ctx context.Context, node model.ServerAddress, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
DeleteShard(ctx context.Context, node model.ServerAddress, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
GetHealthClient(node model.ServerAddress) (grpc_health_v1.HealthClient, error)
}
func NewRpcProvider ¶
func NewRpcProvider(pool common.ClientPool) RpcProvider
type ServerRank ¶
type ServerRank struct {
Addr model.ServerAddress
Shards common.Set[int64]
}
type ShardAssignmentsProvider ¶
type ShardAssignmentsProvider interface {
WaitForNextUpdate(ctx context.Context, currentValue *proto.ShardAssignments) (*proto.ShardAssignments, error)
}
type ShardController ¶
type ShardController interface {
io.Closer
HandleNodeFailure(failedNode model.ServerAddress)
SwapNode(from model.ServerAddress, to model.ServerAddress) error
DeleteShard()
Term() int64
Leader() *model.ServerAddress
Status() model.ShardStatus
}
The ShardController is responsible to handle all the state transition for a given a shard e.g. electing a new leader
func NewShardController ¶
func NewShardController(namespace string, shard int64, shardMetadata model.ShardMetadata, rpc RpcProvider, coordinator Coordinator) ShardController
type SwapNodeAction ¶
type SwapNodeAction struct {
Shard int64
From model.ServerAddress
To model.ServerAddress
}
Click to show internal directories.
Click to hide internal directories.