Documentation
¶
Index ¶
- Variables
- func GetNamespaceConfig(namespaces []model.NamespaceConfig, namespace string) *model.NamespaceConfig
- func NewK8SClientConfig() *rest.Config
- func NewK8SClientset(config *rest.Config) kubernetes.Interface
- func SimpleEnsembleSupplier(candidates []model.Server, nc *model.NamespaceConfig, cs *model.ClusterStatus) []model.Server
- type Client
- type Coordinator
- type MetadataContainer
- type MetadataProvider
- type NodeAvailabilityListener
- type NodeController
- type NodeStatus
- type ResourceInterface
- type RpcProvider
- type ServerContext
- type ShardAssignmentsProvider
- type ShardController
- type SwapNodeAction
- type Version
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrMetadataNotInitialized = errors.New("metadata not initialized") ErrMetadataBadVersion = errors.New("metadata bad version") )
View Source
var (
ErrNamespaceNotFound = errors.New("namespace not found")
)
Functions ¶
func GetNamespaceConfig ¶ added in v0.10.0
func GetNamespaceConfig(namespaces []model.NamespaceConfig, namespace string) *model.NamespaceConfig
func NewK8SClientConfig ¶
func NewK8SClientset ¶
func NewK8SClientset(config *rest.Config) kubernetes.Interface
func SimpleEnsembleSupplier ¶ added in v0.12.0
func SimpleEnsembleSupplier(candidates []model.Server, nc *model.NamespaceConfig, cs *model.ClusterStatus) []model.Server
Types ¶
type Client ¶
type Client[Resource resource] interface {
Upsert(namespace, name string, resource *Resource) (*Resource, error)
Delete(namespace, name string) error
Get(namespace, name string) (*Resource, error)
}
func K8SConfigMaps ¶
func K8SConfigMaps(kc kubernetes.Interface) Client[corev1.ConfigMap]
type Coordinator ¶
type Coordinator interface {
io.Closer
ShardAssignmentsProvider
InitiateLeaderElection(namespace string, shard int64, metadata model.ShardMetadata) error
ElectedLeader(namespace string, shard int64, metadata model.ShardMetadata) error
ShardDeleted(namespace string, shard int64) error
NodeAvailabilityListener
ClusterStatus() model.ClusterStatus
// FindServerByIdentifier searches for a server in the cluster by its identifier and returns it if found.
FindServerByIdentifier(identifier string) (*model.Server, bool)
}
func NewCoordinator ¶
func NewCoordinator(metadataProvider MetadataProvider, clusterConfigProvider func() (model.ClusterConfig, error), clusterConfigNotificationsCh chan any, rpc RpcProvider) (Coordinator, error)
type MetadataContainer ¶
type MetadataContainer struct {
ClusterStatus *model.ClusterStatus `json:"clusterStatus"`
Version Version `json:"version"`
}
type MetadataProvider ¶
type MetadataProvider interface {
io.Closer
Get() (cs *model.ClusterStatus, version Version, err error)
Store(cs *model.ClusterStatus, expectedVersion Version) (newVersion Version, err error)
}
func NewMetadataProviderConfigMap ¶
func NewMetadataProviderConfigMap(kc k8s.Interface, namespace, name string) MetadataProvider
func NewMetadataProviderFile ¶
func NewMetadataProviderFile(path string) MetadataProvider
func NewMetadataProviderMemory ¶
func NewMetadataProviderMemory() MetadataProvider
type NodeAvailabilityListener ¶
type NodeAvailabilityListener interface {
}
type NodeController ¶
type NodeController interface {
io.Closer
Status() NodeStatus
SetStatus(status NodeStatus)
}
The NodeController takes care of checking the health-status of each node and to push all the service discovery updates.
func NewNodeController ¶
func NewNodeController(addr model.Server, shardAssignmentsProvider ShardAssignmentsProvider, nodeAvailabilityListener NodeAvailabilityListener, rpc RpcProvider) NodeController
type ResourceInterface ¶
type ResourceInterface[Resource resource] interface {
Create(ctx context.Context, resource *Resource, opts metav1.CreateOptions) (*Resource, error)
Update(ctx context.Context, resource *Resource, opts metav1.UpdateOptions) (*Resource, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*Resource, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions,
subresources ...string) (*Resource, error)
}
type RpcProvider ¶
type RpcProvider interface {
PushShardAssignments(ctx context.Context, node model.Server) (proto.OxiaCoordination_PushShardAssignmentsClient, error)
NewTerm(ctx context.Context, node model.Server, req *proto.NewTermRequest) (*proto.NewTermResponse, error)
BecomeLeader(ctx context.Context, node model.Server, req *proto.BecomeLeaderRequest) (*proto.BecomeLeaderResponse, error)
AddFollower(ctx context.Context, node model.Server, req *proto.AddFollowerRequest) (*proto.AddFollowerResponse, error)
GetStatus(ctx context.Context, node model.Server, req *proto.GetStatusRequest) (*proto.GetStatusResponse, error)
DeleteShard(ctx context.Context, node model.Server, req *proto.DeleteShardRequest) (*proto.DeleteShardResponse, error)
GetHealthClient(node model.Server) (grpc_health_v1.HealthClient, io.Closer, error)
ClearPooledConnections(node model.Server)
}
func NewRpcProvider ¶
func NewRpcProvider(pool rpc.ClientPool) RpcProvider
type ServerContext ¶ added in v0.11.4
type ServerContext struct {
Server model.Server
Shards collection.Set[int64]
}
type ShardAssignmentsProvider ¶
type ShardAssignmentsProvider interface {
WaitForNextUpdate(ctx context.Context, currentValue *proto.ShardAssignments) (*proto.ShardAssignments, error)
}
type ShardController ¶
type ShardController interface {
io.Closer
HandleNodeFailure(failedNode model.Server)
SyncServerAddress()
SwapNode(from model.Server, to model.Server) error
DeleteShard()
Term() int64
Leader() *model.Server
Status() model.ShardStatus
}
The ShardController is responsible to handle all the state transition for a given a shard e.g. electing a new leader.
func NewShardController ¶
func NewShardController(namespace string, shard int64, namespaceConfig *model.NamespaceConfig, shardMetadata model.ShardMetadata, rpc RpcProvider, coordinator Coordinator) ShardController
Click to show internal directories.
Click to hide internal directories.