callback

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 44 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	RequestCounter = metrics.NewCounterDef(
		"callback_outbound_requests",
		metrics.WithDescription("The number of callback outbound requests made by the history service."),
	)
	RequestLatencyHistogram = metrics.NewTimerDef(
		"callback_outbound_latency",
		metrics.WithDescription("Latency histogram of outbound callback requests made by the history service."),
	)
)

CHASM callback metrics. These are defined independently from HSM callbacks to avoid coupling between the two implementations.

View Source
var AllowedAddresses = dynamicconfig.NewNamespaceTypedSettingWithConverter(
	"chasm.callback.allowedAddresses",
	allowedAddressConverter,
	AddressMatchRules{},
	`The per-namespace list of addresses that are allowed for callbacks and whether secure connections (https) are required.
URL: "temporal://system" is always allowed for worker callbacks. The default is no address rules.
URLs are checked against each in order when starting a workflow with attached callbacks and only need to match one to pass validation.
This configuration is required for external endpoint targets; any invalid entries are ignored. Each entry is a map with possible values:
     - "Pattern":string (required) the host:port pattern to which this config applies.
        Wildcards, '*', are supported and can match any number of characters (e.g. '*' matches everything, 'prefix.*.domain' matches 'prefix.a.domain' as well as 'prefix.a.b.domain').
     - "AllowInsecure":bool (optional, default=false) indicates whether https is required`)
View Source
var Module = fx.Module(
	"chasm.lib.callback",
	fx.Provide(configProvider),
	fx.Provide(httpCallerProviderProvider),
	fx.Provide(NewInvocationTaskExecutor),
	fx.Provide(NewBackoffTaskExecutor),
	fx.Provide(newLibrary),
	fx.Invoke(register),
)
View Source
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
	"chasm.callback.request.timeout",
	time.Second*10,
	`RequestTimeout is the timeout for executing a single callback request.`,
)
View Source
var RetryPolicyInitialInterval = dynamicconfig.NewGlobalDurationSetting(
	"chasm.callback.retryPolicy.initialInterval",
	time.Second,
	`The initial backoff interval between every callback request attempt for a given callback.`,
)
View Source
var RetryPolicyMaximumInterval = dynamicconfig.NewGlobalDurationSetting(
	"chasm.callback.retryPolicy.maxInterval",
	time.Hour,
	`The maximum backoff interval between every callback request attempt for a given callback.`,
)
View Source
var TransitionAttemptFailed = chasm.NewTransition(
	[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED},
	callbackspb.CALLBACK_STATUS_BACKING_OFF,
	func(cb *Callback, ctx chasm.MutableContext, event EventAttemptFailed) error {
		cb.recordAttempt(event.Time)

		nextDelay := event.RetryPolicy.ComputeNextDelay(0, int(cb.Attempt), event.Err)
		nextAttemptScheduleTime := event.Time.Add(nextDelay)
		cb.NextAttemptScheduleTime = timestamppb.New(nextAttemptScheduleTime)
		cb.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: false,
				},
			},
		}
		ctx.AddTask(
			cb,
			chasm.TaskAttributes{ScheduledTime: nextAttemptScheduleTime},
			&callbackspb.BackoffTask{Attempt: cb.Attempt},
		)
		return nil
	},
)
View Source
var TransitionFailed = chasm.NewTransition(
	[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED},
	callbackspb.CALLBACK_STATUS_FAILED,
	func(cb *Callback, ctx chasm.MutableContext, event EventFailed) error {
		cb.recordAttempt(event.Time)
		cb.LastAttemptFailure = &failurepb.Failure{
			Message: event.Err.Error(),
			FailureInfo: &failurepb.Failure_ApplicationFailureInfo{
				ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
					NonRetryable: true,
				},
			},
		}
		return nil
	},
)
View Source
var TransitionRescheduled = chasm.NewTransition(
	[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_BACKING_OFF},
	callbackspb.CALLBACK_STATUS_SCHEDULED,
	func(cb *Callback, ctx chasm.MutableContext, event EventRescheduled) error {
		cb.NextAttemptScheduleTime = nil
		u, err := url.Parse(cb.Callback.GetNexus().Url)
		if err != nil {
			return fmt.Errorf("failed to parse URL: %v: %w", cb.Callback, err)
		}
		ctx.AddTask(
			cb,
			chasm.TaskAttributes{Destination: u.Scheme + "://" + u.Host},
			&callbackspb.InvocationTask{Attempt: cb.Attempt},
		)
		return nil
	},
)
View Source
var TransitionScheduled = chasm.NewTransition(
	[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_STANDBY},
	callbackspb.CALLBACK_STATUS_SCHEDULED,
	func(cb *Callback, ctx chasm.MutableContext, event EventScheduled) error {
		u, err := url.Parse(cb.Callback.GetNexus().GetUrl())
		if err != nil {
			return fmt.Errorf("failed to parse URL: %v: %w", cb.Callback, err)
		}
		ctx.AddTask(cb, chasm.TaskAttributes{Destination: u.Scheme + "://" + u.Host}, &callbackspb.InvocationTask{})
		return nil
	},
)
View Source
var TransitionSucceeded = chasm.NewTransition(
	[]callbackspb.CallbackStatus{callbackspb.CALLBACK_STATUS_SCHEDULED},
	callbackspb.CALLBACK_STATUS_SUCCEEDED,
	func(cb *Callback, ctx chasm.MutableContext, event EventSucceeded) error {
		cb.recordAttempt(event.Time)
		cb.LastAttemptFailure = nil
		return nil
	},
)

Functions

This section is empty.

Types

type AddressMatchRule

type AddressMatchRule struct {
	Regexp        *regexp.Regexp
	AllowInsecure bool
}

func (AddressMatchRule) Allow

func (a AddressMatchRule) Allow(u *url.URL) (bool, error)

Allow validates the URL by: 1. true, nil if the provided url matches the rule and passed validation for the given rule. 2. false, nil if the URL does not match the rule. 3. It false, error if there is a match and the URL fails validation

type AddressMatchRules

type AddressMatchRules struct {
	Rules []AddressMatchRule
}

func (AddressMatchRules) Validate

func (a AddressMatchRules) Validate(rawURL string) error

type BackoffTaskExecutor

type BackoffTaskExecutor struct {
	// contains filtered or unexported fields
}

func (*BackoffTaskExecutor) Execute

func (e *BackoffTaskExecutor) Execute(
	ctx chasm.MutableContext,
	callback *Callback,
	taskAttrs chasm.TaskAttributes,
	task *callbackspb.BackoffTask,
) error

Execute transitions the callback from BACKING_OFF to SCHEDULED state and generates an InvocationTask for the next attempt.

func (*BackoffTaskExecutor) Validate

func (e *BackoffTaskExecutor) Validate(
	ctx chasm.Context,
	callback *Callback,
	taskAttr chasm.TaskAttributes,
	task *callbackspb.BackoffTask,
) (bool, error)

type BackoffTaskExecutorOptions

type BackoffTaskExecutorOptions struct {
	fx.In

	Config         *Config
	MetricsHandler metrics.Handler
	Logger         log.Logger
}

type Callback

type Callback struct {
	chasm.UnimplementedComponent

	// Persisted internal state
	*callbackspb.CallbackState

	// Interface to retrieve Nexus operation completion data
	CompletionSource chasm.ParentPtr[CompletionSource]
}

Callback represents a callback component in CHASM.

func NewCallback

func NewCallback(
	requestID string,
	registrationTime *timestamppb.Timestamp,
	state *callbackspb.CallbackState,
	cb *callbackspb.Callback,
) *Callback

func (*Callback) LifecycleState

func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState

func (*Callback) SetStateMachineState

func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus)

func (*Callback) StateMachineState

func (c *Callback) StateMachineState() callbackspb.CallbackStatus

func (*Callback) ToAPICallback

func (c *Callback) ToAPICallback() (*commonpb.Callback, error)

ToAPICallback converts a CHASM callback to API callback proto.

type CompletionSource

type CompletionSource interface {
	GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.OperationCompletion, error)
}

type Config

type Config struct {
	RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
	RetryPolicy    func() backoff.RetryPolicy
}

type EventAttemptFailed

type EventAttemptFailed struct {
	Time        time.Time
	Err         error
	RetryPolicy backoff.RetryPolicy
}

EventAttemptFailed is triggered when an attempt is failed with a retryable error.

type EventFailed

type EventFailed struct {
	Time time.Time
	Err  error
}

EventFailed is triggered when an attempt is failed with a non retryable error.

type EventRescheduled

type EventRescheduled struct{}

EventRescheduled is triggered when the callback is meant to be rescheduled after backing off from a previous attempt.

type EventScheduled

type EventScheduled struct{}

EventScheduled is triggered when the callback is meant to be scheduled for the first time - when its Trigger condition is met.

type EventSucceeded

type EventSucceeded struct {
	Time time.Time
}

EventSucceeded is triggered when an attempt succeeds.

type HTTPCaller

type HTTPCaller func(*http.Request) (*http.Response, error)

HTTPCaller is a method that can be used to invoke HTTP requests.

type HTTPCallerProvider

type HTTPCallerProvider func(common.NamespaceIDAndDestination) HTTPCaller

type InvocationTaskExecutor

type InvocationTaskExecutor struct {
	// contains filtered or unexported fields
}

func (InvocationTaskExecutor) Execute

func (InvocationTaskExecutor) Invoke

func (InvocationTaskExecutor) Validate

type InvocationTaskExecutorOptions

type InvocationTaskExecutorOptions struct {
	fx.In

	Config             *Config
	NamespaceRegistry  namespace.Registry
	MetricsHandler     metrics.Handler
	Logger             log.Logger
	HTTPCallerProvider HTTPCallerProvider
	HTTPTraceProvider  commonnexus.HTTPClientTraceProvider
	HistoryClient      resource.HistoryClient
}

type Library

type Library struct {
	chasm.UnimplementedLibrary

	InvocationTaskExecutor *InvocationTaskExecutor
	BackoffTaskExecutor    *BackoffTaskExecutor
}

func (*Library) Components

func (l *Library) Components() []*chasm.RegistrableComponent

func (*Library) Name

func (l *Library) Name() string

func (*Library) RegisterServices

func (l *Library) RegisterServices(server *grpc.Server)

func (*Library) Tasks

func (l *Library) Tasks() []*chasm.RegistrableTask

Directories

Path Synopsis
gen
callbackpb/v1
Code generated by protoc-gen-go-helpers.
Code generated by protoc-gen-go-helpers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL