Documentation
¶
Overview ¶
Copyright 2020 MatrixOrigin.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func DecodeDataKey(key []byte) []byte
- func EncodeDataKey(group uint64, key []byte) []byte
- func GetMaxKey() []byte
- func GetMinKey() []byte
- func GetStoreIdentKey() []byte
- func NewResourceAdapterWithShard(meta bhmetapb.Shard) metadata.Resource
- func WriteGroupPrefix(group uint64, key []byte)
- type Router
- type ShardsPool
- type ShardsProxy
- type Store
- type TestClusterOption
- func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
- func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
- func WithMetadataStorageOption(metaOpts *cpebble.Options) TestClusterOption
- func WithTestClusterDataPath(path string) TestClusterOption
- func WithTestClusterDisableSchedule() TestClusterOption
- func WithTestClusterLogLevel(level string) TestClusterOption
- func WithTestClusterNodeCount(n int) TestClusterOption
- func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
- func WithTestClusterReadHandler(cmd uint64, value command.ReadCommandFunc) TestClusterOption
- func WithTestClusterRecreate(value bool) TestClusterOption
- func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
- func WithTestClusterUseDisk() TestClusterOption
- func WithTestClusterWriteHandler(cmd uint64, value command.WriteCommandFunc) TestClusterOption
- type TestKVClient
- type TestRaftCluster
Constants ¶
This section is empty.
Variables ¶
var ( // SingleTestCluster single test raft cluster SingleTestCluster = WithTestClusterNodeCount(1) // DiskTestCluster using pebble storage test raft store DiskTestCluster = WithTestClusterUseDisk() // DisableScheduleTestCluster disable prophet schedulers test raft store DisableScheduleTestCluster = WithTestClusterDisableSchedule() // NewTestCluster clean data before test cluster start NewTestCluster = WithTestClusterRecreate(true) // NoCleanTestCluster using exists data before test cluster start OldTestCluster = WithTestClusterRecreate(false) // SetCMDTestClusterHandler set cmd handler SetCMDTestClusterHandler = WithTestClusterWriteHandler(1, func(s bhmetapb.Shard, r *raftcmdpb.Request, c command.Context) (uint64, int64, *raftcmdpb.Response) { resp := pb.AcquireResponse() c.WriteBatch().Set(r.Key, r.Cmd) resp.Value = []byte("OK") changed := uint64(len(r.Key)) + uint64(len(r.Cmd)) return changed, int64(changed), resp }) // GetCMDTestClusterHandler get cmd handler GetCMDTestClusterHandler = WithTestClusterReadHandler(2, func(s bhmetapb.Shard, r *raftcmdpb.Request, c command.Context) (*raftcmdpb.Response, uint64) { resp := pb.AcquireResponse() value, err := c.DataStorage().(storage.KVStorage).Get(r.Key) if err != nil { panic("BUG: can not error") } resp.Value = value return resp, uint64(len(value)) }) )
var (
// DataPrefixSize data prefix size
DataPrefixSize = dataPrefixKeySize + 8
)
data is in (z, z+1)
var ( // ErrTimeout timeout error ErrTimeout = errors.New("exec timeout") )
var ( // RetryInterval retry interval RetryInterval = time.Second )
Functions ¶
func EncodeDataKey ¶
EncodeDataKey encode data key
func NewResourceAdapterWithShard ¶
NewResourceAdapterWithShard create a prophet resource use shard
func WriteGroupPrefix ¶
WriteGroupPrefix write group prefix
Types ¶
type Router ¶
type Router interface {
// Start the router
Start() error
// SelectShard returns a shard and leader store that the key is in the range [shard.Start, shard.End).
// If returns leader address is "", means the current shard has no leader
SelectShard(group uint64, key []byte) (uint64, string)
// Every do with all shards
Every(group uint64, mustLeader bool, fn func(shard *bhmetapb.Shard, store bhmetapb.Store))
// ForeachShards foreach shards
ForeachShards(group uint64, fn func(shard *bhmetapb.Shard) bool)
// LeaderStore return leader peer store
LeaderPeerStore(shardID uint64) bhmetapb.Store
// RandomPeerStore return random peer store
RandomPeerStore(shardID uint64) bhmetapb.Store
// GetShardStats returns the runtime stats info of the shard
GetShardStats(id uint64) *metapb.ResourceStats
// GetStoreStats returns the runtime stats info of the store
GetStoreStats(id uint64) *metapb.ContainerStats
// GetWatcher returns the prophet event watcher
GetWatcher() prophet.Watcher
}
Router route the request to the corresponding shard
type ShardsPool ¶
type ShardsPool interface {
// Alloc alloc a shard from shards pool, returns error if no idle shards left. The `purpose` is used to avoid
// duplicate allocation.
Alloc(group uint64, purpose []byte) (bhmetapb.AllocatedShard, error)
}
ShardsPool is a shards pool, it will always create shards until the number of available shards reaches the value specified by `capacity`, we called these `Idle Shards`.
The pool will create a Job in the prophet. Once a node became the prophet leader, shards pool job will start, and stop if the node became the follower, So the job can be executed on any node. It will use prophet client to create shard after the job starts.
type ShardsProxy ¶
type ShardsProxy interface {
Dispatch(req *raftcmdpb.Request) error
DispatchTo(req *raftcmdpb.Request, shard uint64, store string) error
Router() Router
}
ShardsProxy Shards proxy, distribute the appropriate request to the corresponding backend, retry the request for the error
func NewShardsProxy ¶
func NewShardsProxy(router Router, doneCB doneFunc, errorDoneCB errorDoneFunc) ShardsProxy
NewShardsProxy returns a shard proxy
func NewShardsProxyWithStore ¶
func NewShardsProxyWithStore(store Store, doneCB doneFunc, errorDoneCB errorDoneFunc, ) (ShardsProxy, error)
NewShardsProxyWithStore returns a shard proxy with a raftstore
type Store ¶
type Store interface {
// Start the raft store
Start()
// Stop the raft store
Stop()
// GetConfig returns the config of the store
GetConfig() *config.Config
// Meta returns store meta
Meta() bhmetapb.Store
// GetRouter returns a router
GetRouter() Router
// RegisterReadFunc register read command handler
RegisterReadFunc(uint64, command.ReadCommandFunc)
// RegisterWriteFunc register write command handler
RegisterWriteFunc(uint64, command.WriteCommandFunc)
// RegisterLocalFunc register local command handler
RegisterLocalFunc(uint64, command.LocalCommandFunc)
// RegisterLocalRequestCB register local request cb to process response
RegisterLocalRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response))
// RegisterRPCRequestCB register rpc request cb to process response
RegisterRPCRequestCB(func(*raftcmdpb.RaftResponseHeader, *raftcmdpb.Response))
// OnRequest receive a request, and call cb while the request is completed
OnRequest(*raftcmdpb.Request) error
// MetadataStorage returns a MetadataStorage of the shard group
MetadataStorage() storage.MetadataStorage
// DataStorage returns a DataStorage of the shard group
DataStorageByGroup(uint64, uint64) storage.DataStorage
// MaybeLeader returns the shard replica maybe leader
MaybeLeader(uint64) bool
// AllocID returns a uint64 id, panic if has a error
MustAllocID() uint64
// Prophet return current prophet instance
Prophet() prophet.Prophet
// CreateRPCCliendSideCodec returns the rpc codec at client side
CreateRPCCliendSideCodec() (codec.Encoder, codec.Decoder)
// CreateResourcePool create resource pools, the resource pool will create shards,
// and try to maintain the number of shards in the pool not less than the `capacity`
// parameter. This is an idempotent operation.
CreateResourcePool(...metapb.ResourcePool) (ShardsPool, error)
// GetResourcePool returns `ShardsPool`, nil if `CreateResourcePool` not completed
GetResourcePool() ShardsPool
}
Store manage a set of raft group
type TestClusterOption ¶
type TestClusterOption func(*testClusterOptions)
TestClusterOption is the option for create TestCluster
func WithAppendTestClusterAdjustConfigFunc ¶
func WithAppendTestClusterAdjustConfigFunc(value func(node int, cfg *config.Config)) TestClusterOption
WithAppendTestClusterAdjustConfigFunc adjust config
func WithDataStorageOption ¶
func WithDataStorageOption(dataOpts *cpebble.Options) TestClusterOption
WithDataStorageOption set options to create data storage
func WithMetadataStorageOption ¶
func WithMetadataStorageOption(metaOpts *cpebble.Options) TestClusterOption
WithMetadataStorageOption set options to create metadata storage
func WithTestClusterDataPath ¶
func WithTestClusterDataPath(path string) TestClusterOption
WithTestClusterDataPath set data data storage directory
func WithTestClusterDisableSchedule ¶
func WithTestClusterDisableSchedule() TestClusterOption
WithTestClusterDisableSchedule disable pd schedule
func WithTestClusterLogLevel ¶
func WithTestClusterLogLevel(level string) TestClusterOption
WithTestClusterLogLevel set raftstore log level
func WithTestClusterNodeCount ¶
func WithTestClusterNodeCount(n int) TestClusterOption
WithTestClusterNodeCount set node count of test cluster
func WithTestClusterNodeStartFunc ¶
func WithTestClusterNodeStartFunc(value func(node int, store Store)) TestClusterOption
WithTestClusterNodeStartFunc custom node start func
func WithTestClusterReadHandler ¶
func WithTestClusterReadHandler(cmd uint64, value command.ReadCommandFunc) TestClusterOption
WithTestClusterReadHandler read handlers
func WithTestClusterRecreate ¶
func WithTestClusterRecreate(value bool) TestClusterOption
WithTestClusterRecreate if true, the test cluster will clean and recreate the data dir
func WithTestClusterStoreFactory ¶
func WithTestClusterStoreFactory(value func(node int, cfg *config.Config) Store) TestClusterOption
WithTestClusterStoreFactory custom create raftstore factory
func WithTestClusterUseDisk ¶
func WithTestClusterUseDisk() TestClusterOption
WithTestClusterUseDisk use disk storage for testing
func WithTestClusterWriteHandler ¶
func WithTestClusterWriteHandler(cmd uint64, value command.WriteCommandFunc) TestClusterOption
WithTestClusterWriteHandler write handlers
type TestKVClient ¶
type TestKVClient interface {
// Set set key-value to the backend kv storage
Set(key, value string, timeout time.Duration) error
// Get returns the value of the specific key from backend kv storage
Get(key string, timeout time.Duration) (string, error)
// Close close the test client
Close()
}
TestKVClient is a kv client that uses `TestRaftCluster` as Backend's KV storage engine
type TestRaftCluster ¶
type TestRaftCluster interface {
// EveryStore do fn at every store, it can be used to init some store register
EveryStore(fn func(i int, store Store))
// GetStore returns the node store
GetStore(node int) Store
// GetWatcher returns event watcher of the node
GetWatcher(node int) prophet.Watcher
// Start start each node sequentially
Start()
// Stop stop each node sequentially
Stop()
// StartWithConcurrent after starting the first node, other nodes start concurrently
StartWithConcurrent(bool)
// Restart restart the cluster
Restart()
// RestartWithFunc restart the cluster, `beforeStartFunc` is called before starting
RestartWithFunc(beforeStartFunc func())
// GetPRCount returns the number of replicas on the node
GetPRCount(node int) int
// GetShardByIndex returns the shard by `shardIndex`, `shardIndex` is the order in which
// the shard is created on the node
GetShardByIndex(node int, shardIndex int) bhmetapb.Shard
// GetShardByID returns the shard from the node by shard id
GetShardByID(node int, shardID uint64) bhmetapb.Shard
// CheckShardCount check whether the number of shards on each node is correct
CheckShardCount(countPerNode int)
// CheckShardRange check whether the range field of the shard on each node is correct,
// `shardIndex` is the order in which the shard is created on the node
CheckShardRange(shardIndex int, start, end []byte)
// WaitRemovedByShardID check whether the specific shard removed from every node until timeout
WaitRemovedByShardID(shardID uint64, timeout time.Duration)
// WaitLeadersByCount check that the number of leaders of the cluster reaches at least the specified value
// until the timeout
WaitLeadersByCount(count int, timeout time.Duration)
// WaitShardByCount check that the number of shard of the cluster reaches at least the specified value
// until the timeout
WaitShardByCount(count int, timeout time.Duration)
// WaitShardByCountPerNode check that the number of shard of each node reaches at least the specified value
// until the timeout
WaitShardByCountPerNode(count int, timeout time.Duration)
// WaitShardSplitByCount check whether the count of shard split reaches a specific value until timeout
WaitShardSplitByCount(id uint64, count int, timeout time.Duration)
// WaitShardByCounts check whether the number of shards reaches a specific value until timeout
WaitShardByCounts(counts []int, timeout time.Duration)
// WaitShardStateChangedTo check whether the state of shard changes to the specific value until timeout
WaitShardStateChangedTo(shardID uint64, to metapb.ResourceState, timeout time.Duration)
// GetShardLeaderStore return the leader node of the shard
GetShardLeaderStore(shardID uint64) Store
// GetProphet returns the prophet instance
GetProphet() prophet.Prophet
// Set write key-value pairs to the `DataStorage` of the node
Set(node int, key, value []byte)
// CreateTestKVClient create and returns a kv client
CreateTestKVClient(node int) TestKVClient
}
TestRaftCluster is the test cluster is used to test starting N nodes in a process, and to provide the start and stop capabilities of a single node, which is used to test `raftstore` more easily.
func NewSingleTestClusterStore ¶
func NewSingleTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewSingleTestClusterStore create test cluster with 1 node
func NewTestClusterStore ¶
func NewTestClusterStore(t *testing.T, opts ...TestClusterOption) TestRaftCluster
NewTestClusterStore create test cluster using options
Source Files
¶
- batch.go
- cmd.go
- codec_rpc.go
- errors.go
- keys.go
- metric.go
- peer_apply.go
- peer_apply_exec.go
- peer_event_loop.go
- peer_event_post_apply.go
- peer_event_proposal.go
- peer_event_raft_ready.go
- peer_job.go
- peer_replica.go
- peer_storage.go
- pool.go
- prophet_adapter.go
- proxy.go
- proxy_backend.go
- router.go
- rpc.go
- snap.go
- store.go
- store_bootstrap.go
- store_handler.go
- store_ready.go
- store_route_handler.go
- store_shards_pool.go
- testutil.go
- util.go