replication

package
v1.3.7-prerelease21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2025 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Overview

Package replication is a generated GoMock package.

Package replication is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var ErrUnknownCluster = errors.New("unknown cluster")

ErrUnknownCluster is returned when given cluster is not defined in cluster metadata

View Source
var (
	// ErrUnknownReplicationTask is the error to indicate unknown replication task type
	ErrUnknownReplicationTask = &types.BadRequestError{Message: "unknown replication task"}
)

Functions

This section is empty.

Types

type DLQHandler

type DLQHandler interface {
	common.Daemon

	GetMessageCount(
		ctx context.Context,
		forceFetch bool,
	) (map[string]int64, error)
	ReadMessages(
		ctx context.Context,
		sourceCluster string,
		lastMessageID int64,
		pageSize int,
		pageToken []byte,
	) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error)
	PurgeMessages(
		ctx context.Context,
		sourceCluster string,
		lastMessageID int64,
	) error
	MergeMessages(
		ctx context.Context,
		sourceCluster string,
		lastMessageID int64,
		pageSize int,
		pageToken []byte,
	) ([]byte, error)
}

DLQHandler is the interface handles replication DLQ messages

func NewDLQHandler

func NewDLQHandler(
	shard shard.Context,
	taskExecutors map[string]TaskExecutor,
) DLQHandler

NewDLQHandler initialize the replication message DLQ handler

type DynamicTaskBatchSizer added in v1.3.0

type DynamicTaskBatchSizer interface {
	// contains filtered or unexported methods
}

DynamicTaskBatchSizer is responsible for the batch size used to retrieve ReplicationTasks by TaskAckManager It adjusts the task batch size based on the error and the getTasksResult, and use the following rules:

  1. If there is an error, decrease the task batch size. In case of an increased load to the database, we should reduce the load to the database Emitted metric with tag reason:"error"

  2. If the task batch size is shrunk, decrease the task batch size. The payload size of messages is too big, so we should decrease the number of messages to be sure that future messages will not be shrunk Emitted metric with tag reason:"shrunk"

  3. If the read level of a passive cluster has not been changed and there are no fetched tasks, not change the task batch size. There is no need to change because the replication is not stuck, there just are no new tasks Metric is not emitted

  4. If the read level of a passive cluster has not been changed and if there are fetched tasks, and number of previously fetched tasks is not zero, decrease the task batch size. The replication is stuck on the passive side Emitted metric with tag reason:"possible_stuck"

  5. If the read level of a passive cluster has not been changed and if there are fetched tasks, and number of previously fetched tasks is zero, not change the task batch size. The replication is not stuck, and there are new tasks to be replicated Metric is not emitted

  6. If the read level of a passive cluster has been changed and if there are more tasks in db, increase the task batch size. We should retrieve the maximum possible value at the next time, as there are more tasks to be replicated Emitted metric with tag reason:"more_tasks"

  7. If the read level of a passive cluster has been changed and if there are no more tasks in db, not change the size. The existing size is already enough, and there are no more tasks to be replicated Metric is not emitted

func NewDynamicTaskBatchSizer added in v1.3.0

func NewDynamicTaskBatchSizer(shardID int, logger log.Logger, config *config.Config, metricsClient metrics.Client) DynamicTaskBatchSizer

NewDynamicTaskBatchSizer creates a new dynamicTaskBatchSizerImpl

type MetricsEmitterImpl added in v0.25.0

type MetricsEmitterImpl struct {
	// contains filtered or unexported fields
}

MetricsEmitterImpl is responsible for emitting source side replication metrics occasionally.

func NewMetricsEmitter added in v0.25.0

func NewMetricsEmitter(
	shardID int,
	shardData metricsEmitterShardData,
	reader taskReader,
	metricsClient metrics.Client,
) *MetricsEmitterImpl

NewMetricsEmitter creates a new metrics emitter, which starts a goroutine to emit replication metrics occasionally.

func (*MetricsEmitterImpl) Start added in v0.25.0

func (m *MetricsEmitterImpl) Start()

func (*MetricsEmitterImpl) Stop added in v0.25.0

func (m *MetricsEmitterImpl) Stop()

type MockTaskExecutor

type MockTaskExecutor struct {
	// contains filtered or unexported fields
}

MockTaskExecutor is a mock of TaskExecutor interface.

func NewMockTaskExecutor

func NewMockTaskExecutor(ctrl *gomock.Controller) *MockTaskExecutor

NewMockTaskExecutor creates a new mock instance.

func (*MockTaskExecutor) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

type MockTaskExecutorMockRecorder

type MockTaskExecutorMockRecorder struct {
	// contains filtered or unexported fields
}

MockTaskExecutorMockRecorder is the mock recorder for MockTaskExecutor.

type MockTaskFetcher

type MockTaskFetcher struct {
	// contains filtered or unexported fields
}

MockTaskFetcher is a mock of TaskFetcher interface.

func NewMockTaskFetcher

func NewMockTaskFetcher(ctrl *gomock.Controller) *MockTaskFetcher

NewMockTaskFetcher creates a new mock instance.

func (*MockTaskFetcher) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockTaskFetcher) GetRateLimiter added in v0.14.0

func (m *MockTaskFetcher) GetRateLimiter() quotas.Limiter

GetRateLimiter mocks base method.

func (*MockTaskFetcher) GetRequestChan

func (m *MockTaskFetcher) GetRequestChan(shardID int) chan<- *request

GetRequestChan mocks base method.

func (*MockTaskFetcher) GetSourceCluster

func (m *MockTaskFetcher) GetSourceCluster() string

GetSourceCluster mocks base method.

func (*MockTaskFetcher) Start

func (m *MockTaskFetcher) Start()

Start mocks base method.

func (*MockTaskFetcher) Stop

func (m *MockTaskFetcher) Stop()

Stop mocks base method.

type MockTaskFetcherMockRecorder

type MockTaskFetcherMockRecorder struct {
	// contains filtered or unexported fields
}

MockTaskFetcherMockRecorder is the mock recorder for MockTaskFetcher.

func (*MockTaskFetcherMockRecorder) GetRateLimiter added in v0.14.0

func (mr *MockTaskFetcherMockRecorder) GetRateLimiter() *gomock.Call

GetRateLimiter indicates an expected call of GetRateLimiter.

func (*MockTaskFetcherMockRecorder) GetRequestChan

func (mr *MockTaskFetcherMockRecorder) GetRequestChan(shardID any) *gomock.Call

GetRequestChan indicates an expected call of GetRequestChan.

func (*MockTaskFetcherMockRecorder) GetSourceCluster

func (mr *MockTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call

GetSourceCluster indicates an expected call of GetSourceCluster.

func (*MockTaskFetcherMockRecorder) Start

Start indicates an expected call of Start.

func (*MockTaskFetcherMockRecorder) Stop

Stop indicates an expected call of Stop.

type MockTaskFetchers

type MockTaskFetchers struct {
	// contains filtered or unexported fields
}

MockTaskFetchers is a mock of TaskFetchers interface.

func NewMockTaskFetchers

func NewMockTaskFetchers(ctrl *gomock.Controller) *MockTaskFetchers

NewMockTaskFetchers creates a new mock instance.

func (*MockTaskFetchers) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockTaskFetchers) GetFetchers

func (m *MockTaskFetchers) GetFetchers() []TaskFetcher

GetFetchers mocks base method.

func (*MockTaskFetchers) Start

func (m *MockTaskFetchers) Start()

Start mocks base method.

func (*MockTaskFetchers) Stop

func (m *MockTaskFetchers) Stop()

Stop mocks base method.

type MockTaskFetchersMockRecorder

type MockTaskFetchersMockRecorder struct {
	// contains filtered or unexported fields
}

MockTaskFetchersMockRecorder is the mock recorder for MockTaskFetchers.

func (*MockTaskFetchersMockRecorder) GetFetchers

func (mr *MockTaskFetchersMockRecorder) GetFetchers() *gomock.Call

GetFetchers indicates an expected call of GetFetchers.

func (*MockTaskFetchersMockRecorder) Start

Start indicates an expected call of Start.

func (*MockTaskFetchersMockRecorder) Stop

Stop indicates an expected call of Stop.

type TaskAckManager added in v0.15.0

type TaskAckManager struct {
	// contains filtered or unexported fields
}

TaskAckManager is the ack manager for replication tasks

func NewTaskAckManager added in v0.15.0

func NewTaskAckManager(
	shardID int,
	ackLevels ackLevelStore,
	metricsClient metrics.Client,
	logger log.Logger,
	reader taskReader,
	store *TaskStore,
	timeSource clock.TimeSource,
	config *config.Config,
	replicationMessagesSizeFn types.ReplicationMessagesSizeFn,
	dynamicTaskBatchSizer DynamicTaskBatchSizer,
) TaskAckManager

NewTaskAckManager initializes a new replication task ack manager

func (*TaskAckManager) GetTasks added in v0.15.0

func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, lastReadTaskID int64) (_ *types.ReplicationMessages, err error)

type TaskExecutor

type TaskExecutor interface {
	// contains filtered or unexported methods
}

TaskExecutor is the executor for replication task

func NewTaskExecutor

func NewTaskExecutor(
	sourceCluster string,
	shard shard.Context,
	domainCache cache.DomainCache,
	historyResender ndc.HistoryResender,
	historyEngine engine.Engine,
	metricsClient metrics.Client,
	logger log.Logger,
) TaskExecutor

NewTaskExecutor creates an replication task executor The executor uses by 1) DLQ replication task handler 2) history replication task processor

type TaskFetcher

type TaskFetcher interface {
	common.Daemon

	GetSourceCluster() string
	GetRequestChan(shardID int) chan<- *request
	GetRateLimiter() quotas.Limiter
}

TaskFetcher is responsible for fetching replication messages from remote DC.

type TaskFetchers

type TaskFetchers interface {
	common.Daemon

	GetFetchers() []TaskFetcher
}

TaskFetchers is a group of fetchers, one per source DC.

func NewTaskFetchers

func NewTaskFetchers(
	logger log.Logger,
	config *config.Config,
	clusterMetadata cluster.Metadata,
	clientBean client.Bean,
	metricsClient metrics.Client,
) (TaskFetchers, error)

NewTaskFetchers creates an instance of ReplicationTaskFetchers with given configs.

type TaskHydrator added in v0.25.0

type TaskHydrator struct {
	// contains filtered or unexported fields
}

TaskHydrator will enrich replication task with additional information from mutable state and history events. Mutable state and history providers can be either in-memory or persistence based implementations; depending whether we have available data already or need to load it.

func NewDeferredTaskHydrator added in v0.25.0

func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache execution.Cache, domains domainCache) TaskHydrator

NewDeferredTaskHydrator will enrich replication tasks with additional information that is not available on hand, but is rather loaded in a deferred way later from a database and cache.

func NewImmediateTaskHydrator added in v0.25.0

func NewImmediateTaskHydrator(isRunning bool, vh *persistence.VersionHistories, activities map[int64]*persistence.ActivityInfo, blob, nextBlob *persistence.DataBlob) TaskHydrator

NewImmediateTaskHydrator will enrich replication tasks with additional information that is immediately available.

func (TaskHydrator) Hydrate added in v0.25.0

func (h TaskHydrator) Hydrate(ctx context.Context, task persistence.Task) (retTask *types.ReplicationTask, retErr error)

Hydrate will enrich replication task with additional information from mutable state and history events.

type TaskProcessor

type TaskProcessor interface {
	common.Daemon
}

TaskProcessor is responsible for processing replication tasks for a shard.

func NewTaskProcessor

func NewTaskProcessor(
	shard shard.Context,
	historyEngine engine.Engine,
	config *config.Config,
	metricsClient metrics.Client,
	taskFetcher TaskFetcher,
	taskExecutor TaskExecutor,
	clock clock.TimeSource,
) TaskProcessor

NewTaskProcessor creates a new replication task processor.

type TaskReader added in v1.3.0

type TaskReader struct {
	// contains filtered or unexported fields
}

TaskReader will read replication tasks from database

func NewTaskReader added in v1.3.0

func NewTaskReader(shardID int, executionManager persistence.ExecutionManager) *TaskReader

NewTaskReader creates new TaskReader

func (*TaskReader) Read added in v1.3.0

func (r *TaskReader) Read(ctx context.Context, readLevel int64, maxReadLevel int64, batchSize int) ([]persistence.Task, bool, error)

Read reads and returns replications tasks from readLevel to maxReadLevel

type TaskStore added in v0.25.0

type TaskStore struct {
	// contains filtered or unexported fields
}

TaskStore is a component that hydrates and caches replication messages so that they can be reused across several polling source clusters. It also exposes public Put method. This allows pre-store already hydrated messages at the end of successful transaction, saving a DB call to fetch history events.

TaskStore uses a separate cache per each source cluster allowing messages to be fetched at different rates. Once a cache becomes full it will not accept further messages for that cluster. Later those messages be fetched from DB and hydrated again. A cache stores only a pointer to the message. It is hydrates once and shared across caches. Cluster acknowledging the message will remove it from that corresponding cache. Once all clusters acknowledge it, no more references will be held, and GC will eventually pick it up.

func NewTaskStore added in v0.25.0

func NewTaskStore(
	config *config.Config,
	clusterMetadata cluster.Metadata,
	domains domainCache,
	metricsClient metrics.Client,
	logger log.Logger,
	hydrator taskHydrator,
	budgetManager cache.Manager,
	shardID int,
) *TaskStore

NewTaskStore create new instance of TaskStore

func (*TaskStore) Ack added in v0.25.0

func (m *TaskStore) Ack(cluster string, lastTaskID int64) error

Ack will acknowledge replication message for a given cluster. This will result in all messages removed from the cache up to a given lastTaskID.

func (*TaskStore) Get added in v0.25.0

func (m *TaskStore) Get(ctx context.Context, cluster string, info persistence.Task) (*types.ReplicationTask, error)

Get will return a hydrated replication message for a given cluster based on raw task info. It will either return it immediately from cache or hydrate it, store in cache and then return.

Returned task may be nil. This may be due domain not existing in a given cluster or replication message is not longer relevant. Either case is valid and such replication message should be ignored and not returned in the response.

func (*TaskStore) Put added in v0.25.0

func (m *TaskStore) Put(task *types.ReplicationTask)

Put will try to store hydrated replication to all cluster caches. Tasks may not be relevant, as domain is not enabled in some clusters. Ignore task for that cluster. Some clusters may already have full cache. Ignore the task, it will be fetched and hydrated again later. Some clusters may have already acknowledged such task. Ignore task, it is no longer relevant for such cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL