worker_versioning

package
v1.35.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BuildIdSearchAttributePrefixPinned = "pinned"

	BuildIdSearchAttributeDelimiter = ":"
	BuildIdSearchAttributeEscape    = "|"
	// UnversionedSearchAttribute is the sentinel value used to mark all unversioned workflows
	UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
	UnversionedVersionId       = "__unversioned__"

	// ErrPinnedVersionNotInTaskQueueSubstring is the key substring used to identify
	// when a pinned version is not present in a task queue. This is used for error
	// classification in batch operations.
	ErrPinnedVersionNotInTaskQueueSubstring = "is not present in task queue"

	// WorkerDeploymentVersionIdDelimiterV31 will be deleted once we stop supporting v31 version string fields
	// in external and internal APIs. Until then, both delimiters are banned in deployment name. All
	// deprecated version string fields in APIs keep using the old delimiter. Workflow SA uses new delimiter.
	WorkerDeploymentVersionIDDelimiterV31   = "."
	WorkerDeploymentVersionDelimiter        = ":"
	WorkerDeploymentVersionWorkflowIDEscape = "|"

	// Prefixes, Delimeters and Keys that are used in the internal entity workflows backing worker-versioning
	WorkerDeploymentWorkflowIDPrefix             = "temporal-sys-worker-deployment"
	WorkerDeploymentVersionWorkflowIDPrefix      = "temporal-sys-worker-deployment-version"
	WorkerDeploymentVersionWorkflowIDInitialSize = len(WorkerDeploymentVersionWorkflowIDPrefix) + len(WorkerDeploymentVersionDelimiter) // 39
	WorkerDeploymentNameFieldName                = "WorkerDeploymentName"
	WorkerDeploymentBuildIDFieldName             = "BuildID"
)

Variables

This section is empty.

Functions

func AddV31VersioningInfoToV32

We store versioning info in the modern v0.32 format, so call this before returning the object to readers to mutatively populate the missing fields.

func AssignedBuildIdSearchAttribute

func AssignedBuildIdSearchAttribute(buildId string) string

AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID

func BuildIDToStringV32

func BuildIDToStringV32(deploymentName, buildID string) string

func BuildIdFromCapabilities

func BuildIdFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) string

func BuildIdIfUsingVersioning

func BuildIdIfUsingVersioning(stamp *commonpb.WorkerVersionStamp) string

BuildIdIfUsingVersioning returns the given WorkerVersionStamp if it is using versioning, otherwise returns nil.

func CalculateTaskQueueVersioningInfo

CalculateTaskQueueVersioningInfo calculates the current and ramping versioning info for a task queue.

func CleanupOldDeletedVersions

func CleanupOldDeletedVersions(deploymentData *persistencespb.WorkerDeploymentData, maxVersions int) bool

CleanupOldDeletedVersions removes versions deleted more than 7 days ago. Also removes more deleted versions if the limit is being exceeded. Never removes undeleted versions.

func ConvertOverrideToV32

func ConvertOverrideToV32(override *workflowpb.VersioningOverride) *workflowpb.VersioningOverride

ConvertOverrideToV32 reads from deprecated fields and returns a new object with ONLY the equivalent non-deprecated v0.32 fields. Should be used to replace any passed in override that is stored in persistence.

func CountDeploymentVersions

func CountDeploymentVersions(deployments *persistencespb.DeploymentData) int

func DeploymentFromCapabilities

func DeploymentFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) (*deploymentpb.Deployment, error)

DeploymentFromCapabilities returns the deployment if it is using versioning V3, otherwise nil. It returns the deployment from the `options` if present, otherwise, from `capabilities`,

func DeploymentFromDeploymentVersion

func DeploymentFromDeploymentVersion(dv *deploymentspb.WorkerDeploymentVersion) *deploymentpb.Deployment

DeploymentFromDeploymentVersion Temporary helper function to convert WorkerDeploymentVersion to Deployment proto until we update code to use the new proto in all places.

func DeploymentFromExternalDeploymentVersion

func DeploymentFromExternalDeploymentVersion(dv *deploymentpb.WorkerDeploymentVersion) *deploymentpb.Deployment

DeploymentFromExternalDeploymentVersion Temporary helper function to convert WorkerDeploymentVersion to Deployment proto until we update code to use the new proto in all places.

func DeploymentIfValid

func DeploymentIfValid(d *deploymentpb.Deployment) *deploymentpb.Deployment

DeploymentIfValid returns the deployment back if is both of its fields have value.

func DeploymentNameFromCapabilities

func DeploymentNameFromCapabilities(capabilities *commonpb.WorkerVersionCapabilities, options *deploymentpb.WorkerDeploymentOptions) string

func DeploymentOrVersion

DeploymentOrVersion Temporary helper function to return a Deployment based on passed Deployment or WorkerDeploymentVersion objects, if `v` is not nil, it'll take precedence.

func DeploymentVersionFromDeployment

func DeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentspb.WorkerDeploymentVersion

DeploymentVersionFromDeployment Temporary helper function to convert Deployment to WorkerDeploymentVersion proto until we update code to use the new proto in all places.

func DirectiveDeployment

func DirectiveDeployment(directive *taskqueuespb.TaskVersionDirective) *deploymentpb.Deployment

DirectiveDeployment Temporary function until Directive proto is removed.

func ExternalWorkerDeploymentVersionFromDeployment

func ExternalWorkerDeploymentVersionFromDeployment(deployment *deploymentpb.Deployment) *deploymentpb.WorkerDeploymentVersion

ExternalWorkerDeploymentVersionFromDeployment Temporary helper function to convert Deployment to WorkerDeploymentVersion proto until we update code to use the new proto in all places.

func ExternalWorkerDeploymentVersionFromStringV31

func ExternalWorkerDeploymentVersionFromStringV31(s string) *deploymentpb.WorkerDeploymentVersion

func ExternalWorkerDeploymentVersionFromVersion

func ExternalWorkerDeploymentVersionFromVersion(version *deploymentspb.WorkerDeploymentVersion) *deploymentpb.WorkerDeploymentVersion

ExternalWorkerDeploymentVersionFromVersion Temporary helper function to convert internal Worker Deployment to WorkerDeploymentVersion proto until we update code to use the new proto in all places.

func ExternalWorkerDeploymentVersionToString

func ExternalWorkerDeploymentVersionToString(v *deploymentpb.WorkerDeploymentVersion) string

func ExternalWorkerDeploymentVersionToStringV31

func ExternalWorkerDeploymentVersionToStringV31(v *deploymentpb.WorkerDeploymentVersion) string

func ExtractVersioningBehaviorFromOverride

func ExtractVersioningBehaviorFromOverride(override *workflowpb.VersioningOverride) enumspb.VersioningBehavior

func FindBuildId

func FindBuildId(versioningData *persistencespb.VersioningData, buildId string) (setIndex, indexInSet int)

FindBuildId finds a build ID in the version data's sets, returning (set index, index within that set). Returns -1, -1 if not found.

func FindTargetDeploymentVersionAndRevisionNumberForWorkflowID

func FindTargetDeploymentVersionAndRevisionNumberForWorkflowID(
	current *deploymentspb.WorkerDeploymentVersion,
	currentRevisionNumber int64,
	ramping *deploymentspb.WorkerDeploymentVersion,
	rampingPercentage float32,
	rampingRevisionNumber int64,
	workflowId string,
) (*deploymentspb.WorkerDeploymentVersion, int64)

FindTargetDeploymentVersionAndRevisionNumberForWorkflowID returns the deployment version and revision number (if applicable) for the particular workflow ID based on the versioning info of the task queue. Nil means unversioned.

func FormatPinnedVersionNotInTaskQueueError

func FormatPinnedVersionNotInTaskQueueError(deploymentName, buildID, taskQueue string, taskQueueType enumspb.TaskQueueType) string

FormatPinnedVersionNotInTaskQueueError formats the error message when a pinned version is not present in a task queue.

func IsUnversionedOrAssignedBuildIdSearchAttribute

func IsUnversionedOrAssignedBuildIdSearchAttribute(buildId string) bool

IsUnversionedOrAssignedBuildIdSearchAttribute returns the value is "unversioned" or "assigned:<bld>"

func MakeBuildIdDirective

func MakeBuildIdDirective(buildId string) *taskqueuespb.TaskVersionDirective

func MakeDirectiveForWorkflowTask

func MakeDirectiveForWorkflowTask(
	inheritedBuildId string,
	assignedBuildId string,
	stamp *commonpb.WorkerVersionStamp,
	hasCompletedWorkflowTask bool,
	behavior enumspb.VersioningBehavior,
	deployment *deploymentpb.Deployment,
	revisionNumber int64,
) *taskqueuespb.TaskVersionDirective

MakeDirectiveForWorkflowTask returns a versioning directive based on the following parameters: - inheritedBuildId: build ID inherited from a past/previous wf execution (for Child WF or CaN) - assignedBuildId: the build ID to which the WF is currently assigned (i.e. mutable state's AssginedBuildId) - stamp: the latest versioning stamp of the execution (only needed for old versioning) - hasCompletedWorkflowTask: if the wf has completed any WFT - behavior: workflow's effective behavior - deployment: workflow's effective deployment

func MakeUseAssignmentRulesDirective

func MakeUseAssignmentRulesDirective() *taskqueuespb.TaskVersionDirective

func OverrideIsPinned

func OverrideIsPinned(override *workflowpb.VersioningOverride) bool

func PickFinalCurrentAndRamping

func PickFinalCurrentAndRamping(
	current *deploymentspb.DeploymentVersionData,
	ramping *deploymentspb.DeploymentVersionData,
	currentVersionRoutingConfig *deploymentpb.RoutingConfig,
	rampingVersionRoutingConfig *deploymentpb.RoutingConfig,
) (
	finalCurrent *deploymentspb.WorkerDeploymentVersion,
	finalCurrentRev int64,
	finalCurrentUpdateTime time.Time,
	finalRamping *deploymentspb.WorkerDeploymentVersion,
	isRamping bool,
	finalRampPercentage float32,
	finalRampingRev int64,
	finalRampingUpdateTime time.Time,
)

PickFinalCurrentAndRamping determines the effective "current" and "ramping" deployment versions by comparing timestamps from the legacy deployment data (old format) and the RoutingConfig (new format). It returns: - final current deployment version and its revision number (0 for old format) - final ramping deployment version, its revision number (0 for old format), and ramp percentage

func PinnedBuildIdSearchAttribute

func PinnedBuildIdSearchAttribute(version string) string

PinnedBuildIdSearchAttribute creates the pinned search attribute for the BuildIds list, used as a visibility optimization. For pinned workflows using WorkerDeployment APIs (ms.GetEffectiveVersioningBehavior() == PINNED && ms.executionInfo.VersioningInfo.Version != ""), this will be `pinned:<version>`. The version used will be the override version if set, or the versioningInfo.Version.

If deprecated Deployment-based APIs are in use and the workflow is pinned, `pinned:<deployment_series_name>:<deployment_build_id>` will. The values used will be the override deployment_series and build_id if set, or versioningInfo.Deployment.

If the workflow becomes unpinned or unversioned, this entry will be removed from that list.

func StampFromBuildId

func StampFromBuildId(buildId string) *commonpb.WorkerVersionStamp

func StampIfUsingVersioning

func StampIfUsingVersioning(stamp *commonpb.WorkerVersionStamp) *commonpb.WorkerVersionStamp

StampIfUsingVersioning returns the given WorkerVersionStamp if it is using versioning, otherwise returns nil.

func UnversionedBuildIdSearchAttribute

func UnversionedBuildIdSearchAttribute(buildId string) string

UnversionedBuildIdSearchAttribute returns the search attribute value for an unversioned build ID

func ValidateDeployment

func ValidateDeployment(deployment *deploymentpb.Deployment) error

ValidateDeployment returns error if the deployment is nil or it has empty build ID or deployment name.

func ValidateDeploymentVersion

func ValidateDeploymentVersion(version *deploymentspb.WorkerDeploymentVersion, maxIDLengthLimit int) error

ValidateDeploymentVersion returns error if the deployment version is not a valid entity.

func ValidateDeploymentVersionFields

func ValidateDeploymentVersionFields(fieldName string, field string, maxIDLengthLimit int) error

ValidateDeploymentVersionFields is a helper that verifies if the fields within a Worker Deployment Version are valid

func ValidateDeploymentVersionStringV31

func ValidateDeploymentVersionStringV31(version string) (*deploymentspb.WorkerDeploymentVersion, error)

ValidateDeploymentVersionStringV31 returns error if the deployment version is nil or it has empty version or deployment name.

func ValidateTaskVersionDirective

func ValidateTaskVersionDirective(
	directive *taskqueuespb.TaskVersionDirective,
	wfBehavior enumspb.VersioningBehavior,
	wfDeployment *deploymentpb.Deployment,
	scheduledDeployment *deploymentpb.Deployment,
) error

func ValidateVersioningOverride

func ValidateVersioningOverride(ctx context.Context,
	override *workflowpb.VersioningOverride,
	matchingClient resource.MatchingClient,
	versionMembershipCache VersionMembershipCache,
	tq string,
	tqType enumspb.TaskQueueType,
	namespaceID string) error

func VersionStampToBuildIdSearchAttribute

func VersionStampToBuildIdSearchAttribute(stamp *commonpb.WorkerVersionStamp) string

VersionStampToBuildIdSearchAttribute returns the search attribute value for a version stamp

func VersionedBuildIdSearchAttribute

func VersionedBuildIdSearchAttribute(buildId string) string

VersionedBuildIdSearchAttribute returns the search attribute value for a versioned build ID

func WorkerDeploymentVersionFromStringV31

func WorkerDeploymentVersionFromStringV31(s string) (*deploymentspb.WorkerDeploymentVersion, error)

func WorkerDeploymentVersionFromStringV32

func WorkerDeploymentVersionFromStringV32(s string) (*deploymentspb.WorkerDeploymentVersion, error)

func WorkerDeploymentVersionToStringV31

func WorkerDeploymentVersionToStringV31(v *deploymentspb.WorkerDeploymentVersion) string

func WorkerDeploymentVersionToStringV32

func WorkerDeploymentVersionToStringV32(v *deploymentspb.WorkerDeploymentVersion) string

func WorkflowsExistForBuildId

func WorkflowsExistForBuildId(ctx context.Context, visibilityManager manager.VisibilityManager, ns *namespace.Namespace, taskQueue, buildId string) (bool, error)

Types

type IsWFTaskQueueInVersionDetector

type IsWFTaskQueueInVersionDetector = func(ctx context.Context, namespaceID, tq string, version *deploymentpb.WorkerDeploymentVersion) (bool, error)

func GetIsWFTaskQueueInVersionDetector

func GetIsWFTaskQueueInVersionDetector(matchingClient resource.MatchingClient, versionMembershipCache VersionMembershipCache) IsWFTaskQueueInVersionDetector

type ReactivationSignalCache

type ReactivationSignalCache interface {
	// ShouldSendSignal returns true if signal should be sent (not recently sent)
	// and atomically marks it as sent. Returns false if recently sent.
	ShouldSendSignal(namespaceID, deploymentName, buildID string) bool
}

ReactivationSignalCache deduplicates reactivation signals to version workflows.

Implementations are expected to be safe for concurrent use.

func NewReactivationSignalCache

func NewReactivationSignalCache(c cache.Cache, metricsHandler metrics.Handler) ReactivationSignalCache

NewReactivationSignalCache wraps the provided cache with a typed API and metrics.

type ReactivationSignalCacheImpl

type ReactivationSignalCacheImpl struct {
	cache.Cache
	// contains filtered or unexported fields
}

ReactivationSignalCache deduplicates reactivation signals to version workflows.

Implementations are expected to be safe for concurrent use.

func (*ReactivationSignalCacheImpl) ShouldSendSignal

func (c *ReactivationSignalCacheImpl) ShouldSendSignal(
	namespaceID, deploymentName, buildID string,
) bool

type RoutingInfo

type RoutingInfo struct {
	Current               *deploymentspb.WorkerDeploymentVersion
	CurrentRevisionNumber int64
	Ramping               *deploymentspb.WorkerDeploymentVersion
	RampPercentage        float32
	RampingRevisionNumber int64
}

RoutingInfoCache is used to cache results of GetTaskQueueUserData calls followed by CalculateTaskQueueVersioningInfo computation.

Implementations are expected to be safe for concurrent use.

type RoutingInfoCache

type RoutingInfoCache interface {
	// Get returns the cached routing info. ok=false means there was no cached value.
	Get(
		namespaceID string,
		taskQueue string,
		taskQueueType enumspb.TaskQueueType,
	) (RoutingInfo, bool)

	Put(
		namespaceID string,
		taskQueue string,
		taskQueueType enumspb.TaskQueueType,
		current *deploymentspb.WorkerDeploymentVersion,
		currentRevisionNumber int64,
		ramping *deploymentspb.WorkerDeploymentVersion,
		rampPercentage float32,
		rampingRevisionNumber int64,
	)
}

RoutingInfoCache is used to cache results of GetTaskQueueUserData calls followed by CalculateTaskQueueVersioningInfo computation.

Implementations are expected to be safe for concurrent use.

func NewRoutingInfoCache

func NewRoutingInfoCache(c cache.Cache, metricsHandler metrics.Handler) RoutingInfoCache

NewRoutingInfoCache wraps the provided cache with a typed API and metrics.

type RoutingInfoCacheImpl

type RoutingInfoCacheImpl struct {
	cache.Cache
	// contains filtered or unexported fields
}

RoutingInfoCache is used to cache results of GetTaskQueueUserData calls followed by CalculateTaskQueueVersioningInfo computation.

Implementations are expected to be safe for concurrent use.

func (*RoutingInfoCacheImpl) Get

func (c *RoutingInfoCacheImpl) Get(
	namespaceID string,
	taskQueue string,
	taskQueueType enumspb.TaskQueueType,
) (RoutingInfo, bool)

func (*RoutingInfoCacheImpl) Put

func (c *RoutingInfoCacheImpl) Put(
	namespaceID string,
	taskQueue string,
	taskQueueType enumspb.TaskQueueType,
	current *deploymentspb.WorkerDeploymentVersion,
	currentRevisionNumber int64,
	ramping *deploymentspb.WorkerDeploymentVersion,
	rampPercentage float32,
	rampingRevisionNumber int64,
)

type VersionMembershipCache

type VersionMembershipCache interface {
	// Get returns (isMember, ok). ok=false means there was no cached value.
	Get(
		namespaceID string,
		taskQueue string,
		taskQueueType enumspb.TaskQueueType,
		deploymentName string,
		buildID string,
	) (isMember bool, ok bool)

	Put(
		namespaceID string,
		taskQueue string,
		taskQueueType enumspb.TaskQueueType,
		deploymentName string,
		buildID string,
		isMember bool,
	)
}

VersionMembershipCache is used to cache results of Matching's CheckTaskQueueVersionMembership calls (used internally by the worker versioning pinned override validation).

Implementations are expected to be safe for concurrent use.

func NewVersionMembershipCache

func NewVersionMembershipCache(c cache.Cache, metricsHandler metrics.Handler) VersionMembershipCache

NewVersionMembershipCache wraps the provided cache with a typed API and metrics.

type VersionMembershipCacheImpl

type VersionMembershipCacheImpl struct {
	cache.Cache
	// contains filtered or unexported fields
}

VersionMembershipCache is used to cache results of Matching's CheckTaskQueueVersionMembership calls (used internally by the worker versioning pinned override validation).

Implementations are expected to be safe for concurrent use.

func (*VersionMembershipCacheImpl) Get

func (c *VersionMembershipCacheImpl) Get(
	namespaceID string,
	taskQueue string,
	taskQueueType enumspb.TaskQueueType,
	deploymentName string,
	buildID string,
) (isMember bool, ok bool)

func (*VersionMembershipCacheImpl) Put

func (c *VersionMembershipCacheImpl) Put(
	namespaceID string,
	taskQueue string,
	taskQueueType enumspb.TaskQueueType,
	deploymentName string,
	buildID string,
	isMember bool,
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL