Documentation
¶
Overview ¶
Copyright 2020 MatrixOrigin.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func NewResourceAdapterWithShard(meta Shard) metadata.Resource
- type Epoch
- type FailureCallback
- type LogReader
- func (lr *LogReader) Append(entries []pb.Entry) error
- func (lr *LogReader) ApplySnapshot(snapshot pb.Snapshot) error
- func (lr *LogReader) Compact(index uint64) error
- func (lr *LogReader) CreateSnapshot(snapshot pb.Snapshot) error
- func (lr *LogReader) Entries(low uint64, high uint64, maxSize uint64) ([]pb.Entry, error)
- func (lr *LogReader) FirstIndex() (uint64, error)
- func (lr *LogReader) GetSnapshotRequested() bool
- func (lr *LogReader) InitialState() (pb.HardState, pb.ConfState, error)
- func (lr *LogReader) LastIndex() (uint64, error)
- func (lr *LogReader) SetConfState(cs pb.ConfState)
- func (lr *LogReader) SetRange(firstIndex uint64, length uint64)
- func (lr *LogReader) SetState(s pb.HardState)
- func (lr *LogReader) Snapshot() (pb.Snapshot, error)
- func (lr *LogReader) Term(index uint64) (uint64, error)
- type Replica
- type RetryController
- type Router
- type Shard
- type ShardsPool
- type ShardsProxy
- type Store
- type SuccessCallback
- type TestClusterOption
- func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
- func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
- func WithEnableTestParallel() TestClusterOption
- func WithTestClusterDataPath(path string) TestClusterOption
- func WithTestClusterDisableSchedule() TestClusterOption
- func WithTestClusterEnableAdvertiseAddr() TestClusterOption
- func WithTestClusterLogLevel(level zapcore.Level) TestClusterOption
- func WithTestClusterNodeCount(n int) TestClusterOption
- func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
- func WithTestClusterRecreate(value bool) TestClusterOption
- func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
- func WithTestClusterUseDisk() TestClusterOption
- type TestDataBuilder
- type TestKVClient
- type TestRaftCluster
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidConfigChangeRequest = errors.New("invalid config change request") ErrRemoveVoter = errors.New("removing voter") ErrRemoveLeader = errors.New("removing leader") ErrPendingConfigChange = errors.New("pending config change") ErrDuplicatedRequest = errors.New("duplicated config change request") ErrLearnerOnlyChange = errors.New("learner only change") )
var ( ErrNotLearnerReplica = errors.New("not learner") ErrReplicaNotFound = errors.New("replica not found") ErrReplicaDuplicated = errors.New("replica duplicated") )
var ( // SingleTestCluster single test raft cluster SingleTestCluster = WithTestClusterNodeCount(1) // DiskTestCluster using pebble storage test raft store DiskTestCluster = WithTestClusterUseDisk() // DisableScheduleTestCluster disable prophet schedulers test raft store DisableScheduleTestCluster = WithTestClusterDisableSchedule() // NewTestCluster clean data before test cluster start NewTestCluster = WithTestClusterRecreate(true) // NoCleanTestCluster using exists data before test cluster start OldTestCluster = WithTestClusterRecreate(false) )
var (
ErrRemoveShardKeyRange = errors.New("failed to delete shard key range")
)
var ( // ErrTimeout timeout error ErrTimeout = errors.New("exec timeout") )
var ( // ErrUnknownReplica indicates that the replica is unknown. ErrUnknownReplica = errors.New("unknown replica") )
Functions ¶
func NewResourceAdapterWithShard ¶
NewResourceAdapterWithShard create a prophet resource use shard
Types ¶
type Epoch ¶ added in v0.2.0
type Epoch = metapb.ResourceEpoch
type FailureCallback ¶ added in v0.2.0
FailureCallback request failure callback
type LogReader ¶ added in v0.2.0
LogReader is the struct used to manage logs that have already been persisted into LogDB. LogReader implements the raft.Storage interface.
func NewLogReader ¶ added in v0.2.0
NewLogReader creates and returns a new LogReader instance.
func (*LogReader) Append ¶ added in v0.2.0
Append marks the specified entries as persisted and make them available from logreader.
func (*LogReader) ApplySnapshot ¶ added in v0.2.0
ApplySnapshot applies the specified snapshot.
func (*LogReader) CreateSnapshot ¶ added in v0.2.0
CreateSnapshot keeps the metadata of the specified snapshot.
func (*LogReader) Entries ¶ added in v0.2.0
Entries returns persisted entries between [low, high) with a total limit of up to maxSize bytes.
func (*LogReader) FirstIndex ¶ added in v0.2.0
func (*LogReader) GetSnapshotRequested ¶ added in v0.2.0
GetSnapshotRequested returns a boolean value indicating whether creating a new snapshot has been requested.
func (*LogReader) InitialState ¶ added in v0.2.0
InitialState returns the saved HardState and ConfState information.
func (*LogReader) SetConfState ¶ added in v0.2.0
func (*LogReader) SetRange ¶ added in v0.2.0
SetRange updates the LogReader to reflect what is available in it.
type RetryController ¶ added in v0.2.0
type RetryController interface {
// Retry used to control retry if retryable error encountered. returns false means stop retry.
Retry(requestID []byte) (rpc.Request, bool)
}
RetryController retry controller
type Router ¶
type Router interface {
// Start the router
Start() error
// Stop stops the router
Stop()
// SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End).
// If returns leader address is "", means the current shard has no leader
SelectShard(group uint64, key []byte) (Shard, string)
// Every do with all shards
Every(group uint64, mustLeader bool, fn func(shard Shard, store meta.Store) bool)
// ForeachShards foreach shards
ForeachShards(group uint64, fn func(shard Shard) bool)
// GetShard returns the shard by shard id
GetShard(id uint64) Shard
// UpdateLeader update shard leader
UpdateLeader(shardID uint64, leaderReplciaID uint64)
// LeaderStore return leader replica store
LeaderReplicaStore(shardID uint64) meta.Store
// RandomReplicaStore return random replica store
RandomReplicaStore(shardID uint64) meta.Store
// GetShardStats returns the runtime stats info of the shard
GetShardStats(id uint64) metapb.ResourceStats
// GetStoreStats returns the runtime stats info of the store
GetStoreStats(id uint64) metapb.ContainerStats
}
Router route the request to the corresponding shard
type ShardsPool ¶
type ShardsPool interface {
// Alloc alloc a shard from shards pool, returns error if no idle shards left. The `purpose` is used to avoid
// duplicate allocation.
Alloc(group uint64, purpose []byte) (meta.AllocatedShard, error)
}
ShardsPool is a shards pool, it will always create shards until the number of available shards reaches the value specified by `capacity`, we called these `Idle Shards`.
The pool will create a Job in the prophet. Once a node became the prophet leader, shards pool job will start, and stop if the node became the follower, So the job can be executed on any node. It will use prophet client to create shard after the job starts.
type ShardsProxy ¶
type ShardsProxy interface {
Start() error
Stop() error
Dispatch(req rpc.Request) error
DispatchTo(req rpc.Request, shard Shard, store string) error
SetCallback(SuccessCallback, FailureCallback)
SetRetryController(retryController RetryController)
OnResponse(rpc.ResponseBatch)
Router() Router
}
ShardsProxy Shards proxy, distribute the appropriate request to the corresponding backend, retry the request for the error
type Store ¶
type Store interface {
// Start the raft store
Start()
// Stop the raft store
Stop()
// GetConfig returns the config of the store
GetConfig() *config.Config
// Meta returns store meta
Meta() meta.Store
// GetRouter returns a router
GetRouter() Router
// GetShardsProxy get shards proxy to dispatch requests
GetShardsProxy() ShardsProxy
// OnRequest receive a request, and call cb while the request is completed
OnRequest(rpc.Request) error
// OnRequestWithCB receive a request, and call cb while the request is completed
OnRequestWithCB(req rpc.Request, cb func(resp rpc.ResponseBatch)) error
// DataStorage returns a DataStorage of the shard group
DataStorageByGroup(uint64) storage.DataStorage
// MaybeLeader returns the shard replica maybe leader
MaybeLeader(uint64) bool
// AllocID returns a uint64 id, panic if has a error
MustAllocID() uint64
// Prophet return current prophet instance
Prophet() prophet.Prophet
// CreateResourcePool create resource pools, the resource pool will create shards,
// and try to maintain the number of shards in the pool not less than the `capacity`
// parameter. This is an idempotent operation.
CreateResourcePool(...metapb.ResourcePool) (ShardsPool, error)
// GetResourcePool returns `ShardsPool`, nil if `CreateResourcePool` not completed
GetResourcePool() ShardsPool
}
Store manage a set of raft group
type SuccessCallback ¶ added in v0.2.0
SuccessCallback request success callback
type TestClusterOption ¶
type TestClusterOption func(*testClusterOptions)
TestClusterOption is the option for create TestCluster
func WithAppendTestClusterAdjustConfigFunc ¶
func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
WithAppendTestClusterAdjustConfigFunc adjust config
func WithDataStorageOption ¶
func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
WithDataStorageOption set options to create data storage
func WithEnableTestParallel ¶ added in v0.2.0
func WithEnableTestParallel() TestClusterOption
WithEnableTestParallel enable parallel testing
func WithTestClusterDataPath ¶
func WithTestClusterDataPath(path string) TestClusterOption
WithTestClusterDataPath set data data storage directory
func WithTestClusterDisableSchedule ¶
func WithTestClusterDisableSchedule() TestClusterOption
WithTestClusterDisableSchedule disable pd schedule
func WithTestClusterEnableAdvertiseAddr ¶ added in v0.2.0
func WithTestClusterEnableAdvertiseAddr() TestClusterOption
WithTestClusterEnableAdvertiseAddr set data data storage directory
func WithTestClusterLogLevel ¶
func WithTestClusterLogLevel(level zapcore.Level) TestClusterOption
WithTestClusterLogLevel set raftstore log level
func WithTestClusterNodeCount ¶
func WithTestClusterNodeCount(n int) TestClusterOption
WithTestClusterNodeCount set node count of test cluster
func WithTestClusterNodeStartFunc ¶
func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
WithTestClusterNodeStartFunc custom node start func
func WithTestClusterRecreate ¶
func WithTestClusterRecreate(value bool) TestClusterOption
WithTestClusterRecreate if true, the test cluster will clean and recreate the data dir
func WithTestClusterStoreFactory ¶
func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
WithTestClusterStoreFactory custom create raftstore factory
func WithTestClusterUseDisk ¶
func WithTestClusterUseDisk() TestClusterOption
WithTestClusterUseDisk use disk storage for testing
type TestDataBuilder ¶ added in v0.2.0
type TestDataBuilder struct {
}
TestDataBuilder build test data
func NewTestDataBuilder ¶ added in v0.2.0
func NewTestDataBuilder() *TestDataBuilder
NewTestDataBuilder create and return TestDataBuilder
func (*TestDataBuilder) CreateShard ¶ added in v0.2.0
func (b *TestDataBuilder) CreateShard(id uint64, replicasFormater string) Shard
CreateShard create shard for testing. format: id: id range: [id, id+1) replicasFormat: Voter Format: pid/cid
Learner Format: pid/cid/[l|v], default v Initial Member: pid/cid/l/[t|f], default f Use ',' to split multi-replica. First is current replica
type TestKVClient ¶
type TestKVClient interface {
// Set set key-value to the backend kv storage
Set(key, value string, timeout time.Duration) error
// Get returns the value of the specific key from backend kv storage
Get(key string, timeout time.Duration) (string, error)
// UpdateLabel update the shard label
UpdateLabel(shard, group uint64, key, value string, timeout time.Duration) error
// Close close the test client
Close()
}
TestKVClient is a kv client that uses `TestRaftCluster` as Backend's KV storage engine
type TestRaftCluster ¶
type TestRaftCluster interface {
// EveryStore do fn at every store, it can be used to init some store register
EveryStore(fn func(i int, store Store))
// GetStore returns the node store
GetStore(node int) Store
// GetStoreByID returns the store
GetStoreByID(id uint64) Store
// Start start each node sequentially
Start()
// Stop stop each node sequentially
Stop()
// StartWithConcurrent after starting the first node, other nodes start concurrently
StartWithConcurrent(bool)
// Restart restart the cluster
Restart()
// RestartWithFunc restart the cluster, `beforeStartFunc` is called before starting
RestartWithFunc(beforeStartFunc func())
// StartNode start the node
StartNode(node int)
// StopNode stop the node
StopNode(node int)
// RestartNode restart the node
RestartNode(node int)
// StartNetworkPartition node will in network partition, must call after node started
StartNetworkPartition(partitions [][]int)
// StopNetworkPartition stop network partition
StopNetworkPartition()
// GetPRCount returns the number of replicas on the node
GetPRCount(node int) int
// GetShardByIndex returns the shard by `shardIndex`, `shardIndex` is the order in which
// the shard is created on the node
GetShardByIndex(node int, shardIndex int) Shard
// GetShardByID returns the shard from the node by shard id
GetShardByID(node int, shardID uint64) Shard
// CheckShardCount check whether the number of shards on each node is correct
CheckShardCount(countPerNode int)
// CheckShardRange check whether the range field of the shard on each node is correct,
// `shardIndex` is the order in which the shard is created on the node
CheckShardRange(shardIndex int, start, end []byte)
// WaitRemovedByShardID check whether the specific shard removed from every node until timeout
WaitRemovedByShardID(shardID uint64, timeout time.Duration)
// WaitRemovedByShardIDAt check whether the specific shard removed from specific node until timeout
WaitRemovedByShardIDAt(shardID uint64, nodes []int, timeout time.Duration)
// WaitLeadersByCount check that the number of leaders of the cluster reaches at least the specified value
// until the timeout
WaitLeadersByCount(count int, timeout time.Duration)
// WaitLeadersByCountsAndShardGroupAndLabel check that the number of leaders of the cluster reaches at least the specified value
// until the timeout
WaitLeadersByCountsAndShardGroupAndLabel(counts []int, group uint64, key, value string, timeout time.Duration)
// WaitShardByCount check that the number of shard of the cluster reaches at least the specified value
// until the timeout
WaitShardByCount(count int, timeout time.Duration)
// WaitShardByLabel check that the shard has the specified label until the timeout
WaitShardByLabel(id uint64, label, value string, timeout time.Duration)
// WaitVoterReplicaByCount check that the number of voter shard of the cluster reaches at least the specified value
// until the timeout
WaitVoterReplicaByCountPerNode(count int, timeout time.Duration)
// WaitVoterReplicaByCounts check that the number of voter shard of the cluster reaches at least the specified value
// until the timeout
WaitVoterReplicaByCounts(counts []int, timeout time.Duration)
// WaitVoterReplicaByCountsAndShardGroup check that the number of voter shard of the cluster reaches at least the specified value
// until the timeout
WaitVoterReplicaByCountsAndShardGroup(counts []int, shardGroup uint64, timeout time.Duration)
// WaitVoterReplicaByCountsAndShardGroupAndLabel check that the number of voter shard of the cluster reaches at least the specified value
// until the timeout
WaitVoterReplicaByCountsAndShardGroupAndLabel(counts []int, shardGroup uint64, label, value string, timeout time.Duration)
// WaitShardByCountPerNode check that the number of shard of each node reaches at least the specified value
// until the timeout
WaitShardByCountPerNode(count int, timeout time.Duration)
// WaitAllReplicasChangeToVoter check that the role of shard of each node change to voter until the timeout
WaitAllReplicasChangeToVoter(shard uint64, timeout time.Duration)
// WaitShardByCountOnNode check that the number of shard of the specified node reaches at least the specified value
// until the timeout
WaitShardByCountOnNode(node, count int, timeout time.Duration)
// WaitShardSplitByCount check whether the count of shard split reaches a specific value until timeout
WaitShardSplitByCount(id uint64, count int, timeout time.Duration)
// WaitShardByCounts check whether the number of shards reaches a specific value until timeout
WaitShardByCounts(counts []int, timeout time.Duration)
// WaitShardStateChangedTo check whether the state of shard changes to the specific value until timeout
WaitShardStateChangedTo(shardID uint64, to metapb.ResourceState, timeout time.Duration)
// GetShardLeaderStore return the leader node of the shard
GetShardLeaderStore(shardID uint64) Store
// GetProphet returns the prophet instance
GetProphet() prophet.Prophet
// CreateTestKVClient create and returns a kv client
CreateTestKVClient(node int) TestKVClient
}
TestRaftCluster is the test cluster is used to test starting N nodes in a process, and to provide the start and stop capabilities of a single node, which is used to test `raftstore` more easily.
func NewSingleTestClusterStore ¶
func NewSingleTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewSingleTestClusterStore create test cluster with 1 node
func NewTestClusterStore ¶
func NewTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewTestClusterStore create test cluster using options
Source Files
¶
- batch.go
- codec_rpc.go
- errors.go
- execute_context.go
- group_controller.go
- log_reader.go
- metric.go
- pending_proposal.go
- prophet_adapter.go
- proposal_batch.go
- proxy.go
- proxy_backend.go
- proxy_rpc.go
- read_index_queue.go
- replica.go
- replica_apply_result.go
- replica_create.go
- replica_destroy.go
- replica_destroy_task.go
- replica_event_loop.go
- replica_event_proposal.go
- replica_event_raft_ready.go
- replica_snapshot.go
- replica_split.go
- replica_state_machine.go
- replica_state_machine_exec.go
- replica_stats.go
- resp.go
- router.go
- snapshotter.go
- split_checker.go
- store.go
- store_bootstrap.go
- store_debug.go
- store_handler.go
- store_route_handler.go
- store_shards_pool.go
- store_timer_task.go
- testutil.go
- type.go
- util.go
- vacuum_cleaner.go
- worker_pool.go