Documentation
¶
Overview ¶
Package replication is a generated GoMock package.
Package replication is a generated GoMock package.
Index ¶
- Variables
- type DLQHandler
- type DynamicTaskBatchSizer
- type MetricsEmitterImpl
- type MockTaskExecutor
- type MockTaskExecutorMockRecorder
- type MockTaskFetcher
- func (m *MockTaskFetcher) EXPECT() *MockTaskFetcherMockRecorder
- func (m *MockTaskFetcher) GetRateLimiter() quotas.Limiter
- func (m *MockTaskFetcher) GetRequestChan(shardID int) chan<- *request
- func (m *MockTaskFetcher) GetSourceCluster() string
- func (m *MockTaskFetcher) Start()
- func (m *MockTaskFetcher) Stop()
- type MockTaskFetcherMockRecorder
- func (mr *MockTaskFetcherMockRecorder) GetRateLimiter() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) GetRequestChan(shardID any) *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) Start() *gomock.Call
- func (mr *MockTaskFetcherMockRecorder) Stop() *gomock.Call
- type MockTaskFetchers
- type MockTaskFetchersMockRecorder
- type TaskAckManager
- type TaskExecutor
- type TaskFetcher
- type TaskFetchers
- type TaskHydrator
- type TaskProcessor
- type TaskReader
- type TaskStore
Constants ¶
This section is empty.
Variables ¶
var ErrUnknownCluster = errors.New("unknown cluster")
ErrUnknownCluster is returned when given cluster is not defined in cluster metadata
var ( // ErrUnknownReplicationTask is the error to indicate unknown replication task type ErrUnknownReplicationTask = &types.BadRequestError{Message: "unknown replication task"} )
Functions ¶
This section is empty.
Types ¶
type DLQHandler ¶
type DLQHandler interface {
common.Daemon
GetMessageCount(
ctx context.Context,
forceFetch bool,
) (map[string]int64, error)
ReadMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error)
PurgeMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
) error
MergeMessages(
ctx context.Context,
sourceCluster string,
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]byte, error)
}
DLQHandler is the interface handles replication DLQ messages
func NewDLQHandler ¶
func NewDLQHandler( shard shard.Context, taskExecutors map[string]TaskExecutor, ) DLQHandler
NewDLQHandler initialize the replication message DLQ handler
type DynamicTaskBatchSizer ¶ added in v1.3.0
type DynamicTaskBatchSizer interface {
// contains filtered or unexported methods
}
DynamicTaskBatchSizer is responsible for the batch size used to retrieve ReplicationTasks by TaskAckManager It adjusts the task batch size based on the error and the getTasksResult, and use the following rules:
If there is an error, decrease the task batch size. In case of an increased load to the database, we should reduce the load to the database Emitted metric with tag reason:"error"
If the task batch size is shrunk, decrease the task batch size. The payload size of messages is too big, so we should decrease the number of messages to be sure that future messages will not be shrunk Emitted metric with tag reason:"shrunk"
If the read level of a passive cluster has not been changed and there are no fetched tasks, not change the task batch size. There is no need to change because the replication is not stuck, there just are no new tasks Metric is not emitted
If the read level of a passive cluster has not been changed and if there are fetched tasks, and number of previously fetched tasks is not zero, decrease the task batch size. The replication is stuck on the passive side Emitted metric with tag reason:"possible_stuck"
If the read level of a passive cluster has not been changed and if there are fetched tasks, and number of previously fetched tasks is zero, not change the task batch size. The replication is not stuck, and there are new tasks to be replicated Metric is not emitted
If the read level of a passive cluster has been changed and if there are more tasks in db, increase the task batch size. We should retrieve the maximum possible value at the next time, as there are more tasks to be replicated Emitted metric with tag reason:"more_tasks"
If the read level of a passive cluster has been changed and if there are no more tasks in db, not change the size. The existing size is already enough, and there are no more tasks to be replicated Metric is not emitted
func NewDynamicTaskBatchSizer ¶ added in v1.3.0
func NewDynamicTaskBatchSizer(shardID int, logger log.Logger, config *config.Config, metricsClient metrics.Client) DynamicTaskBatchSizer
NewDynamicTaskBatchSizer creates a new dynamicTaskBatchSizerImpl
type MetricsEmitterImpl ¶ added in v0.25.0
type MetricsEmitterImpl struct {
// contains filtered or unexported fields
}
MetricsEmitterImpl is responsible for emitting source side replication metrics occasionally.
func NewMetricsEmitter ¶ added in v0.25.0
func NewMetricsEmitter( shardID int, shardData metricsEmitterShardData, reader taskReader, metricsClient metrics.Client, ) *MetricsEmitterImpl
NewMetricsEmitter creates a new metrics emitter, which starts a goroutine to emit replication metrics occasionally.
func (*MetricsEmitterImpl) Start ¶ added in v0.25.0
func (m *MetricsEmitterImpl) Start()
func (*MetricsEmitterImpl) Stop ¶ added in v0.25.0
func (m *MetricsEmitterImpl) Stop()
type MockTaskExecutor ¶
type MockTaskExecutor struct {
// contains filtered or unexported fields
}
MockTaskExecutor is a mock of TaskExecutor interface.
func NewMockTaskExecutor ¶
func NewMockTaskExecutor(ctrl *gomock.Controller) *MockTaskExecutor
NewMockTaskExecutor creates a new mock instance.
func (*MockTaskExecutor) EXPECT ¶
func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockTaskExecutorMockRecorder ¶
type MockTaskExecutorMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskExecutorMockRecorder is the mock recorder for MockTaskExecutor.
type MockTaskFetcher ¶
type MockTaskFetcher struct {
// contains filtered or unexported fields
}
MockTaskFetcher is a mock of TaskFetcher interface.
func NewMockTaskFetcher ¶
func NewMockTaskFetcher(ctrl *gomock.Controller) *MockTaskFetcher
NewMockTaskFetcher creates a new mock instance.
func (*MockTaskFetcher) EXPECT ¶
func (m *MockTaskFetcher) EXPECT() *MockTaskFetcherMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockTaskFetcher) GetRateLimiter ¶ added in v0.14.0
func (m *MockTaskFetcher) GetRateLimiter() quotas.Limiter
GetRateLimiter mocks base method.
func (*MockTaskFetcher) GetRequestChan ¶
func (m *MockTaskFetcher) GetRequestChan(shardID int) chan<- *request
GetRequestChan mocks base method.
func (*MockTaskFetcher) GetSourceCluster ¶
func (m *MockTaskFetcher) GetSourceCluster() string
GetSourceCluster mocks base method.
type MockTaskFetcherMockRecorder ¶
type MockTaskFetcherMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskFetcherMockRecorder is the mock recorder for MockTaskFetcher.
func (*MockTaskFetcherMockRecorder) GetRateLimiter ¶ added in v0.14.0
func (mr *MockTaskFetcherMockRecorder) GetRateLimiter() *gomock.Call
GetRateLimiter indicates an expected call of GetRateLimiter.
func (*MockTaskFetcherMockRecorder) GetRequestChan ¶
func (mr *MockTaskFetcherMockRecorder) GetRequestChan(shardID any) *gomock.Call
GetRequestChan indicates an expected call of GetRequestChan.
func (*MockTaskFetcherMockRecorder) GetSourceCluster ¶
func (mr *MockTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call
GetSourceCluster indicates an expected call of GetSourceCluster.
func (*MockTaskFetcherMockRecorder) Start ¶
func (mr *MockTaskFetcherMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockTaskFetcherMockRecorder) Stop ¶
func (mr *MockTaskFetcherMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type MockTaskFetchers ¶
type MockTaskFetchers struct {
// contains filtered or unexported fields
}
MockTaskFetchers is a mock of TaskFetchers interface.
func NewMockTaskFetchers ¶
func NewMockTaskFetchers(ctrl *gomock.Controller) *MockTaskFetchers
NewMockTaskFetchers creates a new mock instance.
func (*MockTaskFetchers) EXPECT ¶
func (m *MockTaskFetchers) EXPECT() *MockTaskFetchersMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockTaskFetchers) GetFetchers ¶
func (m *MockTaskFetchers) GetFetchers() []TaskFetcher
GetFetchers mocks base method.
type MockTaskFetchersMockRecorder ¶
type MockTaskFetchersMockRecorder struct {
// contains filtered or unexported fields
}
MockTaskFetchersMockRecorder is the mock recorder for MockTaskFetchers.
func (*MockTaskFetchersMockRecorder) GetFetchers ¶
func (mr *MockTaskFetchersMockRecorder) GetFetchers() *gomock.Call
GetFetchers indicates an expected call of GetFetchers.
func (*MockTaskFetchersMockRecorder) Start ¶
func (mr *MockTaskFetchersMockRecorder) Start() *gomock.Call
Start indicates an expected call of Start.
func (*MockTaskFetchersMockRecorder) Stop ¶
func (mr *MockTaskFetchersMockRecorder) Stop() *gomock.Call
Stop indicates an expected call of Stop.
type TaskAckManager ¶ added in v0.15.0
type TaskAckManager struct {
// contains filtered or unexported fields
}
TaskAckManager is the ack manager for replication tasks
func NewTaskAckManager ¶ added in v0.15.0
func NewTaskAckManager( shardID int, ackLevels ackLevelStore, metricsClient metrics.Client, logger log.Logger, reader taskReader, store *TaskStore, timeSource clock.TimeSource, config *config.Config, replicationMessagesSizeFn types.ReplicationMessagesSizeFn, dynamicTaskBatchSizer DynamicTaskBatchSizer, ) TaskAckManager
NewTaskAckManager initializes a new replication task ack manager
func (*TaskAckManager) GetTasks ¶ added in v0.15.0
func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, lastReadTaskID int64) (_ *types.ReplicationMessages, err error)
type TaskExecutor ¶
type TaskExecutor interface {
// contains filtered or unexported methods
}
TaskExecutor is the executor for replication task
func NewTaskExecutor ¶
func NewTaskExecutor( sourceCluster string, shard shard.Context, domainCache cache.DomainCache, historyResender ndc.HistoryResender, historyEngine engine.Engine, metricsClient metrics.Client, logger log.Logger, ) TaskExecutor
NewTaskExecutor creates an replication task executor The executor uses by 1) DLQ replication task handler 2) history replication task processor
type TaskFetcher ¶
type TaskFetcher interface {
common.Daemon
GetSourceCluster() string
GetRequestChan(shardID int) chan<- *request
GetRateLimiter() quotas.Limiter
}
TaskFetcher is responsible for fetching replication messages from remote DC.
type TaskFetchers ¶
type TaskFetchers interface {
common.Daemon
GetFetchers() []TaskFetcher
}
TaskFetchers is a group of fetchers, one per source DC.
type TaskHydrator ¶ added in v0.25.0
type TaskHydrator struct {
// contains filtered or unexported fields
}
TaskHydrator will enrich replication task with additional information from mutable state and history events. Mutable state and history providers can be either in-memory or persistence based implementations; depending whether we have available data already or need to load it.
func NewDeferredTaskHydrator ¶ added in v0.25.0
func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache execution.Cache, domains domainCache) TaskHydrator
NewDeferredTaskHydrator will enrich replication tasks with additional information that is not available on hand, but is rather loaded in a deferred way later from a database and cache.
func NewImmediateTaskHydrator ¶ added in v0.25.0
func NewImmediateTaskHydrator(isRunning bool, vh *persistence.VersionHistories, activities map[int64]*persistence.ActivityInfo, blob, nextBlob *persistence.DataBlob) TaskHydrator
NewImmediateTaskHydrator will enrich replication tasks with additional information that is immediately available.
func (TaskHydrator) Hydrate ¶ added in v0.25.0
func (h TaskHydrator) Hydrate(ctx context.Context, task persistence.Task) (retTask *types.ReplicationTask, retErr error)
Hydrate will enrich replication task with additional information from mutable state and history events.
type TaskProcessor ¶
TaskProcessor is responsible for processing replication tasks for a shard.
func NewTaskProcessor ¶
func NewTaskProcessor( shard shard.Context, historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, taskFetcher TaskFetcher, taskExecutor TaskExecutor, clock clock.TimeSource, ) TaskProcessor
NewTaskProcessor creates a new replication task processor.
type TaskReader ¶ added in v1.3.0
type TaskReader struct {
// contains filtered or unexported fields
}
TaskReader will read replication tasks from database
func NewTaskReader ¶ added in v1.3.0
func NewTaskReader(shardID int, executionManager persistence.ExecutionManager) *TaskReader
NewTaskReader creates new TaskReader
type TaskStore ¶ added in v0.25.0
type TaskStore struct {
// contains filtered or unexported fields
}
TaskStore is a component that hydrates and caches replication messages so that they can be reused across several polling source clusters. It also exposes public Put method. This allows pre-store already hydrated messages at the end of successful transaction, saving a DB call to fetch history events.
TaskStore uses a separate cache per each source cluster allowing messages to be fetched at different rates. Once a cache becomes full it will not accept further messages for that cluster. Later those messages be fetched from DB and hydrated again. A cache stores only a pointer to the message. It is hydrates once and shared across caches. Cluster acknowledging the message will remove it from that corresponding cache. Once all clusters acknowledge it, no more references will be held, and GC will eventually pick it up.
func NewTaskStore ¶ added in v0.25.0
func NewTaskStore( config *config.Config, clusterMetadata cluster.Metadata, domains domainCache, metricsClient metrics.Client, logger log.Logger, hydrator taskHydrator, budgetManager cache.Manager, shardID int, ) *TaskStore
NewTaskStore create new instance of TaskStore
func (*TaskStore) Ack ¶ added in v0.25.0
Ack will acknowledge replication message for a given cluster. This will result in all messages removed from the cache up to a given lastTaskID.
func (*TaskStore) Get ¶ added in v0.25.0
func (m *TaskStore) Get(ctx context.Context, cluster string, info persistence.Task) (*types.ReplicationTask, error)
Get will return a hydrated replication message for a given cluster based on raw task info. It will either return it immediately from cache or hydrate it, store in cache and then return.
Returned task may be nil. This may be due domain not existing in a given cluster or replication message is not longer relevant. Either case is valid and such replication message should be ignored and not returned in the response.
func (*TaskStore) Put ¶ added in v0.25.0
func (m *TaskStore) Put(task *types.ReplicationTask)
Put will try to store hydrated replication to all cluster caches. Tasks may not be relevant, as domain is not enabled in some clusters. Ignore task for that cluster. Some clusters may already have full cache. Ignore the task, it will be fetched and hydrated again later. Some clusters may have already acknowledged such task. Ignore task, it is no longer relevant for such cluster.