procedure

package
v1.0.0-alpha02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 30, 2022 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StateInit      = "init"
	StateRunning   = "running"
	StateFinished  = "finished"
	StateFailed    = "failed"
	StateCancelled = "cancelled"
)
View Source
const (
	Version              = "v1"
	PathProcedure        = "procedure"
	PathDeletedProcedure = "deletedProcedure"
)

Variables

View Source
var (
	ErrShardLeaderNotFound   = coderr.NewCodeError(coderr.Internal, "shard leader not found")
	ErrProcedureNotFound     = coderr.NewCodeError(coderr.Internal, "procedure not found")
	ErrClusterConfigChanged  = coderr.NewCodeError(coderr.Internal, "cluster config changed")
	ErrTableAlreadyExists    = coderr.NewCodeError(coderr.Internal, "table already exists")
	ErrProcedureTypeNotMatch = coderr.NewCodeError(coderr.Internal, "procedure type not match")
	ErrDecodeRawData         = coderr.NewCodeError(coderr.Internal, "decode raw data")
	ErrEncodeRawData         = coderr.NewCodeError(coderr.Internal, "encode raw data")
	ErrGetRequest            = coderr.NewCodeError(coderr.Internal, "get request from event")
	ErrNodeNumberNotEnough   = coderr.NewCodeError(coderr.Internal, "node number not enough")
	ErrEmptyPartitionNames   = coderr.NewCodeError(coderr.Internal, "partition names is empty")
	ErrShardNumberNotEnough  = coderr.NewCodeError(coderr.Internal, "shard number not enough")
)

Functions

func IsContains added in v1.0.0

func IsContains(slice []string, target string) bool

func IsSubSlice added in v1.0.0

func IsSubSlice(subSlice []string, slice []string) bool

func Max added in v1.0.0

func Max(x, y int) int

Types

type CreateNormalTableProcedure

type CreateNormalTableProcedure struct {
	// contains filtered or unexported fields
}

func (*CreateNormalTableProcedure) Cancel

func (*CreateNormalTableProcedure) ID

func (*CreateNormalTableProcedure) Start

func (*CreateNormalTableProcedure) State

func (p *CreateNormalTableProcedure) State() State

func (*CreateNormalTableProcedure) Typ

type CreatePartitionTableCallbackRequest

type CreatePartitionTableCallbackRequest struct {
	// contains filtered or unexported fields
}

type CreatePartitionTableProcedure

type CreatePartitionTableProcedure struct {
	// contains filtered or unexported fields
}

func (*CreatePartitionTableProcedure) Cancel

func (*CreatePartitionTableProcedure) ID

func (*CreatePartitionTableProcedure) Start

func (*CreatePartitionTableProcedure) State

func (*CreatePartitionTableProcedure) Typ

type CreatePartitionTableProcedureRequest

type CreatePartitionTableProcedureRequest struct {
	// contains filtered or unexported fields
}

type CreatePartitionTableRawData

type CreatePartitionTableRawData struct {
	ID       uint64
	FsmState string
	State    State

	CreateTableResult    cluster.CreateTableResult
	PartitionTableShards []cluster.ShardNodeWithVersion
	DataTablesShards     []cluster.ShardNodeWithVersion
}

type CreatePartitionTableRequest

type CreatePartitionTableRequest struct {
	ClusterName string
	SourceReq   *metaservicepb.CreateTableRequest

	PartitionTableRatioOfNodes float32

	OnSucceeded func(cluster.CreateTableResult) error
	OnFailed    func(error) error
}

type CreateTableProcedure

type CreateTableProcedure struct {
	// contains filtered or unexported fields
}

CreateTableProcedure is a proxy procedure, it determines the actual procedure created according to the request type.

func (CreateTableProcedure) Cancel

func (p CreateTableProcedure) Cancel(ctx context.Context) error

func (CreateTableProcedure) ID

func (CreateTableProcedure) Start

func (CreateTableProcedure) State

func (p CreateTableProcedure) State() State

func (CreateTableProcedure) Typ

func (p CreateTableProcedure) Typ() Typ

type CreateTableRequest

type CreateTableRequest struct {
	Cluster   *cluster.Cluster
	SourceReq *metaservicepb.CreateTableRequest

	OnSucceeded func(cluster.CreateTableResult) error
	OnFailed    func(error) error
}

type DropTableProcedure

type DropTableProcedure struct {
	// contains filtered or unexported fields
}

func (*DropTableProcedure) Cancel

func (p *DropTableProcedure) Cancel(_ context.Context) error

func (*DropTableProcedure) ID

func (p *DropTableProcedure) ID() uint64

func (*DropTableProcedure) Start

func (p *DropTableProcedure) Start(ctx context.Context) error

func (*DropTableProcedure) State

func (p *DropTableProcedure) State() State

func (*DropTableProcedure) Typ

func (p *DropTableProcedure) Typ() Typ

type DropTableRequest

type DropTableRequest struct {
	Cluster   *cluster.Cluster
	SourceReq *metaservicepb.DropTableRequest

	OnSucceeded func(cluster.TableInfo) error
	OnFailed    func(error) error
}

type EtcdStorageImpl

type EtcdStorageImpl struct {
	// contains filtered or unexported fields
}

func (EtcdStorageImpl) CreateOrUpdate

func (e EtcdStorageImpl) CreateOrUpdate(ctx context.Context, meta Meta) error

CreateOrUpdate example: /{rootPath}/v1/procedure/{procedureID} -> {procedureType} + {procedureState} + {data}

func (EtcdStorageImpl) List

func (e EtcdStorageImpl) List(ctx context.Context, batchSize int) ([]*Meta, error)

func (EtcdStorageImpl) MarkDeleted

func (e EtcdStorageImpl) MarkDeleted(ctx context.Context, id uint64) error

MarkDeleted Do a soft deletion, and the deleted key's format is: /{rootPath}/v1/historyProcedure/{clusterID}/{procedureID}

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

func NewFactory

func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage, manager cluster.Manager, partitionTableProportionOfNodes float32) *Factory

func (*Factory) CreateDropTableProcedure

func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTableRequest) (Procedure, error)

func (*Factory) CreateScatterProcedure

func (f *Factory) CreateScatterProcedure(ctx context.Context, request ScatterRequest) (Procedure, error)

func (*Factory) CreateSplitProcedure

func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest) (Procedure, error)

func (*Factory) CreateTransferLeaderProcedure

func (f *Factory) CreateTransferLeaderProcedure(ctx context.Context, request TransferLeaderRequest) (Procedure, error)

func (*Factory) MakeCreateTableProcedure

func (f *Factory) MakeCreateTableProcedure(ctx context.Context, request CreateTableRequest) (Procedure, error)

type Info

type Info struct {
	ID    uint64
	Typ   Typ
	State State
}

Info is used to provide immutable description procedure information.

type Manager

type Manager interface {
	// Start must be called before manager is used.
	Start(ctx context.Context) error
	// Stop must be called before manager is dropped.
	Stop(ctx context.Context) error

	// Submit procedure to be executed asynchronously.
	// TODO: change result type, add channel to get whether the procedure executed successfully
	Submit(ctx context.Context, procedure Procedure) error
	// Cancel procedure that has been submitted.
	Cancel(ctx context.Context, procedureID uint64) error
	ListRunningProcedure(ctx context.Context) ([]*Info, error)
}

func NewManagerImpl

func NewManagerImpl(storage Storage) (Manager, error)

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

func (*ManagerImpl) Cancel

func (m *ManagerImpl) Cancel(ctx context.Context, procedureID uint64) error

func (*ManagerImpl) ListRunningProcedure

func (m *ManagerImpl) ListRunningProcedure(_ context.Context) ([]*Info, error)

func (*ManagerImpl) Start

func (m *ManagerImpl) Start(ctx context.Context) error

func (*ManagerImpl) Stop

func (m *ManagerImpl) Stop(ctx context.Context) error

func (*ManagerImpl) Submit

func (m *ManagerImpl) Submit(_ context.Context, procedure Procedure) error

type Meta

type Meta struct {
	ID      uint64
	Typ     Typ
	State   State
	RawData []byte
}

type Procedure

type Procedure interface {
	// ID of the procedure.
	ID() uint64

	// Typ of the procedure.
	Typ() Typ

	// Start the procedure.
	Start(ctx context.Context) error

	// Cancel the procedure.
	Cancel(ctx context.Context) error

	// State of the procedure. Retrieve the state of this procedure.
	State() State
}

Procedure is used to describe how to execute a set of operations from the scheduler, e.g. SwitchLeaderProcedure, MergeShardProcedure.

func NewCreateNormalTableProcedure

func NewCreateNormalTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.CreateTableRequest, onSucceeded func(cluster.CreateTableResult) error, onFailed func(error) error) Procedure

func NewCreateTableProcedure

func NewCreateTableProcedure(ctx context.Context, factory *Factory, c *cluster.Cluster, sourceReq *metaservicepb.CreateTableRequest, onSucceeded func(cluster.CreateTableResult) error, onFailed func(error) error) (Procedure, error)

func NewDropTableProcedure

func NewDropTableProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, req *metaservicepb.DropTableRequest, onSucceeded func(cluster.TableInfo) error, onFailed func(error) error) Procedure

func NewScatterProcedure

func NewScatterProcedure(dispatch eventdispatch.Dispatch, cluster *cluster.Cluster, id uint64, shardIDs []storage.ShardID) Procedure

func NewTransferLeaderProcedure

func NewTransferLeaderProcedure(dispatch eventdispatch.Dispatch, c *cluster.Cluster, s Storage, shardID storage.ShardID, oldLeaderNodeName string, newLeaderNodeName string, id uint64) (Procedure, error)

type RandomShardPicker

type RandomShardPicker struct {
	// contains filtered or unexported fields
}

RandomShardPicker randomly pick up shards that are not on the same node in the current cluster.

func (*RandomShardPicker) PickShards

func (p *RandomShardPicker) PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error)

PickShards will pick a specified number of shards as expectShardNum.

type ScatterCallbackRequest

type ScatterCallbackRequest struct {
	// contains filtered or unexported fields
}

ScatterCallbackRequest is fsm callbacks param.

type ScatterProcedure

type ScatterProcedure struct {
	// contains filtered or unexported fields
}

func (*ScatterProcedure) Cancel

func (p *ScatterProcedure) Cancel(_ context.Context) error

func (*ScatterProcedure) ID

func (p *ScatterProcedure) ID() uint64

func (*ScatterProcedure) Start

func (p *ScatterProcedure) Start(ctx context.Context) error

func (*ScatterProcedure) State

func (p *ScatterProcedure) State() State

func (*ScatterProcedure) Typ

func (p *ScatterProcedure) Typ() Typ

type ScatterRequest

type ScatterRequest struct {
	Cluster  *cluster.Cluster
	ShardIDs []storage.ShardID
}

type ShardPicker

type ShardPicker interface {
	PickShards(ctx context.Context, clusterName string, expectShardNum int, enableDuplicateNode bool) ([]cluster.ShardNodeWithVersion, error)
}

ShardPicker is used to pick up the shards suitable for scheduling in the cluster. If expectShardNum bigger than cluster node number, the result depends on enableDuplicateNode: If enableDuplicateNode is false, pick shards will be failed and return error. If enableDuplicateNode is true, pick shard will return shards on the same node. TODO: Consider refactor this interface, abstracts the parameters of PickShards as PickStrategy.

func NewRandomShardPicker

func NewRandomShardPicker(manager cluster.Manager) ShardPicker

type SplitProcedure

type SplitProcedure struct {
	// contains filtered or unexported fields
}

SplitProcedure fsm: Update ShardTable Metadata -> OpenNewShard -> CloseTable

func NewSplitProcedure

func NewSplitProcedure(id uint64, dispatch eventdispatch.Dispatch, storage Storage, c *cluster.Cluster, schemaName string, shardID storage.ShardID, newShardID storage.ShardID, tableNames []string, targetNodeName string) *SplitProcedure

func (*SplitProcedure) Cancel

func (p *SplitProcedure) Cancel(_ context.Context) error

func (*SplitProcedure) ID

func (p *SplitProcedure) ID() uint64

func (*SplitProcedure) Start

func (p *SplitProcedure) Start(ctx context.Context) error

func (*SplitProcedure) State

func (p *SplitProcedure) State() State

func (*SplitProcedure) Typ

func (p *SplitProcedure) Typ() Typ

type SplitProcedurePersistRawData

type SplitProcedurePersistRawData struct {
	SchemaName     string
	TableNames     []string
	ShardID        uint32
	NewShardID     uint32
	TargetNodeName string
}

type SplitRequest

type SplitRequest struct {
	ClusterName    string
	SchemaName     string
	TableNames     []string
	ShardID        storage.ShardID
	NewShardID     storage.ShardID
	TargetNodeName string
	ClusterVersion uint64
}

type State

type State string

type Storage

type Storage interface {
	Write
	List(ctx context.Context, batchSize int) ([]*Meta, error)
	MarkDeleted(ctx context.Context, id uint64) error
}

func NewEtcdStorageImpl

func NewEtcdStorageImpl(client *clientv3.Client, rootPath string) Storage

type TransferLeaderCallbackRequest

type TransferLeaderCallbackRequest struct {
	// contains filtered or unexported fields
}

TransferLeaderCallbackRequest is fsm callbacks param.

type TransferLeaderProcedure

type TransferLeaderProcedure struct {
	// contains filtered or unexported fields
}

func (*TransferLeaderProcedure) Cancel

func (*TransferLeaderProcedure) ID

func (*TransferLeaderProcedure) Start

func (*TransferLeaderProcedure) State

func (p *TransferLeaderProcedure) State() State

func (*TransferLeaderProcedure) Typ

func (p *TransferLeaderProcedure) Typ() Typ

type TransferLeaderProcedurePersistRawData

type TransferLeaderProcedurePersistRawData struct {
	ID       uint64
	FsmState string
	State    State

	ShardID           storage.ShardID
	OldLeaderNodeName string
	NewLeaderNodeName string
}

TransferLeaderProcedurePersistRawData used for storage, procedure will be converted to persist raw data before saved in storage.

type TransferLeaderRequest

type TransferLeaderRequest struct {
	ClusterName       string
	ShardID           storage.ShardID
	OldLeaderNodeName string
	NewLeaderNodeName string
	ClusterVersion    uint64
}

type Typ

type Typ uint
const (
	Create Typ = iota
	Delete
	TransferLeader
	Migrate
	Split
	Merge
	Scatter
	CreateTable
	DropTable
	CreatePartitionTable
)

type Write

type Write interface {
	CreateOrUpdate(ctx context.Context, meta Meta) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL