Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( RequestCounter = metrics.NewCounterDef( "callback_outbound_requests", metrics.WithDescription("The number of callback outbound requests made by the history service."), ) RequestLatencyHistogram = metrics.NewTimerDef( "callback_outbound_latency", metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."), ) )
CHASM callback metrics. These are defined independently from HSM callbacks to avoid coupling between the two implementations.
var AllowedAddresses = dynamicconfig.NewNamespaceTypedSettingWithConverter( "chasm.callback.allowedAddresses", allowedAddressConverter, AddressMatchRules{}, `The per-namespace list of addresses that are allowed for callbacks and whether secure connections (https) are required. URL: "temporal://system" is always allowed for worker callbacks. The default is no address rules. URLs are checked against each in order when starting a workflow with attached callbacks and only need to match one to pass validation. This configuration is required for external endpoint targets; any invalid entries are ignored. Each entry is a map with possible values: - "Pattern":string (required) the host:port pattern to which this config applies. Wildcards, '*', are supported and can match any number of characters (e.g. '*' matches everything, 'prefix.*.domain' matches 'prefix.a.domain' as well as 'prefix.a.b.domain'). - "AllowInsecure":bool (optional, default=false) indicates whether https is required`)
var Module = fx.Module( "chasm.lib.callback", fx.Provide(configProvider), fx.Provide(httpCallerProviderProvider), fx.Provide(newInvocationTaskHandler), fx.Provide(newBackoffTaskHandler), fx.Provide(newLibrary), fx.Invoke(register), )
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "callback.request.timeout", time.Second*10, `RequestTimeout is the timeout for executing a single callback request.`, )
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting( "callback.retryPolicy.initialInterval", time.Second, `The initial backoff interval between every callback request attempt for a given callback.`, )
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting( "callback.retryPolicy.maxInterval", time.Hour, `The maximum backoff interval between every callback request attempt for a given callback.`, )
var TransitionAttemptFailed = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, callbackspb.CALLBACK_STATUS_BACKING_OFF, func(cb *Callback, ctx chasm.MutableContext, event EventAttemptFailed) error { cb.recordAttempt(event.Time) nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.Attempt), event.Err) nextAttemptScheduleTime := event.Time.Add(nextDelay) cb.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime) cb.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: false, }, }, } ctx.AddTask( cb, chasm.TaskAttributes{ScheduledTime: nextAttemptScheduleTime}, &callbackspb.BackoffTask{Attempt: cb.Attempt}, ) return nil }, )
var TransitionFailed = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, callbackspb.CALLBACK_STATUS_FAILED, func(cb *Callback, ctx chasm.MutableContext, event EventFailed) error { cb.recordAttempt(event.Time) cb.LastAttemptFailure = &failurepb.Failure{ Message: event.Err.Error(), FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ NonRetryable: true, }, }, } return nil }, )
var TransitionRescheduled = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_BACKING_OFF}, callbackspb.CALLBACK_STATUS_SCHEDULED, func(cb *Callback, ctx chasm.MutableContext, event EventRescheduled) error { cb.NextAttemptScheduleTime = nil u, err := url.Parse(cb.Callback.GetNexus().Url) if err != nil { return fmt.Errorf("failed to parse URL: %v: %w", cb.Callback, err) } ctx.AddTask( cb, chasm.TaskAttributes{Destination: u.Scheme + "://" + u.Host}, &callbackspb.InvocationTask{Attempt: cb.Attempt}, ) return nil }, )
var TransitionScheduled = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_STANDBY}, callbackspb.CALLBACK_STATUS_SCHEDULED, func(cb *Callback, ctx chasm.MutableContext, event EventScheduled) error { u, err := url.Parse(cb.Callback.GetNexus().GetUrl()) if err != nil { return fmt.Errorf("failed to parse URL: %v: %w", cb.Callback, err) } ctx.AddTask(cb, chasm.TaskAttributes{Destination: u.Scheme + "://" + u.Host}, &callbackspb.InvocationTask{}) return nil }, )
var TransitionSucceeded = chasm.NewTransition( []callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED}, callbackspb.CALLBACK_STATUS_SUCCEEDED, func(cb *Callback, ctx chasm.MutableContext, event EventSucceeded) error { cb.recordAttempt(event.Time) cb.LastAttemptFailure = nil return nil }, )
Functions ¶
This section is empty.
Types ¶
type AddressMatchRule ¶
func (AddressMatchRule) Allow ¶
func (a AddressMatchRule) Allow(u *url.URL) (bool, error)
Allow validates the URL by: 1. true, nil if the provided url matches the rule and passed validation for the given rule. 2. false, nil if the URL does not match the rule. 3. It false, error if there is a match and the URL fails validation
type AddressMatchRules ¶
type AddressMatchRules struct {
Rules []AddressMatchRule
}
func (AddressMatchRules) Validate ¶
func (a AddressMatchRules) Validate(rawURL string) error
type Callback ¶
type Callback struct {
chasm.UnimplementedComponent
// Persisted internal state
*callbackspb.CallbackState
// Interface to retrieve Nexus operation completion data
CompletionSource chasm.ParentPtr[CompletionSource]
}
Callback represents a callback component in CHASM.
func NewCallback ¶
func NewCallback( requestID string, registrationTime *timestamppb.Timestamp, state *callbackspb.CallbackState, cb *callbackspb.Callback, ) *Callback
func (*Callback) LifecycleState ¶
func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState
func (*Callback) SetStateMachineState ¶
func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus)
func (*Callback) StateMachineState ¶
func (c *Callback) StateMachineState() callbackspb.CallbackStatus
type CompletionSource ¶
type Config ¶
type Config struct {
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
RetryPolicy func() backoff.RetryPolicy
}
type EventAttemptFailed ¶
type EventAttemptFailed struct {
Time time.Time
Err error
RetryPolicy backoff.RetryPolicy
}
EventAttemptFailed is triggered when an attempt is failed with a retryable error.
type EventFailed ¶
EventFailed is triggered when an attempt is failed with a non retryable error.
type EventRescheduled ¶
type EventRescheduled struct{}
EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.
type EventScheduled ¶
type EventScheduled struct{}
EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.
type EventSucceeded ¶
EventSucceeded is triggered when an attempt succeeds.
type HTTPCaller ¶
HTTPCaller is a method that can be used to invoke HTTP requests.
type HTTPCallerProvider ¶
type HTTPCallerProvider func(common.NamespaceIDAndDestination) HTTPCaller
HTTPCallerProvider is a method that can be used to retrieve an HTTPCaller for a given namespace and destination.
type Library ¶
type Library struct {
chasm.UnimplementedLibrary
InvocationTaskHandler *invocationTaskHandler
BackoffTaskHandler *backoffTaskHandler
}
func (*Library) Components ¶
func (l *Library) Components() []*chasm.RegistrableComponent
func (*Library) RegisterServices ¶
func (*Library) Tasks ¶
func (l *Library) Tasks() []*chasm.RegistrableTask
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
gen
|
|
|
callbackpb/v1
Code generated by protoc-gen-go-helpers.
|
Code generated by protoc-gen-go-helpers. |