activity

package
v1.36.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// WorkflowTypeTag is a required workflow tag for standalone activities to ensure consistent
	// metric labeling between workflows and activities.
	WorkflowTypeTag = "__temporal_standalone_activity__"
)

Variables

View Source
var (
	Enabled = dynamicconfig.NewNamespaceBoolSetting(
		"activity.enableStandalone",
		false,
		`Toggles standalone activity functionality on the server.`,
	)

	LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting(
		"activity.longPollTimeout",
		20*time.Second,
		`Timeout for activity long-poll requests.`,
	)

	LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting(
		"activity.longPollBuffer",
		time.Second,
		`A buffer used to adjust the activity long-poll timeouts.
 Specifically, activity long-poll requests are timed out at a time which leaves at least the buffer's duration
 remaining before the caller's deadline, if permitted by the caller's deadline.`,
	)
)
View Source
var (
	Archetype   = chasm.FullyQualifiedName(libraryName, componentName)
	ArchetypeID = chasm.GenerateTypeID(Archetype)
)
View Source
var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone activity is disabled")
View Source
var FrontendModule = fx.Module(
	"activity-frontend",
	fx.Provide(ConfigProvider),
	fx.Provide(activitypb.NewActivityServiceLayeredClient),
	fx.Provide(NewFrontendHandler),
	fx.Provide(resource.SearchAttributeValidatorProvider),
	fx.Invoke(func(registry *chasm.Registry) error {

		return registry.Register(newComponentOnlyLibrary())
	}),
)
View Source
var HistoryModule = fx.Module(
	"activity-history",
	fx.Provide(
		ConfigProvider,
		newActivityDispatchTaskExecutor,
		newScheduleToStartTimeoutTaskExecutor,
		newScheduleToCloseTimeoutTaskExecutor,
		newStartToCloseTimeoutTaskExecutor,
		newHeartbeatTimeoutTaskExecutor,
		newHandler,
		newLibrary,
	),
	fx.Invoke(func(l *library, registry *chasm.Registry) error {
		return registry.Register(l)
	}),
)

TransitionCancelRequested transitions to CancelRequested status.

View Source
var TransitionCanceled = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED,
	func(a *Activity, ctx chasm.MutableContext, event cancelEvent) error {
		return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error {
			outcome := a.Outcome.Get(ctx)
			failure := &failurepb.Failure{
				Message: "Activity canceled",
				FailureInfo: &failurepb.Failure_CanceledFailureInfo{
					CanceledFailureInfo: &failurepb.CanceledFailureInfo{
						Details: event.details,
					},
				},
			}
			outcome.Variant = &activitypb.ActivityOutcome_Failed_{
				Failed: &activitypb.ActivityOutcome_Failed{
					Failure: failure,
				},
			}

			a.emitOnCanceledMetrics(ctx, event.handler, event.fromStatus)

			return nil
		})
	},
)

TransitionCanceled transitions to Canceled status.

View Source
var TransitionCompleted = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
		activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED,
	func(a *Activity, ctx chasm.MutableContext, event completeEvent) error {
		return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error {
			req := event.req.GetCompleteRequest()

			attempt := a.LastAttempt.Get(ctx)
			attempt.CompleteTime = timestamppb.New(ctx.Now(a))
			attempt.LastWorkerIdentity = req.GetIdentity()
			outcome := a.Outcome.Get(ctx)
			outcome.Variant = &activitypb.ActivityOutcome_Successful_{
				Successful: &activitypb.ActivityOutcome_Successful{
					Output: req.GetResult(),
				},
			}

			a.emitOnCompletedMetrics(ctx, event.metricsHandler)

			return nil
		})
	},
)

TransitionCompleted transitions to Completed status.

View Source
var TransitionFailed = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
		activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_FAILED,
	func(a *Activity, ctx chasm.MutableContext, event failedEvent) error {
		return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error {
			req := event.req.GetFailedRequest()

			if details := req.GetLastHeartbeatDetails(); details != nil {
				heartbeat := a.getOrCreateLastHeartbeat(ctx)
				heartbeat.Details = details
				heartbeat.RecordedTime = timestamppb.New(ctx.Now(a))
			}
			attempt := a.LastAttempt.Get(ctx)
			attempt.LastWorkerIdentity = req.GetIdentity()

			if err := a.recordFailedAttempt(ctx, 0, req.GetFailure(), ctx.Now(a), true); err != nil {
				return err
			}

			a.emitOnFailedMetrics(ctx, event.metricsHandler)

			return nil
		})
	},
)

TransitionFailed transitions to Failed status.

View Source
var TransitionRescheduled = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
	func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error {
		attempt := a.LastAttempt.Get(ctx)
		currentTime := ctx.Now(a)
		attempt.Count++
		attempt.Stamp++

		err := a.recordFailedAttempt(ctx, event.retryInterval, event.failure, currentTime, false)
		if err != nil {
			return err
		}

		retryScheduledTime := attemptScheduleTimeForRetry(attempt).AsTime()

		if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 {
			ctx.AddTask(
				a,
				chasm.TaskAttributes{
					ScheduledTime: retryScheduledTime.Add(timeout),
				},
				&activitypb.ScheduleToStartTimeoutTask{
					Stamp: attempt.GetStamp(),
				})
		}

		ctx.AddTask(
			a,
			chasm.TaskAttributes{
				ScheduledTime: retryScheduledTime,
			},
			&activitypb.ActivityDispatchTask{
				Stamp: attempt.GetStamp(),
			})

		return nil
	},
)

TransitionRescheduled transitions to Scheduled from Started, which happens on retries. The event to pass in is the failure to be recorded from the previously failed attempt.

View Source
var TransitionScheduled = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
	func(a *Activity, ctx chasm.MutableContext, _ any) error {
		attempt := a.LastAttempt.Get(ctx)
		currentTime := ctx.Now(a)
		attempt.Count++
		attempt.Stamp++

		if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 {
			ctx.AddTask(
				a,
				chasm.TaskAttributes{
					ScheduledTime: currentTime.Add(timeout),
				},
				&activitypb.ScheduleToStartTimeoutTask{
					Stamp: attempt.GetStamp(),
				})
		}

		if timeout := a.GetScheduleToCloseTimeout().AsDuration(); timeout > 0 {
			ctx.AddTask(
				a,
				chasm.TaskAttributes{
					ScheduledTime: currentTime.Add(timeout),
				},
				&activitypb.ScheduleToCloseTimeoutTask{})
		}

		ctx.AddTask(
			a,
			chasm.TaskAttributes{},
			&activitypb.ActivityDispatchTask{
				Stamp: attempt.GetStamp(),
			})

		return nil
	},
)

TransitionScheduled transitions to Scheduled status. This is only called on the initial scheduling of the activity.

View Source
var TransitionStarted = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
	func(a *Activity, ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) error {
		attempt := a.LastAttempt.Get(ctx)
		attempt.StartedTime = timestamppb.New(ctx.Now(a))
		attempt.StartRequestId = request.GetRequestId()
		attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity()
		if versionDirective := request.GetVersionDirective().GetDeploymentVersion(); versionDirective != nil {
			attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
				BuildId:        versionDirective.GetBuildId(),
				DeploymentName: versionDirective.GetDeploymentName(),
			}
		}
		startTime := attempt.GetStartedTime().AsTime()
		ctx.AddTask(
			a,
			chasm.TaskAttributes{
				ScheduledTime: startTime.Add(a.GetStartToCloseTimeout().AsDuration()),
			},
			&activitypb.StartToCloseTimeoutTask{
				Stamp: a.LastAttempt.Get(ctx).GetStamp(),
			})

		if heartbeatTimeout := a.GetHeartbeatTimeout().AsDuration(); heartbeatTimeout > 0 {
			ctx.AddTask(
				a,
				chasm.TaskAttributes{
					ScheduledTime: startTime.Add(heartbeatTimeout),
				},
				&activitypb.HeartbeatTimeoutTask{
					Stamp: attempt.GetStamp(),
				})
		}

		return nil
	},
)

TransitionStarted transitions to Started status.

View Source
var TransitionTerminated = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
		activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
		activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED,
	func(a *Activity, ctx chasm.MutableContext, event terminateEvent) error {
		return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error {
			req := event.request.GetFrontendRequest()

			a.TerminateState = &activitypb.ActivityTerminateState{
				RequestId: req.GetRequestId(),
			}
			outcome := a.Outcome.Get(ctx)
			failure := &failurepb.Failure{

				Message:     req.GetReason(),
				FailureInfo: &failurepb.Failure_TerminatedFailureInfo{},
			}
			outcome.Variant = &activitypb.ActivityOutcome_Failed_{
				Failed: &activitypb.ActivityOutcome_Failed{
					Failure: failure,
				},
			}

			metricsHandler := enrichMetricsHandler(
				a,
				event.MetricsHandlerBuilderParams.Handler,
				event.MetricsHandlerBuilderParams.NamespaceName,
				metrics.ActivityTerminatedScope,
				event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue)

			metrics.ActivityTerminate.With(metricsHandler).Record(1)

			return nil
		})
	},
)

TransitionTerminated transitions to Terminated status.

View Source
var TransitionTimedOut = chasm.NewTransition(
	[]activitypb.ActivityExecutionStatus{
		activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
		activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
		activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
	},
	activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT,
	func(a *Activity, ctx chasm.MutableContext, event timeoutEvent) error {
		timeoutType := event.timeoutType

		return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error {
			var err error
			switch timeoutType {
			case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START,
				enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
				err = a.recordScheduleToStartOrCloseTimeoutFailure(ctx, timeoutType)
			case enumspb.TIMEOUT_TYPE_START_TO_CLOSE:
				failure := createStartToCloseTimeoutFailure()
				err = a.recordFailedAttempt(ctx, 0, failure, ctx.Now(a), true)
			case enumspb.TIMEOUT_TYPE_HEARTBEAT:
				failure := createHeartbeatTimeoutFailure()
				err = a.recordFailedAttempt(ctx, 0, failure, ctx.Now(a), true)
			default:
				err = fmt.Errorf("unhandled activity timeout: %v", timeoutType)
			}
			if err != nil {
				return err
			}

			a.emitOnTimedOutMetrics(ctx, event.metricsHandler, timeoutType, event.fromStatus)

			return nil
		})
	},
)

TransitionTimedOut transitions to TimedOut status.

Functions

func InternalStatusToAPIStatus

func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumspb.ActivityExecutionStatus

InternalStatusToAPIStatus converts internal activity execution status to API status.

func NewEmbeddedActivity

func NewEmbeddedActivity(
	ctx chasm.MutableContext,
	state *activitypb.ActivityState,
	parent ActivityStore,
)

func ValidateAndNormalizeEmbeddedActivity

func ValidateAndNormalizeEmbeddedActivity(
	activityID string,
	activityType string,
	getDefaultActivityRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings],
	maxIDLengthLimit int,
	namespaceID namespace.ID,
	options *activitypb.ActivityOptions,
	priority *commonpb.Priority,
	runTimeout *durationpb.Duration,
) error

ValidateAndNormalizeEmbeddedActivity validates and normalizes the attributes for an embedded activity.

func ValidateAndNormalizeStandaloneActivity

func ValidateAndNormalizeStandaloneActivity(
	activityID string,
	activityType string,
	getDefaultActivityRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings],
	maxIDLengthLimit int,
	namespaceID namespace.ID,
	options *activitypb.ActivityOptions,
	priority *commonpb.Priority,
	runTimeout *durationpb.Duration,
) error

ValidateAndNormalizeStandaloneActivity validates and normalizes the attributes for a standalone activity.

Types

type Activity

type Activity struct {
	chasm.UnimplementedComponent

	*activitypb.ActivityState

	Visibility    chasm.Field[*chasm.Visibility]
	LastAttempt   chasm.Field[*activitypb.ActivityAttemptState]
	LastHeartbeat chasm.Field[*activitypb.ActivityHeartbeatState]
	// Standalone only
	RequestData chasm.Field[*activitypb.ActivityRequestData]
	Outcome     chasm.Field[*activitypb.ActivityOutcome]
	// Pointer to an implementation of the "store". For a workflow activity this would be a parent
	// pointer back to the workflow. For a standalone activity this is nil (Activity itself
	// implements the ActivityStore interface).
	// TODO(saa-preview): figure out better naming.
	Store chasm.ParentPtr[ActivityStore]
}

Activity component represents an activity execution persistence object and can be either standalone activity or one embedded within a workflow.

func NewStandaloneActivity

func NewStandaloneActivity(
	ctx chasm.MutableContext,
	request *workflowservice.StartActivityExecutionRequest,
) (*Activity, error)

NewStandaloneActivity creates a new activity component and adds associated tasks to start execution.

func (*Activity) GenerateRecordActivityTaskStartedResponse

func (a *Activity) GenerateRecordActivityTaskStartedResponse(
	ctx chasm.Context,
	namespace string,
) (*historyservice.RecordActivityTaskStartedResponse, error)

GenerateRecordActivityTaskStartedResponse generates the response for HandleStarted.

func (*Activity) HandleCanceled

HandleCanceled updates the activity on activity canceled.

func (*Activity) HandleCompleted

HandleCompleted updates the activity on activity completion.

func (*Activity) HandleFailed

HandleFailed updates the activity on activity failure. if the activity is retryable, it will be rescheduled for retry instead.

func (*Activity) HandleStarted

HandleStarted updates the activity on recording activity task started and populates the response.

func (*Activity) LifecycleState

func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState

LifecycleState implements the chasm.Component interface.

func (*Activity) RecordCompleted

func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error

RecordCompleted applies the provided function to record activity completion.

func (*Activity) RecordHeartbeat

RecordHeartbeat records a heartbeat for the activity.

func (*Activity) SearchAttributes

func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue

SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface. Returns the current search attribute values for this activity execution.

func (*Activity) SetStateMachineState

func (a *Activity) SetStateMachineState(state activitypb.ActivityExecutionStatus)

SetStateMachineState sets the status of the activity.

func (*Activity) StateMachineState

func (a *Activity) StateMachineState() activitypb.ActivityExecutionStatus

StateMachineState returns the current status of the activity.

func (*Activity) StoreOrSelf

func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore

StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone activities), it returns the activity itself.

func (*Activity) Terminate

Terminate implements the chasm.RootComponent interface.

type ActivityStore

type ActivityStore interface {
	// RecordCompleted applies the provided function to record activity completion
	RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
}

type FrontendHandler

func NewFrontendHandler

func NewFrontendHandler(
	client activitypb.ActivityServiceClient,
	config *Config,
	logger log.Logger,
	metricsHandler metrics.Handler,
	namespaceRegistry namespace.Registry,
	saMapperProvider searchattribute.MapperProvider,
	saValidator *searchattribute.Validator,
) FrontendHandler

NewFrontendHandler creates a new FrontendHandler instance for processing activity frontend requests.

type MetricsHandlerBuilderParams

type MetricsHandlerBuilderParams struct {
	Handler                     metrics.Handler
	NamespaceName               string
	BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool]
}

MetricsHandlerBuilderParams contains parameters for building/enriching a metrics handler for activity operations

type RespondCancelledEvent

type RespondCancelledEvent struct {
	Request                     *historyservice.RespondActivityTaskCanceledRequest
	Token                       *tokenspb.Task
	MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

RespondCancelledEvent wraps the RespondActivityTaskCanceledRequest with context-specific data.

type RespondCompletedEvent

type RespondCompletedEvent struct {
	Request                     *historyservice.RespondActivityTaskCompletedRequest
	Token                       *tokenspb.Task
	MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

RespondCompletedEvent wraps the RespondActivityTaskCompletedRequest with context-specific data.

type RespondFailedEvent

type RespondFailedEvent struct {
	Request                     *historyservice.RespondActivityTaskFailedRequest
	Token                       *tokenspb.Task
	MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}

RespondFailedEvent wraps the RespondActivityTaskFailedRequest with context-specific data.

type WithToken

type WithToken[R any] struct {
	Token   *tokenspb.Task
	Request R
}

WithToken wraps a request with its deserialized task token.

Directories

Path Synopsis
gen
activitypb/v1
Code generated by protoc-gen-go-helpers.
Code generated by protoc-gen-go-helpers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL