migration

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	NamespaceTagName           = "namespace"
	ForceReplicationRpsTagName = "force_replication_rps"
)
View Source
var Module = fx.Options(
	fx.Provide(NewResult),
	fx.Provide(workflowVerifierProvider),
)

Functions

func ForceReplicationWorkflow

func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParams) error

func ForceReplicationWorkflowV2

func ForceReplicationWorkflowV2(ctx workflow.Context, params ForceReplicationParams) error

func NamespaceHandoverWorkflow

func NamespaceHandoverWorkflow(ctx workflow.Context, params NamespaceHandoverParams) (retErr error)

func NamespaceHandoverWorkflowV2

func NamespaceHandoverWorkflowV2(ctx workflow.Context, params NamespaceHandoverParams) (retErr error)

func NewResult

func NewResult(params initParams) fxResult

Types

type CatchUpOutput

type CatchUpOutput struct{}

func CatchupWorkflow

func CatchupWorkflow(ctx workflow.Context, params CatchUpParams) (CatchUpOutput, error)

type CatchUpParams

type CatchUpParams struct {
	Namespace      string
	CatchupCluster string
	TargetCluster  string
}

type ExecutionInfo

type ExecutionInfo struct {
	BusinessID  string `json:"business_id,omitempty"`
	RunID       string `json:"run_id,omitempty"`
	ArchetypeID uint32 `json:"archetype_id,omitempty"`
}

func (*ExecutionInfo) UnmarshalJSON

func (e *ExecutionInfo) UnmarshalJSON(data []byte) error

type ForceReplicationOutput

type ForceReplicationOutput struct {
}

type ForceReplicationParams

type ForceReplicationParams struct {
	Namespace               string `validate:"required"`
	Query                   string `validate:"required"` // query to list workflows for replication
	ConcurrentActivityCount int
	OverallRps              float64 // RPS for enqueuing of replication tasks
	GetParentInfoRPS        float64 // RPS for getting parent child info
	ListWorkflowsPageSize   int     // PageSize of ListWorkflow, will paginate through results.
	PageCountPerExecution   int     // number of pages to be processed before continue as new, max is 1000.
	NextPageToken           []byte  // used by continue as new

	// Used for verifying workflow executions were replicated successfully on target cluster.
	EnableVerification      bool
	TargetClusterEndpoint   string
	TargetClusterName       string
	VerifyIntervalInSeconds int `validate:"gte=0"`

	// Used by query handler to indicate overall progress of replication
	LastCloseTime                      time.Time
	LastStartTime                      time.Time
	ContinuedAsNewCount                int
	TaskQueueUserDataReplicationParams TaskQueueUserDataReplicationParams
	ReplicatedWorkflowCount            int64
	TotalForceReplicateWorkflowCount   int64
	ReplicatedWorkflowCountPerSecond   float64

	// Used to calculate QPS
	QPSQueue QPSQueue
	// Queue size is determined by Multiplier * Concurrency
	EstimationMultiplier int

	// Carry over the replication status after continue-as-new.
	TaskQueueUserDataReplicationStatus TaskQueueUserDataReplicationStatus
}

type ForceReplicationStatus

type ForceReplicationStatus struct {
	LastCloseTime                      time.Time
	LastStartTime                      time.Time
	TaskQueueUserDataReplicationStatus TaskQueueUserDataReplicationStatus
	ContinuedAsNewCount                int
	TotalWorkflowCount                 int64
	ReplicatedWorkflowCount            int64
	ReplicatedWorkflowCountPerSecond   float64
	PageTokenForRestart                []byte
}

type NamespaceHandoverParams

type NamespaceHandoverParams struct {
	Namespace     string
	RemoteCluster string

	// how far behind on replication is allowed for remote cluster before handover is initiated
	AllowedLaggingSeconds int
	AllowedLaggingTasks   int64

	// how long to wait for handover to complete before rollback
	HandoverTimeoutSeconds int
}

type QPSData

type QPSData struct {
	Count     int64
	Timestamp time.Time
}

type QPSQueue

type QPSQueue struct {
	MaxSize int
	Data    []QPSData
}

func NewQPSQueue

func NewQPSQueue(concurrentActivityCount int, estimationMultiplier int) QPSQueue

NewQPSQueue initializes a QPSQueue to collect data points for each workflow execution. The queue size is set to concurrency + 1 to account for up to 'concurrency' activities running simultaneously and the initial starting point.

func (*QPSQueue) CalculateQPS

func (q *QPSQueue) CalculateQPS() float64

func (*QPSQueue) Enqueue

func (q *QPSQueue) Enqueue(ctx workflow.Context, count int64)

type TaskQueueUserDataReplicationParams

type TaskQueueUserDataReplicationParams struct {
	// PageSize for the SeedReplicationQueueWithUserDataEntries activity
	PageSize int
	// RPS limits the number of task queue user data entries pages requested per second.
	RPS float64
}

type TaskQueueUserDataReplicationParamsWithNamespace

type TaskQueueUserDataReplicationParamsWithNamespace struct {
	TaskQueueUserDataReplicationParams
	// Namespace name
	Namespace string
}

TaskQueueUserDataReplicationParamsWithNamespace is used for child workflow / activity input

type TaskQueueUserDataReplicationStatus

type TaskQueueUserDataReplicationStatus struct {
	Done           bool
	FailureMessage string
}

type WorkflowVerifier

type WorkflowVerifier func(
	ctx context.Context,
	request *verifyReplicationTasksRequest,
	remoteAdminClient adminservice.AdminServiceClient,
	localAdminClient adminservice.AdminServiceClient,
	ns *namespace.Namespace,
	execution *ExecutionInfo,
	mu *adminservice.DescribeMutableStateResponse,
) (verifyResult, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL