nexusoperations

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// OperationMachineType is a unique type identifier for the Operation state machine.
	OperationMachineType = "nexusoperations.Operation"

	// CancelationMachineType is a unique type identifier for the Cancelation state machine.
	CancelationMachineType = "nexusoperations.Cancelation"
)
View Source
const (
	TaskTypeInvocation         = "nexusoperations.Invocation"
	TaskTypeBackoff            = "nexusoperations.Backoff"
	TaskTypeCancelation        = "nexusoperations.Cancelation"
	TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff"
	// NOTE: the name `Timeout` is used for backward compatibility with existing persisted tasks and predates the addition of more flexible timeout types.
	TaskTypeScheduleToCloseTimeout = "nexusoperations.Timeout"
	TaskTypeScheduleToStartTimeout = "nexusoperations.ScheduleToStartTimeout"
	TaskTypeStartToCloseTimeout    = "nexusoperations.StartToCloseTimeout"
)
View Source
const NexusCallbackSourceHeader = "Nexus-Callback-Source"

Variables

View Source
var CallbackURLTemplate = dynamicconfig.NewGlobalStringSetting(
	"component.nexusoperations.callback.endpoint.template",
	"unset",
	`Controls the template for generating callback URLs included in Nexus operation requests, which are used to deliver asynchronous completion.
The template can be used to interpolate the {{.NamepaceName}} and {{.NamespaceID}} parameters to construct a publicly accessible URL.
Must be set in order to use Nexus Operations.`,
)
View Source
var CancelationMachineKey = hsm.Key{Type: CancelationMachineType, ID: ""}

CancelationMachineKey is a fixed key for the cancelation machine as a child of the operation machine.

View Source
var DisallowedOperationHeaders = dynamicconfig.NewGlobalTypedSettingWithConverter(
	"component.nexusoperations.disallowedHeaders",
	func(in any) ([]string, error) {
		keys, err := dynamicconfig.ConvertStructure[[]string](nil)(in)
		if err != nil {
			return nil, err
		}
		for i, k := range keys {
			keys[i] = strings.ToLower(k)
		}
		return keys, nil
	},
	[]string{
		"request-timeout",
		interceptor.DCRedirectionApiHeaderName,
		interceptor.DCRedirectionContextHeaderName,
		headers.CallerNameHeaderName,
		headers.CallerTypeHeaderName,
		headers.CallOriginHeaderName,
	},
	`Case insensitive list of disallowed header keys for Nexus Operations.
ScheduleNexusOperation commands with a "nexus_header" field that contains any of these disallowed keys will be
rejected.`,
)
View Source
var ErrInvalidOperationToken = errors.New("invalid operation token")
View Source
var ErrResponseBodyTooLarge = errors.New("http: response body too large")
View Source
var MaxConcurrentOperations = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.concurrency",

	30,
	`MaxConcurrentOperations limits the maximum allowed concurrent Nexus Operations for a given workflow execution.
Once the limit is reached, ScheduleNexusOperation commands will be rejected.`,
)
View Source
var MaxOperationHeaderSize = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.header.size",
	8192,
	`The maximum allowed header size for a Nexus Operation.
ScheduleNexusOperation commands with a "nexus_header" field that exceeds this limit will be rejected.
Uses Go's len() function on header keys and values to determine the total size.`,
)
View Source
var MaxOperationNameLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.name.length",
	1000,
	`MaxOperationNameLength limits the maximum allowed length for a Nexus Operation name.
ScheduleNexusOperation commands with an operation name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
View Source
var MaxOperationScheduleToCloseTimeout = dynamicconfig.NewNamespaceDurationSetting(
	"component.nexusoperations.limit.scheduleToCloseTimeout",
	0,
	`MaxOperationScheduleToCloseTimeout limits the maximum allowed duration of a Nexus Operation. ScheduleOperation
commands that specify no schedule-to-close timeout or a longer timeout than permitted will have their
schedule-to-close timeout capped to this value. 0 implies no limit.`,
)
View Source
var MaxOperationTokenLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.operation.token.length",
	4096,
	`Limits the maximum allowed length for a Nexus Operation token. Tokens returned via start responses or via async
completions that exceed this limit will be rejected. Uses Go's len() function to determine the length.
Leave this limit long enough to fit a workflow ID and namespace name plus padding at minimum since that's what the SDKs
use as the token.`,
)
View Source
var MaxServiceNameLength = dynamicconfig.NewNamespaceIntSetting(
	"component.nexusoperations.limit.service.name.length",
	1000,
	`MaxServiceNameLength limits the maximum allowed length for a Nexus Service name.
ScheduleNexusOperation commands with a service name that exceeds this limit will be rejected.
Uses Go's len() function to determine the length.`,
)
View Source
var MetricTagConfiguration = dynamicconfig.NewGlobalTypedSetting(
	"component.nexusoperations.metrics.tags",
	chasmnexus.NexusMetricTagConfig{},
	`Controls which metric tags are included with Nexus operation metrics. This configuration supports:
1. Service name tag - adds the Nexus service name as a metric dimension (IncludeServiceTag)
2. Operation name tag - adds the Nexus operation name as a metric dimension (IncludeOperationTag)
3. Header-based tags - maps values from request headers to metric tags (HeaderTagMappings)

Note: default metric tags (like namespace, endpoint) are always included and not affected by this configuration.
Adding high-cardinality tags (like unique operation names) can significantly increase metric storage
requirements and query complexity. Consider the cardinality impact when enabling these tags.`,
)
View Source
var MinDispatchTaskTimeout = dynamicconfig.NewNamespaceDurationSetting(
	"component.nexusoperations.limit.dispatch.task.timeout.min",
	time.Second,
	`MinDispatchTaskTimeout is the minimum time remaining for a request to be dispatched to the handler worker.
If the remaining request timeout is less than this value, a timeout error will be returned. Working in conjunction with
MinRequestTimeout, both configs help ensure that the server has enough time to complete a Nexus request.`,
)
View Source
var MinRequestTimeout = dynamicconfig.NewNamespaceDurationSetting(
	"component.nexusoperations.limit.request.timeout.min",
	time.Millisecond*1500,
	`MinRequestTimeout is the minimum time remaining for a request to complete for the server to make
RPCs. If the remaining request timeout is less than this value, a non-retryable timeout error will be returned.`,
)
View Source
var RecordCancelRequestCompletionEvents = dynamicconfig.NewGlobalBoolSetting(
	"component.nexusoperations.recordCancelRequestCompletionEvents",
	true,
	`Boolean flag to control whether to record NexusOperationCancelRequestCompleted and
NexusOperationCancelRequestFailed events. Default true.`,
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"component.nexusoperations.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for making a single nexus start or cancel request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.nexusoperations.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"component.nexusoperations.retryPolicy.maxInterval",
	time.Hour,
	`The maximum backoff interval between every nexus StartOperation or CancelOperation request for a given operation.`,
)
View Source
var TransitionAttemptFailed = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED},
	enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF,
	func(op Operation, event EventAttemptFailed) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(op.Attempt), nil)
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		op.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		op.LastAttemptFailure = event.Failure
		return op.output()
	},
)
View Source
var TransitionCancelationAttemptFailed = hsm.NewTransition(
	[]enumspb.NexusOperationCancellationState{enumspb.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED},
	enumspb.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF,
	func(c Cancelation, event EventCancelationAttemptFailed) (hsm.TransitionOutput, error) {
		c.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(c.Attempt), nil)
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		c.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		c.LastAttemptFailure = event.Failure
		return c.output(event.Node)
	},
)
View Source
var TransitionRescheduled = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF},
	enumsspb.NEXUS_OPERATION_STATE_SCHEDULED,
	func(op Operation, event EventRescheduled) (hsm.TransitionOutput, error) {
		op.NextAttemptScheduleTime = nil
		return op.output()
	},
)
View Source
var TransitionStarted = hsm.NewTransition(
	[]enumsspb.NexusOperationState{enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF},
	enumsspb.NEXUS_OPERATION_STATE_STARTED,
	func(op Operation, event EventStarted) (hsm.TransitionOutput, error) {
		op.recordAttempt(event.Time)
		if event.Attributes.OperationToken != "" {
			op.OperationToken = event.Attributes.OperationToken
		} else if event.Attributes.OperationId != "" {

			op.OperationToken = event.Attributes.OperationId
		}

		op.StartedTime = timestamppb.New(event.Time)

		child, err := op.CancelationNode(event.Node)
		if err != nil {
			return hsm.TransitionOutput{}, err
		}
		if child != nil {
			return hsm.TransitionOutput{}, hsm.MachineTransition(child, func(c Cancelation) (hsm.TransitionOutput, error) {
				return TransitionCancelationScheduled.Apply(c, EventCancelationScheduled{
					Time: event.Time,
					Node: child,
				})
			})
		}

		output, err := op.output()
		if err != nil {
			return output, err
		}

		output.Tasks = append(output.Tasks, op.startToCloseTimeoutTask()...)

		return output, nil
	},
)
View Source
var UseSystemCallbackURL = dynamicconfig.NewGlobalBoolSetting(
	"component.nexusoperations.useSystemCallbackURL",
	true,
	`UseSystemCallbackURL is a global feature toggle that controls how the executor generates
	callback URLs for worker targets in Nexus Operations.When set to true,
	the executor will use the fixed system callback URL ("temporal://system") for all worker targets,
	instead of generating URLs from the callback URL template.
	This simplifies configuration and improves reliability for worker callbacks.
	- false: The executor uses the callback URL template to generate callback URLs for worker targets.
	- true (default): The executor uses the fixed system callback URL ("temporal://system") for worker targets.
	Note: The default will switch to true in future releases.`,
)

Functions

func AddChild

func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventToken []byte) (*hsm.Node, error)

AddChild adds a new operation child machine to the given node and transitions it to the SCHEDULED state.

func CallbackTokenGeneratorProvider

func CallbackTokenGeneratorProvider() *commonnexus.CallbackTokenGenerator

func CompletionHandler

func CompletionHandler(
	ctx context.Context,
	env hsm.Environment,
	ref hsm.Ref,
	requestID string,
	operationToken string,
	startTime *timestamppb.Timestamp,
	links []*commonpb.Link,
	result *commonpb.Payload,
	opFailedError *nexus.OperationError,
) error

func EndpointRegistryLifetimeHooks

func EndpointRegistryLifetimeHooks(lc fx.Lifecycle, registry commonnexus.EndpointRegistry)

func EndpointRegistryProvider

func EndpointRegistryProvider(
	matchingClient resource.MatchingClient,
	endpointManager persistence.NexusEndpointManager,
	dc *dynamicconfig.Collection,
	logger log.Logger,
	metricsHandler metrics.Handler,
) commonnexus.EndpointRegistry

func MachineCollection

func MachineCollection(tree *hsm.Node) hsm.Collection[Operation]

MachineCollection creates a new typed [statemachines.Collection] for operations.

func RegisterEventDefinitions

func RegisterEventDefinitions(reg *hsm.Registry) error

func RegisterExecutor

func RegisterExecutor(
	registry *hsm.Registry,
	options TaskExecutorOptions,
) error

func RegisterStateMachines

func RegisterStateMachines(r *hsm.Registry) error

func RegisterTaskSerializers

func RegisterTaskSerializers(reg *hsm.Registry) error

Types

type BackoffTask

type BackoffTask struct {
	// contains filtered or unexported fields
}

func (BackoffTask) Deadline

func (t BackoffTask) Deadline() time.Time

func (BackoffTask) Destination

func (t BackoffTask) Destination() string

func (BackoffTask) Type

func (BackoffTask) Type() string

func (BackoffTask) Validate

func (t BackoffTask) Validate(_ *persistencespb.StateMachineRef, node *hsm.Node) error

type BackoffTaskSerializer

type BackoffTaskSerializer struct{}

func (BackoffTaskSerializer) Deserialize

func (BackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (BackoffTaskSerializer) Serialize

func (BackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error)

type CancelRequestCompletedEventDefinition

type CancelRequestCompletedEventDefinition struct{}

func (CancelRequestCompletedEventDefinition) Apply

func (CancelRequestCompletedEventDefinition) CherryPick

func (CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger

func (d CancelRequestCompletedEventDefinition) IsWorkflowTaskTrigger() bool

func (CancelRequestCompletedEventDefinition) Type

type CancelRequestFailedEventDefinition

type CancelRequestFailedEventDefinition struct{}

func (CancelRequestFailedEventDefinition) Apply

func (CancelRequestFailedEventDefinition) CherryPick

func (CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger

func (d CancelRequestFailedEventDefinition) IsWorkflowTaskTrigger() bool

func (CancelRequestFailedEventDefinition) Type

type CancelRequestedEventDefinition

type CancelRequestedEventDefinition struct{}

func (CancelRequestedEventDefinition) Apply

func (CancelRequestedEventDefinition) CherryPick

func (CancelRequestedEventDefinition) IsWorkflowTaskTrigger

func (d CancelRequestedEventDefinition) IsWorkflowTaskTrigger() bool

func (CancelRequestedEventDefinition) Type

type Cancelation

type Cancelation struct {
	*persistencespb.NexusOperationCancellationInfo
}

Cancelation state machine for canceling an operation.

func (Cancelation) RegenerateTasks

func (c Cancelation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Cancelation) SetState

func (Cancelation) State

type CancelationBackoffTask

type CancelationBackoffTask struct {
	// contains filtered or unexported fields
}

func (CancelationBackoffTask) Deadline

func (t CancelationBackoffTask) Deadline() time.Time

func (CancelationBackoffTask) Destination

func (CancelationBackoffTask) Destination() string

func (CancelationBackoffTask) Type

func (CancelationBackoffTask) Validate

type CancelationBackoffTaskSerializer

type CancelationBackoffTaskSerializer struct{}

func (CancelationBackoffTaskSerializer) Deserialize

func (CancelationBackoffTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (CancelationBackoffTaskSerializer) Serialize

type CancelationTask

type CancelationTask struct {
	EndpointName string
	Attempt      int32
}

func (CancelationTask) Deadline

func (CancelationTask) Deadline() time.Time

func (CancelationTask) Destination

func (t CancelationTask) Destination() string

func (CancelationTask) Type

func (CancelationTask) Type() string

func (CancelationTask) Validate

type CancelationTaskSerializer

type CancelationTaskSerializer struct{}

func (CancelationTaskSerializer) Deserialize

func (CancelationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (CancelationTaskSerializer) Serialize

func (CancelationTaskSerializer) Serialize(task hsm.Task) ([]byte, error)

type CanceledEventDefinition

type CanceledEventDefinition struct{}

func (CanceledEventDefinition) Apply

func (CanceledEventDefinition) CherryPick

func (d CanceledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error

func (CanceledEventDefinition) IsWorkflowTaskTrigger

func (d CanceledEventDefinition) IsWorkflowTaskTrigger() bool

func (CanceledEventDefinition) Type

type ClientProvider

type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexusrpc.HTTPClient, error)

ClientProvider provides a nexus client for a given endpoint.

func ClientProviderFactory

func ClientProviderFactory(
	httpTransportProvider NexusTransportProvider,
	clusterMetadata cluster.Metadata,
	rpcFactory common.RPCFactory,
	config *Config,
) (ClientProvider, error)

type CompletedEventDefinition

type CompletedEventDefinition struct{}

func (CompletedEventDefinition) Apply

func (CompletedEventDefinition) CherryPick

func (d CompletedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error

func (CompletedEventDefinition) IsWorkflowTaskTrigger

func (d CompletedEventDefinition) IsWorkflowTaskTrigger() bool

func (CompletedEventDefinition) Type

type Config

type Config struct {
	NumHistoryShards                    int32
	RequestTimeout                      dynamicconfig.DurationPropertyFnWithDestinationFilter
	MinRequestTimeout                   dynamicconfig.DurationPropertyFnWithNamespaceFilter
	MaxConcurrentOperations             dynamicconfig.IntPropertyFnWithNamespaceFilter
	MaxServiceNameLength                dynamicconfig.IntPropertyFnWithNamespaceFilter
	MaxOperationNameLength              dynamicconfig.IntPropertyFnWithNamespaceFilter
	MaxOperationTokenLength             dynamicconfig.IntPropertyFnWithNamespaceFilter
	MaxOperationHeaderSize              dynamicconfig.IntPropertyFnWithNamespaceFilter
	DisallowedOperationHeaders          dynamicconfig.TypedPropertyFn[[]string]
	MaxOperationScheduleToCloseTimeout  dynamicconfig.DurationPropertyFnWithNamespaceFilter
	PayloadSizeLimit                    dynamicconfig.IntPropertyFnWithNamespaceFilter
	CallbackURLTemplate                 dynamicconfig.StringPropertyFn
	UseSystemCallbackURL                dynamicconfig.BoolPropertyFn
	UseNewFailureWireFormat             dynamicconfig.BoolPropertyFnWithNamespaceFilter
	RecordCancelRequestCompletionEvents dynamicconfig.BoolPropertyFn
	RetryPolicy                         func() backoff.RetryPolicy
}

func ConfigProvider

func ConfigProvider(dc *dynamicconfig.Collection, cfg *config.Persistence) *Config

type EventAttemptFailed

type EventAttemptFailed struct {
	Time        time.Time
	Failure     *failurepb.Failure
	Node        *hsm.Node
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an invocation attempt is failed with a retryable error.

type EventCancelationAttemptFailed

type EventCancelationAttemptFailed struct {
	Time        time.Time
	Failure     *failurepb.Failure
	Node        *hsm.Node
	RetryPolicy backoff.RetryPolicy
}

EventCancelationAttemptFailed is triggered when a cancelation attempt is failed with a retryable error.

type EventCancelationFailed

type EventCancelationFailed struct {
	Time    time.Time
	Failure *failurepb.Failure
	Node    *hsm.Node
}

EventCancelationFailed is triggered when a cancelation attempt is failed with a non retryable error.

type EventCancelationRescheduled

type EventCancelationRescheduled struct {
	Node *hsm.Node
}

EventCancelationRescheduled is triggered when cancelation is meant to be rescheduled after backing off from a previous attempt.

type EventCancelationScheduled

type EventCancelationScheduled struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationScheduled is triggered when cancelation is meant to be scheduled for the first time - immediately after it has been requested.

type EventCancelationSucceeded

type EventCancelationSucceeded struct {
	Time time.Time
	Node *hsm.Node
}

EventCancelationSucceeded is triggered when a cancelation attempt succeeds.

type EventCanceled

type EventCanceled struct {
	Time time.Time
	Node *hsm.Node
}

EventCanceled is triggered when an invocation attempt succeeds.

type EventFailed

type EventFailed struct {
	Time       time.Time
	Node       *hsm.Node
	Attributes *historypb.NexusOperationFailedEventAttributes
}

EventFailed is triggered when an invocation attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct {
	Node *hsm.Node
}

EventRescheduled is triggered when the operation is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct {
	Node *hsm.Node
}

EventScheduled is triggered when the operation is meant to be scheduled - immediately after initialization.

type EventStarted

type EventStarted struct {
	Time       time.Time
	Node       *hsm.Node
	Attributes *historypb.NexusOperationStartedEventAttributes
}

EventStarted is triggered when an invocation attempt succeeds and the handler indicates that it started an asynchronous operation.

type EventSucceeded

type EventSucceeded struct {
	// Only set if the operation completed synchronously, as a response to a StartOperation RPC.
	Time time.Time
	Node *hsm.Node
}

EventSucceeded is triggered when an invocation attempt succeeds.

type EventTimedOut

type EventTimedOut struct {
	Node *hsm.Node
}

EventTimedOut is triggered when the schedule-to-close timeout is triggered for an operation.

type FailedEventDefinition

type FailedEventDefinition struct{}

func (FailedEventDefinition) Apply

func (d FailedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error

func (FailedEventDefinition) CherryPick

func (d FailedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error

func (FailedEventDefinition) IsWorkflowTaskTrigger

func (d FailedEventDefinition) IsWorkflowTaskTrigger() bool

func (FailedEventDefinition) Type

type InvocationTask

type InvocationTask struct {
	EndpointName string
	Attempt      int32
}

func (InvocationTask) Deadline

func (InvocationTask) Deadline() time.Time

func (InvocationTask) Destination

func (t InvocationTask) Destination() string

func (InvocationTask) Type

func (InvocationTask) Type() string

func (InvocationTask) Validate

type InvocationTaskSerializer

type InvocationTaskSerializer struct{}

func (InvocationTaskSerializer) Deserialize

func (InvocationTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (InvocationTaskSerializer) Serialize

func (InvocationTaskSerializer) Serialize(task hsm.Task) ([]byte, error)

type LimitedReadCloser

type LimitedReadCloser struct {
	R io.ReadCloser
	N int64
}

A LimitedReaderCloser reads from R but limits the amount of data returned to just N bytes. Each call to Read updates N to reflect the new amount remaining. Read returns ErrResponseBodyTooLarge when N <= 0.

func NewLimitedReadCloser

func NewLimitedReadCloser(rc io.ReadCloser, l int64) *LimitedReadCloser

func (*LimitedReadCloser) Close

func (l *LimitedReadCloser) Close() error

func (*LimitedReadCloser) Read

func (l *LimitedReadCloser) Read(p []byte) (n int, err error)

type NexusTransportProvider

type NexusTransportProvider func(namespaceID, serviceName string) http.RoundTripper

NexusTransportProvider type alias allows a provider to customize the default implementation specifically for Nexus.

func DefaultNexusTransportProvider

func DefaultNexusTransportProvider() NexusTransportProvider

type Operation

type Operation struct {
	*persistencespb.NexusOperationInfo
}

Operation state machine.

func (Operation) Cancel

func (o Operation) Cancel(node *hsm.Node, t time.Time, requestedEventID int64) (hsm.TransitionOutput, error)

Cancel marks the Operation machine as canceled by spawning a child Cancelation machine. If the Operation already completed, then the Operation cannot be canceled anymore, and the Cancelation machine will stay in UNSPECIFIED state. If the Operation is in STARTED state, then transition the Cancelation machine to the SCHEDULED state. Otherwise, the Cancelation machine will wait the Operation machine transition to the STARTED state.

func (Operation) Cancelation

func (o Operation) Cancelation(node *hsm.Node) (*Cancelation, error)

func (Operation) CancelationNode

func (o Operation) CancelationNode(node *hsm.Node) (*hsm.Node, error)

func (Operation) RegenerateTasks

func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error)

func (Operation) SetState

func (o Operation) SetState(state enumsspb.NexusOperationState)

func (Operation) State

type ResponseSizeLimiter

type ResponseSizeLimiter struct {
	// contains filtered or unexported fields
}

func (ResponseSizeLimiter) RoundTrip

func (r ResponseSizeLimiter) RoundTrip(request *http.Request) (*http.Response, error)

type ScheduleToCloseTimeoutTask

type ScheduleToCloseTimeoutTask struct {
	// contains filtered or unexported fields
}

func (ScheduleToCloseTimeoutTask) Deadline

func (t ScheduleToCloseTimeoutTask) Deadline() time.Time

func (ScheduleToCloseTimeoutTask) Destination

func (ScheduleToCloseTimeoutTask) Destination() string

func (ScheduleToCloseTimeoutTask) Type

func (ScheduleToCloseTimeoutTask) Validate

Validate checks if the timeout task is still valid to execute for the given node state.

type ScheduleToStartTimeoutTask

type ScheduleToStartTimeoutTask struct {
	// contains filtered or unexported fields
}

func (ScheduleToStartTimeoutTask) Deadline

func (t ScheduleToStartTimeoutTask) Deadline() time.Time

func (ScheduleToStartTimeoutTask) Destination

func (ScheduleToStartTimeoutTask) Destination() string

func (ScheduleToStartTimeoutTask) Type

func (ScheduleToStartTimeoutTask) Validate

Validate checks if the schedule-to-start timeout task is still valid. Only valid if operation is still in SCHEDULED or BACKING_OFF state.

type ScheduleToStartTimeoutTaskSerializer

type ScheduleToStartTimeoutTaskSerializer struct{}

func (ScheduleToStartTimeoutTaskSerializer) Deserialize

func (ScheduleToStartTimeoutTaskSerializer) Serialize

type ScheduledEventDefinition

type ScheduledEventDefinition struct{}

func (ScheduledEventDefinition) Apply

func (ScheduledEventDefinition) CherryPick

func (d ScheduledEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, _ map[enumspb.ResetReapplyExcludeType]struct{}) error

func (ScheduledEventDefinition) IsWorkflowTaskTrigger

func (d ScheduledEventDefinition) IsWorkflowTaskTrigger() bool

func (ScheduledEventDefinition) Type

type StartToCloseTimeoutTask

type StartToCloseTimeoutTask struct {
	// contains filtered or unexported fields
}

func (StartToCloseTimeoutTask) Deadline

func (t StartToCloseTimeoutTask) Deadline() time.Time

func (StartToCloseTimeoutTask) Destination

func (StartToCloseTimeoutTask) Destination() string

func (StartToCloseTimeoutTask) Type

func (StartToCloseTimeoutTask) Validate

Validate checks if the start-to-close timeout task is still valid. Only valid if operation is in STARTED state.

type StartToCloseTimeoutTaskSerializer

type StartToCloseTimeoutTaskSerializer struct{}

func (StartToCloseTimeoutTaskSerializer) Deserialize

func (StartToCloseTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (StartToCloseTimeoutTaskSerializer) Serialize

type StartedEventDefinition

type StartedEventDefinition struct{}

func (StartedEventDefinition) Apply

func (d StartedEventDefinition) Apply(root *hsm.Node, event *historypb.HistoryEvent) error

func (StartedEventDefinition) CherryPick

func (d StartedEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error

func (StartedEventDefinition) IsWorkflowTaskTrigger

func (d StartedEventDefinition) IsWorkflowTaskTrigger() bool

func (StartedEventDefinition) Type

type TaskExecutorOptions

type TaskExecutorOptions struct {
	fx.In

	Config                 *Config
	NamespaceRegistry      namespace.Registry
	MetricsHandler         metrics.Handler
	Logger                 log.Logger
	CallbackTokenGenerator *commonnexus.CallbackTokenGenerator
	ClientProvider         ClientProvider
	EndpointRegistry       commonnexus.EndpointRegistry
	HTTPTraceProvider      commonnexus.HTTPClientTraceProvider
	HistoryClient          resource.HistoryClient
	ChasmRegistry          *chasm.Registry
}

type TimedOutEventDefinition

type TimedOutEventDefinition struct{}

func (TimedOutEventDefinition) Apply

func (TimedOutEventDefinition) CherryPick

func (d TimedOutEventDefinition) CherryPick(root *hsm.Node, event *historypb.HistoryEvent, excludeTypes map[enumspb.ResetReapplyExcludeType]struct{}) error

func (TimedOutEventDefinition) IsWorkflowTaskTrigger

func (d TimedOutEventDefinition) IsWorkflowTaskTrigger() bool

func (TimedOutEventDefinition) Type

type TimeoutTaskSerializer

type TimeoutTaskSerializer struct{}

func (TimeoutTaskSerializer) Deserialize

func (TimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error)

func (TimeoutTaskSerializer) Serialize

func (TimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL