replication

package
v1.33.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2025 License: BSD-3-Clause Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const DELETED = "deleted"

DELETED is a constant representing a temporary deleted state of a replication operation that should not be stored in the FSM.

View Source
const (
	MaxErrors = 50
)

Variables

View Source
var (
	GatewayBackoffMaxInterval   = 15 * time.Second
	GatewayInitialBackoffPeriod = 5 * time.Second
)
View Source
var (
	ErrAlreadyExists = errors.New("already exists")
	ErrNodeNotFound  = errors.New("node not found")
	ErrClassNotFound = errors.New("class not found")
	ErrShardNotFound = errors.New("shard not found")
)
View Source
var ErrBadRequest = errors.New("bad request")
View Source
var ErrMaxErrorsReached = errors.New("max errors reached")
View Source
var ErrShardAlreadyReplicating = errors.New("replica is already being replicated")

Functions

func ValidateReplicationReplicateShard

func ValidateReplicationReplicateShard(schemaReader schema.SchemaReader, c *api.ReplicationReplicateShardRequest) error

ValidateReplicationReplicateShard validates that c is valid given the current state of the schema read using schemaReader

Types

type CopyOpConsumer

type CopyOpConsumer struct {
	// contains filtered or unexported fields
}

CopyOpConsumer is an implementation of the OpConsumer interface that processes replication operations by executing copy operations from a source shard to a target shard. It uses a ReplicaCopier to actually carry out the copy operation. Moreover, it supports configurable backoff, timeout and concurrency limits.

func NewCopyOpConsumer

func NewCopyOpConsumer(
	logger *logrus.Logger,
	leaderClient types.FSMUpdater,
	replicaCopier types.ReplicaCopier,
	nodeId string,
	backoffPolicy backoff.BackOff,
	ongoingOps *OpsCache,
	opTimeout time.Duration,
	maxWorkers int,
	asyncReplicationMinimumWait *runtime.DynamicValue[time.Duration],
	engineOpCallbacks *metrics.ReplicationEngineOpsCallbacks,
	schemaReader schema.SchemaReader,
) *CopyOpConsumer

NewCopyOpConsumer creates a new CopyOpConsumer instance responsible for executing replication operations using a configurable worker pool.

It uses a ReplicaCopier to perform the actual data copy.

func (*CopyOpConsumer) Consume

func (c *CopyOpConsumer) Consume(workerCtx context.Context, in <-chan ShardReplicationOpAndStatus) error

Consume processes replication operations from the input channel, ensuring that only a limited number of consumers are active concurrently based on the maxWorkers value.

type FSMOpProducer

type FSMOpProducer struct {
	// contains filtered or unexported fields
}

FSMOpProducer is an implementation of the OpProducer interface that reads replication operations from a ShardReplicationFSM, which tracks the state of replication operations.

func NewFSMOpProducer

func NewFSMOpProducer(logger *logrus.Logger, fsm *ShardReplicationFSM, pollingInterval time.Duration, nodeId string) *FSMOpProducer

NewFSMOpProducer creates a new FSMOpProducer instance, which periodically polls the ShardReplicationFSM for operations assigned to the given node and pushes them to a channel for consumption by the replication engine.The polling interval controls how often the FSM is queried for replication operations.

Additional configuration can be applied using optional FSMProducerOption functions.

func (*FSMOpProducer) Produce

func (p *FSMOpProducer) Produce(ctx context.Context, out chan<- ShardReplicationOpAndStatus) error

Produce implements the OpProducer interface and starts producing operations for the given node.

It uses a polling mechanism based on time.Ticker to periodically fetch all replication operations that should be executed on the current node. These operations are then sent to the provided output channel to be consumed by the OpConsumer.

The function respects backpressure by using a bounded output channel. If the channel is full (i.e., the consumer is slow or blocked), the producer blocks while trying to send operations. While blocked, any additional ticks from the time.Ticker are dropped, as time.Ticker does not buffer ticks. This means the polling interval is effectively paused while the system is under load.

This behavior is intentional: the producer only generates new work when the system has capacity to process it. Missing some ticks during backpressure is acceptable and avoids accumulating unprocessed work or overloading the system.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(schemaReader schema.SchemaReader, nodeSelector cluster.NodeSelector, reg prometheus.Registerer) *Manager

func (*Manager) CancelReplication

func (m *Manager) CancelReplication(c *cmd.ApplyRequest) error

func (*Manager) DeleteAllReplications

func (m *Manager) DeleteAllReplications(c *cmd.ApplyRequest) error

func (*Manager) DeleteReplication

func (m *Manager) DeleteReplication(c *cmd.ApplyRequest) error

func (*Manager) DeleteReplicationsByCollection

func (m *Manager) DeleteReplicationsByCollection(c *cmd.ApplyRequest) error

func (*Manager) DeleteReplicationsByTenants

func (m *Manager) DeleteReplicationsByTenants(c *cmd.ApplyRequest) error

func (*Manager) ForceDeleteAll

func (m *Manager) ForceDeleteAll(c *cmd.ApplyRequest) error

func (*Manager) ForceDeleteByCollection

func (m *Manager) ForceDeleteByCollection(c *cmd.ApplyRequest) error

func (*Manager) ForceDeleteByCollectionAndShard

func (m *Manager) ForceDeleteByCollectionAndShard(c *cmd.ApplyRequest) error

func (*Manager) ForceDeleteByTargetNode

func (m *Manager) ForceDeleteByTargetNode(c *cmd.ApplyRequest) error

func (*Manager) ForceDeleteByUuid

func (m *Manager) ForceDeleteByUuid(c *cmd.ApplyRequest) error

func (*Manager) GetAllReplicationDetails

func (m *Manager) GetAllReplicationDetails(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) GetReplicationDetailsByCollection

func (m *Manager) GetReplicationDetailsByCollection(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) GetReplicationDetailsByCollectionAndShard

func (m *Manager) GetReplicationDetailsByCollectionAndShard(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) GetReplicationDetailsByReplicationId

func (m *Manager) GetReplicationDetailsByReplicationId(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) GetReplicationDetailsByTargetNode

func (m *Manager) GetReplicationDetailsByTargetNode(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) GetReplicationFSM

func (m *Manager) GetReplicationFSM() *ShardReplicationFSM

func (*Manager) GetReplicationOpUUIDFromId

func (m *Manager) GetReplicationOpUUIDFromId(id uint64) (strfmt.UUID, error)

func (*Manager) GetReplicationOperationState

func (m *Manager) GetReplicationOperationState(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) QueryReplicationScalePlan added in v1.31.20

func (m *Manager) QueryReplicationScalePlan(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) QueryShardingStateByCollection

func (m *Manager) QueryShardingStateByCollection(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) QueryShardingStateByCollectionAndShard

func (m *Manager) QueryShardingStateByCollectionAndShard(c *cmd.QueryRequest) ([]byte, error)

func (*Manager) RegisterError

func (m *Manager) RegisterError(c *cmd.ApplyRequest) error

func (*Manager) RemoveReplicaOp

func (m *Manager) RemoveReplicaOp(c *cmd.ApplyRequest) error

func (*Manager) Replicate

func (m *Manager) Replicate(logId uint64, c *cmd.ApplyRequest) error

func (*Manager) ReplicationCancellationComplete

func (m *Manager) ReplicationCancellationComplete(c *cmd.ApplyRequest) error

func (*Manager) Restore

func (m *Manager) Restore(bytes []byte) error

func (*Manager) Snapshot

func (m *Manager) Snapshot() ([]byte, error)

func (*Manager) StoreSchemaVersion

func (m *Manager) StoreSchemaVersion(c *cmd.ApplyRequest) error

func (*Manager) UpdateReplicateOpState

func (m *Manager) UpdateReplicateOpState(c *cmd.ApplyRequest) error

type MockOpConsumer

type MockOpConsumer struct {
	mock.Mock
}

MockOpConsumer is an autogenerated mock type for the OpConsumer type

func NewMockOpConsumer

func NewMockOpConsumer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockOpConsumer

NewMockOpConsumer creates a new instance of MockOpConsumer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockOpConsumer) Consume

func (_m *MockOpConsumer) Consume(ctx context.Context, in <-chan ShardReplicationOpAndStatus) error

Consume provides a mock function with given fields: ctx, in

func (*MockOpConsumer) EXPECT

type MockOpConsumer_Consume_Call

type MockOpConsumer_Consume_Call struct {
	*mock.Call
}

MockOpConsumer_Consume_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Consume'

func (*MockOpConsumer_Consume_Call) Return

func (*MockOpConsumer_Consume_Call) Run

func (*MockOpConsumer_Consume_Call) RunAndReturn

type MockOpConsumer_Expecter

type MockOpConsumer_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockOpConsumer_Expecter) Consume

func (_e *MockOpConsumer_Expecter) Consume(ctx interface{}, in interface{}) *MockOpConsumer_Consume_Call

Consume is a helper method to define mock.On call

  • ctx context.Context
  • in <-chan ShardReplicationOpAndStatus

type MockOpProducer

type MockOpProducer struct {
	mock.Mock
}

MockOpProducer is an autogenerated mock type for the OpProducer type

func NewMockOpProducer

func NewMockOpProducer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockOpProducer

NewMockOpProducer creates a new instance of MockOpProducer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockOpProducer) EXPECT

func (*MockOpProducer) Produce

func (_m *MockOpProducer) Produce(ctx context.Context, out chan<- ShardReplicationOpAndStatus) error

Produce provides a mock function with given fields: ctx, out

type MockOpProducer_Expecter

type MockOpProducer_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockOpProducer_Expecter) Produce

func (_e *MockOpProducer_Expecter) Produce(ctx interface{}, out interface{}) *MockOpProducer_Produce_Call

Produce is a helper method to define mock.On call

  • ctx context.Context
  • out chan<- ShardReplicationOpAndStatus

type MockOpProducer_Produce_Call

type MockOpProducer_Produce_Call struct {
	*mock.Call
}

MockOpProducer_Produce_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Produce'

func (*MockOpProducer_Produce_Call) Return

func (*MockOpProducer_Produce_Call) Run

func (*MockOpProducer_Produce_Call) RunAndReturn

type MockTimeProvider

type MockTimeProvider struct {
	mock.Mock
}

MockTimeProvider is an autogenerated mock type for the TimeProvider type

func NewMockTimeProvider

func NewMockTimeProvider(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTimeProvider

NewMockTimeProvider creates a new instance of MockTimeProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTimeProvider) EXPECT

func (*MockTimeProvider) Now

func (_m *MockTimeProvider) Now() time.Time

Now provides a mock function with no fields

type MockTimeProvider_Expecter

type MockTimeProvider_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTimeProvider_Expecter) Now

Now is a helper method to define mock.On call

type MockTimeProvider_Now_Call

type MockTimeProvider_Now_Call struct {
	*mock.Call
}

MockTimeProvider_Now_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Now'

func (*MockTimeProvider_Now_Call) Return

func (*MockTimeProvider_Now_Call) Run

func (*MockTimeProvider_Now_Call) RunAndReturn

func (_c *MockTimeProvider_Now_Call) RunAndReturn(run func() time.Time) *MockTimeProvider_Now_Call

type MockTimer

type MockTimer struct {
	mock.Mock
}

MockTimer is an autogenerated mock type for the Timer type

func NewMockTimer

func NewMockTimer(t interface {
	mock.TestingT
	Cleanup(func())
}) *MockTimer

NewMockTimer creates a new instance of MockTimer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockTimer) AfterFunc

func (_m *MockTimer) AfterFunc(duration time.Duration, fn func()) *time.Timer

AfterFunc provides a mock function with given fields: duration, fn

func (*MockTimer) EXPECT

func (_m *MockTimer) EXPECT() *MockTimer_Expecter

type MockTimer_AfterFunc_Call

type MockTimer_AfterFunc_Call struct {
	*mock.Call
}

MockTimer_AfterFunc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AfterFunc'

func (*MockTimer_AfterFunc_Call) Return

func (*MockTimer_AfterFunc_Call) Run

func (_c *MockTimer_AfterFunc_Call) Run(run func(duration time.Duration, fn func())) *MockTimer_AfterFunc_Call

func (*MockTimer_AfterFunc_Call) RunAndReturn

func (_c *MockTimer_AfterFunc_Call) RunAndReturn(run func(time.Duration, func()) *time.Timer) *MockTimer_AfterFunc_Call

type MockTimer_Expecter

type MockTimer_Expecter struct {
	// contains filtered or unexported fields
}

func (*MockTimer_Expecter) AfterFunc

func (_e *MockTimer_Expecter) AfterFunc(duration interface{}, fn interface{}) *MockTimer_AfterFunc_Call

AfterFunc is a helper method to define mock.On call

  • duration time.Duration
  • fn func()

type OpConsumer

type OpConsumer interface {
	// Consume starts consuming operations from the provided channel.
	// The consumer processes operations, and a buffered channel is typically used to apply backpressure.
	// The consumer should return an error if it fails to process any operation.
	Consume(ctx context.Context, in <-chan ShardReplicationOpAndStatus) error
}

OpConsumer is an interface for consuming replication operations.

type OpProducer

type OpProducer interface {
	// Produce starts producing replication operations and sends them to the provided channel.
	// A buffered channel is typically used for backpressure, but an unbounded channel may cause
	// memory growth if the consumer falls behind. Errors during production should be returned.
	Produce(ctx context.Context, out chan<- ShardReplicationOpAndStatus) error
}

OpProducer is an interface for producing replication operations.

type OpsCache

type OpsCache struct {
	// contains filtered or unexported fields
}

func NewOpsCache

func NewOpsCache() *OpsCache

func (*OpsCache) Cancel

func (c *OpsCache) Cancel(opId uint64) bool

func (*OpsCache) CancelAll

func (c *OpsCache) CancelAll()

func (*OpsCache) DeleteHasBeenCancelled

func (c *OpsCache) DeleteHasBeenCancelled(opId uint64)

func (*OpsCache) DeleteInFlight

func (c *OpsCache) DeleteInFlight(opId uint64)

func (*OpsCache) HasBeenCancelled

func (c *OpsCache) HasBeenCancelled(opId uint64) bool

func (*OpsCache) InFlight

func (c *OpsCache) InFlight(opId uint64) bool

func (*OpsCache) LoadCancel

func (c *OpsCache) LoadCancel(opId uint64) (context.CancelFunc, bool)

func (*OpsCache) LoadOrStore

func (c *OpsCache) LoadOrStore(opId uint64) bool

func (*OpsCache) StoreCancel

func (c *OpsCache) StoreCancel(opId uint64, cancel context.CancelFunc)

func (*OpsCache) StoreHasBeenCancelled

func (c *OpsCache) StoreHasBeenCancelled(opId uint64)

type OpsGateway

type OpsGateway struct {
	// contains filtered or unexported fields
}

func NewOpsGateway

func NewOpsGateway() *OpsGateway

func (*OpsGateway) CanSchedule

func (og *OpsGateway) CanSchedule(opId uint64) (bool, time.Time)

func (*OpsGateway) RegisterFailure

func (og *OpsGateway) RegisterFailure(opId uint64)

func (*OpsGateway) RegisterFinished

func (og *OpsGateway) RegisterFinished(opId uint64)

func (*OpsGateway) ScheduleNow

func (og *OpsGateway) ScheduleNow(opId uint64)

type OpsScheduleMetadata

type OpsScheduleMetadata struct {
	// contains filtered or unexported fields
}

func NewOpsScheduleMetadata

func NewOpsScheduleMetadata() *OpsScheduleMetadata

type ShardReplicationEngine

type ShardReplicationEngine struct {
	// contains filtered or unexported fields
}

ShardReplicationEngine coordinates the replication of shard data between nodes in a distributed system.

It uses a producer-consumer pattern where replication operations are pulled from a source (e.g., FSM) and dispatched to workers for execution, enabling parallel processing with built-in backpressure implemented by means of a limited channel.

The engine ensures that operations are processed concurrently, but with limits to avoid overloading the system. It also provides mechanisms for graceful shutdown and error handling. The replication engine is responsible for managing the lifecycle of both producer and consumer goroutines that work together to execute replication tasks.

Key responsibilities of this engine include managing buffered channels for backpressure, starting and stopping the replication operation lifecycle, and ensuring that the engine handles concurrent workers without resource exhaustion.

This engine is expected to run in a single node within a cluster, where it processes replication operations relevant to the node with a pull mechanism where an engine running on a certain node is responsible for running replication operations with that node as a target.

func NewShardReplicationEngine

func NewShardReplicationEngine(
	logger *logrus.Logger,
	nodeId string,
	producer OpProducer,
	consumer OpConsumer,
	opBufferSize int,
	maxWorkers int,
	shutdownTimeout time.Duration,
	engineMetricCallbacks *metrics.ReplicationEngineCallbacks,
) *ShardReplicationEngine

NewShardReplicationEngine creates a new replication engine

func (*ShardReplicationEngine) IsRunning

func (e *ShardReplicationEngine) IsRunning() bool

IsRunning reports whether the replication engine is currently running.

It returns true if the engine has been started and has not yet shut down.

func (*ShardReplicationEngine) OpChannelCap

func (e *ShardReplicationEngine) OpChannelCap() int

OpChannelCap returns the capacity of the internal operation channel.

This reflects the total number of replication operations the channel can queue before blocking the producer implementing a backpressure mechanism.

func (*ShardReplicationEngine) OpChannelLen

func (e *ShardReplicationEngine) OpChannelLen() int

OpChannelLen returns the current number of operations buffered in the internal channel.

This can be used to monitor the backpressure between the producer and the consumer.

func (*ShardReplicationEngine) Start

Start runs the replication engine's main loop, including the operation producer and consumer.

It starts two goroutines: one for the OpProducer and one for the OpConsumer. These goroutines communicate through a buffered channel, and the engine coordinates their lifecycle. This method is safe to call only once; if the engine is already running, it logs a warning and returns.

It returns an error if either the producer or consumer fails unexpectedly, or if the context is cancelled.

It is, safe to restart the replication engin using this method, after it has been stopped.

func (*ShardReplicationEngine) Stop

func (e *ShardReplicationEngine) Stop()

Stop signals the replication engine to shut down gracefully.

It safely transitions the engine's running state to false and closes the internal stop channel, which unblocks the main loop in Start() and initiates the shutdown sequence. Calling Stop multiple times is safe; only the first call has an effect. Note that the ops channel is closed in the Start method after waiting for both the producer and consumers to terminate.

func (*ShardReplicationEngine) String

func (e *ShardReplicationEngine) String() string

String returns a string representation of the ShardReplicationEngine, including the node ID that uniquely identifies the engine for a specific node.

The expectation is that each node runs one and only one replication engine, so the string output is helpful for logging or diagnostics to easily identify the engine associated with the node.

type ShardReplicationFSM

type ShardReplicationFSM struct {
	// contains filtered or unexported fields
}

func NewShardReplicationFSM

func NewShardReplicationFSM(reg prometheus.Registerer) *ShardReplicationFSM

func (*ShardReplicationFSM) CancelReplication

func (s *ShardReplicationFSM) CancelReplication(c *api.ReplicationCancelRequest) error

func (*ShardReplicationFSM) CancellationComplete

func (*ShardReplicationFSM) DeleteAllReplications

func (s *ShardReplicationFSM) DeleteAllReplications(c *api.ReplicationDeleteAllRequest) error

func (*ShardReplicationFSM) DeleteReplication

func (s *ShardReplicationFSM) DeleteReplication(c *api.ReplicationDeleteRequest) error

func (*ShardReplicationFSM) DeleteReplicationsByCollection

func (s *ShardReplicationFSM) DeleteReplicationsByCollection(collection string) error

func (*ShardReplicationFSM) DeleteReplicationsByTenants

func (s *ShardReplicationFSM) DeleteReplicationsByTenants(collection string, tenants []string) error

func (*ShardReplicationFSM) FilterOneShardReplicasRead

func (s *ShardReplicationFSM) FilterOneShardReplicasRead(collection string, shard string, shardReplicasLocation []string) []string

func (*ShardReplicationFSM) FilterOneShardReplicasWrite

func (s *ShardReplicationFSM) FilterOneShardReplicasWrite(collection string, shard string, shardReplicasLocation []string) ([]string, []string)

func (*ShardReplicationFSM) ForceDeleteAll

func (s *ShardReplicationFSM) ForceDeleteAll() error

func (*ShardReplicationFSM) ForceDeleteByCollection

func (s *ShardReplicationFSM) ForceDeleteByCollection(collection string) error

func (*ShardReplicationFSM) ForceDeleteByCollectionAndShard

func (s *ShardReplicationFSM) ForceDeleteByCollectionAndShard(collection, shard string) error

func (*ShardReplicationFSM) ForceDeleteByTargetNode

func (s *ShardReplicationFSM) ForceDeleteByTargetNode(node string) error

func (*ShardReplicationFSM) ForceDeleteByUuid

func (s *ShardReplicationFSM) ForceDeleteByUuid(uuid strfmt.UUID) error

func (*ShardReplicationFSM) GetOpById

func (*ShardReplicationFSM) GetOpByUuid

func (*ShardReplicationFSM) GetOpState

func (*ShardReplicationFSM) GetOpsForCollection

func (s *ShardReplicationFSM) GetOpsForCollection(collection string) ([]ShardReplicationOpAndStatus, bool)

func (*ShardReplicationFSM) GetOpsForCollectionAndShard

func (s *ShardReplicationFSM) GetOpsForCollectionAndShard(collection string, shard string) ([]ShardReplicationOpAndStatus, bool)

func (*ShardReplicationFSM) GetOpsForTarget

func (s *ShardReplicationFSM) GetOpsForTarget(node string) []ShardReplicationOp

func (*ShardReplicationFSM) GetOpsForTargetNode

func (s *ShardReplicationFSM) GetOpsForTargetNode(node string) ([]ShardReplicationOpAndStatus, bool)

func (*ShardReplicationFSM) GetReplicationOpUUIDFromId

func (s *ShardReplicationFSM) GetReplicationOpUUIDFromId(id uint64) (strfmt.UUID, error)

func (*ShardReplicationFSM) GetStatusByOps

func (*ShardReplicationFSM) HasOngoingReplication

func (s *ShardReplicationFSM) HasOngoingReplication(collection string, shard string, replica string) bool

func (*ShardReplicationFSM) RegisterError

func (*ShardReplicationFSM) RemoveReplicationOp

func (s *ShardReplicationFSM) RemoveReplicationOp(c *api.ReplicationRemoveOpRequest) error

func (*ShardReplicationFSM) Replicate

func (*ShardReplicationFSM) Restore

func (s *ShardReplicationFSM) Restore(bytes []byte) error

func (*ShardReplicationFSM) SetUnCancellable

func (s *ShardReplicationFSM) SetUnCancellable(id uint64) error

func (*ShardReplicationFSM) Snapshot

func (s *ShardReplicationFSM) Snapshot() ([]byte, error)

func (*ShardReplicationFSM) StoreSchemaVersion

func (*ShardReplicationFSM) UpdateReplicationOpStatus

func (s *ShardReplicationFSM) UpdateReplicationOpStatus(c *api.ReplicationUpdateOpStateRequest) error

type ShardReplicationOp

type ShardReplicationOp struct {
	ID   uint64
	UUID strfmt.UUID

	// Targeting information of the replication operation
	SourceShard shardFQDN
	TargetShard shardFQDN

	TransferType    api.ShardReplicationTransferType
	StartTimeUnixMs int64 // Unix timestamp when the operation started
}

func NewShardReplicationOp

func NewShardReplicationOp(id uint64, sourceNode, targetNode, collectionId, shardId string, transferType api.ShardReplicationTransferType) ShardReplicationOp

func (ShardReplicationOp) MarshalText

func (s ShardReplicationOp) MarshalText() (text []byte, err error)

func (*ShardReplicationOp) UnmarshalText

func (s *ShardReplicationOp) UnmarshalText(text []byte) error

type ShardReplicationOpAndStatus

type ShardReplicationOpAndStatus struct {
	Op     ShardReplicationOp
	Status ShardReplicationOpStatus
}

ShardReplicationOpAndStatus is a struct that contains a ShardReplicationOp and a ShardReplicationOpStatus

func NewShardReplicationOpAndStatus

func NewShardReplicationOpAndStatus(op ShardReplicationOp, status ShardReplicationOpStatus) ShardReplicationOpAndStatus

NewShardReplicationOpAndStatus creates a new ShardReplicationOpAndStatus from op and status

type ShardReplicationOpStatus

type ShardReplicationOpStatus struct {
	// SchemaVersion is the minimum schema version that the shard replication operation can safely proceed with
	// It's necessary to track this because the schema version is not always the same across multiple nodes due to EC issues with RAFT.
	// By communicating it with remote nodes, we can ensure that they will wait for the schema version to be the same or greater before proceeding with the operation.
	SchemaVersion uint64

	// Current is the current state of the shard replication operation
	Current State

	// ShouldCancel is a flag indicating that the operation should be cancelled at the earliest possible time
	ShouldCancel bool
	// ShouldDelete is a flag indicating that the operation should be cancelled at the earliest possible time and then deleted
	ShouldDelete bool
	// UnCancellable is a flag indicating that an operation is not capable of being cancelled.
	// E.g., an op is not cancellable if it is in the DEHYDRATING state after the replica has been added to the sharding state.
	UnCancellable bool

	// History is the history of the state changes of the shard replication operation
	History StateHistory
}

ShardReplicationOpStatus is the status of a shard replication operation as well as the history of the state changes and their associated errors (if any)

func NewShardReplicationStatus

func NewShardReplicationStatus(state api.ShardReplicationState) ShardReplicationOpStatus

NewShardReplicationStatus creates a new ShardReplicationOpStatus initialized with the given state and en empty history

func (*ShardReplicationOpStatus) AddError

func (s *ShardReplicationOpStatus) AddError(error string, timeUnixMs int64) error

AddError adds an error to the current state of the shard replication operation

func (*ShardReplicationOpStatus) ChangeState

func (s *ShardReplicationOpStatus) ChangeState(nextState api.ShardReplicationState)

ChangeState changes the state of the shard replication operation to the next state and keeps the previous state in the history

func (*ShardReplicationOpStatus) CompleteCancellation

func (s *ShardReplicationOpStatus) CompleteCancellation()

func (*ShardReplicationOpStatus) GetCurrent

func (s *ShardReplicationOpStatus) GetCurrent() State

GetCurrent returns the current state and errors of the shard replication operation

func (*ShardReplicationOpStatus) GetCurrentState

func (s *ShardReplicationOpStatus) GetCurrentState() api.ShardReplicationState

GetCurrentState returns the current state of the shard replication operation

func (*ShardReplicationOpStatus) GetHistory

func (s *ShardReplicationOpStatus) GetHistory() StateHistory

GetHistory returns the history of the state changes of the shard replication operation

func (*ShardReplicationOpStatus) OnlyCancellation

func (s *ShardReplicationOpStatus) OnlyCancellation() bool

OnlyCancellation returns true if ShouldCancel is true and ShouldDelete is false

func (*ShardReplicationOpStatus) ShouldCleanup

func (s *ShardReplicationOpStatus) ShouldCleanup() bool

ShouldCleanup returns true if the current state is not READY

func (ShardReplicationOpStatus) ShouldConsumeOps

func (s ShardReplicationOpStatus) ShouldConsumeOps() bool

ShouldConsumeOps returns true if the operation should be consumed by the consumer

It checks the following two conditions:

1. The operation is neither cancelled nor ready, meaning that it is still in progress performing some long-running op like hydrating/finalizing

2. The operation is cancelled or ready and should be deleted, meaning that the operation is finished and should be removed from the FSM

func (*ShardReplicationOpStatus) TriggerCancellation

func (s *ShardReplicationOpStatus) TriggerCancellation()

func (*ShardReplicationOpStatus) TriggerDeletion

func (s *ShardReplicationOpStatus) TriggerDeletion()

type State

type State struct {
	// State is the current state of the shard replication operation
	State api.ShardReplicationState
	// Errors is the list of errors that occurred during this state
	Errors []api.ReplicationDetailsError
	// Ms is the Unix timestamp in milliseconds when the state was first entered
	StartTimeUnixMs int64
}

State is the status of a shard replication operation

func (State) ToAPIFormat

func (s State) ToAPIFormat() api.ReplicationDetailsState

ToAPIFormat converts the State to the API format

type StateHistory

type StateHistory []State

StateHistory is the history of the state changes of the shard replication operation Defining this as a type allows us to define methods on it

func (StateHistory) ToAPIFormat

func (sh StateHistory) ToAPIFormat() []api.ReplicationDetailsState

ToAPIFormat converts the StateHistory to the API format

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL