Documentation
¶
Index ¶
- Constants
- Variables
- func ValidateReplicationReplicateShard(schemaReader schema.SchemaReader, c *api.ReplicationReplicateShardRequest) error
- type CopyOpConsumer
- type FSMOpProducer
- type Manager
- func (m *Manager) CancelReplication(c *cmd.ApplyRequest) error
- func (m *Manager) DeleteAllReplications(c *cmd.ApplyRequest) error
- func (m *Manager) DeleteReplication(c *cmd.ApplyRequest) error
- func (m *Manager) DeleteReplicationsByCollection(c *cmd.ApplyRequest) error
- func (m *Manager) DeleteReplicationsByTenants(c *cmd.ApplyRequest) error
- func (m *Manager) ForceDeleteAll(c *cmd.ApplyRequest) error
- func (m *Manager) ForceDeleteByCollection(c *cmd.ApplyRequest) error
- func (m *Manager) ForceDeleteByCollectionAndShard(c *cmd.ApplyRequest) error
- func (m *Manager) ForceDeleteByTargetNode(c *cmd.ApplyRequest) error
- func (m *Manager) ForceDeleteByUuid(c *cmd.ApplyRequest) error
- func (m *Manager) GetAllReplicationDetails(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) GetReplicationDetailsByCollection(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) GetReplicationDetailsByCollectionAndShard(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) GetReplicationDetailsByReplicationId(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) GetReplicationDetailsByTargetNode(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) GetReplicationFSM() *ShardReplicationFSM
- func (m *Manager) GetReplicationOpUUIDFromId(id uint64) (strfmt.UUID, error)
- func (m *Manager) GetReplicationOperationState(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) QueryShardingStateByCollection(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) QueryShardingStateByCollectionAndShard(c *cmd.QueryRequest) ([]byte, error)
- func (m *Manager) RegisterError(c *cmd.ApplyRequest) error
- func (m *Manager) RemoveReplicaOp(c *cmd.ApplyRequest) error
- func (m *Manager) Replicate(logId uint64, c *cmd.ApplyRequest) error
- func (m *Manager) ReplicationCancellationComplete(c *cmd.ApplyRequest) error
- func (m *Manager) Restore(bytes []byte) error
- func (m *Manager) Snapshot() ([]byte, error)
- func (m *Manager) StoreSchemaVersion(c *cmd.ApplyRequest) error
- func (m *Manager) UpdateReplicateOpState(c *cmd.ApplyRequest) error
- type MockOpConsumer
- type MockOpConsumer_Consume_Call
- func (_c *MockOpConsumer_Consume_Call) Return(_a0 error) *MockOpConsumer_Consume_Call
- func (_c *MockOpConsumer_Consume_Call) Run(run func(ctx context.Context, in <-chan ShardReplicationOpAndStatus)) *MockOpConsumer_Consume_Call
- func (_c *MockOpConsumer_Consume_Call) RunAndReturn(run func(context.Context, <-chan ShardReplicationOpAndStatus) error) *MockOpConsumer_Consume_Call
- type MockOpConsumer_Expecter
- type MockOpProducer
- type MockOpProducer_Expecter
- type MockOpProducer_Produce_Call
- func (_c *MockOpProducer_Produce_Call) Return(_a0 error) *MockOpProducer_Produce_Call
- func (_c *MockOpProducer_Produce_Call) Run(run func(ctx context.Context, out chan<- ShardReplicationOpAndStatus)) *MockOpProducer_Produce_Call
- func (_c *MockOpProducer_Produce_Call) RunAndReturn(run func(context.Context, chan<- ShardReplicationOpAndStatus) error) *MockOpProducer_Produce_Call
- type MockTimeProvider
- type MockTimeProvider_Expecter
- type MockTimeProvider_Now_Call
- type MockTimer
- type MockTimer_AfterFunc_Call
- func (_c *MockTimer_AfterFunc_Call) Return(_a0 *time.Timer) *MockTimer_AfterFunc_Call
- func (_c *MockTimer_AfterFunc_Call) Run(run func(duration time.Duration, fn func())) *MockTimer_AfterFunc_Call
- func (_c *MockTimer_AfterFunc_Call) RunAndReturn(run func(time.Duration, func()) *time.Timer) *MockTimer_AfterFunc_Call
- type MockTimer_Expecter
- type OpConsumer
- type OpProducer
- type OpsCache
- func (c *OpsCache) Cancel(opId uint64) bool
- func (c *OpsCache) CancelAll()
- func (c *OpsCache) DeleteHasBeenCancelled(opId uint64)
- func (c *OpsCache) DeleteInFlight(opId uint64)
- func (c *OpsCache) HasBeenCancelled(opId uint64) bool
- func (c *OpsCache) InFlight(opId uint64) bool
- func (c *OpsCache) LoadCancel(opId uint64) (context.CancelFunc, bool)
- func (c *OpsCache) LoadOrStore(opId uint64) bool
- func (c *OpsCache) StoreCancel(opId uint64, cancel context.CancelFunc)
- func (c *OpsCache) StoreHasBeenCancelled(opId uint64)
- type OpsGateway
- type OpsScheduleMetadata
- type ShardReplicationEngine
- func (e *ShardReplicationEngine) IsRunning() bool
- func (e *ShardReplicationEngine) OpChannelCap() int
- func (e *ShardReplicationEngine) OpChannelLen() int
- func (e *ShardReplicationEngine) Start(ctx context.Context) error
- func (e *ShardReplicationEngine) Stop()
- func (e *ShardReplicationEngine) String() string
- type ShardReplicationFSM
- func (s *ShardReplicationFSM) CancelReplication(c *api.ReplicationCancelRequest) error
- func (s *ShardReplicationFSM) CancellationComplete(c *api.ReplicationCancellationCompleteRequest) error
- func (s *ShardReplicationFSM) DeleteAllReplications(c *api.ReplicationDeleteAllRequest) error
- func (s *ShardReplicationFSM) DeleteReplication(c *api.ReplicationDeleteRequest) error
- func (s *ShardReplicationFSM) DeleteReplicationsByCollection(collection string) error
- func (s *ShardReplicationFSM) DeleteReplicationsByTenants(collection string, tenants []string) error
- func (s *ShardReplicationFSM) FilterOneShardReplicasRead(collection string, shard string, shardReplicasLocation []string) []string
- func (s *ShardReplicationFSM) FilterOneShardReplicasWrite(collection string, shard string, shardReplicasLocation []string) ([]string, []string)
- func (s *ShardReplicationFSM) ForceDeleteAll() error
- func (s *ShardReplicationFSM) ForceDeleteByCollection(collection string) error
- func (s *ShardReplicationFSM) ForceDeleteByCollectionAndShard(collection, shard string) error
- func (s *ShardReplicationFSM) ForceDeleteByTargetNode(node string) error
- func (s *ShardReplicationFSM) ForceDeleteByUuid(uuid strfmt.UUID) error
- func (s *ShardReplicationFSM) GetOpById(id uint64) (ShardReplicationOpAndStatus, bool)
- func (s *ShardReplicationFSM) GetOpByUuid(uuid strfmt.UUID) (ShardReplicationOpAndStatus, bool)
- func (s *ShardReplicationFSM) GetOpState(op ShardReplicationOp) (ShardReplicationOpStatus, bool)
- func (s *ShardReplicationFSM) GetOpsForCollection(collection string) ([]ShardReplicationOpAndStatus, bool)
- func (s *ShardReplicationFSM) GetOpsForCollectionAndShard(collection string, shard string) ([]ShardReplicationOpAndStatus, bool)
- func (s *ShardReplicationFSM) GetOpsForTarget(node string) []ShardReplicationOp
- func (s *ShardReplicationFSM) GetOpsForTargetNode(node string) ([]ShardReplicationOpAndStatus, bool)
- func (s *ShardReplicationFSM) GetReplicationOpUUIDFromId(id uint64) (strfmt.UUID, error)
- func (s *ShardReplicationFSM) GetStatusByOps() map[ShardReplicationOp]ShardReplicationOpStatus
- func (s *ShardReplicationFSM) HasOngoingReplication(collection string, shard string, replica string) bool
- func (s *ShardReplicationFSM) RegisterError(c *api.ReplicationRegisterErrorRequest) error
- func (s *ShardReplicationFSM) RemoveReplicationOp(c *api.ReplicationRemoveOpRequest) error
- func (s *ShardReplicationFSM) Replicate(id uint64, c *api.ReplicationReplicateShardRequest) error
- func (s *ShardReplicationFSM) Restore(bytes []byte) error
- func (s *ShardReplicationFSM) SetUnCancellable(id uint64) error
- func (s *ShardReplicationFSM) Snapshot() ([]byte, error)
- func (s *ShardReplicationFSM) StoreSchemaVersion(c *api.ReplicationStoreSchemaVersionRequest) error
- func (s *ShardReplicationFSM) UpdateReplicationOpStatus(c *api.ReplicationUpdateOpStateRequest) error
- type ShardReplicationOp
- type ShardReplicationOpAndStatus
- type ShardReplicationOpStatus
- func (s *ShardReplicationOpStatus) AddError(error string) error
- func (s *ShardReplicationOpStatus) ChangeState(nextState api.ShardReplicationState)
- func (s *ShardReplicationOpStatus) CompleteCancellation()
- func (s *ShardReplicationOpStatus) GetCurrent() State
- func (s *ShardReplicationOpStatus) GetCurrentState() api.ShardReplicationState
- func (s *ShardReplicationOpStatus) GetHistory() StateHistory
- func (s *ShardReplicationOpStatus) OnlyCancellation() bool
- func (s *ShardReplicationOpStatus) ShouldCleanup() bool
- func (s ShardReplicationOpStatus) ShouldConsumeOps() bool
- func (s *ShardReplicationOpStatus) TriggerCancellation()
- func (s *ShardReplicationOpStatus) TriggerDeletion()
- type State
- type StateHistory
Constants ¶
const DELETED = "deleted"
DELETED is a constant representing a temporary deleted state of a replication operation that should not be stored in the FSM.
const (
MaxErrors = 50
)
Variables ¶
var ( GatewayBackoffMaxInterval = 15 * time.Second GatewayInitialBackoffPeriod = 5 * time.Second )
var ( ErrAlreadyExists = errors.New("already exists") ErrNodeNotFound = errors.New("node not found") ErrClassNotFound = errors.New("class not found") ErrShardNotFound = errors.New("shard not found") )
var ErrBadRequest = errors.New("bad request")
var ErrMaxErrorsReached = errors.New("max errors reached")
var ErrShardAlreadyReplicating = errors.New("replica is already being replicated")
Functions ¶
func ValidateReplicationReplicateShard ¶
func ValidateReplicationReplicateShard(schemaReader schema.SchemaReader, c *api.ReplicationReplicateShardRequest) error
ValidateReplicationReplicateShard validates that c is valid given the current state of the schema read using schemaReader
Types ¶
type CopyOpConsumer ¶
type CopyOpConsumer struct {
// contains filtered or unexported fields
}
CopyOpConsumer is an implementation of the OpConsumer interface that processes replication operations by executing copy operations from a source shard to a target shard. It uses a ReplicaCopier to actually carry out the copy operation. Moreover, it supports configurable backoff, timeout and concurrency limits.
func NewCopyOpConsumer ¶
func NewCopyOpConsumer( logger *logrus.Logger, leaderClient types.FSMUpdater, replicaCopier types.ReplicaCopier, nodeId string, backoffPolicy backoff.BackOff, ongoingOps *OpsCache, opTimeout time.Duration, maxWorkers int, asyncReplicationMinimumWait *runtime.DynamicValue[time.Duration], engineOpCallbacks *metrics.ReplicationEngineOpsCallbacks, schemaReader schema.SchemaReader, ) *CopyOpConsumer
NewCopyOpConsumer creates a new CopyOpConsumer instance responsible for executing replication operations using a configurable worker pool.
It uses a ReplicaCopier to perform the actual data copy.
func (*CopyOpConsumer) Consume ¶
func (c *CopyOpConsumer) Consume(workerCtx context.Context, in <-chan ShardReplicationOpAndStatus) error
Consume processes replication operations from the input channel, ensuring that only a limited number of consumers are active concurrently based on the maxWorkers value.
type FSMOpProducer ¶
type FSMOpProducer struct {
// contains filtered or unexported fields
}
FSMOpProducer is an implementation of the OpProducer interface that reads replication operations from a ShardReplicationFSM, which tracks the state of replication operations.
func NewFSMOpProducer ¶
func NewFSMOpProducer(logger *logrus.Logger, fsm *ShardReplicationFSM, pollingInterval time.Duration, nodeId string) *FSMOpProducer
NewFSMOpProducer creates a new FSMOpProducer instance, which periodically polls the ShardReplicationFSM for operations assigned to the given node and pushes them to a channel for consumption by the replication engine.The polling interval controls how often the FSM is queried for replication operations.
Additional configuration can be applied using optional FSMProducerOption functions.
func (*FSMOpProducer) Produce ¶
func (p *FSMOpProducer) Produce(ctx context.Context, out chan<- ShardReplicationOpAndStatus) error
Produce implements the OpProducer interface and starts producing operations for the given node.
It uses a polling mechanism based on time.Ticker to periodically fetch all replication operations that should be executed on the current node. These operations are then sent to the provided output channel to be consumed by the OpConsumer.
The function respects backpressure by using a bounded output channel. If the channel is full (i.e., the consumer is slow or blocked), the producer blocks while trying to send operations. While blocked, any additional ticks from the time.Ticker are dropped, as time.Ticker does not buffer ticks. This means the polling interval is effectively paused while the system is under load.
This behavior is intentional: the producer only generates new work when the system has capacity to process it. Missing some ticks during backpressure is acceptable and avoids accumulating unprocessed work or overloading the system.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
func NewManager(schemaReader schema.SchemaReader, reg prometheus.Registerer) *Manager
func (*Manager) CancelReplication ¶
func (m *Manager) CancelReplication(c *cmd.ApplyRequest) error
func (*Manager) DeleteAllReplications ¶
func (m *Manager) DeleteAllReplications(c *cmd.ApplyRequest) error
func (*Manager) DeleteReplication ¶
func (m *Manager) DeleteReplication(c *cmd.ApplyRequest) error
func (*Manager) DeleteReplicationsByCollection ¶
func (m *Manager) DeleteReplicationsByCollection(c *cmd.ApplyRequest) error
func (*Manager) DeleteReplicationsByTenants ¶
func (m *Manager) DeleteReplicationsByTenants(c *cmd.ApplyRequest) error
func (*Manager) ForceDeleteAll ¶
func (m *Manager) ForceDeleteAll(c *cmd.ApplyRequest) error
func (*Manager) ForceDeleteByCollection ¶
func (m *Manager) ForceDeleteByCollection(c *cmd.ApplyRequest) error
func (*Manager) ForceDeleteByCollectionAndShard ¶
func (m *Manager) ForceDeleteByCollectionAndShard(c *cmd.ApplyRequest) error
func (*Manager) ForceDeleteByTargetNode ¶
func (m *Manager) ForceDeleteByTargetNode(c *cmd.ApplyRequest) error
func (*Manager) ForceDeleteByUuid ¶
func (m *Manager) ForceDeleteByUuid(c *cmd.ApplyRequest) error
func (*Manager) GetAllReplicationDetails ¶
func (m *Manager) GetAllReplicationDetails(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) GetReplicationDetailsByCollection ¶
func (m *Manager) GetReplicationDetailsByCollection(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) GetReplicationDetailsByCollectionAndShard ¶
func (m *Manager) GetReplicationDetailsByCollectionAndShard(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) GetReplicationDetailsByReplicationId ¶
func (m *Manager) GetReplicationDetailsByReplicationId(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) GetReplicationDetailsByTargetNode ¶
func (m *Manager) GetReplicationDetailsByTargetNode(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) GetReplicationFSM ¶
func (m *Manager) GetReplicationFSM() *ShardReplicationFSM
func (*Manager) GetReplicationOpUUIDFromId ¶
func (*Manager) GetReplicationOperationState ¶
func (m *Manager) GetReplicationOperationState(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) QueryShardingStateByCollection ¶
func (m *Manager) QueryShardingStateByCollection(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) QueryShardingStateByCollectionAndShard ¶
func (m *Manager) QueryShardingStateByCollectionAndShard(c *cmd.QueryRequest) ([]byte, error)
func (*Manager) RegisterError ¶
func (m *Manager) RegisterError(c *cmd.ApplyRequest) error
func (*Manager) RemoveReplicaOp ¶
func (m *Manager) RemoveReplicaOp(c *cmd.ApplyRequest) error
func (*Manager) ReplicationCancellationComplete ¶
func (m *Manager) ReplicationCancellationComplete(c *cmd.ApplyRequest) error
func (*Manager) StoreSchemaVersion ¶
func (m *Manager) StoreSchemaVersion(c *cmd.ApplyRequest) error
func (*Manager) UpdateReplicateOpState ¶
func (m *Manager) UpdateReplicateOpState(c *cmd.ApplyRequest) error
type MockOpConsumer ¶
MockOpConsumer is an autogenerated mock type for the OpConsumer type
func NewMockOpConsumer ¶
func NewMockOpConsumer(t interface {
mock.TestingT
Cleanup(func())
}) *MockOpConsumer
NewMockOpConsumer creates a new instance of MockOpConsumer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockOpConsumer) Consume ¶
func (_m *MockOpConsumer) Consume(ctx context.Context, in <-chan ShardReplicationOpAndStatus) error
Consume provides a mock function with given fields: ctx, in
func (*MockOpConsumer) EXPECT ¶
func (_m *MockOpConsumer) EXPECT() *MockOpConsumer_Expecter
type MockOpConsumer_Consume_Call ¶
MockOpConsumer_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume'
func (*MockOpConsumer_Consume_Call) Return ¶
func (_c *MockOpConsumer_Consume_Call) Return(_a0 error) *MockOpConsumer_Consume_Call
func (*MockOpConsumer_Consume_Call) Run ¶
func (_c *MockOpConsumer_Consume_Call) Run(run func(ctx context.Context, in <-chan ShardReplicationOpAndStatus)) *MockOpConsumer_Consume_Call
func (*MockOpConsumer_Consume_Call) RunAndReturn ¶
func (_c *MockOpConsumer_Consume_Call) RunAndReturn(run func(context.Context, <-chan ShardReplicationOpAndStatus) error) *MockOpConsumer_Consume_Call
type MockOpConsumer_Expecter ¶
type MockOpConsumer_Expecter struct {
// contains filtered or unexported fields
}
func (*MockOpConsumer_Expecter) Consume ¶
func (_e *MockOpConsumer_Expecter) Consume(ctx interface{}, in interface{}) *MockOpConsumer_Consume_Call
Consume is a helper method to define mock.On call
- ctx context.Context
- in <-chan ShardReplicationOpAndStatus
type MockOpProducer ¶
MockOpProducer is an autogenerated mock type for the OpProducer type
func NewMockOpProducer ¶
func NewMockOpProducer(t interface {
mock.TestingT
Cleanup(func())
}) *MockOpProducer
NewMockOpProducer creates a new instance of MockOpProducer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockOpProducer) EXPECT ¶
func (_m *MockOpProducer) EXPECT() *MockOpProducer_Expecter
func (*MockOpProducer) Produce ¶
func (_m *MockOpProducer) Produce(ctx context.Context, out chan<- ShardReplicationOpAndStatus) error
Produce provides a mock function with given fields: ctx, out
type MockOpProducer_Expecter ¶
type MockOpProducer_Expecter struct {
// contains filtered or unexported fields
}
func (*MockOpProducer_Expecter) Produce ¶
func (_e *MockOpProducer_Expecter) Produce(ctx interface{}, out interface{}) *MockOpProducer_Produce_Call
Produce is a helper method to define mock.On call
- ctx context.Context
- out chan<- ShardReplicationOpAndStatus
type MockOpProducer_Produce_Call ¶
MockOpProducer_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'
func (*MockOpProducer_Produce_Call) Return ¶
func (_c *MockOpProducer_Produce_Call) Return(_a0 error) *MockOpProducer_Produce_Call
func (*MockOpProducer_Produce_Call) Run ¶
func (_c *MockOpProducer_Produce_Call) Run(run func(ctx context.Context, out chan<- ShardReplicationOpAndStatus)) *MockOpProducer_Produce_Call
func (*MockOpProducer_Produce_Call) RunAndReturn ¶
func (_c *MockOpProducer_Produce_Call) RunAndReturn(run func(context.Context, chan<- ShardReplicationOpAndStatus) error) *MockOpProducer_Produce_Call
type MockTimeProvider ¶
MockTimeProvider is an autogenerated mock type for the TimeProvider type
func NewMockTimeProvider ¶
func NewMockTimeProvider(t interface {
mock.TestingT
Cleanup(func())
}) *MockTimeProvider
NewMockTimeProvider creates a new instance of MockTimeProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockTimeProvider) EXPECT ¶
func (_m *MockTimeProvider) EXPECT() *MockTimeProvider_Expecter
func (*MockTimeProvider) Now ¶
func (_m *MockTimeProvider) Now() time.Time
Now provides a mock function with no fields
type MockTimeProvider_Expecter ¶
type MockTimeProvider_Expecter struct {
// contains filtered or unexported fields
}
func (*MockTimeProvider_Expecter) Now ¶
func (_e *MockTimeProvider_Expecter) Now() *MockTimeProvider_Now_Call
Now is a helper method to define mock.On call
type MockTimeProvider_Now_Call ¶
MockTimeProvider_Now_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Now'
func (*MockTimeProvider_Now_Call) Return ¶
func (_c *MockTimeProvider_Now_Call) Return(_a0 time.Time) *MockTimeProvider_Now_Call
func (*MockTimeProvider_Now_Call) Run ¶
func (_c *MockTimeProvider_Now_Call) Run(run func()) *MockTimeProvider_Now_Call
func (*MockTimeProvider_Now_Call) RunAndReturn ¶
func (_c *MockTimeProvider_Now_Call) RunAndReturn(run func() time.Time) *MockTimeProvider_Now_Call
type MockTimer ¶
MockTimer is an autogenerated mock type for the Timer type
func NewMockTimer ¶
NewMockTimer creates a new instance of MockTimer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockTimer) EXPECT ¶
func (_m *MockTimer) EXPECT() *MockTimer_Expecter
type MockTimer_AfterFunc_Call ¶
MockTimer_AfterFunc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AfterFunc'
func (*MockTimer_AfterFunc_Call) Return ¶
func (_c *MockTimer_AfterFunc_Call) Return(_a0 *time.Timer) *MockTimer_AfterFunc_Call
func (*MockTimer_AfterFunc_Call) Run ¶
func (_c *MockTimer_AfterFunc_Call) Run(run func(duration time.Duration, fn func())) *MockTimer_AfterFunc_Call
func (*MockTimer_AfterFunc_Call) RunAndReturn ¶
func (_c *MockTimer_AfterFunc_Call) RunAndReturn(run func(time.Duration, func()) *time.Timer) *MockTimer_AfterFunc_Call
type MockTimer_Expecter ¶
type MockTimer_Expecter struct {
// contains filtered or unexported fields
}
func (*MockTimer_Expecter) AfterFunc ¶
func (_e *MockTimer_Expecter) AfterFunc(duration interface{}, fn interface{}) *MockTimer_AfterFunc_Call
AfterFunc is a helper method to define mock.On call
- duration time.Duration
- fn func()
type OpConsumer ¶
type OpConsumer interface {
// Consume starts consuming operations from the provided channel.
// The consumer processes operations, and a buffered channel is typically used to apply backpressure.
// The consumer should return an error if it fails to process any operation.
Consume(ctx context.Context, in <-chan ShardReplicationOpAndStatus) error
}
OpConsumer is an interface for consuming replication operations.
type OpProducer ¶
type OpProducer interface {
// Produce starts producing replication operations and sends them to the provided channel.
// A buffered channel is typically used for backpressure, but an unbounded channel may cause
// memory growth if the consumer falls behind. Errors during production should be returned.
Produce(ctx context.Context, out chan<- ShardReplicationOpAndStatus) error
}
OpProducer is an interface for producing replication operations.
type OpsCache ¶
type OpsCache struct {
// contains filtered or unexported fields
}
func NewOpsCache ¶
func NewOpsCache() *OpsCache
func (*OpsCache) DeleteHasBeenCancelled ¶
func (*OpsCache) DeleteInFlight ¶
func (*OpsCache) HasBeenCancelled ¶
func (*OpsCache) LoadCancel ¶
func (c *OpsCache) LoadCancel(opId uint64) (context.CancelFunc, bool)
func (*OpsCache) LoadOrStore ¶
func (*OpsCache) StoreCancel ¶
func (c *OpsCache) StoreCancel(opId uint64, cancel context.CancelFunc)
func (*OpsCache) StoreHasBeenCancelled ¶
type OpsGateway ¶
type OpsGateway struct {
// contains filtered or unexported fields
}
func NewOpsGateway ¶
func NewOpsGateway() *OpsGateway
func (*OpsGateway) CanSchedule ¶
func (og *OpsGateway) CanSchedule(opId uint64) (bool, time.Time)
func (*OpsGateway) RegisterFailure ¶
func (og *OpsGateway) RegisterFailure(opId uint64)
func (*OpsGateway) RegisterFinished ¶
func (og *OpsGateway) RegisterFinished(opId uint64)
func (*OpsGateway) ScheduleNow ¶
func (og *OpsGateway) ScheduleNow(opId uint64)
type OpsScheduleMetadata ¶
type OpsScheduleMetadata struct {
// contains filtered or unexported fields
}
func NewOpsScheduleMetadata ¶
func NewOpsScheduleMetadata() *OpsScheduleMetadata
type ShardReplicationEngine ¶
type ShardReplicationEngine struct {
// contains filtered or unexported fields
}
ShardReplicationEngine coordinates the replication of shard data between nodes in a distributed system.
It uses a producer-consumer pattern where replication operations are pulled from a source (e.g., FSM) and dispatched to workers for execution, enabling parallel processing with built-in backpressure implemented by means of a limited channel.
The engine ensures that operations are processed concurrently, but with limits to avoid overloading the system. It also provides mechanisms for graceful shutdown and error handling. The replication engine is responsible for managing the lifecycle of both producer and consumer goroutines that work together to execute replication tasks.
Key responsibilities of this engine include managing buffered channels for backpressure, starting and stopping the replication operation lifecycle, and ensuring that the engine handles concurrent workers without resource exhaustion.
This engine is expected to run in a single node within a cluster, where it processes replication operations relevant to the node with a pull mechanism where an engine running on a certain node is responsible for running replication operations with that node as a target.
func NewShardReplicationEngine ¶
func NewShardReplicationEngine( logger *logrus.Logger, nodeId string, producer OpProducer, consumer OpConsumer, opBufferSize int, maxWorkers int, shutdownTimeout time.Duration, engineMetricCallbacks *metrics.ReplicationEngineCallbacks, ) *ShardReplicationEngine
NewShardReplicationEngine creates a new replication engine
func (*ShardReplicationEngine) IsRunning ¶
func (e *ShardReplicationEngine) IsRunning() bool
IsRunning reports whether the replication engine is currently running.
It returns true if the engine has been started and has not yet shut down.
func (*ShardReplicationEngine) OpChannelCap ¶
func (e *ShardReplicationEngine) OpChannelCap() int
OpChannelCap returns the capacity of the internal operation channel.
This reflects the total number of replication operations the channel can queue before blocking the producer implementing a backpressure mechanism.
func (*ShardReplicationEngine) OpChannelLen ¶
func (e *ShardReplicationEngine) OpChannelLen() int
OpChannelLen returns the current number of operations buffered in the internal channel.
This can be used to monitor the backpressure between the producer and the consumer.
func (*ShardReplicationEngine) Start ¶
func (e *ShardReplicationEngine) Start(ctx context.Context) error
Start runs the replication engine's main loop, including the operation producer and consumer.
It starts two goroutines: one for the OpProducer and one for the OpConsumer. These goroutines communicate through a buffered channel, and the engine coordinates their lifecycle. This method is safe to call only once; if the engine is already running, it logs a warning and returns.
It returns an error if either the producer or consumer fails unexpectedly, or if the context is cancelled.
It is, safe to restart the replication engin using this method, after it has been stopped.
func (*ShardReplicationEngine) Stop ¶
func (e *ShardReplicationEngine) Stop()
Stop signals the replication engine to shut down gracefully.
It safely transitions the engine's running state to false and closes the internal stop channel, which unblocks the main loop in Start() and initiates the shutdown sequence. Calling Stop multiple times is safe; only the first call has an effect. Note that the ops channel is closed in the Start method after waiting for both the producer and consumers to terminate.
func (*ShardReplicationEngine) String ¶
func (e *ShardReplicationEngine) String() string
String returns a string representation of the ShardReplicationEngine, including the node ID that uniquely identifies the engine for a specific node.
The expectation is that each node runs one and only one replication engine, so the string output is helpful for logging or diagnostics to easily identify the engine associated with the node.
type ShardReplicationFSM ¶
type ShardReplicationFSM struct {
// contains filtered or unexported fields
}
func NewShardReplicationFSM ¶
func NewShardReplicationFSM(reg prometheus.Registerer) *ShardReplicationFSM
func (*ShardReplicationFSM) CancelReplication ¶
func (s *ShardReplicationFSM) CancelReplication(c *api.ReplicationCancelRequest) error
func (*ShardReplicationFSM) CancellationComplete ¶
func (s *ShardReplicationFSM) CancellationComplete(c *api.ReplicationCancellationCompleteRequest) error
func (*ShardReplicationFSM) DeleteAllReplications ¶
func (s *ShardReplicationFSM) DeleteAllReplications(c *api.ReplicationDeleteAllRequest) error
func (*ShardReplicationFSM) DeleteReplication ¶
func (s *ShardReplicationFSM) DeleteReplication(c *api.ReplicationDeleteRequest) error
func (*ShardReplicationFSM) DeleteReplicationsByCollection ¶
func (s *ShardReplicationFSM) DeleteReplicationsByCollection(collection string) error
func (*ShardReplicationFSM) DeleteReplicationsByTenants ¶
func (s *ShardReplicationFSM) DeleteReplicationsByTenants(collection string, tenants []string) error
func (*ShardReplicationFSM) FilterOneShardReplicasRead ¶
func (s *ShardReplicationFSM) FilterOneShardReplicasRead(collection string, shard string, shardReplicasLocation []string) []string
func (*ShardReplicationFSM) FilterOneShardReplicasWrite ¶
func (*ShardReplicationFSM) ForceDeleteAll ¶
func (s *ShardReplicationFSM) ForceDeleteAll() error
func (*ShardReplicationFSM) ForceDeleteByCollection ¶
func (s *ShardReplicationFSM) ForceDeleteByCollection(collection string) error
func (*ShardReplicationFSM) ForceDeleteByCollectionAndShard ¶
func (s *ShardReplicationFSM) ForceDeleteByCollectionAndShard(collection, shard string) error
func (*ShardReplicationFSM) ForceDeleteByTargetNode ¶
func (s *ShardReplicationFSM) ForceDeleteByTargetNode(node string) error
func (*ShardReplicationFSM) ForceDeleteByUuid ¶
func (s *ShardReplicationFSM) ForceDeleteByUuid(uuid strfmt.UUID) error
func (*ShardReplicationFSM) GetOpById ¶
func (s *ShardReplicationFSM) GetOpById(id uint64) (ShardReplicationOpAndStatus, bool)
func (*ShardReplicationFSM) GetOpByUuid ¶
func (s *ShardReplicationFSM) GetOpByUuid(uuid strfmt.UUID) (ShardReplicationOpAndStatus, bool)
func (*ShardReplicationFSM) GetOpState ¶
func (s *ShardReplicationFSM) GetOpState(op ShardReplicationOp) (ShardReplicationOpStatus, bool)
func (*ShardReplicationFSM) GetOpsForCollection ¶
func (s *ShardReplicationFSM) GetOpsForCollection(collection string) ([]ShardReplicationOpAndStatus, bool)
func (*ShardReplicationFSM) GetOpsForCollectionAndShard ¶
func (s *ShardReplicationFSM) GetOpsForCollectionAndShard(collection string, shard string) ([]ShardReplicationOpAndStatus, bool)
func (*ShardReplicationFSM) GetOpsForTarget ¶
func (s *ShardReplicationFSM) GetOpsForTarget(node string) []ShardReplicationOp
func (*ShardReplicationFSM) GetOpsForTargetNode ¶
func (s *ShardReplicationFSM) GetOpsForTargetNode(node string) ([]ShardReplicationOpAndStatus, bool)
func (*ShardReplicationFSM) GetReplicationOpUUIDFromId ¶
func (s *ShardReplicationFSM) GetReplicationOpUUIDFromId(id uint64) (strfmt.UUID, error)
func (*ShardReplicationFSM) GetStatusByOps ¶
func (s *ShardReplicationFSM) GetStatusByOps() map[ShardReplicationOp]ShardReplicationOpStatus
func (*ShardReplicationFSM) HasOngoingReplication ¶
func (s *ShardReplicationFSM) HasOngoingReplication(collection string, shard string, replica string) bool
func (*ShardReplicationFSM) RegisterError ¶
func (s *ShardReplicationFSM) RegisterError(c *api.ReplicationRegisterErrorRequest) error
func (*ShardReplicationFSM) RemoveReplicationOp ¶
func (s *ShardReplicationFSM) RemoveReplicationOp(c *api.ReplicationRemoveOpRequest) error
func (*ShardReplicationFSM) Replicate ¶
func (s *ShardReplicationFSM) Replicate(id uint64, c *api.ReplicationReplicateShardRequest) error
func (*ShardReplicationFSM) Restore ¶
func (s *ShardReplicationFSM) Restore(bytes []byte) error
func (*ShardReplicationFSM) SetUnCancellable ¶
func (s *ShardReplicationFSM) SetUnCancellable(id uint64) error
func (*ShardReplicationFSM) Snapshot ¶
func (s *ShardReplicationFSM) Snapshot() ([]byte, error)
func (*ShardReplicationFSM) StoreSchemaVersion ¶
func (s *ShardReplicationFSM) StoreSchemaVersion(c *api.ReplicationStoreSchemaVersionRequest) error
func (*ShardReplicationFSM) UpdateReplicationOpStatus ¶
func (s *ShardReplicationFSM) UpdateReplicationOpStatus(c *api.ReplicationUpdateOpStateRequest) error
type ShardReplicationOp ¶
type ShardReplicationOp struct {
ID uint64
UUID strfmt.UUID
// Targeting information of the replication operation
SourceShard shardFQDN
TargetShard shardFQDN
TransferType api.ShardReplicationTransferType
}
func NewShardReplicationOp ¶
func NewShardReplicationOp(id uint64, sourceNode, targetNode, collectionId, shardId string, transferType api.ShardReplicationTransferType) ShardReplicationOp
func (ShardReplicationOp) MarshalText ¶
func (s ShardReplicationOp) MarshalText() (text []byte, err error)
func (*ShardReplicationOp) UnmarshalText ¶
func (s *ShardReplicationOp) UnmarshalText(text []byte) error
type ShardReplicationOpAndStatus ¶
type ShardReplicationOpAndStatus struct {
Op ShardReplicationOp
Status ShardReplicationOpStatus
}
ShardReplicationOpAndStatus is a struct that contains a ShardReplicationOp and a ShardReplicationOpStatus
func NewShardReplicationOpAndStatus ¶
func NewShardReplicationOpAndStatus(op ShardReplicationOp, status ShardReplicationOpStatus) ShardReplicationOpAndStatus
NewShardReplicationOpAndStatus creates a new ShardReplicationOpAndStatus from op and status
type ShardReplicationOpStatus ¶
type ShardReplicationOpStatus struct {
// SchemaVersion is the minimum schema version that the shard replication operation can safely proceed with
// It's necessary to track this because the schema version is not always the same across multiple nodes due to EC issues with RAFT.
// By communicating it with remote nodes, we can ensure that they will wait for the schema version to be the same or greater before proceeding with the operation.
SchemaVersion uint64
// Current is the current state of the shard replication operation
Current State
// ShouldCancel is a flag indicating that the operation should be cancelled at the earliest possible time
ShouldCancel bool
// ShouldDelete is a flag indicating that the operation should be cancelled at the earliest possible time and then deleted
ShouldDelete bool
// UnCancellable is a flag indicating that an operation is not capable of being cancelled.
// E.g., an op is not cancellable if it is in the DEHYDRATING state after the replica has been added to the sharding state.
UnCancellable bool
// History is the history of the state changes of the shard replication operation
History StateHistory
}
ShardReplicationOpStatus is the status of a shard replication operation as well as the history of the state changes and their associated errors (if any)
func NewShardReplicationStatus ¶
func NewShardReplicationStatus(state api.ShardReplicationState) ShardReplicationOpStatus
NewShardReplicationStatus creates a new ShardReplicationOpStatus initialized with the given state and en empty history
func (*ShardReplicationOpStatus) AddError ¶
func (s *ShardReplicationOpStatus) AddError(error string) error
AddError adds an error to the current state of the shard replication operation
func (*ShardReplicationOpStatus) ChangeState ¶
func (s *ShardReplicationOpStatus) ChangeState(nextState api.ShardReplicationState)
ChangeState changes the state of the shard replication operation to the next state and keeps the previous state in the history
func (*ShardReplicationOpStatus) CompleteCancellation ¶
func (s *ShardReplicationOpStatus) CompleteCancellation()
func (*ShardReplicationOpStatus) GetCurrent ¶
func (s *ShardReplicationOpStatus) GetCurrent() State
GetCurrent returns the current state and errors of the shard replication operation
func (*ShardReplicationOpStatus) GetCurrentState ¶
func (s *ShardReplicationOpStatus) GetCurrentState() api.ShardReplicationState
GetCurrentState returns the current state of the shard replication operation
func (*ShardReplicationOpStatus) GetHistory ¶
func (s *ShardReplicationOpStatus) GetHistory() StateHistory
GetHistory returns the history of the state changes of the shard replication operation
func (*ShardReplicationOpStatus) OnlyCancellation ¶
func (s *ShardReplicationOpStatus) OnlyCancellation() bool
OnlyCancellation returns true if ShouldCancel is true and ShouldDelete is false
func (*ShardReplicationOpStatus) ShouldCleanup ¶
func (s *ShardReplicationOpStatus) ShouldCleanup() bool
ShouldCleanup returns true if the current state is not READY
func (ShardReplicationOpStatus) ShouldConsumeOps ¶
func (s ShardReplicationOpStatus) ShouldConsumeOps() bool
ShouldConsumeOps returns true if the operation should be consumed by the consumer
It checks the following two conditions:
1. The operation is neither cancelled nor ready, meaning that it is still in progress performing some long-running op like hydrating/finalizing
2. The operation is cancelled or ready and should be deleted, meaning that the operation is finished and should be removed from the FSM
func (*ShardReplicationOpStatus) TriggerCancellation ¶
func (s *ShardReplicationOpStatus) TriggerCancellation()
func (*ShardReplicationOpStatus) TriggerDeletion ¶
func (s *ShardReplicationOpStatus) TriggerDeletion()
type State ¶
type State struct {
// State is the current state of the shard replication operation
State api.ShardReplicationState
// Errors is the list of errors that occurred during this state
Errors []string
}
State is the status of a shard replication operation
func (State) ToAPIFormat ¶
func (s State) ToAPIFormat() api.ReplicationDetailsState
ToAPIFormat converts the State to the API format
type StateHistory ¶
type StateHistory []State
StateHistory is the history of the state changes of the shard replication operation Defining this as a type allows us to define methods on it
func (StateHistory) ToAPIFormat ¶
func (sh StateHistory) ToAPIFormat() []api.ReplicationDetailsState
ToAPIFormat converts the StateHistory to the API format