durable

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package durable provides an experimental Go implementation of AWS Durable Execution style workflow helpers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClassifyCheckpointError

func ClassifyCheckpointError(err error) error

func DurableOperationErrorFromErrorObject

func DurableOperationErrorFromErrorObject(eo *ErrorObject) error

func HashID

func HashID(input string) string

func InitializeExecutionContext

func InitializeExecutionContext(input InvocationInput, client DurableExecutionClient, requestID, tenantID string) (*ExecutionContext, DurableExecutionMode, string, error)

func IsUnrecoverableError

func IsUnrecoverableError(err error) bool

func IsUnrecoverableInvocationError

func IsUnrecoverableInvocationError(err error) bool

func NewCallbackError

func NewCallbackError(msg string, cause error, data string) error

func NewCallbackSubmitterError

func NewCallbackSubmitterError(msg string, cause error, data string) error

func NewCallbackTimeoutError

func NewCallbackTimeoutError(msg string, cause error, data string) error

func NewChildContextError

func NewChildContextError(msg string, cause error, data string) error

func NewInvokeError

func NewInvokeError(msg string, cause error, data string) error

func NewStepError

func NewStepError(msg string, cause error, data string) error

func NewUnrecoverableExecutionError

func NewUnrecoverableExecutionError(reason TerminationReason, msg string, cause error) error

func NewUnrecoverableInvocationError

func NewUnrecoverableInvocationError(reason TerminationReason, msg string, cause error) error

func NewWaitForConditionError

func NewWaitForConditionError(msg string, cause error, data string) error

func SafeDeserialize

func SafeDeserialize(serdes Serdes, data string, stepID, stepName string, tm *TerminationManager, durableExecutionArn string) (any, error)

func SafeSerialize

func SafeSerialize(serdes Serdes, value any, stepID, stepName string, tm *TerminationManager, durableExecutionArn string) (string, error)

func ValidateContextUsage

func ValidateContextUsage(ctx context.Context, operationContextID, operationName string, tm *TerminationManager)

func ValidateReplayConsistency

func ValidateReplayConsistency(stepID string, current operationDescriptor, checkpointData *Operation, execCtx *ExecutionContext) error

func WithActiveOperation

func WithActiveOperation(ctx context.Context, op ActiveOperation) context.Context

Types

type APIError

type APIError struct {
	StatusCode int
	Code       string
	Message    string
}

func (*APIError) Error

func (e *APIError) Error() string

type AWSDurableExecutionClient

type AWSDurableExecutionClient struct {
	// contains filtered or unexported fields
}

func NewAWSDurableExecutionClient

func NewAWSDurableExecutionClient(client *lambdasdk.Client) *AWSDurableExecutionClient

func NewDefaultAWSDurableExecutionClient

func NewDefaultAWSDurableExecutionClient(ctx context.Context, optFns ...func(*awsconfig.LoadOptions) error) (*AWSDurableExecutionClient, error)

func (*AWSDurableExecutionClient) Checkpoint

func (*AWSDurableExecutionClient) GetExecutionState

type ActiveOperation

type ActiveOperation struct {
	ContextID string
	ParentID  string
	Attempt   int
	Mode      DurableExecutionMode
}

func GetActiveOperation

func GetActiveOperation(ctx context.Context) (ActiveOperation, bool)

type BatchCompletionReason

type BatchCompletionReason string
const (
	BatchCompletionReasonAllCompleted             BatchCompletionReason = "ALL_COMPLETED"
	BatchCompletionReasonMinSuccessfulReached     BatchCompletionReason = "MIN_SUCCESSFUL_REACHED"
	BatchCompletionReasonFailureToleranceExceeded BatchCompletionReason = "FAILURE_TOLERANCE_EXCEEDED"
)

type BatchItem

type BatchItem struct {
	Result any             `json:"result,omitempty"`
	Error  *ErrorObject    `json:"error,omitempty"`
	Index  int             `json:"index"`
	Status BatchItemStatus `json:"status"`
}

type BatchItemStatus

type BatchItemStatus string
const (
	BatchItemStatusSucceeded BatchItemStatus = "SUCCEEDED"
	BatchItemStatusFailed    BatchItemStatus = "FAILED"
	BatchItemStatusStarted   BatchItemStatus = "STARTED"
)

type BatchResult

type BatchResult struct {
	All              []BatchItem           `json:"all"`
	CompletionReason BatchCompletionReason `json:"completionReason"`
}

func (*BatchResult) Errors

func (r *BatchResult) Errors() []error

func (*BatchResult) Failed

func (r *BatchResult) Failed() []BatchItem

func (*BatchResult) FailureCount

func (r *BatchResult) FailureCount() int

func (*BatchResult) HasFailure

func (r *BatchResult) HasFailure() bool

func (*BatchResult) Results

func (r *BatchResult) Results() []any

func (*BatchResult) Started

func (r *BatchResult) Started() []BatchItem

func (*BatchResult) StartedCount

func (r *BatchResult) StartedCount() int

func (*BatchResult) Status

func (r *BatchResult) Status() BatchItemStatus

func (*BatchResult) Succeeded

func (r *BatchResult) Succeeded() []BatchItem

func (*BatchResult) SuccessCount

func (r *BatchResult) SuccessCount() int

func (*BatchResult) ThrowIfError

func (r *BatchResult) ThrowIfError() error

func (*BatchResult) TotalCount

func (r *BatchResult) TotalCount() int

type BatchResultSerdes

type BatchResultSerdes struct{}

func (BatchResultSerdes) Deserialize

func (BatchResultSerdes) Deserialize(data string, _ SerdesContext) (any, error)

func (BatchResultSerdes) Serialize

func (BatchResultSerdes) Serialize(value any, _ SerdesContext) (string, error)

type CallbackDetails

type CallbackDetails struct {
	CallbackID                string       `json:"CallbackId,omitempty"`
	Result                    string       `json:"Result,omitempty"`
	Error                     *ErrorObject `json:"Error,omitempty"`
	ScheduledTimeoutTimestamp *time.Time   `json:"ScheduledTimeoutTimestamp,omitempty"`
}

type CallbackOptions

type CallbackOptions struct {
	TimeoutSeconds          int `json:"TimeoutSeconds,omitempty"`
	HeartbeatTimeoutSeconds int `json:"HeartbeatTimeoutSeconds,omitempty"`
}

type ChainedInvokeDetails

type ChainedInvokeDetails struct {
	Result string       `json:"Result,omitempty"`
	Error  *ErrorObject `json:"Error,omitempty"`
}

type ChainedInvokeOptions

type ChainedInvokeOptions struct {
	FunctionName string `json:"FunctionName,omitempty"`
}

type CheckpointManager

type CheckpointManager struct {
	// contains filtered or unexported fields
}

func NewCheckpointManager

func NewCheckpointManager(
	durableExecutionArn string,
	stepData map[string]Operation,
	stepDataMu *sync.RWMutex,
	client DurableExecutionClient,
	termination *TerminationManager,
	checkpointToken string,
	logger Logger,
) *CheckpointManager

func (*CheckpointManager) Checkpoint

func (m *CheckpointManager) Checkpoint(ctx context.Context, stepID string, update OperationUpdate) error

func (*CheckpointManager) ForceCheckpoint

func (m *CheckpointManager) ForceCheckpoint(ctx context.Context) error

func (*CheckpointManager) GetOperationState

func (m *CheckpointManager) GetOperationState(stepID string) OperationLifecycleState

func (*CheckpointManager) MarkAncestorFinished

func (m *CheckpointManager) MarkAncestorFinished(stepID string)

func (*CheckpointManager) MarkOperationAwaited

func (m *CheckpointManager) MarkOperationAwaited(stepID string)

func (*CheckpointManager) MarkOperationState

func (m *CheckpointManager) MarkOperationState(stepID string, state OperationLifecycleState, metadata OperationMetadata, endTimestamp *time.Time)

func (*CheckpointManager) SetTerminating

func (m *CheckpointManager) SetTerminating()

func (*CheckpointManager) WaitForQueueCompletion

func (m *CheckpointManager) WaitForQueueCompletion(ctx context.Context) error

func (*CheckpointManager) WaitForRetryTimer

func (m *CheckpointManager) WaitForRetryTimer(ctx context.Context, stepID string) error

func (*CheckpointManager) WaitForStatusChange

func (m *CheckpointManager) WaitForStatusChange(ctx context.Context, stepID string) error

type CheckpointRequest

type CheckpointRequest struct {
	DurableExecutionArn string
	CheckpointToken     string
	Updates             []OperationUpdate
}

type CheckpointResponse

type CheckpointResponse struct {
	CheckpointToken   string
	NewExecutionState *ExecutionState
}

type ChildConfig

type ChildConfig struct {
	Serdes           Serdes
	SubType          OperationSubType
	SummaryGenerator func(value any) string
	ErrorMapper      func(error) error
}

type ChildFunc

type ChildFunc func(ctx context.Context, child *DurableContext) (any, error)

type CompletionConfig

type CompletionConfig struct {
	MinSuccessful              *int
	ToleratedFailureCount      *int
	ToleratedFailurePercentage *float64
}

type ConcurrencyConfig

type ConcurrencyConfig struct {
	MaxConcurrency   int
	TopLevelSubType  OperationSubType
	IterationSubType OperationSubType
	SummaryGenerator func(result *BatchResult) string
	Serdes           Serdes
	ItemSerdes       Serdes
	CompletionConfig *CompletionConfig
}

type ConcurrentExecutionItem

type ConcurrentExecutionItem struct {
	ID    string `json:"id"`
	Data  any    `json:"data,omitempty"`
	Index int    `json:"index"`
	Name  string `json:"name,omitempty"`
}

type ConcurrentExecutor

type ConcurrentExecutor func(item ConcurrentExecutionItem, childContext *DurableContext) (any, error)

type ContextDetails

type ContextDetails struct {
	Result         string       `json:"Result,omitempty"`
	ReplayChildren bool         `json:"ReplayChildren,omitempty"`
	Error          *ErrorObject `json:"Error,omitempty"`
}

type ContextOptions

type ContextOptions struct {
	ReplayChildren bool `json:"ReplayChildren,omitempty"`
}

type CreateCallbackConfig

type CreateCallbackConfig struct {
	Timeout          *Duration
	HeartbeatTimeout *Duration
	Serdes           Serdes
}

type CreateCallbackResult

type CreateCallbackResult struct {
	Promise    *Future[any]
	CallbackID string
}

type DurableContext

type DurableContext struct {
	// contains filtered or unexported fields
}

func NewDurableContext

func NewDurableContext(execCtx *ExecutionContext, mode DurableExecutionMode, logger Logger, stepPrefix, parentID string, checkpoint *CheckpointManager) *DurableContext

func (*DurableContext) CreateCallback

func (*DurableContext) ExecuteConcurrently

func (c *DurableContext) ExecuteConcurrently(
	ctx context.Context,
	name string,
	items []ConcurrentExecutionItem,
	executor ConcurrentExecutor,
	cfg *ConcurrencyConfig,
) *Future[*BatchResult]

func (*DurableContext) Invoke

func (c *DurableContext) Invoke(ctx context.Context, name string, functionName string, input any, cfg *InvokeConfig) *Future[any]

func (*DurableContext) Map

func (c *DurableContext) Map(
	ctx context.Context,
	name string,
	items []any,
	mapFunc MapFunc,
	cfg *MapConfig,
) *Future[*BatchResult]

func (*DurableContext) Parallel

func (c *DurableContext) Parallel(
	ctx context.Context,
	name string,
	branches []NamedParallelBranch,
	cfg *ParallelConfig,
) *Future[*BatchResult]

func (*DurableContext) RunInChildContext

func (c *DurableContext) RunInChildContext(ctx context.Context, name string, fn ChildFunc, cfg *ChildConfig) *Future[any]

func (*DurableContext) Step

func (c *DurableContext) Step(ctx context.Context, name string, fn StepFunc, cfg *StepConfig) *Future[any]

func (*DurableContext) Wait

func (c *DurableContext) Wait(ctx context.Context, name string, duration Duration) *Future[struct{}]

func (*DurableContext) WaitForCallback

func (c *DurableContext) WaitForCallback(ctx context.Context, name string, submitter WaitForCallbackSubmitterFunc, cfg *WaitForCallbackConfig) *Future[any]

func (*DurableContext) WaitForCondition

func (c *DurableContext) WaitForCondition(ctx context.Context, name string, check WaitForConditionCheckFunc, cfg *WaitForConditionConfig) *Future[any]

type DurableExecutionClient

type DurableExecutionClient interface {
	GetExecutionState(input GetExecutionStateRequest) (GetExecutionStateResponse, error)
	Checkpoint(input CheckpointRequest) (CheckpointResponse, error)
}

type DurableExecutionConfig

type DurableExecutionConfig struct {
	Client    DurableExecutionClient
	Logger    Logger
	RequestID string
	TenantID  string
}

type DurableExecutionHandler

type DurableExecutionHandler func(ctx context.Context, event any, dctx *DurableContext) (any, error)

type DurableExecutionMode

type DurableExecutionMode string
const (
	ExecutionMode          DurableExecutionMode = "ExecutionMode"
	ReplayMode             DurableExecutionMode = "ReplayMode"
	ReplaySucceededContext DurableExecutionMode = "ReplaySucceededContext"
)

type DurableOperationError

type DurableOperationError struct {
	Type    string
	Message string
	Cause   error
	Data    string
}

func (*DurableOperationError) Error

func (e *DurableOperationError) Error() string

func (*DurableOperationError) ToErrorObject

func (e *DurableOperationError) ToErrorObject() *ErrorObject

func (*DurableOperationError) Unwrap

func (e *DurableOperationError) Unwrap() error

type Duration

type Duration struct {
	Days    int `json:"days,omitempty"`
	Hours   int `json:"hours,omitempty"`
	Minutes int `json:"minutes,omitempty"`
	Seconds int `json:"seconds,omitempty"`
}

func (Duration) ToDuration

func (d Duration) ToDuration() time.Duration

func (Duration) ToSeconds

func (d Duration) ToSeconds() int

type ErrorObject

type ErrorObject struct {
	ErrorType    string   `json:"ErrorType,omitempty"`
	ErrorMessage string   `json:"ErrorMessage,omitempty"`
	ErrorData    string   `json:"ErrorData,omitempty"`
	StackTrace   []string `json:"StackTrace,omitempty"`
}

func CreateErrorObjectFromError

func CreateErrorObjectFromError(err error, data string) *ErrorObject

type ExecutionContext

type ExecutionContext struct {
	// contains filtered or unexported fields
}

func (*ExecutionContext) DurableExecutionArn

func (e *ExecutionContext) DurableExecutionArn() string

func (*ExecutionContext) DurableExecutionClient

func (e *ExecutionContext) DurableExecutionClient() DurableExecutionClient

func (*ExecutionContext) GetStepData

func (e *ExecutionContext) GetStepData(stepID string) *Operation

func (*ExecutionContext) TerminationManager

func (e *ExecutionContext) TerminationManager() *TerminationManager

type ExecutionDetails

type ExecutionDetails struct {
	InputPayload string `json:"InputPayload,omitempty"`
}

type ExecutionState

type ExecutionState struct {
	Operations []Operation `json:"Operations"`
	NextMarker string      `json:"NextMarker,omitempty"`
}

type Future

type Future[T any] struct {
	// contains filtered or unexported fields
}

func NewFuture

func NewFuture[T any](exec func(context.Context) (T, error)) *Future[T]

func NewNeverFuture

func NewNeverFuture[T any]() *Future[T]

func NewRejectedFuture

func NewRejectedFuture[T any](err error) *Future[T]

func NewResolvedFuture

func NewResolvedFuture[T any](value T) *Future[T]

func (*Future[T]) Await

func (f *Future[T]) Await(ctx context.Context) (T, error)

func (*Future[T]) Executed

func (f *Future[T]) Executed() bool

type GetExecutionStateRequest

type GetExecutionStateRequest struct {
	DurableExecutionArn string
	CheckpointToken     string
	Marker              string
	MaxItems            int
}

type GetExecutionStateResponse

type GetExecutionStateResponse struct {
	Operations []Operation
	NextMarker string
}

type InMemoryClient

type InMemoryClient struct {
	// contains filtered or unexported fields
}

InMemoryClient is a local DurableExecutionClient implementation for tests and local execution.

func NewInMemoryClient

func NewInMemoryClient() *InMemoryClient

func (*InMemoryClient) Checkpoint

func (c *InMemoryClient) Checkpoint(input CheckpointRequest) (CheckpointResponse, error)

func (*InMemoryClient) CompleteCallback

func (c *InMemoryClient) CompleteCallback(callbackID string, payload string)

func (*InMemoryClient) FailCallback

func (c *InMemoryClient) FailCallback(callbackID string, errObj *ErrorObject, timedOut bool)

func (*InMemoryClient) GetExecutionState

func (*InMemoryClient) SetOperation

func (c *InMemoryClient) SetOperation(op Operation)

type InvocationInput

type InvocationInput struct {
	DurableExecutionArn   string         `json:"DurableExecutionArn"`
	CheckpointToken       string         `json:"CheckpointToken"`
	InitialExecutionState ExecutionState `json:"InitialExecutionState"`
}

func ParseInvocationInput

func ParseInvocationInput(raw []byte) (InvocationInput, error)

ParseInvocationInput parses invocation payload from durable backend. The backend may encode timestamp fields as either ISO-8601 strings or epoch numbers.

type InvocationOutput

type InvocationOutput struct {
	Status InvocationStatus `json:"Status"`
	Result string           `json:"Result,omitempty"`
	Error  *ErrorObject     `json:"Error,omitempty"`
}

type InvocationStatus

type InvocationStatus string
const (
	InvocationStatusSucceeded InvocationStatus = "SUCCEEDED"
	InvocationStatusFailed    InvocationStatus = "FAILED"
	InvocationStatusPending   InvocationStatus = "PENDING"
)

type InvokeConfig

type InvokeConfig struct {
	PayloadSerdes Serdes
	ResultSerdes  Serdes
}

type JSONSerdes

type JSONSerdes struct{}

func (JSONSerdes) Deserialize

func (JSONSerdes) Deserialize(data string, _ SerdesContext) (any, error)

func (JSONSerdes) Serialize

func (JSONSerdes) Serialize(value any, _ SerdesContext) (string, error)

type Logger

type Logger interface {
	Debugf(format string, args ...any)
	Infof(format string, args ...any)
	Warnf(format string, args ...any)
	Errorf(format string, args ...any)
}

type MapConfig

type MapConfig struct {
	MaxConcurrency   int
	ItemNamer        func(item any, index int) string
	Serdes           Serdes
	ItemSerdes       Serdes
	CompletionConfig *CompletionConfig
}

type MapFunc

type MapFunc func(context *DurableContext, item any, index int, array []any) (any, error)

type NamedParallelBranch

type NamedParallelBranch struct {
	Name string
	Func ParallelFunc
}

type NopLogger

type NopLogger struct{}

func (NopLogger) Debugf

func (NopLogger) Debugf(string, ...any)

func (NopLogger) Errorf

func (NopLogger) Errorf(string, ...any)

func (NopLogger) Infof

func (NopLogger) Infof(string, ...any)

func (NopLogger) Warnf

func (NopLogger) Warnf(string, ...any)

type Operation

type Operation struct {
	ID       string           `json:"Id,omitempty"`
	ParentID string           `json:"ParentId,omitempty"`
	Name     string           `json:"Name,omitempty"`
	Type     OperationType    `json:"Type,omitempty"`
	SubType  OperationSubType `json:"SubType,omitempty"`
	Status   OperationStatus  `json:"Status,omitempty"`

	ExecutionDetails     *ExecutionDetails     `json:"ExecutionDetails,omitempty"`
	StepDetails          *StepDetails          `json:"StepDetails,omitempty"`
	WaitDetails          *WaitDetails          `json:"WaitDetails,omitempty"`
	CallbackDetails      *CallbackDetails      `json:"CallbackDetails,omitempty"`
	ChainedInvokeDetails *ChainedInvokeDetails `json:"ChainedInvokeDetails,omitempty"`
	ContextDetails       *ContextDetails       `json:"ContextDetails,omitempty"`

	StartTimestamp *time.Time `json:"StartTimestamp,omitempty"`
}

type OperationAction

type OperationAction string
const (
	OperationActionStart   OperationAction = "START"
	OperationActionSucceed OperationAction = "SUCCEED"
	OperationActionFail    OperationAction = "FAIL"
	OperationActionRetry   OperationAction = "RETRY"
)

type OperationLifecycleState

type OperationLifecycleState string
const (
	OperationLifecycleExecuting      OperationLifecycleState = "EXECUTING"
	OperationLifecycleRetryWaiting   OperationLifecycleState = "RETRY_WAITING"
	OperationLifecycleIdleNotAwaited OperationLifecycleState = "IDLE_NOT_AWAITED"
	OperationLifecycleIdleAwaited    OperationLifecycleState = "IDLE_AWAITED"
	OperationLifecycleCompleted      OperationLifecycleState = "COMPLETED"
)

type OperationMetadata

type OperationMetadata struct {
	StepID   string
	Name     string
	Type     OperationType
	SubType  OperationSubType
	ParentID string
}

type OperationStatus

type OperationStatus string
const (
	OperationStatusStarted   OperationStatus = "STARTED"
	OperationStatusReady     OperationStatus = "READY"
	OperationStatusPending   OperationStatus = "PENDING"
	OperationStatusSucceeded OperationStatus = "SUCCEEDED"
	OperationStatusFailed    OperationStatus = "FAILED"
	OperationStatusCancelled OperationStatus = "CANCELLED"
	OperationStatusStopped   OperationStatus = "STOPPED"
	OperationStatusTimedOut  OperationStatus = "TIMED_OUT"
)

type OperationSubType

type OperationSubType string
const (
	OperationSubTypeStep             OperationSubType = "Step"
	OperationSubTypeWait             OperationSubType = "Wait"
	OperationSubTypeCallback         OperationSubType = "Callback"
	OperationSubTypeRunInChild       OperationSubType = "RunInChildContext"
	OperationSubTypeMap              OperationSubType = "Map"
	OperationSubTypeMapIteration     OperationSubType = "MapIteration"
	OperationSubTypeParallel         OperationSubType = "Parallel"
	OperationSubTypeParallelBranch   OperationSubType = "ParallelBranch"
	OperationSubTypeWaitForCallback  OperationSubType = "WaitForCallback"
	OperationSubTypeWaitForCondition OperationSubType = "WaitForCondition"
	OperationSubTypeChainedInvoke    OperationSubType = "ChainedInvoke"
)

type OperationType

type OperationType string
const (
	OperationTypeExecution     OperationType = "EXECUTION"
	OperationTypeStep          OperationType = "STEP"
	OperationTypeWait          OperationType = "WAIT"
	OperationTypeCallback      OperationType = "CALLBACK"
	OperationTypeContext       OperationType = "CONTEXT"
	OperationTypeChainedInvoke OperationType = "CHAINED_INVOKE"
)

type OperationUpdate

type OperationUpdate struct {
	ID       string           `json:"Id,omitempty"`
	ParentID string           `json:"ParentId,omitempty"`
	Name     string           `json:"Name,omitempty"`
	Type     OperationType    `json:"Type,omitempty"`
	SubType  OperationSubType `json:"SubType,omitempty"`
	Action   OperationAction  `json:"Action,omitempty"`

	Payload string       `json:"Payload,omitempty"`
	Error   *ErrorObject `json:"Error,omitempty"`

	StepOptions          *StepOptions          `json:"StepOptions,omitempty"`
	WaitOptions          *WaitOptions          `json:"WaitOptions,omitempty"`
	CallbackOptions      *CallbackOptions      `json:"CallbackOptions,omitempty"`
	ChainedInvokeOptions *ChainedInvokeOptions `json:"ChainedInvokeOptions,omitempty"`
	ContextOptions       *ContextOptions       `json:"ContextOptions,omitempty"`
}

type ParallelConfig

type ParallelConfig struct {
	MaxConcurrency   int
	Serdes           Serdes
	ItemSerdes       Serdes
	CompletionConfig *CompletionConfig
}

type ParallelFunc

type ParallelFunc func(context *DurableContext) (any, error)

type PassThroughSerdes

type PassThroughSerdes struct{}

func (PassThroughSerdes) Deserialize

func (PassThroughSerdes) Deserialize(data string, _ SerdesContext) (any, error)

func (PassThroughSerdes) Serialize

func (PassThroughSerdes) Serialize(value any, _ SerdesContext) (string, error)

type RetryDecision

type RetryDecision struct {
	ShouldRetry bool
	Delay       Duration
}

type RetryStrategy

type RetryStrategy func(err error, attempt int) RetryDecision

type Serdes

type Serdes interface {
	Serialize(value any, ctx SerdesContext) (string, error)
	Deserialize(data string, ctx SerdesContext) (any, error)
}

type SerdesContext

type SerdesContext struct {
	EntityID            string
	DurableExecutionArn string
}

type StepConfig

type StepConfig struct {
	RetryStrategy RetryStrategy
	Semantics     StepSemantics
	Serdes        Serdes
}

type StepContext

type StepContext struct {
	Logger Logger
}

type StepDetails

type StepDetails struct {
	Result               string       `json:"Result,omitempty"`
	Error                *ErrorObject `json:"Error,omitempty"`
	Attempt              int          `json:"Attempt,omitempty"`
	NextAttemptTimestamp *time.Time   `json:"NextAttemptTimestamp,omitempty"`
}

type StepFunc

type StepFunc func(ctx context.Context, stepCtx StepContext) (any, error)

type StepOptions

type StepOptions struct {
	NextAttemptDelaySeconds int `json:"NextAttemptDelaySeconds,omitempty"`
}

type StepSemantics

type StepSemantics string
const (
	StepSemanticsAtLeastOncePerRetry StepSemantics = "AT_LEAST_ONCE_PER_RETRY"
	StepSemanticsAtMostOncePerRetry  StepSemantics = "AT_MOST_ONCE_PER_RETRY"
)

type TerminationDetails

type TerminationDetails struct {
	Reason  TerminationReason
	Message string
	Error   error
}

type TerminationManager

type TerminationManager struct {
	// contains filtered or unexported fields
}

func NewTerminationManager

func NewTerminationManager() *TerminationManager

func (*TerminationManager) Channel

func (m *TerminationManager) Channel() <-chan TerminationDetails

func (*TerminationManager) Details

func (m *TerminationManager) Details() (TerminationDetails, bool)

func (*TerminationManager) IsTerminated

func (m *TerminationManager) IsTerminated() bool

func (*TerminationManager) Terminate

func (m *TerminationManager) Terminate(details TerminationDetails)

type TerminationReason

type TerminationReason string
const (
	TerminationReasonOperationTerminated  TerminationReason = "OPERATION_TERMINATED"
	TerminationReasonRetryScheduled       TerminationReason = "RETRY_SCHEDULED"
	TerminationReasonRetryInterruptedStep TerminationReason = "RETRY_INTERRUPTED_STEP"
	TerminationReasonWaitScheduled        TerminationReason = "WAIT_SCHEDULED"
	TerminationReasonCallbackPending      TerminationReason = "CALLBACK_PENDING"
	TerminationReasonCheckpointFailed     TerminationReason = "CHECKPOINT_FAILED"
	TerminationReasonSerdesFailed         TerminationReason = "SERDES_FAILED"
	TerminationReasonContextValidation    TerminationReason = "CONTEXT_VALIDATION_ERROR"
	TerminationReasonCustom               TerminationReason = "CUSTOM"
)

type UnrecoverableError

type UnrecoverableError interface {
	error
	TerminationReason() TerminationReason
	InvocationLevel() bool
	ExecutionLevel() bool
}

type WaitDetails

type WaitDetails struct {
	ScheduledEndTimestamp *time.Time `json:"ScheduledEndTimestamp,omitempty"`
}

type WaitForCallbackConfig

type WaitForCallbackConfig struct {
	Timeout          *Duration
	HeartbeatTimeout *Duration
	RetryStrategy    RetryStrategy
	Serdes           Serdes
}

type WaitForCallbackContext

type WaitForCallbackContext struct {
	Logger Logger
}

type WaitForCallbackSubmitterFunc

type WaitForCallbackSubmitterFunc func(callbackID string, ctx WaitForCallbackContext) error

type WaitForConditionCheckFunc

type WaitForConditionCheckFunc func(state any, ctx WaitForConditionContext) (any, error)

type WaitForConditionConfig

type WaitForConditionConfig struct {
	WaitStrategy WaitForConditionWaitStrategyFunc
	InitialState any
	Serdes       Serdes
}

type WaitForConditionContext

type WaitForConditionContext struct {
	Logger Logger
}

type WaitForConditionDecision

type WaitForConditionDecision struct {
	ShouldContinue bool
	Delay          Duration
}

type WaitForConditionWaitStrategyFunc

type WaitForConditionWaitStrategyFunc func(state any, attempt int) WaitForConditionDecision

type WaitOptions

type WaitOptions struct {
	WaitSeconds int `json:"WaitSeconds,omitempty"`
}

type WrappedHandler

type WrappedHandler func(ctx context.Context, input InvocationInput) (InvocationOutput, error)

func WithDurableExecution

func WithDurableExecution(handler DurableExecutionHandler, config DurableExecutionConfig) WrappedHandler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL