Documentation
¶
Overview ¶
Package durable provides an experimental Go implementation of AWS Durable Execution style workflow helpers.
Index ¶
- func ClassifyCheckpointError(err error) error
- func DurableOperationErrorFromErrorObject(eo *ErrorObject) error
- func HashID(input string) string
- func InitializeExecutionContext(input InvocationInput, client DurableExecutionClient, ...) (*ExecutionContext, DurableExecutionMode, string, error)
- func IsUnrecoverableError(err error) bool
- func IsUnrecoverableInvocationError(err error) bool
- func NewCallbackError(msg string, cause error, data string) error
- func NewCallbackSubmitterError(msg string, cause error, data string) error
- func NewCallbackTimeoutError(msg string, cause error, data string) error
- func NewChildContextError(msg string, cause error, data string) error
- func NewInvokeError(msg string, cause error, data string) error
- func NewStepError(msg string, cause error, data string) error
- func NewUnrecoverableExecutionError(reason TerminationReason, msg string, cause error) error
- func NewUnrecoverableInvocationError(reason TerminationReason, msg string, cause error) error
- func NewWaitForConditionError(msg string, cause error, data string) error
- func SafeDeserialize(serdes Serdes, data string, stepID, stepName string, tm *TerminationManager, ...) (any, error)
- func SafeSerialize(serdes Serdes, value any, stepID, stepName string, tm *TerminationManager, ...) (string, error)
- func ValidateContextUsage(ctx context.Context, operationContextID, operationName string, ...)
- func ValidateReplayConsistency(stepID string, current operationDescriptor, checkpointData *Operation, ...) error
- func WithActiveOperation(ctx context.Context, op ActiveOperation) context.Context
- type APIError
- type AWSDurableExecutionClient
- type ActiveOperation
- type BatchCompletionReason
- type BatchItem
- type BatchItemStatus
- type BatchResult
- func (r *BatchResult) Errors() []error
- func (r *BatchResult) Failed() []BatchItem
- func (r *BatchResult) FailureCount() int
- func (r *BatchResult) HasFailure() bool
- func (r *BatchResult) Results() []any
- func (r *BatchResult) Started() []BatchItem
- func (r *BatchResult) StartedCount() int
- func (r *BatchResult) Status() BatchItemStatus
- func (r *BatchResult) Succeeded() []BatchItem
- func (r *BatchResult) SuccessCount() int
- func (r *BatchResult) ThrowIfError() error
- func (r *BatchResult) TotalCount() int
- type BatchResultSerdes
- type CallbackDetails
- type CallbackOptions
- type ChainedInvokeDetails
- type ChainedInvokeOptions
- type CheckpointManager
- func (m *CheckpointManager) Checkpoint(ctx context.Context, stepID string, update OperationUpdate) error
- func (m *CheckpointManager) ForceCheckpoint(ctx context.Context) error
- func (m *CheckpointManager) GetOperationState(stepID string) OperationLifecycleState
- func (m *CheckpointManager) MarkAncestorFinished(stepID string)
- func (m *CheckpointManager) MarkOperationAwaited(stepID string)
- func (m *CheckpointManager) MarkOperationState(stepID string, state OperationLifecycleState, metadata OperationMetadata, ...)
- func (m *CheckpointManager) SetTerminating()
- func (m *CheckpointManager) WaitForQueueCompletion(ctx context.Context) error
- func (m *CheckpointManager) WaitForRetryTimer(ctx context.Context, stepID string) error
- func (m *CheckpointManager) WaitForStatusChange(ctx context.Context, stepID string) error
- type CheckpointRequest
- type CheckpointResponse
- type ChildConfig
- type ChildFunc
- type CompletionConfig
- type ConcurrencyConfig
- type ConcurrentExecutionItem
- type ConcurrentExecutor
- type ContextDetails
- type ContextOptions
- type CreateCallbackConfig
- type CreateCallbackResult
- type DurableContext
- func (c *DurableContext) CreateCallback(ctx context.Context, name string, cfg *CreateCallbackConfig) *Future[CreateCallbackResult]
- func (c *DurableContext) ExecuteConcurrently(ctx context.Context, name string, items []ConcurrentExecutionItem, ...) *Future[*BatchResult]
- func (c *DurableContext) Invoke(ctx context.Context, name string, functionName string, input any, ...) *Future[any]
- func (c *DurableContext) Map(ctx context.Context, name string, items []any, mapFunc MapFunc, cfg *MapConfig) *Future[*BatchResult]
- func (c *DurableContext) Parallel(ctx context.Context, name string, branches []NamedParallelBranch, ...) *Future[*BatchResult]
- func (c *DurableContext) RunInChildContext(ctx context.Context, name string, fn ChildFunc, cfg *ChildConfig) *Future[any]
- func (c *DurableContext) Step(ctx context.Context, name string, fn StepFunc, cfg *StepConfig) *Future[any]
- func (c *DurableContext) Wait(ctx context.Context, name string, duration Duration) *Future[struct{}]
- func (c *DurableContext) WaitForCallback(ctx context.Context, name string, submitter WaitForCallbackSubmitterFunc, ...) *Future[any]
- func (c *DurableContext) WaitForCondition(ctx context.Context, name string, check WaitForConditionCheckFunc, ...) *Future[any]
- type DurableExecutionClient
- type DurableExecutionConfig
- type DurableExecutionHandler
- type DurableExecutionMode
- type DurableOperationError
- type Duration
- type ErrorObject
- type ExecutionContext
- type ExecutionDetails
- type ExecutionState
- type Future
- type GetExecutionStateRequest
- type GetExecutionStateResponse
- type InMemoryClient
- func (c *InMemoryClient) Checkpoint(input CheckpointRequest) (CheckpointResponse, error)
- func (c *InMemoryClient) CompleteCallback(callbackID string, payload string)
- func (c *InMemoryClient) FailCallback(callbackID string, errObj *ErrorObject, timedOut bool)
- func (c *InMemoryClient) GetExecutionState(_ GetExecutionStateRequest) (GetExecutionStateResponse, error)
- func (c *InMemoryClient) SetOperation(op Operation)
- type InvocationInput
- type InvocationOutput
- type InvocationStatus
- type InvokeConfig
- type JSONSerdes
- type Logger
- type MapConfig
- type MapFunc
- type NamedParallelBranch
- type NopLogger
- type Operation
- type OperationAction
- type OperationLifecycleState
- type OperationMetadata
- type OperationStatus
- type OperationSubType
- type OperationType
- type OperationUpdate
- type ParallelConfig
- type ParallelFunc
- type PassThroughSerdes
- type RetryDecision
- type RetryStrategy
- type Serdes
- type SerdesContext
- type StepConfig
- type StepContext
- type StepDetails
- type StepFunc
- type StepOptions
- type StepSemantics
- type TerminationDetails
- type TerminationManager
- type TerminationReason
- type UnrecoverableError
- type WaitDetails
- type WaitForCallbackConfig
- type WaitForCallbackContext
- type WaitForCallbackSubmitterFunc
- type WaitForConditionCheckFunc
- type WaitForConditionConfig
- type WaitForConditionContext
- type WaitForConditionDecision
- type WaitForConditionWaitStrategyFunc
- type WaitOptions
- type WrappedHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClassifyCheckpointError ¶
func DurableOperationErrorFromErrorObject ¶
func DurableOperationErrorFromErrorObject(eo *ErrorObject) error
func InitializeExecutionContext ¶
func InitializeExecutionContext(input InvocationInput, client DurableExecutionClient, requestID, tenantID string) (*ExecutionContext, DurableExecutionMode, string, error)
func IsUnrecoverableError ¶
func NewCallbackTimeoutError ¶
func NewUnrecoverableExecutionError ¶
func NewUnrecoverableExecutionError(reason TerminationReason, msg string, cause error) error
func NewUnrecoverableInvocationError ¶
func NewUnrecoverableInvocationError(reason TerminationReason, msg string, cause error) error
func SafeDeserialize ¶
func SafeSerialize ¶
func ValidateContextUsage ¶
func ValidateContextUsage(ctx context.Context, operationContextID, operationName string, tm *TerminationManager)
func ValidateReplayConsistency ¶
func ValidateReplayConsistency(stepID string, current operationDescriptor, checkpointData *Operation, execCtx *ExecutionContext) error
func WithActiveOperation ¶
func WithActiveOperation(ctx context.Context, op ActiveOperation) context.Context
Types ¶
type AWSDurableExecutionClient ¶
type AWSDurableExecutionClient struct {
// contains filtered or unexported fields
}
func NewAWSDurableExecutionClient ¶
func NewAWSDurableExecutionClient(client *lambdasdk.Client) *AWSDurableExecutionClient
func NewDefaultAWSDurableExecutionClient ¶
func NewDefaultAWSDurableExecutionClient(ctx context.Context, optFns ...func(*awsconfig.LoadOptions) error) (*AWSDurableExecutionClient, error)
func (*AWSDurableExecutionClient) Checkpoint ¶
func (c *AWSDurableExecutionClient) Checkpoint(input CheckpointRequest) (CheckpointResponse, error)
func (*AWSDurableExecutionClient) GetExecutionState ¶
func (c *AWSDurableExecutionClient) GetExecutionState(input GetExecutionStateRequest) (GetExecutionStateResponse, error)
type ActiveOperation ¶
type ActiveOperation struct {
ContextID string
ParentID string
Attempt int
Mode DurableExecutionMode
}
func GetActiveOperation ¶
func GetActiveOperation(ctx context.Context) (ActiveOperation, bool)
type BatchCompletionReason ¶
type BatchCompletionReason string
const ( BatchCompletionReasonAllCompleted BatchCompletionReason = "ALL_COMPLETED" BatchCompletionReasonMinSuccessfulReached BatchCompletionReason = "MIN_SUCCESSFUL_REACHED" BatchCompletionReasonFailureToleranceExceeded BatchCompletionReason = "FAILURE_TOLERANCE_EXCEEDED" )
type BatchItem ¶
type BatchItem struct {
Result any `json:"result,omitempty"`
Error *ErrorObject `json:"error,omitempty"`
Index int `json:"index"`
Status BatchItemStatus `json:"status"`
}
type BatchItemStatus ¶
type BatchItemStatus string
const ( BatchItemStatusSucceeded BatchItemStatus = "SUCCEEDED" BatchItemStatusFailed BatchItemStatus = "FAILED" BatchItemStatusStarted BatchItemStatus = "STARTED" )
type BatchResult ¶
type BatchResult struct {
All []BatchItem `json:"all"`
CompletionReason BatchCompletionReason `json:"completionReason"`
}
func (*BatchResult) Errors ¶
func (r *BatchResult) Errors() []error
func (*BatchResult) Failed ¶
func (r *BatchResult) Failed() []BatchItem
func (*BatchResult) FailureCount ¶
func (r *BatchResult) FailureCount() int
func (*BatchResult) HasFailure ¶
func (r *BatchResult) HasFailure() bool
func (*BatchResult) Results ¶
func (r *BatchResult) Results() []any
func (*BatchResult) Started ¶
func (r *BatchResult) Started() []BatchItem
func (*BatchResult) StartedCount ¶
func (r *BatchResult) StartedCount() int
func (*BatchResult) Status ¶
func (r *BatchResult) Status() BatchItemStatus
func (*BatchResult) Succeeded ¶
func (r *BatchResult) Succeeded() []BatchItem
func (*BatchResult) SuccessCount ¶
func (r *BatchResult) SuccessCount() int
func (*BatchResult) ThrowIfError ¶
func (r *BatchResult) ThrowIfError() error
func (*BatchResult) TotalCount ¶
func (r *BatchResult) TotalCount() int
type BatchResultSerdes ¶
type BatchResultSerdes struct{}
func (BatchResultSerdes) Deserialize ¶
func (BatchResultSerdes) Deserialize(data string, _ SerdesContext) (any, error)
func (BatchResultSerdes) Serialize ¶
func (BatchResultSerdes) Serialize(value any, _ SerdesContext) (string, error)
type CallbackDetails ¶
type CallbackDetails struct {
CallbackID string `json:"CallbackId,omitempty"`
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
ScheduledTimeoutTimestamp *time.Time `json:"ScheduledTimeoutTimestamp,omitempty"`
}
type CallbackOptions ¶
type ChainedInvokeDetails ¶
type ChainedInvokeDetails struct {
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
}
type ChainedInvokeOptions ¶
type ChainedInvokeOptions struct {
FunctionName string `json:"FunctionName,omitempty"`
}
type CheckpointManager ¶
type CheckpointManager struct {
// contains filtered or unexported fields
}
func NewCheckpointManager ¶
func NewCheckpointManager( durableExecutionArn string, stepData map[string]Operation, stepDataMu *sync.RWMutex, client DurableExecutionClient, termination *TerminationManager, checkpointToken string, logger Logger, ) *CheckpointManager
func (*CheckpointManager) Checkpoint ¶
func (m *CheckpointManager) Checkpoint(ctx context.Context, stepID string, update OperationUpdate) error
func (*CheckpointManager) ForceCheckpoint ¶
func (m *CheckpointManager) ForceCheckpoint(ctx context.Context) error
func (*CheckpointManager) GetOperationState ¶
func (m *CheckpointManager) GetOperationState(stepID string) OperationLifecycleState
func (*CheckpointManager) MarkAncestorFinished ¶
func (m *CheckpointManager) MarkAncestorFinished(stepID string)
func (*CheckpointManager) MarkOperationAwaited ¶
func (m *CheckpointManager) MarkOperationAwaited(stepID string)
func (*CheckpointManager) MarkOperationState ¶
func (m *CheckpointManager) MarkOperationState(stepID string, state OperationLifecycleState, metadata OperationMetadata, endTimestamp *time.Time)
func (*CheckpointManager) SetTerminating ¶
func (m *CheckpointManager) SetTerminating()
func (*CheckpointManager) WaitForQueueCompletion ¶
func (m *CheckpointManager) WaitForQueueCompletion(ctx context.Context) error
func (*CheckpointManager) WaitForRetryTimer ¶
func (m *CheckpointManager) WaitForRetryTimer(ctx context.Context, stepID string) error
func (*CheckpointManager) WaitForStatusChange ¶
func (m *CheckpointManager) WaitForStatusChange(ctx context.Context, stepID string) error
type CheckpointRequest ¶
type CheckpointRequest struct {
DurableExecutionArn string
CheckpointToken string
Updates []OperationUpdate
}
type CheckpointResponse ¶
type CheckpointResponse struct {
CheckpointToken string
NewExecutionState *ExecutionState
}
type ChildConfig ¶
type CompletionConfig ¶
type ConcurrencyConfig ¶
type ConcurrencyConfig struct {
MaxConcurrency int
TopLevelSubType OperationSubType
IterationSubType OperationSubType
SummaryGenerator func(result *BatchResult) string
Serdes Serdes
ItemSerdes Serdes
CompletionConfig *CompletionConfig
}
type ConcurrentExecutionItem ¶
type ConcurrentExecutor ¶
type ConcurrentExecutor func(item ConcurrentExecutionItem, childContext *DurableContext) (any, error)
type ContextDetails ¶
type ContextDetails struct {
Result string `json:"Result,omitempty"`
ReplayChildren bool `json:"ReplayChildren,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
}
type ContextOptions ¶
type ContextOptions struct {
ReplayChildren bool `json:"ReplayChildren,omitempty"`
}
type CreateCallbackConfig ¶
type CreateCallbackResult ¶
type DurableContext ¶
type DurableContext struct {
// contains filtered or unexported fields
}
func NewDurableContext ¶
func NewDurableContext(execCtx *ExecutionContext, mode DurableExecutionMode, logger Logger, stepPrefix, parentID string, checkpoint *CheckpointManager) *DurableContext
func (*DurableContext) CreateCallback ¶
func (c *DurableContext) CreateCallback(ctx context.Context, name string, cfg *CreateCallbackConfig) *Future[CreateCallbackResult]
func (*DurableContext) ExecuteConcurrently ¶
func (c *DurableContext) ExecuteConcurrently( ctx context.Context, name string, items []ConcurrentExecutionItem, executor ConcurrentExecutor, cfg *ConcurrencyConfig, ) *Future[*BatchResult]
func (*DurableContext) Invoke ¶
func (c *DurableContext) Invoke(ctx context.Context, name string, functionName string, input any, cfg *InvokeConfig) *Future[any]
func (*DurableContext) Map ¶
func (c *DurableContext) Map( ctx context.Context, name string, items []any, mapFunc MapFunc, cfg *MapConfig, ) *Future[*BatchResult]
func (*DurableContext) Parallel ¶
func (c *DurableContext) Parallel( ctx context.Context, name string, branches []NamedParallelBranch, cfg *ParallelConfig, ) *Future[*BatchResult]
func (*DurableContext) RunInChildContext ¶
func (c *DurableContext) RunInChildContext(ctx context.Context, name string, fn ChildFunc, cfg *ChildConfig) *Future[any]
func (*DurableContext) Step ¶
func (c *DurableContext) Step(ctx context.Context, name string, fn StepFunc, cfg *StepConfig) *Future[any]
func (*DurableContext) WaitForCallback ¶
func (c *DurableContext) WaitForCallback(ctx context.Context, name string, submitter WaitForCallbackSubmitterFunc, cfg *WaitForCallbackConfig) *Future[any]
func (*DurableContext) WaitForCondition ¶
func (c *DurableContext) WaitForCondition(ctx context.Context, name string, check WaitForConditionCheckFunc, cfg *WaitForConditionConfig) *Future[any]
type DurableExecutionClient ¶
type DurableExecutionClient interface {
GetExecutionState(input GetExecutionStateRequest) (GetExecutionStateResponse, error)
Checkpoint(input CheckpointRequest) (CheckpointResponse, error)
}
type DurableExecutionConfig ¶
type DurableExecutionConfig struct {
Client DurableExecutionClient
Logger Logger
RequestID string
TenantID string
}
type DurableExecutionHandler ¶
type DurableExecutionMode ¶
type DurableExecutionMode string
const ( ExecutionMode DurableExecutionMode = "ExecutionMode" ReplayMode DurableExecutionMode = "ReplayMode" ReplaySucceededContext DurableExecutionMode = "ReplaySucceededContext" )
type DurableOperationError ¶
func (*DurableOperationError) Error ¶
func (e *DurableOperationError) Error() string
func (*DurableOperationError) ToErrorObject ¶
func (e *DurableOperationError) ToErrorObject() *ErrorObject
func (*DurableOperationError) Unwrap ¶
func (e *DurableOperationError) Unwrap() error
type Duration ¶
type Duration struct {
Days int `json:"days,omitempty"`
Hours int `json:"hours,omitempty"`
Minutes int `json:"minutes,omitempty"`
Seconds int `json:"seconds,omitempty"`
}
func (Duration) ToDuration ¶
type ErrorObject ¶
type ErrorObject struct {
ErrorType string `json:"ErrorType,omitempty"`
ErrorMessage string `json:"ErrorMessage,omitempty"`
ErrorData string `json:"ErrorData,omitempty"`
StackTrace []string `json:"StackTrace,omitempty"`
}
func CreateErrorObjectFromError ¶
func CreateErrorObjectFromError(err error, data string) *ErrorObject
type ExecutionContext ¶
type ExecutionContext struct {
// contains filtered or unexported fields
}
func (*ExecutionContext) DurableExecutionArn ¶
func (e *ExecutionContext) DurableExecutionArn() string
func (*ExecutionContext) DurableExecutionClient ¶
func (e *ExecutionContext) DurableExecutionClient() DurableExecutionClient
func (*ExecutionContext) GetStepData ¶
func (e *ExecutionContext) GetStepData(stepID string) *Operation
func (*ExecutionContext) TerminationManager ¶
func (e *ExecutionContext) TerminationManager() *TerminationManager
type ExecutionDetails ¶
type ExecutionDetails struct {
InputPayload string `json:"InputPayload,omitempty"`
}
type ExecutionState ¶
type Future ¶
type Future[T any] struct { // contains filtered or unexported fields }
func NewNeverFuture ¶
func NewRejectedFuture ¶
func NewResolvedFuture ¶
type InMemoryClient ¶
type InMemoryClient struct {
// contains filtered or unexported fields
}
InMemoryClient is a local DurableExecutionClient implementation for tests and local execution.
func NewInMemoryClient ¶
func NewInMemoryClient() *InMemoryClient
func (*InMemoryClient) Checkpoint ¶
func (c *InMemoryClient) Checkpoint(input CheckpointRequest) (CheckpointResponse, error)
func (*InMemoryClient) CompleteCallback ¶
func (c *InMemoryClient) CompleteCallback(callbackID string, payload string)
func (*InMemoryClient) FailCallback ¶
func (c *InMemoryClient) FailCallback(callbackID string, errObj *ErrorObject, timedOut bool)
func (*InMemoryClient) GetExecutionState ¶
func (c *InMemoryClient) GetExecutionState(_ GetExecutionStateRequest) (GetExecutionStateResponse, error)
func (*InMemoryClient) SetOperation ¶
func (c *InMemoryClient) SetOperation(op Operation)
type InvocationInput ¶
type InvocationInput struct {
DurableExecutionArn string `json:"DurableExecutionArn"`
CheckpointToken string `json:"CheckpointToken"`
InitialExecutionState ExecutionState `json:"InitialExecutionState"`
}
func ParseInvocationInput ¶
func ParseInvocationInput(raw []byte) (InvocationInput, error)
ParseInvocationInput parses invocation payload from durable backend. The backend may encode timestamp fields as either ISO-8601 strings or epoch numbers.
type InvocationOutput ¶
type InvocationOutput struct {
Status InvocationStatus `json:"Status"`
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
}
type InvocationStatus ¶
type InvocationStatus string
const ( InvocationStatusSucceeded InvocationStatus = "SUCCEEDED" InvocationStatusFailed InvocationStatus = "FAILED" InvocationStatusPending InvocationStatus = "PENDING" )
type InvokeConfig ¶
type JSONSerdes ¶
type JSONSerdes struct{}
func (JSONSerdes) Deserialize ¶
func (JSONSerdes) Deserialize(data string, _ SerdesContext) (any, error)
func (JSONSerdes) Serialize ¶
func (JSONSerdes) Serialize(value any, _ SerdesContext) (string, error)
type NamedParallelBranch ¶
type NamedParallelBranch struct {
Name string
Func ParallelFunc
}
type Operation ¶
type Operation struct {
ID string `json:"Id,omitempty"`
ParentID string `json:"ParentId,omitempty"`
Name string `json:"Name,omitempty"`
Type OperationType `json:"Type,omitempty"`
SubType OperationSubType `json:"SubType,omitempty"`
Status OperationStatus `json:"Status,omitempty"`
ExecutionDetails *ExecutionDetails `json:"ExecutionDetails,omitempty"`
StepDetails *StepDetails `json:"StepDetails,omitempty"`
WaitDetails *WaitDetails `json:"WaitDetails,omitempty"`
CallbackDetails *CallbackDetails `json:"CallbackDetails,omitempty"`
ChainedInvokeDetails *ChainedInvokeDetails `json:"ChainedInvokeDetails,omitempty"`
ContextDetails *ContextDetails `json:"ContextDetails,omitempty"`
StartTimestamp *time.Time `json:"StartTimestamp,omitempty"`
}
type OperationAction ¶
type OperationAction string
const ( OperationActionStart OperationAction = "START" OperationActionSucceed OperationAction = "SUCCEED" OperationActionFail OperationAction = "FAIL" OperationActionRetry OperationAction = "RETRY" )
type OperationLifecycleState ¶
type OperationLifecycleState string
const ( OperationLifecycleExecuting OperationLifecycleState = "EXECUTING" OperationLifecycleRetryWaiting OperationLifecycleState = "RETRY_WAITING" OperationLifecycleIdleNotAwaited OperationLifecycleState = "IDLE_NOT_AWAITED" OperationLifecycleIdleAwaited OperationLifecycleState = "IDLE_AWAITED" OperationLifecycleCompleted OperationLifecycleState = "COMPLETED" )
type OperationMetadata ¶
type OperationMetadata struct {
StepID string
Name string
Type OperationType
SubType OperationSubType
ParentID string
}
type OperationStatus ¶
type OperationStatus string
const ( OperationStatusStarted OperationStatus = "STARTED" OperationStatusReady OperationStatus = "READY" OperationStatusPending OperationStatus = "PENDING" OperationStatusSucceeded OperationStatus = "SUCCEEDED" OperationStatusFailed OperationStatus = "FAILED" OperationStatusCancelled OperationStatus = "CANCELLED" OperationStatusStopped OperationStatus = "STOPPED" OperationStatusTimedOut OperationStatus = "TIMED_OUT" )
type OperationSubType ¶
type OperationSubType string
const ( OperationSubTypeStep OperationSubType = "Step" OperationSubTypeWait OperationSubType = "Wait" OperationSubTypeCallback OperationSubType = "Callback" OperationSubTypeRunInChild OperationSubType = "RunInChildContext" OperationSubTypeMap OperationSubType = "Map" OperationSubTypeMapIteration OperationSubType = "MapIteration" OperationSubTypeParallel OperationSubType = "Parallel" OperationSubTypeParallelBranch OperationSubType = "ParallelBranch" OperationSubTypeWaitForCallback OperationSubType = "WaitForCallback" OperationSubTypeWaitForCondition OperationSubType = "WaitForCondition" OperationSubTypeChainedInvoke OperationSubType = "ChainedInvoke" )
type OperationType ¶
type OperationType string
const ( OperationTypeExecution OperationType = "EXECUTION" OperationTypeStep OperationType = "STEP" OperationTypeWait OperationType = "WAIT" OperationTypeCallback OperationType = "CALLBACK" OperationTypeContext OperationType = "CONTEXT" OperationTypeChainedInvoke OperationType = "CHAINED_INVOKE" )
type OperationUpdate ¶
type OperationUpdate struct {
ID string `json:"Id,omitempty"`
ParentID string `json:"ParentId,omitempty"`
Name string `json:"Name,omitempty"`
Type OperationType `json:"Type,omitempty"`
SubType OperationSubType `json:"SubType,omitempty"`
Action OperationAction `json:"Action,omitempty"`
Payload string `json:"Payload,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
StepOptions *StepOptions `json:"StepOptions,omitempty"`
WaitOptions *WaitOptions `json:"WaitOptions,omitempty"`
CallbackOptions *CallbackOptions `json:"CallbackOptions,omitempty"`
ChainedInvokeOptions *ChainedInvokeOptions `json:"ChainedInvokeOptions,omitempty"`
ContextOptions *ContextOptions `json:"ContextOptions,omitempty"`
}
type ParallelConfig ¶
type ParallelConfig struct {
MaxConcurrency int
Serdes Serdes
ItemSerdes Serdes
CompletionConfig *CompletionConfig
}
type ParallelFunc ¶
type ParallelFunc func(context *DurableContext) (any, error)
type PassThroughSerdes ¶
type PassThroughSerdes struct{}
func (PassThroughSerdes) Deserialize ¶
func (PassThroughSerdes) Deserialize(data string, _ SerdesContext) (any, error)
func (PassThroughSerdes) Serialize ¶
func (PassThroughSerdes) Serialize(value any, _ SerdesContext) (string, error)
type RetryDecision ¶
type RetryStrategy ¶
type RetryStrategy func(err error, attempt int) RetryDecision
type Serdes ¶
type Serdes interface {
Serialize(value any, ctx SerdesContext) (string, error)
Deserialize(data string, ctx SerdesContext) (any, error)
}
type SerdesContext ¶
type StepConfig ¶
type StepConfig struct {
RetryStrategy RetryStrategy
Semantics StepSemantics
Serdes Serdes
}
type StepContext ¶
type StepContext struct {
Logger Logger
}
type StepDetails ¶
type StepDetails struct {
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
Attempt int `json:"Attempt,omitempty"`
NextAttemptTimestamp *time.Time `json:"NextAttemptTimestamp,omitempty"`
}
type StepOptions ¶
type StepOptions struct {
NextAttemptDelaySeconds int `json:"NextAttemptDelaySeconds,omitempty"`
}
type StepSemantics ¶
type StepSemantics string
const ( StepSemanticsAtLeastOncePerRetry StepSemantics = "AT_LEAST_ONCE_PER_RETRY" StepSemanticsAtMostOncePerRetry StepSemantics = "AT_MOST_ONCE_PER_RETRY" )
type TerminationDetails ¶
type TerminationDetails struct {
Reason TerminationReason
Message string
Error error
}
type TerminationManager ¶
type TerminationManager struct {
// contains filtered or unexported fields
}
func NewTerminationManager ¶
func NewTerminationManager() *TerminationManager
func (*TerminationManager) Channel ¶
func (m *TerminationManager) Channel() <-chan TerminationDetails
func (*TerminationManager) Details ¶
func (m *TerminationManager) Details() (TerminationDetails, bool)
func (*TerminationManager) IsTerminated ¶
func (m *TerminationManager) IsTerminated() bool
func (*TerminationManager) Terminate ¶
func (m *TerminationManager) Terminate(details TerminationDetails)
type TerminationReason ¶
type TerminationReason string
const ( TerminationReasonOperationTerminated TerminationReason = "OPERATION_TERMINATED" TerminationReasonRetryScheduled TerminationReason = "RETRY_SCHEDULED" TerminationReasonRetryInterruptedStep TerminationReason = "RETRY_INTERRUPTED_STEP" TerminationReasonWaitScheduled TerminationReason = "WAIT_SCHEDULED" TerminationReasonCallbackPending TerminationReason = "CALLBACK_PENDING" TerminationReasonCheckpointFailed TerminationReason = "CHECKPOINT_FAILED" TerminationReasonSerdesFailed TerminationReason = "SERDES_FAILED" TerminationReasonContextValidation TerminationReason = "CONTEXT_VALIDATION_ERROR" TerminationReasonCustom TerminationReason = "CUSTOM" )
type UnrecoverableError ¶
type UnrecoverableError interface {
error
TerminationReason() TerminationReason
InvocationLevel() bool
ExecutionLevel() bool
}
type WaitDetails ¶
type WaitForCallbackConfig ¶
type WaitForCallbackConfig struct {
Timeout *Duration
HeartbeatTimeout *Duration
RetryStrategy RetryStrategy
Serdes Serdes
}
type WaitForCallbackContext ¶
type WaitForCallbackContext struct {
Logger Logger
}
type WaitForCallbackSubmitterFunc ¶
type WaitForCallbackSubmitterFunc func(callbackID string, ctx WaitForCallbackContext) error
type WaitForConditionCheckFunc ¶
type WaitForConditionCheckFunc func(state any, ctx WaitForConditionContext) (any, error)
type WaitForConditionConfig ¶
type WaitForConditionConfig struct {
WaitStrategy WaitForConditionWaitStrategyFunc
InitialState any
Serdes Serdes
}
type WaitForConditionContext ¶
type WaitForConditionContext struct {
Logger Logger
}
type WaitForConditionWaitStrategyFunc ¶
type WaitForConditionWaitStrategyFunc func(state any, attempt int) WaitForConditionDecision
type WaitOptions ¶
type WaitOptions struct {
WaitSeconds int `json:"WaitSeconds,omitempty"`
}
type WrappedHandler ¶
type WrappedHandler func(ctx context.Context, input InvocationInput) (InvocationOutput, error)
func WithDurableExecution ¶
func WithDurableExecution(handler DurableExecutionHandler, config DurableExecutionConfig) WrappedHandler
Source Files
¶
- aws_durable_execution_client.go
- batch_result_serdes.go
- callback_condition.go
- checkpoint_manager.go
- concurrent.go
- context_tracker.go
- doc.go
- durable_context.go
- errors.go
- execution_context.go
- future.go
- hash.go
- invocation_input_parser.go
- memory_client.go
- replay_validation.go
- serdes.go
- termination.go
- types.go
- with_durable_execution.go
Click to show internal directories.
Click to hide internal directories.