Documentation
¶
Index ¶
- Variables
- type AddressMatchRule
- type AddressMatchRules
- type BackoffTaskExecutor
- type BackoffTaskExecutorOptions
- type Callback
- type CompletionSource
- type Config
- type EventAttemptFailed
- type EventFailed
- type EventRescheduled
- type EventScheduled
- type EventSucceeded
- type HTTPCaller
- type HTTPCallerProvider
- type InvocationTaskExecutor
- func (e InvocationTaskExecutor) Execute(ctx context.Context, ref chasm.ComponentRef, attrs chasm.TaskAttributes, ...) error
- func (e InvocationTaskExecutor) Invoke(ctx context.Context, ref chasm.ComponentRef, taskAttr chasm.TaskAttributes, ...) error
- func (e InvocationTaskExecutor) Validate(ctx chasm.Context, cb *Callback, attrs chasm.TaskAttributes, ...) (bool, error)
- type InvocationTaskExecutorOptions
- type Library
Constants ¶
This section is empty.
Variables ¶
var ( RequestCounter = metrics.NewCounterDef( "callback_outbound_requests", metrics.WithDescription("The number of callback outbound requests made by the history service."), ) RequestLatencyHistogram = metrics.NewTimerDef( "callback_outbound_latency", metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."), ) )
CHASM callback metrics. These are defined independently from HSM callbacks to avoid coupling between the two implementations.
var AllowedAddresses = dynamicconfig.NewNamespaceTypedSettingWithConverter( "chasm.callback.allowedAddresses", allowedAddressConverter, AddressMatchRules{}, `The per-namespace list of addresses that are allowed for callbacks and whether secure connections (https) are required. URL: "temporal://system" is always allowed for worker callbacks. The default is no address rules. URLs are checked against each in order when starting a workflow with attached callbacks and only need to match one to pass validation. This configuration is required for external endpoint targets; any invalid entries are ignored. Each entry is a map with possible values: - "Pattern":string (required) the host:port pattern to which this config applies. Wildcards, '*', are supported and can match any number of characters (e.g. '*' matches everything, 'prefix.*.domain' matches 'prefix.a.domain' as well as 'prefix.a.b.domain'). - "AllowInsecure":bool (optional, default=false) indicates whether https is required`)
var Module = fx.Module( "chasm.lib.callback", fx.Provide(configProvider), fx.Provide(httpCallerProviderProvider), fx.Provide(NewInvocationTaskExecutor), fx.Provide(NewBackoffTaskExecutor), fx.Provide(newLibrary), fx.Invoke(register), )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "chasm.callback.request.timeout", time.Second*10, `RequestTimeout is the timeout for executing a single callback request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "chasm.callback.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every callback request attempt for a given callback.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "chasm.callback.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every callback request attempt for a given callback.`, )
var TransitionAttemptFailed = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, callbackspb.CALLBACK_STATUS_BACKING_OFF, func(cb *Callback, ctx chasm.MutableContext, event EventAttemptFailed) error { cb.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.Attempt), event.Err) nextAttemptScheduleTime := event.Time.Add(nextDelay) cb.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) cb.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } ctx.AddTask( cb, chasm.TaskAttributes{ScheduledTime: nextAttemptScheduleTime}, &callbackspb.BackoffTask{Attempt: cb.Attempt}, ) return nil }, )
var TransitionFailed = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, callbackspb.CALLBACK_STATUS_FAILED, func(cb *Callback, ctx chasm.MutableContext, event EventFailed) error { cb.recordAttempt(event.Time) cb.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } return nil }, )
var TransitionRescheduled = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_BACKING_OFF}, callbackspb.CALLBACK_STATUS_SCHEDULED, func(cb *Callback, ctx chasm.MutableContext, event EventRescheduled) error { cb.NextAttemptScheduleTime = nil u, err := url.Parse(cb.Callback.GetNexus().Url) if err != nil { return fmt.Errorf("failed to parse URL: %v: %w", cb.Callback, err) } ctx.AddTask( cb, chasm.TaskAttributes{Destination: u.Scheme + "://" + u.Host}, &callbackspb.InvocationTask{Attempt: cb.Attempt}, ) return nil }, )
var TransitionScheduled = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_STANDBY}, callbackspb.CALLBACK_STATUS_SCHEDULED, func(cb *Callback, ctx chasm.MutableContext, event EventScheduled) error { u, err := url.Parse(cb.Callback.GetNexus().GetUrl()) if err != nil { return fmt.Errorf("failed to parse URL: %v: %w", cb.Callback, err) } ctx.AddTask(cb, chasm.TaskAttributes{Destination: u.Scheme + "://" + u.Host}, &callbackspb.InvocationTask{}) return nil }, )
var TransitionSucceeded = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, callbackspb.CALLBACK_STATUS_SUCCEEDED, func(cb *Callback, ctx chasm.MutableContext, event EventSucceeded) error { cb.recordAttempt(event.Time) cb.LastAttemptFailure = nil return nil }, )
Functions ¶
This section is empty.
Types ¶
type AddressMatchRule ¶
func (AddressMatchRule) Allow ¶
func (a AddressMatchRule) Allow(u *url.URL) (bool, error)
Allow validates the URL by: 1. true, nil if the provided url matches the rule and passed validation for the given rule. 2. false, nil if the URL does not match the rule. 3. It false, error if there is a match and the URL fails validation
type AddressMatchRules ¶
type AddressMatchRules struct {
Rules []AddressMatchRule
}
func (AddressMatchRules) Validate ¶
func (a AddressMatchRules) Validate(rawURL string) error
type BackoffTaskExecutor ¶
type BackoffTaskExecutor struct {
// contains filtered or unexported fields
}
func NewBackoffTaskExecutor ¶
func NewBackoffTaskExecutor(opts BackoffTaskExecutorOptions) *BackoffTaskExecutor
func (*BackoffTaskExecutor) Execute ¶
func (e *BackoffTaskExecutor) Execute( ctx chasm.MutableContext, callback *Callback, taskAttrs chasm.TaskAttributes, task *callbackspb.BackoffTask, ) error
Execute transitions the callback from BACKING_OFF to SCHEDULED state and generates an InvocationTask for the next attempt.
func (*BackoffTaskExecutor) Validate ¶
func (e *BackoffTaskExecutor) Validate( ctx chasm.Context, callback *Callback, taskAttr chasm.TaskAttributes, task *callbackspb.BackoffTask, ) (bool, error)
type Callback ¶
type Callback struct {
chasm.UnimplementedComponent
// Persisted internal state
*callbackspb.CallbackState
// Interface to retrieve Nexus operation completion data
CompletionSource chasm.ParentPtr[CompletionSource]
}
Callback represents a callback component in CHASM.
func NewCallback ¶
func NewCallback( requestID string, registrationTime *timestamppb.Timestamp, state *callbackspb.CallbackState, cb *callbackspb.Callback, ) *Callback
func (*Callback) LifecycleState ¶
func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState
func (*Callback) SetStateMachineState ¶
func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus)
func (*Callback) StateMachineState ¶
func (c *Callback) StateMachineState() callbackspb.CallbackStatus
type CompletionSource ¶
type Config ¶
type Config struct {
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
RetryPolicy func() backoff.RetryPolicy
}
type EventAttemptFailed ¶
type EventAttemptFailed struct {
Time time.Time
Err error
RetryPolicy backoff.RetryPolicy
}
EventAttemptFailed is triggered when an attempt is failed with a retryable error.
type EventFailed ¶
EventFailed is triggered when an attempt is failed with a non retryable error.
type EventRescheduled ¶
type EventRescheduled struct{}
EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
type EventScheduled struct{}
EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.
type EventSucceeded ¶
EventSucceeded is triggered when an attempt succeeds.
type HTTPCaller ¶
HTTPCaller is a method that can be used to invoke HTTP requests.
type HTTPCallerProvider ¶
type HTTPCallerProvider func(common.NamespaceIDAndDestination) HTTPCaller
type InvocationTaskExecutor ¶
type InvocationTaskExecutor struct {
// contains filtered or unexported fields
}
func NewInvocationTaskExecutor ¶
func NewInvocationTaskExecutor(opts InvocationTaskExecutorOptions) *InvocationTaskExecutor
func (InvocationTaskExecutor) Execute ¶
func (e InvocationTaskExecutor) Execute(ctx context.Context, ref chasm.ComponentRef, attrs chasm.TaskAttributes, task *callbackspb.InvocationTask) error
func (InvocationTaskExecutor) Invoke ¶
func (e InvocationTaskExecutor) Invoke( ctx context.Context, ref chasm.ComponentRef, taskAttr chasm.TaskAttributes, task *callbackspb.InvocationTask, ) error
func (InvocationTaskExecutor) Validate ¶
func (e InvocationTaskExecutor) Validate(ctx chasm.Context, cb *Callback, attrs chasm.TaskAttributes, task *callbackspb.InvocationTask) (bool, error)
type InvocationTaskExecutorOptions ¶
type InvocationTaskExecutorOptions struct {
fx.In
Config *Config
NamespaceRegistry namespace.Registry
MetricsHandler metrics.Handler
Logger log.Logger
HTTPCallerProvider HTTPCallerProvider
HTTPTraceProvider commonnexus.HTTPClientTraceProvider
HistoryClient resource.HistoryClient
}
type Library ¶
type Library struct {
chasm.UnimplementedLibrary
InvocationTaskExecutor *InvocationTaskExecutor
BackoffTaskExecutor *BackoffTaskExecutor
}
func (*Library) Components ¶
func (l *Library) Components() []*chasm.RegistrableComponent
func (*Library) RegisterServices ¶
func (*Library) Tasks ¶
func (l *Library) Tasks() []*chasm.RegistrableTask
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
gen
|
|
|
callbackpb/v1
Code generated by protoc-gen-go-helpers.
|
Code generated by protoc-gen-go-helpers. |