workerdeployment

package
v1.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Workflow types
	WorkerDeploymentVersionWorkflowType = "temporal-sys-worker-deployment-version-workflow"
	WorkerDeploymentWorkflowType        = "temporal-sys-worker-deployment-workflow"

	// Namespace division
	WorkerDeploymentNamespaceDivision = "TemporalWorkerDeployment"

	// Updates
	RegisterWorkerInDeploymentVersion = "register-task-queue-worker"    // for Worker Deployment Version wf
	SyncVersionState                  = "sync-version-state"            // for Worker Deployment Version wfs
	UpdateVersionMetadata             = "update-version-metadata"       // for Worker Deployment Version wfs
	RegisterWorkerInWorkerDeployment  = "register-worker-in-deployment" // for Worker Deployment wfs
	SetCurrentVersion                 = "set-current-version"           // for Worker Deployment wfs
	SetRampingVersion                 = "set-ramping-version"           // for Worker Deployment wfs
	DeleteVersion                     = "delete-version"                // for WorkerDeployment wfs
	DeleteDeployment                  = "delete-deployment"             // for WorkerDeployment wfs
	SetManagerIdentity                = "set-manager-identity"          // for WorkerDeployment wfs

	// Signals
	ForceCANSignalName          = "force-continue-as-new" // for Worker Deployment Version _and_ Worker Deployment wfs
	SyncDrainageSignalName      = "sync-drainage-status"
	TerminateDrainageSignal     = "terminate-drainage"
	SyncVersionSummarySignal    = "sync-version-summary"
	PropagationCompleteSignal   = "propagation-complete"
	ReactivateVersionSignalName = "reactivate-version" // for Worker Deployment Version wfs

	// Queries
	QueryDescribeVersion    = "describe-version"    // for Worker Deployment Version wf
	QueryDescribeDeployment = "describe-deployment" // for Worker Deployment wf

	// Memos
	WorkerDeploymentMemoField = "WorkerDeploymentMemo" // for Worker Deployment wf

	ErrVersionIsDraining         = "version '%s' cannot be deleted since it is draining"
	ErrVersionHasPollers         = "version '%s' cannot be deleted since it has active pollers"
	ErrVersionIsCurrentOrRamping = "version '%s' cannot be deleted since it is current or ramping"

	ErrRampingVersionDoesNotHaveAllTaskQueues = "" /* 152-byte string literal not displayed */
	ErrCurrentVersionDoesNotHaveAllTaskQueues = "" /* 152-byte string literal not displayed */
	ErrManagerIdentityMismatch                = "" /* 190-byte string literal not displayed */
	ErrWorkerDeploymentNotFound               = "no Worker Deployment found with name '%s'; does your Worker Deployment have pollers?"
	ErrWorkerDeploymentVersionNotFound        = "build ID '%s' not found in Worker Deployment '%s'"
	ErrTooManyRequests                        = "too many requests issued to Worker Deployment '%s'. Please try again later"
)
View Source
const SlowPropagationDelay = 10 * time.Second

Variables

Functions

func GenerateDeploymentWorkflowID

func GenerateDeploymentWorkflowID(deploymentName string) string

GenerateDeploymentWorkflowID is a helper that generates a system accepted workflowID which are used in our Worker Deployment workflows

func GenerateVersionWorkflowID

func GenerateVersionWorkflowID(deploymentName string, buildID string) string

GenerateVersionWorkflowID is a helper that generates a system accepted workflowID which are used in our Worker Deployment Version workflows

func GetDeploymentNameFromWorkflowID

func GetDeploymentNameFromWorkflowID(workflowID string) string

func NewResult

func NewResult(
	dc *dynamicconfig.Collection,
	params activityDeps,
) fxResult

func VersionWorkflow

func VersionWorkflow(
	ctx workflow.Context,
	unsafeWorkflowVersionGetter func() DeploymentWorkflowVersion,
	unsafeRefreshIntervalGetter func() time.Duration,
	unsafeVisibilityGracePeriodGetter func() time.Duration,
	versionWorkflowArgs *deploymentspb.WorkerDeploymentVersionWorkflowArgs,
) error

VersionWorkflow is implemented in a way where it always CaNs after some history events are added to it and it has no pending work to do. This is to keep the history clean so that we have less concern about backwards and forwards compatibility. In steady state (i.e. absence of ongoing updates or signals) the wf should only have a single wft in the history. For draining versions, the workflow history should only have a single wft in history followed by a scheduled timer for refreshing drainage info.

func Workflow

func Workflow(ctx workflow.Context, unsafeWorkflowVersionGetter func() DeploymentWorkflowVersion, unsafeMaxVersion func() int, args *deploymentspb.WorkerDeploymentWorkflowArgs) error

This workflow is implemented in a way such that it always CaNs after some history events are added to it and when it has no pending work to do. This is to keep the history clean so that we have less concern about backwards and forwards compatibility. In steady state (i.e. absence of ongoing updates or signals) the wf should only have a single wft in the history.

Types

type Activities

type Activities struct {
	// contains filtered or unexported fields
}

func (*Activities) CheckUnversionedRampUserDataPropagation

func (a *Activities) CheckUnversionedRampUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error

func (*Activities) DeleteWorkerDeploymentVersion

func (a *Activities) DeleteWorkerDeploymentVersion(ctx context.Context, args *deploymentspb.DeleteVersionActivityArgs) error

func (*Activities) RegisterWorkerInVersion

func (a *Activities) RegisterWorkerInVersion(ctx context.Context, args *deploymentspb.RegisterWorkerInVersionArgs) error

func (*Activities) StartWorkerDeploymentVersionWorkflow

func (a *Activities) StartWorkerDeploymentVersionWorkflow(
	ctx context.Context,
	input *deploymentspb.StartWorkerDeploymentVersionRequest,
) error

type Client

type Client interface {
	RegisterTaskQueueWorker(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, buildId string,
		taskQueueName string,
		taskQueueType enumspb.TaskQueueType,
		identity string,
	) error

	DescribeVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string,
		reportTaskQueueStats bool,
	) (*deploymentpb.WorkerDeploymentVersionInfo, []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, error)

	DescribeWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
	) (*deploymentpb.WorkerDeploymentInfo, []byte, error)

	SetCurrentVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		version string,
		identity string,
		ignoreMissingTaskQueues bool,
		conflictToken []byte,
		allowNoPollers bool,
	) (*deploymentspb.SetCurrentVersionResponse, error)

	ListWorkerDeployments(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		pageSize int,
		nextPageToken []byte,
	) ([]*deploymentspb.WorkerDeploymentSummary, []byte, error)

	DeleteWorkerDeploymentVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string,
		skipDrainage bool,
		identity string,
	) error

	DeleteWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		identity string,
	) error

	SetRampingVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		version string,
		percentage float32,
		identity string,
		ignoreMissingTaskQueues bool,
		conflictToken []byte,
		allowNoPollers bool,
	) (*deploymentspb.SetRampingVersionResponse, error)

	UpdateVersionMetadata(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version *deploymentpb.WorkerDeploymentVersion,
		upsertEntries map[string]*commonpb.Payload,
		removeEntries []string,
		identity string,
	) (*deploymentpb.VersionMetadata, error)

	SetManager(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		request *workflowservice.SetWorkerDeploymentManagerRequest,
	) (*workflowservice.SetWorkerDeploymentManagerResponse, error)

	// Used internally by the Worker Deployment Version workflow in its StartWorkerDeployment Activity
	// Deprecated.
	StartWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName string,
		identity string,
		requestID string,
	) error

	// Used internally by the Worker Deployment workflow in its StartWorkerDeploymentVersion Activity
	StartWorkerDeploymentVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, buildID string,
		identity string,
		requestID string,
	) error

	// Used internally by the Worker Deployment workflow in its SyncWorkerDeploymentVersion Activity
	SyncVersionWorkflowFromWorkerDeployment(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, version string,
		args *deploymentspb.SyncVersionStateUpdateArgs,
		identity string,
		requestID string,
	) (*deploymentspb.SyncVersionStateResponse, error)

	// Used internally by the Drainage workflow (child of Worker Deployment Version workflow)
	// in its GetVersionDrainageStatus Activity
	GetVersionDrainageStatus(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		version string) (enumspb.VersionDrainageStatus, error)

	// Used internally by the Worker Deployment workflow in its IsVersionMissingTaskQueues Activity
	// to verify if there are missing task queues in the new current/ramping version.
	IsVersionMissingTaskQueues(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		prevCurrentVersion, newVersion string,
	) (bool, error)

	// Used internally by the Worker Deployment workflow in its RegisterWorkerInVersion Activity
	// to register a task-queue worker in a version.
	RegisterWorkerInVersion(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		args *deploymentspb.RegisterWorkerInVersionArgs,
		identity string,
	) error

	// SignalVersionReactivation sends a reactivation signal to a version workflow.
	// Used when workflows are pinned to a potentially DRAINED/INACTIVE version.
	// This is a fire-and-forget operation - errors are logged but returned for caller handling.
	SignalVersionReactivation(
		ctx context.Context,
		namespaceEntry *namespace.Namespace,
		deploymentName, buildID string,
	) error
}

func ClientProvider

func ClientProvider(
	logger log.Logger,
	historyClient resource.HistoryClient,
	matchingClient resource.MatchingClient,
	visibilityManager manager.VisibilityManager,
	dc *dynamicconfig.Collection,
	testHooks testhooks.TestHooks,
	metricsHandler metrics.Handler,
) Client

type ClientImpl

type ClientImpl struct {
	// contains filtered or unexported fields
}

ClientImpl implements Client

func (*ClientImpl) DeleteWorkerDeployment

func (d *ClientImpl) DeleteWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	identity string,
) (retErr error)

func (*ClientImpl) DeleteWorkerDeploymentVersion

func (d *ClientImpl) DeleteWorkerDeploymentVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string,
	skipDrainage bool,
	identity string,
) (retErr error)

func (*ClientImpl) DescribeVersion

func (d *ClientImpl) DescribeVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string,
	reportTaskQueueStats bool,
) (
	_ *deploymentpb.WorkerDeploymentVersionInfo,
	_ []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue,
	retErr error,
)

func (*ClientImpl) DescribeWorkerDeployment

func (d *ClientImpl) DescribeWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
) (_ *deploymentpb.WorkerDeploymentInfo, conflictToken []byte, retErr error)

func (*ClientImpl) GetVersionDrainageStatus

func (d *ClientImpl) GetVersionDrainageStatus(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version string) (enumspb.VersionDrainageStatus, error)

func (*ClientImpl) IsVersionMissingTaskQueues

func (d *ClientImpl) IsVersionMissingTaskQueues(ctx context.Context, namespaceEntry *namespace.Namespace, prevCurrentVersion, newVersion string) (bool, error)

func (*ClientImpl) ListWorkerDeployments

func (d *ClientImpl) ListWorkerDeployments(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	pageSize int,
	nextPageToken []byte,
) (_ []*deploymentspb.WorkerDeploymentSummary, _ []byte, retError error)

func (*ClientImpl) RegisterTaskQueueWorker

func (d *ClientImpl) RegisterTaskQueueWorker(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, buildId string,
	taskQueueName string,
	taskQueueType enumspb.TaskQueueType,
	identity string,
) (retErr error)

func (*ClientImpl) RegisterWorkerInVersion

func (d *ClientImpl) RegisterWorkerInVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	args *deploymentspb.RegisterWorkerInVersionArgs,
	identity string,
) error

func (*ClientImpl) SetCurrentVersion

func (d *ClientImpl) SetCurrentVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	version string,
	identity string,
	ignoreMissingTaskQueues bool,
	conflictToken []byte,
	allowNoPollers bool,
) (_ *deploymentspb.SetCurrentVersionResponse, retErr error)

func (*ClientImpl) SetRampingVersion

func (d *ClientImpl) SetRampingVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	version string,
	percentage float32,
	identity string,
	ignoreMissingTaskQueues bool,
	conflictToken []byte,
	allowNoPollers bool,
) (_ *deploymentspb.SetRampingVersionResponse, retErr error)

func (*ClientImpl) SignalVersionReactivation

func (d *ClientImpl) SignalVersionReactivation(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, buildID string,
) (retErr error)

func (*ClientImpl) StartWorkerDeployment

func (d *ClientImpl) StartWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName string,
	identity string,
	requestID string,
) (retErr error)

func (*ClientImpl) StartWorkerDeploymentVersion

func (d *ClientImpl) StartWorkerDeploymentVersion(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, buildID string,
	identity string,
	requestID string,
) (retErr error)

func (*ClientImpl) SyncVersionWorkflowFromWorkerDeployment

func (d *ClientImpl) SyncVersionWorkflowFromWorkerDeployment(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	deploymentName, version string,
	args *deploymentspb.SyncVersionStateUpdateArgs,
	identity string,
	requestID string,
) (_ *deploymentspb.SyncVersionStateResponse, retErr error)

func (*ClientImpl) UpdateVersionMetadata

func (d *ClientImpl) UpdateVersionMetadata(
	ctx context.Context,
	namespaceEntry *namespace.Namespace,
	version *deploymentpb.WorkerDeploymentVersion,
	upsertEntries map[string]*commonpb.Payload,
	removeEntries []string,
	identity string,
) (_ *deploymentpb.VersionMetadata, retErr error)

type DeploymentWorkflowVersion

type DeploymentWorkflowVersion int64
const (

	// Represents the state before the versioning API's received the option of becoming async in nature
	InitialVersion DeploymentWorkflowVersion = iota
	// SetCurrent and SetRamping and DeleteVersion APIs are async
	AsyncSetCurrentAndRamping
	// Version Data has its own revision number with TaskQueue registration being async as well
	VersionDataRevisionNumber
)

type ErrRegister

type ErrRegister struct {
	// contains filtered or unexported fields
}

type SignalHandler

type SignalHandler struct {
	// contains filtered or unexported fields
}

SignalHandler encapsulates the signal handling logic

type VersionActivities

type VersionActivities struct {
	// contains filtered or unexported fields
}

func (*VersionActivities) CheckIfTaskQueuesHavePollers

func (a *VersionActivities) CheckIfTaskQueuesHavePollers(ctx context.Context, args *deploymentspb.CheckTaskQueuesHavePollersActivityArgs) (bool, error)

CheckIfTaskQueuesHavePollers returns true if any of the given task queues has any pollers

func (*VersionActivities) CheckWorkerDeploymentUserDataPropagation

func (a *VersionActivities) CheckWorkerDeploymentUserDataPropagation(ctx context.Context, input *deploymentspb.CheckWorkerDeploymentUserDataPropagationRequest) error

func (*VersionActivities) GetVersionDrainageStatus

func (*VersionActivities) StartWorkerDeploymentWorkflow

func (a *VersionActivities) StartWorkerDeploymentWorkflow(
	ctx context.Context,
	input *deploymentspb.StartWorkerDeploymentRequest,
) error

type VersionWorkflowRunner

type VersionWorkflowRunner struct {
	*deploymentspb.WorkerDeploymentVersionWorkflowArgs
	// contains filtered or unexported fields
}

VersionWorkflowRunner holds the local state for a deployment workflow

type WorkflowRunner

type WorkflowRunner struct {
	*deploymentspb.WorkerDeploymentWorkflowArgs
	// contains filtered or unexported fields
}

WorkflowRunner holds the local state while running a deployment-series workflow

Directories

Path Synopsis
worker command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL