Documentation
¶
Index ¶
- Constants
- Variables
- func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumspb.ActivityExecutionStatus
- func NewEmbeddedActivity(ctx chasm.MutableContext, state *activitypb.ActivityState, ...)
- func ValidateAndNormalizeEmbeddedActivity(activityID string, activityType string, ...) error
- func ValidateAndNormalizeStandaloneActivity(activityID string, activityType string, ...) error
- type Activity
- func (a *Activity) GenerateRecordActivityTaskStartedResponse(ctx chasm.Context, namespace string) (*historyservice.RecordActivityTaskStartedResponse, error)
- func (a *Activity) HandleCanceled(ctx chasm.MutableContext, event RespondCancelledEvent) (*historyservice.RespondActivityTaskCanceledResponse, error)
- func (a *Activity) HandleCompleted(ctx chasm.MutableContext, event RespondCompletedEvent) (*historyservice.RespondActivityTaskCompletedResponse, error)
- func (a *Activity) HandleFailed(ctx chasm.MutableContext, event RespondFailedEvent) (*historyservice.RespondActivityTaskFailedResponse, error)
- func (a *Activity) HandleStarted(ctx chasm.MutableContext, ...) (*historyservice.RecordActivityTaskStartedResponse, error)
- func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState
- func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
- func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, ...) (*historyservice.RecordActivityTaskHeartbeatResponse, error)
- func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue
- func (a *Activity) SetStateMachineState(state activitypb.ActivityExecutionStatus)
- func (a *Activity) StateMachineState() activitypb.ActivityExecutionStatus
- func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore
- func (a *Activity) Terminate(_ chasm.MutableContext, _ chasm.TerminateComponentRequest) (chasm.TerminateComponentResponse, error)
- type ActivityStore
- type Config
- type FrontendHandler
- type MetricsHandlerBuilderParams
- type RespondCancelledEvent
- type RespondCompletedEvent
- type RespondFailedEvent
- type WithToken
Constants ¶
const ( // WorkflowTypeTag is a required workflow tag for standalone activities to ensure consistent // metric labeling between workflows and activities. WorkflowTypeTag = "__temporal_standalone_activity__" )
Variables ¶
var ( TypeSearchAttribute = chasm.NewSearchAttributeKeyword("ActivityType", chasm.SearchAttributeFieldKeyword01) StatusSearchAttribute = chasm.NewSearchAttributeKeyword("ExecutionStatus", chasm.SearchAttributeFieldLowCardinalityKeyword01) )
var ( Enabled = dynamicconfig.NewNamespaceBoolSetting( "activity.enableStandalone", false, `Toggles standalone activity functionality on the server.`, ) LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting( "activity.longPollTimeout", 20*time.Second, `Timeout for activity long-poll requests.`, ) LongPollBuffer = dynamicconfig.NewNamespaceDurationSetting( "activity.longPollBuffer", time.Second, `A buffer used to adjust the activity long-poll timeouts. Specifically, activity long-poll requests are timed out at a time which leaves at least the buffer's duration remaining before the caller's deadline, if permitted by the caller's deadline.`, ) )
var ( Archetype = chasm.FullyQualifiedName(libraryName, componentName) ArchetypeID = chasm.GenerateTypeID(Archetype) )
var ErrStandaloneActivityDisabled = serviceerror.NewUnimplemented("Standalone activity is disabled")
var FrontendModule = fx.Module( "activity-frontend", fx.Provide(ConfigProvider), fx.Provide(activitypb.NewActivityServiceLayeredClient), fx.Provide(NewFrontendHandler), fx.Provide(resource.SearchAttributeValidatorProvider), fx.Invoke(func(registry *chasm.Registry) error { return registry.Register(newComponentOnlyLibrary()) }), )
var HistoryModule = fx.Module( "activity-history", fx.Provide( ConfigProvider, newActivityDispatchTaskExecutor, newScheduleToStartTimeoutTaskExecutor, newScheduleToCloseTimeoutTaskExecutor, newStartToCloseTimeoutTaskExecutor, newHeartbeatTimeoutTaskExecutor, newHandler, newLibrary, ), fx.Invoke(func(l *library, registry *chasm.Registry) error { return registry.Register(l) }), )
var TransitionCancelRequested = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, func(a *Activity, ctx chasm.MutableContext, req *workflowservice.RequestCancelActivityExecutionRequest) error { a.CancelState = &activitypb.ActivityCancelState{ Identity: req.GetIdentity(), RequestId: req.GetRequestId(), Reason: req.GetReason(), RequestTime: timestamppb.New(ctx.Now(a)), } return nil }, )
TransitionCancelRequested transitions to CancelRequested status.
var TransitionCanceled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, func(a *Activity, ctx chasm.MutableContext, event cancelEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: "Activity canceled", FailureInfo: &failurepb.Failure_CanceledFailureInfo{ CanceledFailureInfo: &failurepb.CanceledFailureInfo{ Details: event.details, }, }, } outcome.Variant = &activitypb.ActivityOutcome_Failed_{ Failed: &activitypb.ActivityOutcome_Failed{ Failure: failure, }, } a.emitOnCanceledMetrics(ctx, event.handler, event.fromStatus) return nil }) }, )
TransitionCanceled transitions to Canceled status.
var TransitionCompleted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, func(a *Activity, ctx chasm.MutableContext, event completeEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { req := event.req.GetCompleteRequest() attempt := a.LastAttempt.Get(ctx) attempt.CompleteTime = timestamppb.New(ctx.Now(a)) attempt.LastWorkerIdentity = req.GetIdentity() outcome := a.Outcome.Get(ctx) outcome.Variant = &activitypb.ActivityOutcome_Successful_{ Successful: &activitypb.ActivityOutcome_Successful{ Output: req.GetResult(), }, } a.emitOnCompletedMetrics(ctx, event.metricsHandler) return nil }) }, )
TransitionCompleted transitions to Completed status.
var TransitionFailed = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, func(a *Activity, ctx chasm.MutableContext, event failedEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { req := event.req.GetFailedRequest() if details := req.GetLastHeartbeatDetails(); details != nil { heartbeat := a.getOrCreateLastHeartbeat(ctx) heartbeat.Details = details heartbeat.RecordedTime = timestamppb.New(ctx.Now(a)) } attempt := a.LastAttempt.Get(ctx) attempt.LastWorkerIdentity = req.GetIdentity() if err := a.recordFailedAttempt(ctx, 0, req.GetFailure(), ctx.Now(a), true); err != nil { return err } a.emitOnFailedMetrics(ctx, event.metricsHandler) return nil }) }, )
TransitionFailed transitions to Failed status.
var TransitionRescheduled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { attempt := a.LastAttempt.Get(ctx) currentTime := ctx.Now(a) attempt.Count++ attempt.Stamp++ err := a.recordFailedAttempt(ctx, event.retryInterval, event.failure, currentTime, false) if err != nil { return err } retryScheduledTime := attemptScheduleTimeForRetry(attempt).AsTime() if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: retryScheduledTime.Add(timeout), }, &activitypb.ScheduleToStartTimeoutTask{ Stamp: attempt.GetStamp(), }) } ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: retryScheduledTime, }, &activitypb.ActivityDispatchTask{ Stamp: attempt.GetStamp(), }) return nil }, )
TransitionRescheduled transitions to Scheduled from Started, which happens on retries. The event to pass in is the failure to be recorded from the previously failed attempt.
var TransitionScheduled = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_UNSPECIFIED, }, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, func(a *Activity, ctx chasm.MutableContext, _ any) error { attempt := a.LastAttempt.Get(ctx) currentTime := ctx.Now(a) attempt.Count++ attempt.Stamp++ if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: currentTime.Add(timeout), }, &activitypb.ScheduleToStartTimeoutTask{ Stamp: attempt.GetStamp(), }) } if timeout := a.GetScheduleToCloseTimeout().AsDuration(); timeout > 0 { ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: currentTime.Add(timeout), }, &activitypb.ScheduleToCloseTimeoutTask{}) } ctx.AddTask( a, chasm.TaskAttributes{}, &activitypb.ActivityDispatchTask{ Stamp: attempt.GetStamp(), }) return nil }, )
TransitionScheduled transitions to Scheduled status. This is only called on the initial scheduling of the activity.
var TransitionStarted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, }, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, func(a *Activity, ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) error { attempt := a.LastAttempt.Get(ctx) attempt.StartedTime = timestamppb.New(ctx.Now(a)) attempt.StartRequestId = request.GetRequestId() attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity() if versionDirective := request.GetVersionDirective().GetDeploymentVersion(); versionDirective != nil { attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{ BuildId: versionDirective.GetBuildId(), DeploymentName: versionDirective.GetDeploymentName(), } } startTime := attempt.GetStartedTime().AsTime() ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: startTime.Add(a.GetStartToCloseTimeout().AsDuration()), }, &activitypb.StartToCloseTimeoutTask{ Stamp: a.LastAttempt.Get(ctx).GetStamp(), }) if heartbeatTimeout := a.GetHeartbeatTimeout().AsDuration(); heartbeatTimeout > 0 { ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: startTime.Add(heartbeatTimeout), }, &activitypb.HeartbeatTimeoutTask{ Stamp: attempt.GetStamp(), }) } return nil }, )
TransitionStarted transitions to Started status.
var TransitionTerminated = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, func(a *Activity, ctx chasm.MutableContext, event terminateEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { req := event.request.GetFrontendRequest() a.TerminateState = &activitypb.ActivityTerminateState{ RequestId: req.GetRequestId(), } outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: req.GetReason(), FailureInfo: &failurepb.Failure_TerminatedFailureInfo{}, } outcome.Variant = &activitypb.ActivityOutcome_Failed_{ Failed: &activitypb.ActivityOutcome_Failed{ Failure: failure, }, } metricsHandler := enrichMetricsHandler( a, event.MetricsHandlerBuilderParams.Handler, event.MetricsHandlerBuilderParams.NamespaceName, metrics.ActivityTerminatedScope, event.MetricsHandlerBuilderParams.BreakdownMetricsByTaskQueue) metrics.ActivityTerminate.With(metricsHandler).Record(1) return nil }) }, )
TransitionTerminated transitions to Terminated status.
var TransitionTimedOut = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, func(a *Activity, ctx chasm.MutableContext, event timeoutEvent) error { timeoutType := event.timeoutType return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { var err error switch timeoutType { case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE: err = a.recordScheduleToStartOrCloseTimeoutFailure(ctx, timeoutType) case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: failure := createStartToCloseTimeoutFailure() err = a.recordFailedAttempt(ctx, 0, failure, ctx.Now(a), true) case enumspb.TIMEOUT_TYPE_HEARTBEAT: failure := createHeartbeatTimeoutFailure() err = a.recordFailedAttempt(ctx, 0, failure, ctx.Now(a), true) default: err = fmt.Errorf("unhandled activity timeout: %v", timeoutType) } if err != nil { return err } a.emitOnTimedOutMetrics(ctx, event.metricsHandler, timeoutType, event.fromStatus) return nil }) }, )
TransitionTimedOut transitions to TimedOut status.
Functions ¶
func InternalStatusToAPIStatus ¶
func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumspb.ActivityExecutionStatus
InternalStatusToAPIStatus converts internal activity execution status to API status.
func NewEmbeddedActivity ¶
func NewEmbeddedActivity( ctx chasm.MutableContext, state *activitypb.ActivityState, parent ActivityStore, )
func ValidateAndNormalizeEmbeddedActivity ¶
func ValidateAndNormalizeEmbeddedActivity( activityID string, activityType string, getDefaultActivityRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings], maxIDLengthLimit int, namespaceID namespace.ID, options *activitypb.ActivityOptions, priority *commonpb.Priority, runTimeout *durationpb.Duration, ) error
ValidateAndNormalizeEmbeddedActivity validates and normalizes the attributes for an embedded activity.
func ValidateAndNormalizeStandaloneActivity ¶
func ValidateAndNormalizeStandaloneActivity( activityID string, activityType string, getDefaultActivityRetrySettings dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings], maxIDLengthLimit int, namespaceID namespace.ID, options *activitypb.ActivityOptions, priority *commonpb.Priority, runTimeout *durationpb.Duration, ) error
ValidateAndNormalizeStandaloneActivity validates and normalizes the attributes for a standalone activity.
Types ¶
type Activity ¶
type Activity struct {
chasm.UnimplementedComponent
*activitypb.ActivityState
Visibility chasm.Field[*chasm.Visibility]
LastAttempt chasm.Field[*activitypb.ActivityAttemptState]
LastHeartbeat chasm.Field[*activitypb.ActivityHeartbeatState]
// Standalone only
RequestData chasm.Field[*activitypb.ActivityRequestData]
Outcome chasm.Field[*activitypb.ActivityOutcome]
// Pointer to an implementation of the "store". For a workflow activity this would be a parent
// pointer back to the workflow. For a standalone activity this is nil (Activity itself
// implements the ActivityStore interface).
// TODO(saa-preview): figure out better naming.
Store chasm.ParentPtr[ActivityStore]
}
Activity component represents an activity execution persistence object and can be either standalone activity or one embedded within a workflow.
func NewStandaloneActivity ¶
func NewStandaloneActivity( ctx chasm.MutableContext, request *workflowservice.StartActivityExecutionRequest, ) (*Activity, error)
NewStandaloneActivity creates a new activity component and adds associated tasks to start execution.
func (*Activity) GenerateRecordActivityTaskStartedResponse ¶
func (a *Activity) GenerateRecordActivityTaskStartedResponse( ctx chasm.Context, namespace string, ) (*historyservice.RecordActivityTaskStartedResponse, error)
GenerateRecordActivityTaskStartedResponse generates the response for HandleStarted.
func (*Activity) HandleCanceled ¶
func (a *Activity) HandleCanceled( ctx chasm.MutableContext, event RespondCancelledEvent, ) (*historyservice.RespondActivityTaskCanceledResponse, error)
HandleCanceled updates the activity on activity canceled.
func (*Activity) HandleCompleted ¶
func (a *Activity) HandleCompleted( ctx chasm.MutableContext, event RespondCompletedEvent, ) (*historyservice.RespondActivityTaskCompletedResponse, error)
HandleCompleted updates the activity on activity completion.
func (*Activity) HandleFailed ¶
func (a *Activity) HandleFailed( ctx chasm.MutableContext, event RespondFailedEvent, ) (*historyservice.RespondActivityTaskFailedResponse, error)
HandleFailed updates the activity on activity failure. if the activity is retryable, it will be rescheduled for retry instead.
func (*Activity) HandleStarted ¶
func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) ( *historyservice.RecordActivityTaskStartedResponse, error, )
HandleStarted updates the activity on recording activity task started and populates the response.
func (*Activity) LifecycleState ¶
func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState
LifecycleState implements the chasm.Component interface.
func (*Activity) RecordCompleted ¶
func (a *Activity) RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
RecordCompleted applies the provided function to record activity completion.
func (*Activity) RecordHeartbeat ¶
func (a *Activity) RecordHeartbeat( ctx chasm.MutableContext, input WithToken[*historyservice.RecordActivityTaskHeartbeatRequest], ) (*historyservice.RecordActivityTaskHeartbeatResponse, error)
RecordHeartbeat records a heartbeat for the activity.
func (*Activity) SearchAttributes ¶
func (a *Activity) SearchAttributes(_ chasm.Context) []chasm.SearchAttributeKeyValue
SearchAttributes implements chasm.VisibilitySearchAttributesProvider interface. Returns the current search attribute values for this activity execution.
func (*Activity) SetStateMachineState ¶
func (a *Activity) SetStateMachineState(state activitypb.ActivityExecutionStatus)
SetStateMachineState sets the status of the activity.
func (*Activity) StateMachineState ¶
func (a *Activity) StateMachineState() activitypb.ActivityExecutionStatus
StateMachineState returns the current status of the activity.
func (*Activity) StoreOrSelf ¶
func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore
StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone activities), it returns the activity itself.
func (*Activity) Terminate ¶
func (a *Activity) Terminate( _ chasm.MutableContext, _ chasm.TerminateComponentRequest, ) (chasm.TerminateComponentResponse, error)
Terminate implements the chasm.RootComponent interface.
type ActivityStore ¶
type ActivityStore interface {
// RecordCompleted applies the provided function to record activity completion
RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
}
type Config ¶
type Config struct {
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool]
Enabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
LongPollBuffer dynamicconfig.DurationPropertyFnWithNamespaceFilter
LongPollTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
MaxIDLengthLimit dynamicconfig.IntPropertyFn
DefaultActivityRetryPolicy dynamicconfig.TypedPropertyFnWithNamespaceFilter[retrypolicy.DefaultRetrySettings]
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
}
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection) *Config
type FrontendHandler ¶
type FrontendHandler interface {
StartActivityExecution(ctx context.Context, req *workflowservice.StartActivityExecutionRequest) (*workflowservice.StartActivityExecutionResponse, error)
DescribeActivityExecution(ctx context.Context, req *workflowservice.DescribeActivityExecutionRequest) (*workflowservice.DescribeActivityExecutionResponse, error)
PollActivityExecution(ctx context.Context, req *workflowservice.PollActivityExecutionRequest) (*workflowservice.PollActivityExecutionResponse, error)
CountActivityExecutions(context.Context, *workflowservice.CountActivityExecutionsRequest) (*workflowservice.CountActivityExecutionsResponse, error)
DeleteActivityExecution(context.Context, *workflowservice.DeleteActivityExecutionRequest) (*workflowservice.DeleteActivityExecutionResponse, error)
ListActivityExecutions(context.Context, *workflowservice.ListActivityExecutionsRequest) (*workflowservice.ListActivityExecutionsResponse, error)
RequestCancelActivityExecution(context.Context, *workflowservice.RequestCancelActivityExecutionRequest) (*workflowservice.RequestCancelActivityExecutionResponse, error)
TerminateActivityExecution(context.Context, *workflowservice.TerminateActivityExecutionRequest) (*workflowservice.TerminateActivityExecutionResponse, error)
IsStandaloneActivityEnabled(namespaceName string) bool
}
func NewFrontendHandler ¶
func NewFrontendHandler( client activitypb.ActivityServiceClient, config *Config, logger log.Logger, metricsHandler metrics.Handler, namespaceRegistry namespace.Registry, saMapperProvider searchattribute.MapperProvider, saValidator *searchattribute.Validator, ) FrontendHandler
NewFrontendHandler creates a new FrontendHandler instance for processing activity frontend requests.
type MetricsHandlerBuilderParams ¶
type MetricsHandlerBuilderParams struct {
Handler metrics.Handler
NamespaceName string
BreakdownMetricsByTaskQueue dynamicconfig.TypedPropertyFnWithTaskQueueFilter[bool]
}
MetricsHandlerBuilderParams contains parameters for building/enriching a metrics handler for activity operations
type RespondCancelledEvent ¶
type RespondCancelledEvent struct {
Request *historyservice.RespondActivityTaskCanceledRequest
Token *tokenspb.Task
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}
RespondCancelledEvent wraps the RespondActivityTaskCanceledRequest with context-specific data.
type RespondCompletedEvent ¶
type RespondCompletedEvent struct {
Request *historyservice.RespondActivityTaskCompletedRequest
Token *tokenspb.Task
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}
RespondCompletedEvent wraps the RespondActivityTaskCompletedRequest with context-specific data.
type RespondFailedEvent ¶
type RespondFailedEvent struct {
Request *historyservice.RespondActivityTaskFailedRequest
Token *tokenspb.Task
MetricsHandlerBuilderParams MetricsHandlerBuilderParams
}
RespondFailedEvent wraps the RespondActivityTaskFailedRequest with context-specific data.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
gen
|
|
|
activitypb/v1
Code generated by protoc-gen-go-helpers.
|
Code generated by protoc-gen-go-helpers. |