Documentation
¶
Index ¶
- Constants
- Variables
- func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error)
- func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
- func CompletionHandler(ctx context.Context, env hsm.Environment, ref hsm.Ref, requestID string, ...) error
- func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
- func EndpointRegistryProvider(matchingClient resource.MatchingClient, ...) commonnexus.EndpointRegistry
- func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
- func RegisterEventDefinitions(reg *hsm.Registry) error
- func RegisterExecutor(registry *hsm.Registry, options TaskExecutorOptions) error
- func RegisterStateMachines(r *hsm.Registry) error
- func RegisterTaskSerializers(reg *hsm.Registry) error
- type BackoffTask
- type BackoffTaskSerializer
- type CancelRequestCompletedEventDefinition
- func (d CancelRequestCompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestCompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestCompletedEventDefinition) Type() enumspb.EventType
- type CancelRequestFailedEventDefinition
- func (d CancelRequestFailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestFailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestFailedEventDefinition) Type() enumspb.EventType
- type CancelRequestedEventDefinition
- func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CancelRequestedEventDefinition) Type() enumspb.EventType
- type Cancelation
- type CancelationBackoffTask
- type CancelationBackoffTaskSerializer
- type CancelationTask
- type CancelationTaskSerializer
- type CanceledEventDefinition
- func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CanceledEventDefinition) Type() enumspb.EventType
- type ClientProvider
- type CompletedEventDefinition
- func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d CompletedEventDefinition) Type() enumspb.EventType
- type Config
- type EventAttemptFailed
- type EventCancelationAttemptFailed
- type EventCancelationFailed
- type EventCancelationRescheduled
- type EventCancelationScheduled
- type EventCancelationSucceeded
- type EventCanceled
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventStarted
- type EventSucceeded
- type EventTimedOut
- type FailedEventDefinition
- func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d FailedEventDefinition) Type() enumspb.EventType
- type InvocationTask
- type InvocationTaskSerializer
- type LimitedReadCloser
- type NexusTransportProvider
- type Operation
- func (o Operation) Cancel(node *hsm.Node, t time.Time, requestedEventID int64) (hsm.TransitionOutput, error)
- func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
- func (o Operation) CancelationNode(node *hsm.Node) (*hsm.Node, error)
- func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)
- func (o Operation) SetState(state enumsspb.NexusOperationState)
- func (o Operation) State() enumsspb.NexusOperationState
- type ResponseSizeLimiter
- type ScheduleToCloseTimeoutTask
- type ScheduleToStartTimeoutTask
- type ScheduleToStartTimeoutTaskSerializer
- type ScheduledEventDefinition
- func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
- func (d ScheduledEventDefinition) Type() enumspb.EventType
- type StartToCloseTimeoutTask
- type StartToCloseTimeoutTaskSerializer
- type StartedEventDefinition
- func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
- func (d StartedEventDefinition) Type() enumspb.EventType
- type TaskExecutorOptions
- type TimedOutEventDefinition
- func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
- func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, ...) error
- func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
- func (d TimedOutEventDefinition) Type() enumspb.EventType
- type TimeoutTaskSerializer
Constants ¶
const ( // OperationMachineType is a unique type identifier for the Operation state machine. OperationMachineType = "nexusoperations.Operation" // CancelationMachineType is a unique type identifier for the Cancelation state machine. CancelationMachineType = "nexusoperations.Cancelation" )
const ( TaskTypeInvocation = "nexusoperations.Invocation" TaskTypeBackoff = "nexusoperations.Backoff" TaskTypeCancelation = "nexusoperations.Cancelation" TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff" // NOTE: the name `Timeout` is used for backward compatibility with existing persisted tasks and predates the addition of more flexible timeout types. TaskTypeScheduleToCloseTimeout = "nexusoperations.Timeout" TaskTypeScheduleToStartTimeout = "nexusoperations.ScheduleToStartTimeout" TaskTypeStartToCloseTimeout = "nexusoperations.StartToCloseTimeout" )
const NexusCallbackSourceHeader = "Nexus-Callback-Source"
Variables ¶
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
"component.nexusoperations.callback.endpoint.template",
"unset",
`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
var CancelationMachineKey = hsm.Key{Type: CancelationMachineType, ID: ""}
CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.
var DisallowedOperationHeaders = dynamicconfig.NewGlobalTypedSettingWithConverter( "component.nexusoperations.disallowedHeaders", func(in any) ([]string, error) { keys, err := dynamicconfig.ConvertStructure[[]string](nil)(in) if err != nil { return nil, err } for i, k := range keys { keys[i] = strings.ToLower(k) } return keys, nil }, []string{ "request-timeout", interceptor.DCRedirectionApiHeaderName, interceptor.DCRedirectionContextHeaderName, headers.CallerNameHeaderName, headers.CallerTypeHeaderName, headers.CallOriginHeaderName, }, `Case insensitive list of disallowed header keys for Nexus Operations. ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be rejected.`, )
var ErrInvalidOperationToken = errors.New("invalid operation token")
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.concurrency",
30,
`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
var MaxOperationHeaderSize = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.header.size",
8192,
`The maximum allowed header size for a Nexus Operation.
ScheduleNexusOperation commands with a "nexus_header" field that exceeds this limit will be rejected.
Uses Go's len() function on header keys and values to determine the total size.`,
)
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.name.length",
1000,
`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
"component.nexusoperations.limit.scheduleToCloseTimeout",
0,
`MaxOperationScheduleToCloseTimeout limits the maximum allowed duration of a Nexus Operation. ScheduleOperation
commands that specify no schedule-to-close timeout or a longer timeout than permitted will have their
schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
var MaxOperationTokenLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.operation.token.length",
4096,
`Limits the maximum allowed length for a Nexus Operation token. Tokens returned via start responses or via async
completions that exceed this limit will be rejected. Uses Go's len() function to determine the length.
Leave this limit long enough to fit a workflow ID and namespace name plus padding at minimum since that's what the SDKs
use as the token.`,
)
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
"component.nexusoperations.limit.service.name.length",
1000,
`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting( "component.nexusoperations.metrics.tags", chasmnexus.NexusMetricTagConfig{}, `Controls which metric tags are included with Nexus operation metrics. This configuration supports: 1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag) 2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag) 3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings) Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration. Adding high-cardinality tags (like unique operation names) can significantly increase metric storage requirements and query complexity. Consider the cardinality impact when enabling these tags.`, )
var MinDispatchTaskTimeout = dynamicconfig.NewNamespaceDurationSetting( "component.nexusoperations.limit.dispatch.task.timeout.min", time.Second, `MinDispatchTaskTimeout is the minimum time remaining for a request to be dispatched to the handler worker. If the remaining request timeout is less than this value, a timeout error will be returned. Working in conjunction with MinRequestTimeout, both configs help ensure that the server has enough time to complete a Nexus request.`, )
var MinRequestTimeout = dynamicconfig.NewNamespaceDurationSetting( "component.nexusoperations.limit.request.timeout.min", time.Millisecond*1500, `MinRequestTimeout is the minimum time remaining for a request to complete for the server to make RPCs. If the remaining request timeout is less than this value, a non-retryable timeout error will be returned.`, )
var Module = fx.Module( "component.nexusoperations", fx.Provide(ConfigProvider), fx.Provide(ClientProviderFactory), fx.Provide(DefaultNexusTransportProvider), fx.Provide(CallbackTokenGeneratorProvider), fx.Provide(EndpointRegistryProvider), fx.Invoke(EndpointRegistryLifetimeHooks), fx.Invoke(RegisterStateMachines), fx.Invoke(RegisterTaskSerializers), fx.Invoke(RegisterEventDefinitions), fx.Invoke(RegisterExecutor), )
var RecordCancelRequestCompletionEvents = dynamicconfig.NewGlobalBoolSetting( "component.nexusoperations.recordCancelRequestCompletionEvents", true, `Boolean flag to control whether to record NexusOperationCancelRequestCompleted and NexusOperationCancelRequestFailed events. Default true.`, )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "component.nexusoperations.request.timeout", time.Second*10, `RequestTimeout is the timeout for making a single nexus start or cancel request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "component.nexusoperations.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var TransitionAttemptFailed = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED}, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt), nil) nextAttemptScheduleTime := event.Time.Add(nextDelay) op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) op.LastAttemptFailure = event.Failure return op.output() }, )
var TransitionCancelationAttemptFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF, func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt), nil) nextAttemptScheduleTime := event.Time.Add(nextDelay) c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) c.LastAttemptFailure = event.Failure return c.output(event.Node) }, )
var TransitionCancelationFailed = hsm.NewTransition( []enumspb.NexusOperationCancellationState{ enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, }, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, func(c Cancelation, event EventCancelationFailed) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) c.LastAttemptFailure = event.Failure return c.output(event.Node) }, )
var TransitionCancelationRescheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(c Cancelation, event EventCancelationRescheduled) (hsm.TransitionOutput, error) { c.NextAttemptScheduleTime = nil return c.output(event.Node) }, )
var TransitionCancelationScheduled = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED, func(op Cancelation, event EventCancelationScheduled) (hsm.TransitionOutput, error) { op.RequestedTime = timestamppb.New(event.Time) return op.output(event.Node) }, )
var TransitionCancelationSucceeded = hsm.NewTransition( []enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED}, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SUCCEEDED, func(c Cancelation, event EventCancelationSucceeded) (hsm.TransitionOutput, error) { c.recordAttempt(event.Time) return c.output(event.Node) }, )
var TransitionCanceled = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_CANCELED, func(op Operation, event EventCanceled) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionFailed = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_FAILED, func(op Operation, event EventFailed) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionRescheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) { op.NextAttemptScheduleTime = nil return op.output() }, )
var TransitionScheduled = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED}, enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, func(op Operation, event EventScheduled) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionStarted = hsm.NewTransition( []enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF}, enumsspb.NEXUS_OPERATION_STATE_STARTED, func(op Operation, event EventStarted) (hsm.TransitionOutput, error) { op.recordAttempt(event.Time) if event.Attributes.OperationToken != "" { op.OperationToken = event.Attributes.OperationToken } else if event.Attributes.OperationId != "" { op.OperationToken = event.Attributes.OperationId } op.StartedTime = timestamppb.New(event.Time) child, err := op.CancelationNode(event.Node) if err != nil { return hsm.TransitionOutput{}, err } if child != nil { return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) { return TransitionCancelationScheduled.Apply(c, EventCancelationScheduled{ Time: event.Time, Node: child, }) }) } output, err := op.output() if err != nil { return output, err } output.Tasks = append(output.Tasks, op.startToCloseTimeoutTask()...) return output, nil }, )
var TransitionSucceeded = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_SUCCEEDED, func(op Operation, event EventSucceeded) (hsm.TransitionOutput, error) { return op.output() }, )
var TransitionTimedOut = hsm.NewTransition( []enumsspb.NexusOperationState{ enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, enumsspb.NEXUS_OPERATION_STATE_STARTED, }, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, func(op Operation, event EventTimedOut) (hsm.TransitionOutput, error) { return op.output() }, )
var UseSystemCallbackURL = dynamicconfig.NewGlobalBoolSetting( "component.nexusoperations.useSystemCallbackURL", true, `UseSystemCallbackURL is a global feature toggle that controls how the executor generates callback URLs for worker targets in Nexus Operations.When set to true, the executor will use the fixed system callback URL ("temporal://system") for all worker targets, instead of generating URLs from the callback URL template. This simplifies configuration and improves reliability for worker callbacks. - false: The executor uses the callback URL template to generate callback URLs for worker targets. - true (default): The executor uses the fixed system callback URL ("temporal://system") for worker targets. Note: The default will switch to true in future releases.`, )
Functions ¶
func AddChild ¶
func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error)
AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.
func CallbackTokenGeneratorProvider ¶
func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator
func CompletionHandler ¶
func EndpointRegistryLifetimeHooks ¶
func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)
func EndpointRegistryProvider ¶
func EndpointRegistryProvider( matchingClient resource.MatchingClient, endpointManager persistence.NexusEndpointManager, dc *dynamicconfig.Collection, logger log.Logger, metricsHandler metrics.Handler, ) commonnexus.EndpointRegistry
func MachineCollection ¶
func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]
MachineCollection creates a new typed [statemachines.Collection] for operations.
func RegisterExecutor ¶
func RegisterExecutor( registry *hsm.Registry, options TaskExecutorOptions, ) error
func RegisterStateMachines ¶
func RegisterTaskSerializers ¶
Types ¶
type BackoffTask ¶
type BackoffTask struct {
// contains filtered or unexported fields
}
func (BackoffTask) Deadline ¶
func (t BackoffTask) Deadline() time.Time
func (BackoffTask) Destination ¶
func (t BackoffTask) Destination() string
func (BackoffTask) Type ¶
func (BackoffTask) Type() string
func (BackoffTask) Validate ¶
func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error
type BackoffTaskSerializer ¶
type BackoffTaskSerializer struct{}
func (BackoffTaskSerializer) Deserialize ¶
func (BackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelRequestCompletedEventDefinition ¶
type CancelRequestCompletedEventDefinition struct{}
func (CancelRequestCompletedEventDefinition) Apply ¶
func (d CancelRequestCompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestCompletedEventDefinition) CherryPick ¶
func (d CancelRequestCompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestCompletedEventDefinition) Type ¶
func (d CancelRequestCompletedEventDefinition) Type() enumspb.EventType
type CancelRequestFailedEventDefinition ¶
type CancelRequestFailedEventDefinition struct{}
func (CancelRequestFailedEventDefinition) Apply ¶
func (d CancelRequestFailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestFailedEventDefinition) CherryPick ¶
func (d CancelRequestFailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestFailedEventDefinition) Type ¶
func (d CancelRequestFailedEventDefinition) Type() enumspb.EventType
type CancelRequestedEventDefinition ¶
type CancelRequestedEventDefinition struct{}
func (CancelRequestedEventDefinition) Apply ¶
func (d CancelRequestedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CancelRequestedEventDefinition) CherryPick ¶
func (d CancelRequestedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool
func (CancelRequestedEventDefinition) Type ¶
func (d CancelRequestedEventDefinition) Type() enumspb.EventType
type Cancelation ¶
type Cancelation struct {
*persistencespb.NexusOperationCancellationInfo
}
Cancelation state machine for canceling an operation.
func (Cancelation) RegenerateTasks ¶
func (Cancelation) SetState ¶
func (c Cancelation) SetState(state enumspb.NexusOperationCancellationState)
func (Cancelation) State ¶
func (c Cancelation) State() enumspb.NexusOperationCancellationState
type CancelationBackoffTask ¶
type CancelationBackoffTask struct {
// contains filtered or unexported fields
}
func (CancelationBackoffTask) Deadline ¶
func (t CancelationBackoffTask) Deadline() time.Time
func (CancelationBackoffTask) Destination ¶
func (CancelationBackoffTask) Destination() string
func (CancelationBackoffTask) Type ¶
func (CancelationBackoffTask) Type() string
func (CancelationBackoffTask) Validate ¶
func (CancelationBackoffTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationBackoffTaskSerializer ¶
type CancelationBackoffTaskSerializer struct{}
func (CancelationBackoffTaskSerializer) Deserialize ¶
func (CancelationBackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CancelationTask ¶
func (CancelationTask) Deadline ¶
func (CancelationTask) Deadline() time.Time
func (CancelationTask) Destination ¶
func (t CancelationTask) Destination() string
func (CancelationTask) Type ¶
func (CancelationTask) Type() string
func (CancelationTask) Validate ¶
func (CancelationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type CancelationTaskSerializer ¶
type CancelationTaskSerializer struct{}
func (CancelationTaskSerializer) Deserialize ¶
func (CancelationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type CanceledEventDefinition ¶
type CanceledEventDefinition struct{}
func (CanceledEventDefinition) Apply ¶
func (d CanceledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CanceledEventDefinition) CherryPick ¶
func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CanceledEventDefinition) IsWorkflowTaskTrigger ¶
func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool
func (CanceledEventDefinition) Type ¶
func (d CanceledEventDefinition) Type() enumspb.EventType
type ClientProvider ¶
type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexusrpc.HTTPClient, error)
ClientProvider provides a nexus client for a given endpoint.
func ClientProviderFactory ¶
func ClientProviderFactory( httpTransportProvider NexusTransportProvider, clusterMetadata cluster.Metadata, rpcFactory common.RPCFactory, config *Config, ) (ClientProvider, error)
type CompletedEventDefinition ¶
type CompletedEventDefinition struct{}
func (CompletedEventDefinition) Apply ¶
func (d CompletedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (CompletedEventDefinition) CherryPick ¶
func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (CompletedEventDefinition) IsWorkflowTaskTrigger ¶
func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool
func (CompletedEventDefinition) Type ¶
func (d CompletedEventDefinition) Type() enumspb.EventType
type Config ¶
type Config struct {
NumHistoryShards int32
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationHeaderSize dynamicconfig.IntPropertyFnWithNamespaceFilter
DisallowedOperationHeaders dynamicconfig.TypedPropertyFn[[]string]
MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackURLTemplate dynamicconfig.StringPropertyFn
UseSystemCallbackURL dynamicconfig.BoolPropertyFn
UseNewFailureWireFormat dynamicconfig.BoolPropertyFnWithNamespaceFilter
RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
RetryPolicy func() backoff.RetryPolicy
}
func ConfigProvider ¶
func ConfigProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Config
type EventAttemptFailed ¶
type EventAttemptFailed struct {
Time time.Time
Failure *failurepb.Failure
Node *hsm.Node
RetryPolicy backoff.RetryPolicy
}
EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.
type EventCancelationAttemptFailed ¶
type EventCancelationAttemptFailed struct {
Time time.Time
Failure *failurepb.Failure
Node *hsm.Node
RetryPolicy backoff.RetryPolicy
}
EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.
type EventCancelationFailed ¶
EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.
type EventCancelationRescheduled ¶
EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.
type EventCancelationScheduled ¶
EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.
type EventCancelationSucceeded ¶
EventCancelationSucceeded is triggered when a cancelation attempt succeeds.
type EventCanceled ¶
EventCanceled is triggered when an invocation attempt succeeds.
type EventFailed ¶
type EventFailed struct {
Time time.Time
Node *hsm.Node
Attributes *historypb.NexusOperationFailedEventAttributes
}
EventFailed is triggered when an invocation attempt is failed with a non retryable error.
type EventRescheduled ¶
EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.
type EventStarted ¶
type EventStarted struct {
Time time.Time
Node *hsm.Node
Attributes *historypb.NexusOperationStartedEventAttributes
}
EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.
type EventSucceeded ¶
type EventSucceeded struct {
// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
Time time.Time
Node *hsm.Node
}
EventSucceeded is triggered when an invocation attempt succeeds.
type EventTimedOut ¶
EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
type FailedEventDefinition ¶
type FailedEventDefinition struct{}
func (FailedEventDefinition) Apply ¶
func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (FailedEventDefinition) CherryPick ¶
func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (FailedEventDefinition) IsWorkflowTaskTrigger ¶
func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool
func (FailedEventDefinition) Type ¶
func (d FailedEventDefinition) Type() enumspb.EventType
type InvocationTask ¶
func (InvocationTask) Deadline ¶
func (InvocationTask) Deadline() time.Time
func (InvocationTask) Destination ¶
func (t InvocationTask) Destination() string
func (InvocationTask) Type ¶
func (InvocationTask) Type() string
func (InvocationTask) Validate ¶
func (InvocationTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
type InvocationTaskSerializer ¶
type InvocationTaskSerializer struct{}
func (InvocationTaskSerializer) Deserialize ¶
func (InvocationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type LimitedReadCloser ¶
type LimitedReadCloser struct {
R io.ReadCloser
N int64
}
A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.
func NewLimitedReadCloser ¶
func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser
func (*LimitedReadCloser) Close ¶
func (l *LimitedReadCloser) Close() error
type NexusTransportProvider ¶
type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper
NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.
func DefaultNexusTransportProvider ¶
func DefaultNexusTransportProvider() NexusTransportProvider
type Operation ¶
type Operation struct {
*persistencespb.NexusOperationInfo
}
Operation state machine.
func (Operation) Cancel ¶
func (o Operation) Cancel(node *hsm.Node, t time.Time, requestedEventID int64) (hsm.TransitionOutput, error)
Cancel marks the Operation machine as canceled by spawning a child Cancelation machine. If the Operation already completed, then the Operation cannot be canceled anymore, and the Cancelation machine will stay in UNSPECIFIED state. If the Operation is in STARTED state, then transition the Cancelation machine to the SCHEDULED state. Otherwise, the Cancelation machine will wait the Operation machine transition to the STARTED state.
func (Operation) Cancelation ¶
func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)
func (Operation) CancelationNode ¶
func (Operation) RegenerateTasks ¶
func (Operation) SetState ¶
func (o Operation) SetState(state enumsspb.NexusOperationState)
func (Operation) State ¶
func (o Operation) State() enumsspb.NexusOperationState
type ResponseSizeLimiter ¶
type ResponseSizeLimiter struct {
// contains filtered or unexported fields
}
type ScheduleToCloseTimeoutTask ¶
type ScheduleToCloseTimeoutTask struct {
// contains filtered or unexported fields
}
func (ScheduleToCloseTimeoutTask) Deadline ¶
func (t ScheduleToCloseTimeoutTask) Deadline() time.Time
func (ScheduleToCloseTimeoutTask) Destination ¶
func (ScheduleToCloseTimeoutTask) Destination() string
func (ScheduleToCloseTimeoutTask) Type ¶
func (ScheduleToCloseTimeoutTask) Type() string
func (ScheduleToCloseTimeoutTask) Validate ¶
func (t ScheduleToCloseTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
Validate checks if the timeout task is still valid to execute for the given node state.
type ScheduleToStartTimeoutTask ¶
type ScheduleToStartTimeoutTask struct {
// contains filtered or unexported fields
}
func (ScheduleToStartTimeoutTask) Deadline ¶
func (t ScheduleToStartTimeoutTask) Deadline() time.Time
func (ScheduleToStartTimeoutTask) Destination ¶
func (ScheduleToStartTimeoutTask) Destination() string
func (ScheduleToStartTimeoutTask) Type ¶
func (ScheduleToStartTimeoutTask) Type() string
func (ScheduleToStartTimeoutTask) Validate ¶
func (t ScheduleToStartTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
Validate checks if the schedule-to-start timeout task is still valid. Only valid if operation is still in SCHEDULED or BACKING_OFF state.
type ScheduleToStartTimeoutTaskSerializer ¶
type ScheduleToStartTimeoutTaskSerializer struct{}
func (ScheduleToStartTimeoutTaskSerializer) Deserialize ¶
func (ScheduleToStartTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type ScheduledEventDefinition ¶
type ScheduledEventDefinition struct{}
func (ScheduledEventDefinition) Apply ¶
func (d ScheduledEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (ScheduledEventDefinition) CherryPick ¶
func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error
func (ScheduledEventDefinition) IsWorkflowTaskTrigger ¶
func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool
func (ScheduledEventDefinition) Type ¶
func (d ScheduledEventDefinition) Type() enumspb.EventType
type StartToCloseTimeoutTask ¶
type StartToCloseTimeoutTask struct {
// contains filtered or unexported fields
}
func (StartToCloseTimeoutTask) Deadline ¶
func (t StartToCloseTimeoutTask) Deadline() time.Time
func (StartToCloseTimeoutTask) Destination ¶
func (StartToCloseTimeoutTask) Destination() string
func (StartToCloseTimeoutTask) Type ¶
func (StartToCloseTimeoutTask) Type() string
func (StartToCloseTimeoutTask) Validate ¶
func (t StartToCloseTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error
Validate checks if the start-to-close timeout task is still valid. Only valid if operation is in STARTED state.
type StartToCloseTimeoutTaskSerializer ¶
type StartToCloseTimeoutTaskSerializer struct{}
func (StartToCloseTimeoutTaskSerializer) Deserialize ¶
func (StartToCloseTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)
type StartedEventDefinition ¶
type StartedEventDefinition struct{}
func (StartedEventDefinition) Apply ¶
func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (StartedEventDefinition) CherryPick ¶
func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (StartedEventDefinition) IsWorkflowTaskTrigger ¶
func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool
func (StartedEventDefinition) Type ¶
func (d StartedEventDefinition) Type() enumspb.EventType
type TaskExecutorOptions ¶
type TaskExecutorOptions struct {
fx.In
Config *Config
NamespaceRegistry namespace.Registry
MetricsHandler metrics.Handler
Logger log.Logger
CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
ClientProvider ClientProvider
EndpointRegistry commonnexus.EndpointRegistry
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
HistoryClient resource.HistoryClient
ChasmRegistry *chasm.Registry
}
type TimedOutEventDefinition ¶
type TimedOutEventDefinition struct{}
func (TimedOutEventDefinition) Apply ¶
func (d TimedOutEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error
func (TimedOutEventDefinition) CherryPick ¶
func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error
func (TimedOutEventDefinition) IsWorkflowTaskTrigger ¶
func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool
func (TimedOutEventDefinition) Type ¶
func (d TimedOutEventDefinition) Type() enumspb.EventType
type TimeoutTaskSerializer ¶
type TimeoutTaskSerializer struct{}
func (TimeoutTaskSerializer) Deserialize ¶
func (TimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)