Documentation
¶
Index ¶
- Variables
- type Cancellation
- type CancellationBackoffTaskExecutor
- type CancellationTaskExecutor
- type CancellationTaskExecutorOptions
- type Config
- type EventAttemptFailed
- type EventCanceled
- type EventCancellationAttemptFailed
- type EventCancellationFailed
- type EventCancellationRescheduled
- type EventCancellationScheduled
- type EventCancellationSucceeded
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventStarted
- type EventSucceeded
- type EventTimedOut
- type Library
- type NexusHeaderTagMapping
- type NexusMetricTagConfig
- type Operation
- type OperationBackoffTaskExecutor
- type OperationInvocationTaskExecutor
- type OperationTaskExecutorOptions
- type OperationTimeoutTaskExecutor
Constants ¶
This section is empty.
Variables ¶
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
"nexusoperation.callback.endpoint.template",
"unset",
`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver
asynchronous completion for external endpoint targets. The template can be used to interpolate the {{.NamepaceName}}
and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set to call external endpoints.`,
)
var ChasmNexusEnabled = dynamicconfig.NewGlobalBoolSetting( "nexusoperation.enableChasm", false, `Feature flag that controls whether the legacy HSM-based implementation (when flag is false; default) or the newer CHASM-based implementation of Nexus will be used when scheduling new Nexus Operations.`, )
var DisallowedOperationHeaders = dynamicconfig.NewGlobalTypedSettingWithConverter( "nexusoperation.disallowedHeaders", func(in any) ([]string, error) { keys, err := dynamicconfig.ConvertStructure[[]string](nil)(in) if err != nil { return nil, err } for i, k := range keys { keys[i] = strings.ToLower(k) } return keys, nil }, []string{ "request-timeout", interceptor.DCRedirectionApiHeaderName, interceptor.DCRedirectionContextHeaderName, headers.CallerNameHeaderName, headers.CallerTypeHeaderName, headers.CallOriginHeaderName, }, `Case insensitive list of disallowed header keys for Nexus Operations. ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be rejected.`, )
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
"nexusoperation.limit.operation.concurrency",
2000,
`Limits the maximum allowed concurrent Nexus Operations for a given workflow execution. Once the limit is reached,
ScheduleNexusOperation commands will be rejected.`,
)
var MaxOperationHeaderSize = dynamicconfig.NewNamespaceIntSetting(
"nexusoperation.limit.header.size",
8192,
`The maximum allowed header size for a Nexus Operation.
ScheduleNexusOperation commands with a "nexus_header" field that exceeds this limit will be rejected.
Uses Go's len() function on header keys and values to determine the total size.`,
)
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
"nexusoperation.limit.operation.name.length",
1000,
`Limits the maximum allowed length for a Nexus Operation name. ScheduleNexusOperation commands with an operation name
that exceeds this limit will be rejected. Uses Go's len() function to determine the length.`,
)
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
"nexusoperation.limit.scheduleToCloseTimeout",
0,
`Maximum allowed duration of a Nexus Operation. ScheduleOperation commands that specify no schedule-to-close timeout
or a longer timeout than permitted will have their schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
var MaxOperationTokenLength = dynamicconfig.NewNamespaceIntSetting(
"nexusoperation.limit.operation.token.length",
4096,
`Limits the maximum allowed length for a Nexus Operation token. Tokens returned via start responses or via async
completions that exceed this limit will be rejected. Uses Go's len() function to determine the length.
Leave this limit long enough to fit a workflow ID and namespace name plus padding at minimum since that's what the SDKs
use as the token.`,
)
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
"nexusoperation.limit.service.name.length",
1000,
`Limits the maximum allowed length for a Nexus Service name. ScheduleNexusOperation commands with a service name that
exceeds this limit will be rejected. Uses Go's len() function to determine the length.`,
)
var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting( "nexusoperation.metrics.tags", NexusMetricTagConfig{}, `Controls which metric tags are included with Nexus operation metrics. This configuration supports: 1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag) 2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag) 3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings) Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration. Adding high-cardinality tags (like unique operation names) can significantly increase metric storage requirements and query complexity. Consider the cardinality impact when enabling these tags.`, )
var MinDispatchTaskTimeout = dynamicconfig.NewNamespaceDurationSetting( "nexusoperation.limit.dispatch.task.timeout.min", time.Second, `Minimum time remaining for a request to be dispatched to the handler worker. If the remaining request timeout is less than this value, a timeout error will be returned. Working in conjunction with MinRequestTimeout, both configs help ensure that the server has enough time to complete a Nexus request.`, )
var MinRequestTimeout = dynamicconfig.NewNamespaceDurationSetting( "nexusoperation.limit.request.timeout.min", time.Millisecond*1500, `Minimum time remaining for a request to complete for the server to make RPCs. If the remaining request timeout is less than this value, a non-retryable timeout error will be returned.`, )
var Module = fx.Module( "chasm.lib.nexusoperations", fx.Provide(configProvider), fx.Provide(NewOperationInvocationTaskExecutor), fx.Provide(NewOperationBackoffTaskExecutor), fx.Provide(NewOperationTimeoutTaskExecutor), fx.Provide(NewCancellationTaskExecutor), fx.Provide(NewCancellationBackoffTaskExecutor), fx.Provide(newLibrary), fx.Invoke(register), )
var OutboundRequestCounter = metrics.NewCounterDef( "nexus_outbound_requests", metrics.WithDescription("The number of Nexus outbound requests made by the history service."), )
var OutboundRequestLatency = metrics.NewTimerDef( "nexus_outbound_latency", metrics.WithDescription("Latency of outbound Nexus requests made by the history service."), )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "nexusoperation.request.timeout", time.Second*10, `Timeout for making a single nexus start or cancel request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "nexusoperation.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "nexusoperation.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`, )
var UseNewFailureWireFormat = dynamicconfig.NewNamespaceBoolSetting( "nexusoperation.useNewFailureWireFormat", true, `Controls whether to use the new failure wire format via an HTTP header that is attached to StartOperation requests. Added for safety. Defaults to true. Likely to be removed in future server versions.`, )
Functions ¶
This section is empty.
Types ¶
type Cancellation ¶
type Cancellation struct {
chasm.UnimplementedComponent
// Persisted internal state
*nexusoperationpb.CancellationState
}
func NewCancellation ¶
func NewCancellation() *Cancellation
func (*Cancellation) LifecycleState ¶
func (o *Cancellation) LifecycleState(_ chasm.Context) chasm.LifecycleState
func (*Cancellation) SetStateMachineState ¶
func (o *Cancellation) SetStateMachineState(status nexusoperationpb.CancellationStatus)
func (*Cancellation) StateMachineState ¶
func (o *Cancellation) StateMachineState() nexusoperationpb.CancellationStatus
type CancellationBackoffTaskExecutor ¶
type CancellationBackoffTaskExecutor struct {
// contains filtered or unexported fields
}
func NewCancellationBackoffTaskExecutor ¶
func NewCancellationBackoffTaskExecutor(opts CancellationTaskExecutorOptions) *CancellationBackoffTaskExecutor
func (*CancellationBackoffTaskExecutor) Execute ¶
func (e *CancellationBackoffTaskExecutor) Execute( ctx chasm.MutableContext, cancellation *Cancellation, attrs chasm.TaskAttributes, task *nexusoperationpb.CancellationBackoffTask, ) error
func (*CancellationBackoffTaskExecutor) Validate ¶
func (e *CancellationBackoffTaskExecutor) Validate( ctx chasm.Context, cancellation *Cancellation, attrs chasm.TaskAttributes, task *nexusoperationpb.CancellationBackoffTask, ) (bool, error)
type CancellationTaskExecutor ¶
type CancellationTaskExecutor struct {
// contains filtered or unexported fields
}
func NewCancellationTaskExecutor ¶
func NewCancellationTaskExecutor(opts CancellationTaskExecutorOptions) *CancellationTaskExecutor
func (*CancellationTaskExecutor) Execute ¶
func (e *CancellationTaskExecutor) Execute( ctx context.Context, cancelRef chasm.ComponentRef, attrs chasm.TaskAttributes, task *nexusoperationpb.CancellationTask, ) error
func (*CancellationTaskExecutor) Validate ¶
func (e *CancellationTaskExecutor) Validate( ctx chasm.Context, cancellation *Cancellation, attrs chasm.TaskAttributes, task *nexusoperationpb.CancellationTask, ) (bool, error)
type Config ¶
type Config struct {
ChasmEnabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
ChasmNexusEnabled dynamicconfig.BoolPropertyFn
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
MinRequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxServiceNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationTokenLength dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationHeaderSize dynamicconfig.IntPropertyFnWithNamespaceFilter
DisallowedOperationHeaders dynamicconfig.TypedPropertyFn[[]string]
MaxOperationScheduleToCloseTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
PayloadSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackURLTemplate dynamicconfig.StringPropertyFn
UseNewFailureWireFormat dynamicconfig.BoolPropertyFnWithNamespaceFilter
RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
RetryPolicy func() backoff.RetryPolicy
}
type EventAttemptFailed ¶
type EventAttemptFailed struct {
}
EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.
type EventCanceled ¶
type EventCanceled struct {
}
EventCanceled is triggered when an operation is completed as canceled.
type EventCancellationAttemptFailed ¶
type EventCancellationAttemptFailed struct {
}
EventCancellationAttemptFailed is triggered when a cancellation attempt is failed with a retryable error.
type EventCancellationFailed ¶
type EventCancellationFailed struct {
}
EventCancellationFailed is triggered when a cancellation attempt is failed with a non retryable error.
type EventCancellationRescheduled ¶
type EventCancellationRescheduled struct {
}
EventCancellationRescheduled is triggered when cancellation is meant to be rescheduled after backing off from a previous attempt.
type EventCancellationScheduled ¶
type EventCancellationScheduled struct {
}
EventCancellationScheduled is triggered when cancellation is meant to be scheduled for the first time - immediately after it has been requested.
type EventCancellationSucceeded ¶
type EventCancellationSucceeded struct {
}
EventCancellationSucceeded is triggered when a cancellation attempt succeeds.
type EventFailed ¶
type EventFailed struct {
}
EventFailed is triggered when an invocation attempt is failed with a non retryable error.
type EventRescheduled ¶
type EventRescheduled struct {
}
EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
type EventScheduled struct {
}
EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.
type EventStarted ¶
type EventStarted struct {
}
EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.
type EventSucceeded ¶
type EventSucceeded struct {
}
EventSucceeded is triggered when an invocation attempt succeeds.
type EventTimedOut ¶
type EventTimedOut struct {
}
EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.
type Library ¶
type Library struct {
chasm.UnimplementedLibrary
OperationInvocationTaskExecutor *OperationInvocationTaskExecutor
OperationBackoffTaskExecutor *OperationBackoffTaskExecutor
OperationTimeoutTaskExecutor *OperationTimeoutTaskExecutor
CancellationTaskExecutor *CancellationTaskExecutor
CancellationBackoffTaskExecutor *CancellationBackoffTaskExecutor
}
func (*Library) Components ¶
func (l *Library) Components() []*chasm.RegistrableComponent
func (*Library) RegisterServices ¶
func (*Library) Tasks ¶
func (l *Library) Tasks() []*chasm.RegistrableTask
type NexusHeaderTagMapping ¶
type NexusMetricTagConfig ¶
type NexusMetricTagConfig struct {
// Include service name as a metric tag
IncludeServiceTag bool
// Include operation name as a metric tag
IncludeOperationTag bool
// Configuration for mapping request headers to metric tags
HeaderTagMappings []NexusHeaderTagMapping
}
type Operation ¶
type Operation struct {
chasm.UnimplementedComponent
// Persisted internal state
*nexusoperationpb.OperationState
}
func NewOperation ¶
func NewOperation() *Operation
func (*Operation) LifecycleState ¶
func (o *Operation) LifecycleState(_ chasm.Context) chasm.LifecycleState
func (*Operation) SetStateMachineState ¶
func (o *Operation) SetStateMachineState(status nexusoperationpb.OperationStatus)
func (*Operation) StateMachineState ¶
func (o *Operation) StateMachineState() nexusoperationpb.OperationStatus
type OperationBackoffTaskExecutor ¶
type OperationBackoffTaskExecutor struct {
// contains filtered or unexported fields
}
func NewOperationBackoffTaskExecutor ¶
func NewOperationBackoffTaskExecutor(opts OperationTaskExecutorOptions) *OperationBackoffTaskExecutor
func (*OperationBackoffTaskExecutor) Execute ¶
func (e *OperationBackoffTaskExecutor) Execute( ctx chasm.MutableContext, op *Operation, attrs chasm.TaskAttributes, task *nexusoperationpb.InvocationBackoffTask, ) error
func (*OperationBackoffTaskExecutor) Validate ¶
func (e *OperationBackoffTaskExecutor) Validate( ctx chasm.Context, op *Operation, attrs chasm.TaskAttributes, task *nexusoperationpb.InvocationBackoffTask, ) (bool, error)
type OperationInvocationTaskExecutor ¶
type OperationInvocationTaskExecutor struct {
// contains filtered or unexported fields
}
func NewOperationInvocationTaskExecutor ¶
func NewOperationInvocationTaskExecutor(opts OperationTaskExecutorOptions) *OperationInvocationTaskExecutor
func (*OperationInvocationTaskExecutor) Execute ¶
func (e *OperationInvocationTaskExecutor) Execute( ctx context.Context, opRef chasm.ComponentRef, attrs chasm.TaskAttributes, task *nexusoperationpb.InvocationTask, ) error
func (*OperationInvocationTaskExecutor) Validate ¶
func (e *OperationInvocationTaskExecutor) Validate( ctx chasm.Context, op *Operation, attrs chasm.TaskAttributes, task *nexusoperationpb.InvocationTask, ) (bool, error)
type OperationTaskExecutorOptions ¶
type OperationTaskExecutorOptions struct {
fx.In
Config *Config
MetricsHandler metrics.Handler
Logger log.Logger
}
OperationTaskExecutorOptions is the fx parameter object for common options supplied to all operation task executors.
type OperationTimeoutTaskExecutor ¶
type OperationTimeoutTaskExecutor struct {
// contains filtered or unexported fields
}
func NewOperationTimeoutTaskExecutor ¶
func NewOperationTimeoutTaskExecutor(opts OperationTaskExecutorOptions) *OperationTimeoutTaskExecutor
func (*OperationTimeoutTaskExecutor) Execute ¶
func (e *OperationTimeoutTaskExecutor) Execute( ctx chasm.MutableContext, op *Operation, attrs chasm.TaskAttributes, task *nexusoperationpb.InvocationTimeoutTask, ) error
func (*OperationTimeoutTaskExecutor) Validate ¶
func (e *OperationTimeoutTaskExecutor) Validate( ctx chasm.Context, op *Operation, attrs chasm.TaskAttributes, task *nexusoperationpb.InvocationTimeoutTask, ) (bool, error)
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
gen
|
|
|
nexusoperationpb/v1
Code generated by protoc-gen-go-helpers.
|
Code generated by protoc-gen-go-helpers. |