Documentation
¶
Overview ¶
Package durable provides an experimental Go implementation of AWS Durable Execution style workflow helpers.
The package is centered around WithDurableExecution, which wraps a Lambda style handler and provides a DurableContext. A handler uses DurableContext to record durable operations such as Step, Wait, Invoke, callbacks, child contexts, Map, and Parallel. Each operation returns a Future; call Await to either receive a replayed result from the checkpoint state or wait for the operation to complete.
Durable handlers must be deterministic in their durable operation order. On a replay, the SDK validates that the next operation has the same type, subtype, and name as the checkpointed operation. Non-deterministic changes fail the invocation rather than silently binding results to a different operation.
This repository is an independent experimental implementation and is not an official AWS SDK. Public API and checkpoint payload compatibility may change.
Index ¶
- func ClassifyCheckpointError(err error) error
- func DurableOperationErrorFromErrorObject(eo *ErrorObject) error
- func HashID(input string) string
- func InitializeExecutionContext(ctx context.Context, input InvocationInput, client DurableExecutionClient, ...) (*ExecutionContext, DurableExecutionMode, string, error)
- func IsUnrecoverableError(err error) bool
- func IsUnrecoverableInvocationError(err error) bool
- func NewCallbackError(msg string, cause error, data string) error
- func NewCallbackSubmitterError(msg string, cause error, data string) error
- func NewCallbackTimeoutError(msg string, cause error, data string) error
- func NewChildContextError(msg string, cause error, data string) error
- func NewInvokeError(msg string, cause error, data string) error
- func NewStepError(msg string, cause error, data string) error
- func NewUnrecoverableExecutionError(reason TerminationReason, msg string, cause error) error
- func NewUnrecoverableInvocationError(reason TerminationReason, msg string, cause error) error
- func NewWaitForConditionError(msg string, cause error, data string) error
- func SafeDeserialize(serdes Serdes, data string, stepID, stepName string, tm *TerminationManager, ...) (any, error)
- func SafeSerialize(serdes Serdes, value any, stepID, stepName string, tm *TerminationManager, ...) (string, error)
- func ValidateContextUsage(ctx context.Context, operationContextID, operationName string, ...)
- func ValidateReplayConsistency(stepID string, current operationDescriptor, checkpointData *Operation, ...) error
- func WithActiveOperation(ctx context.Context, op ActiveOperation) context.Context
- type APIError
- type AWSDurableExecutionClient
- type ActiveOperation
- type BatchCompletionReason
- type BatchItem
- type BatchItemStatus
- type BatchResult
- func (r *BatchResult) Errors() []error
- func (r *BatchResult) Failed() []BatchItem
- func (r *BatchResult) FailureCount() int
- func (r *BatchResult) HasFailure() bool
- func (r *BatchResult) Results() []any
- func (r *BatchResult) Started() []BatchItem
- func (r *BatchResult) StartedCount() int
- func (r *BatchResult) Status() BatchItemStatus
- func (r *BatchResult) Succeeded() []BatchItem
- func (r *BatchResult) SuccessCount() int
- func (r *BatchResult) ThrowIfError() error
- func (r *BatchResult) TotalCount() int
- type BatchResultSerdes
- type CallbackDetails
- type CallbackOptions
- type ChainedInvokeDetails
- type ChainedInvokeOptions
- type CheckpointManager
- func (m *CheckpointManager) Checkpoint(ctx context.Context, stepID string, update OperationUpdate) error
- func (m *CheckpointManager) ForceCheckpoint(ctx context.Context) error
- func (m *CheckpointManager) GetOperationState(stepID string) OperationLifecycleState
- func (m *CheckpointManager) MarkAncestorFinished(stepID string)
- func (m *CheckpointManager) MarkOperationAwaited(stepID string)
- func (m *CheckpointManager) MarkOperationState(stepID string, state OperationLifecycleState, metadata OperationMetadata, ...)
- func (m *CheckpointManager) SetTerminating()
- func (m *CheckpointManager) WaitForQueueCompletion(ctx context.Context) error
- func (m *CheckpointManager) WaitForRetryTimer(ctx context.Context, stepID string) error
- func (m *CheckpointManager) WaitForStatusChange(ctx context.Context, stepID string) error
- type CheckpointRequest
- type CheckpointResponse
- type ChildConfig
- type ChildFunc
- type CompletionConfig
- type ConcurrencyConfig
- type ConcurrentExecutionItem
- type ConcurrentExecutor
- type ContextDetails
- type ContextOptions
- type CreateCallbackConfig
- type CreateCallbackResult
- type DurableContext
- func (c *DurableContext) CreateCallback(ctx context.Context, name string, cfg *CreateCallbackConfig) *Future[CreateCallbackResult]
- func (c *DurableContext) ExecuteConcurrently(ctx context.Context, name string, items []ConcurrentExecutionItem, ...) *Future[*BatchResult]
- func (c *DurableContext) Invoke(ctx context.Context, name string, functionName string, input any, ...) *Future[any]
- func (c *DurableContext) Map(ctx context.Context, name string, items []any, mapFunc MapFunc, cfg *MapConfig) *Future[*BatchResult]
- func (c *DurableContext) Parallel(ctx context.Context, name string, branches []NamedParallelBranch, ...) *Future[*BatchResult]
- func (c *DurableContext) RunInChildContext(ctx context.Context, name string, fn ChildFunc, cfg *ChildConfig) *Future[any]
- func (c *DurableContext) Step(ctx context.Context, name string, fn StepFunc, cfg *StepConfig) *Future[any]
- func (c *DurableContext) Wait(ctx context.Context, name string, duration Duration) *Future[struct{}]
- func (c *DurableContext) WaitForCallback(ctx context.Context, name string, submitter WaitForCallbackSubmitterFunc, ...) *Future[any]
- func (c *DurableContext) WaitForCondition(ctx context.Context, name string, check WaitForConditionCheckFunc, ...) *Future[any]
- type DurableExecutionClient
- type DurableExecutionConfig
- type DurableExecutionHandler
- type DurableExecutionMode
- type DurableOperationError
- type Duration
- type ErrorObject
- type ExecutionContext
- type ExecutionDetails
- type ExecutionState
- type Future
- type GetExecutionStateRequest
- type GetExecutionStateResponse
- type InMemoryClient
- func (c *InMemoryClient) Checkpoint(_ context.Context, input CheckpointRequest) (CheckpointResponse, error)
- func (c *InMemoryClient) CompleteCallback(callbackID string, payload string)
- func (c *InMemoryClient) FailCallback(callbackID string, errObj *ErrorObject, timedOut bool)
- func (c *InMemoryClient) GetExecutionState(_ context.Context, _ GetExecutionStateRequest) (GetExecutionStateResponse, error)
- func (c *InMemoryClient) SetOperation(op Operation)
- type InvocationInput
- type InvocationOutput
- type InvocationStatus
- type InvokeConfig
- type JSONSerdes
- type Logger
- type MapConfig
- type MapFunc
- type NamedParallelBranch
- type NopLogger
- type Operation
- type OperationAction
- type OperationLifecycleState
- type OperationMetadata
- type OperationStatus
- type OperationSubType
- type OperationType
- type OperationUpdate
- type ParallelConfig
- type ParallelFunc
- type PassThroughSerdes
- type RetryDecision
- type RetryStrategy
- type Serdes
- type SerdesContext
- type StepConfig
- type StepContext
- type StepDetails
- type StepFunc
- type StepOptions
- type StepSemantics
- type TerminationDetails
- type TerminationManager
- type TerminationReason
- type UnrecoverableError
- type WaitDetails
- type WaitForCallbackConfig
- type WaitForCallbackContext
- type WaitForCallbackSubmitterFunc
- type WaitForConditionCheckFunc
- type WaitForConditionConfig
- type WaitForConditionContext
- type WaitForConditionDecision
- type WaitForConditionWaitStrategyFunc
- type WaitOptions
- type WrappedHandler
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClassifyCheckpointError ¶
ClassifyCheckpointError classifies backend checkpoint errors into unrecoverable invocation or execution errors.
func DurableOperationErrorFromErrorObject ¶
func DurableOperationErrorFromErrorObject(eo *ErrorObject) error
DurableOperationErrorFromErrorObject converts a checkpointed ErrorObject into a Go error.
func InitializeExecutionContext ¶
func InitializeExecutionContext(ctx context.Context, input InvocationInput, client DurableExecutionClient, requestID, tenantID string) (*ExecutionContext, DurableExecutionMode, string, error)
InitializeExecutionContext constructs an ExecutionContext from InvocationInput and loads any additional paginated backend state.
func IsUnrecoverableError ¶
IsUnrecoverableError reports whether err implements UnrecoverableError.
func IsUnrecoverableInvocationError ¶
IsUnrecoverableInvocationError reports whether err is unrecoverable at the invocation level.
func NewCallbackError ¶
NewCallbackError creates an error for a failed callback operation.
func NewCallbackSubmitterError ¶
NewCallbackSubmitterError creates an error for a failed WaitForCallback submitter step.
func NewCallbackTimeoutError ¶
NewCallbackTimeoutError creates an error for a timed-out callback.
func NewChildContextError ¶
NewChildContextError creates an error for a failed child context.
func NewInvokeError ¶
NewInvokeError creates an error for a failed chained invocation.
func NewStepError ¶
NewStepError creates an error for a failed Step operation.
func NewUnrecoverableExecutionError ¶
func NewUnrecoverableExecutionError(reason TerminationReason, msg string, cause error) error
NewUnrecoverableExecutionError creates an unrecoverable error scoped to the whole durable execution.
func NewUnrecoverableInvocationError ¶
func NewUnrecoverableInvocationError(reason TerminationReason, msg string, cause error) error
NewUnrecoverableInvocationError creates an unrecoverable error scoped to the current invocation.
func NewWaitForConditionError ¶
NewWaitForConditionError creates an error for a failed WaitForCondition.
func SafeDeserialize ¶
func SafeDeserialize(serdes Serdes, data string, stepID, stepName string, tm *TerminationManager, durableExecutionArn string) (any, error)
SafeDeserialize deserializes data and terminates the invocation on deserialization failure.
func SafeSerialize ¶
func SafeSerialize(serdes Serdes, value any, stepID, stepName string, tm *TerminationManager, durableExecutionArn string) (string, error)
SafeSerialize serializes value and terminates the invocation on serialization failure.
func ValidateContextUsage ¶
func ValidateContextUsage(ctx context.Context, operationContextID, operationName string, tm *TerminationManager)
ValidateContextUsage terminates the invocation when a durable operation is called with a context from a different DurableContext.
func ValidateReplayConsistency ¶
func ValidateReplayConsistency(stepID string, current operationDescriptor, checkpointData *Operation, execCtx *ExecutionContext) error
ValidateReplayConsistency verifies that the current durable operation matches the checkpointed operation at the same call position.
func WithActiveOperation ¶
func WithActiveOperation(ctx context.Context, op ActiveOperation) context.Context
WithActiveOperation stores active durable operation metadata in ctx.
Types ¶
type APIError ¶
APIError is a normalized backend API error used for checkpoint error classification.
type AWSDurableExecutionClient ¶
type AWSDurableExecutionClient struct {
// contains filtered or unexported fields
}
AWSDurableExecutionClient implements DurableExecutionClient with the AWS Lambda Durable Execution APIs.
func NewAWSDurableExecutionClient ¶
func NewAWSDurableExecutionClient(client *lambdasdk.Client) *AWSDurableExecutionClient
NewAWSDurableExecutionClient wraps an existing AWS Lambda client.
func NewDefaultAWSDurableExecutionClient ¶
func NewDefaultAWSDurableExecutionClient(ctx context.Context, optFns ...func(*awsconfig.LoadOptions) error) (*AWSDurableExecutionClient, error)
NewDefaultAWSDurableExecutionClient loads the default AWS configuration and creates an AWSDurableExecutionClient.
func (*AWSDurableExecutionClient) Checkpoint ¶
func (c *AWSDurableExecutionClient) Checkpoint(ctx context.Context, input CheckpointRequest) (CheckpointResponse, error)
Checkpoint persists durable operation updates through AWS Lambda.
func (*AWSDurableExecutionClient) GetExecutionState ¶
func (c *AWSDurableExecutionClient) GetExecutionState(ctx context.Context, input GetExecutionStateRequest) (GetExecutionStateResponse, error)
GetExecutionState loads durable execution state from AWS Lambda.
type ActiveOperation ¶
type ActiveOperation struct {
ContextID string
ParentID string
Attempt int
Mode DurableExecutionMode
}
ActiveOperation identifies the durable context currently associated with a context.Context.
func GetActiveOperation ¶
func GetActiveOperation(ctx context.Context) (ActiveOperation, bool)
GetActiveOperation returns active durable operation metadata from ctx.
type BatchCompletionReason ¶
type BatchCompletionReason string
BatchCompletionReason explains why a batch operation stopped scheduling or completed.
const ( // BatchCompletionReasonAllCompleted means all scheduled items completed. BatchCompletionReasonAllCompleted BatchCompletionReason = "ALL_COMPLETED" // BatchCompletionReasonMinSuccessfulReached means MinSuccessful was reached. BatchCompletionReasonMinSuccessfulReached BatchCompletionReason = "MIN_SUCCESSFUL_REACHED" // BatchCompletionReasonFailureToleranceExceeded means failure tolerance was // exceeded. BatchCompletionReasonFailureToleranceExceeded BatchCompletionReason = "FAILURE_TOLERANCE_EXCEEDED" )
type BatchItem ¶
type BatchItem struct {
Result any `json:"result,omitempty"`
Error *ErrorObject `json:"error,omitempty"`
Index int `json:"index"`
Status BatchItemStatus `json:"status"`
}
BatchItem is one item result in a BatchResult.
type BatchItemStatus ¶
type BatchItemStatus string
BatchItemStatus is the status of one Map, Parallel, or concurrent item.
const ( // BatchItemStatusSucceeded means the item completed successfully. BatchItemStatusSucceeded BatchItemStatus = "SUCCEEDED" // BatchItemStatusFailed means the item completed with an error. BatchItemStatusFailed BatchItemStatus = "FAILED" // BatchItemStatusStarted means the item was started but not completed before // scheduling stopped. BatchItemStatusStarted BatchItemStatus = "STARTED" )
type BatchResult ¶
type BatchResult struct {
All []BatchItem `json:"all"`
CompletionReason BatchCompletionReason `json:"completionReason"`
}
BatchResult is returned by Map, Parallel, and ExecuteConcurrently.
func (*BatchResult) Errors ¶
func (r *BatchResult) Errors() []error
Errors returns errors for failed items.
func (*BatchResult) Failed ¶
func (r *BatchResult) Failed() []BatchItem
Failed returns all failed items.
func (*BatchResult) FailureCount ¶
func (r *BatchResult) FailureCount() int
FailureCount returns the number of failed items.
func (*BatchResult) HasFailure ¶
func (r *BatchResult) HasFailure() bool
HasFailure reports whether any item failed.
func (*BatchResult) Results ¶
func (r *BatchResult) Results() []any
Results returns successful item results in batch order.
func (*BatchResult) Started ¶
func (r *BatchResult) Started() []BatchItem
Started returns all items that were started but did not complete.
func (*BatchResult) StartedCount ¶
func (r *BatchResult) StartedCount() int
StartedCount returns the number of started but incomplete items.
func (*BatchResult) Status ¶
func (r *BatchResult) Status() BatchItemStatus
Status returns BatchItemStatusFailed if any item failed, otherwise BatchItemStatusSucceeded.
func (*BatchResult) Succeeded ¶
func (r *BatchResult) Succeeded() []BatchItem
Succeeded returns all successful items.
func (*BatchResult) SuccessCount ¶
func (r *BatchResult) SuccessCount() int
SuccessCount returns the number of successful items.
func (*BatchResult) ThrowIfError ¶
func (r *BatchResult) ThrowIfError() error
ThrowIfError returns the first failed item error, if any.
func (*BatchResult) TotalCount ¶
func (r *BatchResult) TotalCount() int
TotalCount returns the number of items represented in the result.
type BatchResultSerdes ¶
type BatchResultSerdes struct{}
BatchResultSerdes serializes BatchResult values for Map, Parallel, and ExecuteConcurrently.
func (BatchResultSerdes) Deserialize ¶
func (BatchResultSerdes) Deserialize(data string, _ SerdesContext) (any, error)
Deserialize converts JSON into a *BatchResult.
func (BatchResultSerdes) Serialize ¶
func (BatchResultSerdes) Serialize(value any, _ SerdesContext) (string, error)
Serialize converts a BatchResult or compatible value to JSON.
type CallbackDetails ¶
type CallbackDetails struct {
CallbackID string `json:"CallbackId,omitempty"`
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
ScheduledTimeoutTimestamp *time.Time `json:"ScheduledTimeoutTimestamp,omitempty"`
}
CallbackDetails contains persisted state for a callback operation.
type CallbackOptions ¶
type CallbackOptions struct {
TimeoutSeconds int `json:"TimeoutSeconds,omitempty"`
HeartbeatTimeoutSeconds int `json:"HeartbeatTimeoutSeconds,omitempty"`
}
CallbackOptions are checkpoint options for callbacks.
type ChainedInvokeDetails ¶
type ChainedInvokeDetails struct {
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
}
ChainedInvokeDetails contains persisted state for a chained invocation.
type ChainedInvokeOptions ¶
type ChainedInvokeOptions struct {
FunctionName string `json:"FunctionName,omitempty"`
}
ChainedInvokeOptions are checkpoint options for chained invocation.
type CheckpointManager ¶
type CheckpointManager struct {
// contains filtered or unexported fields
}
CheckpointManager batches checkpoint writes and tracks local operation lifecycle state for one invocation.
It is part of the SDK runtime; most applications should not need to use it directly.
func NewCheckpointManager ¶
func NewCheckpointManager( durableExecutionArn string, stepData map[string]Operation, stepDataMu *sync.RWMutex, client DurableExecutionClient, termination *TerminationManager, checkpointToken string, logger Logger, ) *CheckpointManager
NewCheckpointManager creates a CheckpointManager for an execution.
func (*CheckpointManager) Checkpoint ¶
func (m *CheckpointManager) Checkpoint(ctx context.Context, stepID string, update OperationUpdate) error
Checkpoint enqueues one operation update and waits until it has been flushed to the backend.
func (*CheckpointManager) ForceCheckpoint ¶
func (m *CheckpointManager) ForceCheckpoint(ctx context.Context) error
ForceCheckpoint flushes pending backend state without adding an operation update.
func (*CheckpointManager) GetOperationState ¶
func (m *CheckpointManager) GetOperationState(stepID string) OperationLifecycleState
GetOperationState returns the local lifecycle state for stepID.
func (*CheckpointManager) MarkAncestorFinished ¶
func (m *CheckpointManager) MarkAncestorFinished(stepID string)
MarkAncestorFinished records that a child-context ancestor has completed.
func (*CheckpointManager) MarkOperationAwaited ¶
func (m *CheckpointManager) MarkOperationAwaited(stepID string)
MarkOperationAwaited records that a future for stepID has been awaited.
func (*CheckpointManager) MarkOperationState ¶
func (m *CheckpointManager) MarkOperationState(stepID string, state OperationLifecycleState, metadata OperationMetadata, endTimestamp *time.Time)
MarkOperationState updates local lifecycle state for a durable operation.
func (*CheckpointManager) SetTerminating ¶
func (m *CheckpointManager) SetTerminating()
SetTerminating prevents new checkpoint work from being enqueued.
func (*CheckpointManager) WaitForQueueCompletion ¶
func (m *CheckpointManager) WaitForQueueCompletion(ctx context.Context) error
WaitForQueueCompletion waits until all queued checkpoint writes complete.
func (*CheckpointManager) WaitForRetryTimer ¶
func (m *CheckpointManager) WaitForRetryTimer(ctx context.Context, stepID string) error
WaitForRetryTimer waits for a retry timer and then polls for operation completion.
func (*CheckpointManager) WaitForStatusChange ¶
func (m *CheckpointManager) WaitForStatusChange(ctx context.Context, stepID string) error
WaitForStatusChange polls backend state until stepID reaches a terminal status or the context is canceled.
type CheckpointRequest ¶
type CheckpointRequest struct {
DurableExecutionArn string
CheckpointToken string
Updates []OperationUpdate
}
CheckpointRequest persists operation updates to a durable backend.
type CheckpointResponse ¶
type CheckpointResponse struct {
CheckpointToken string
NewExecutionState *ExecutionState
}
CheckpointResponse is returned after checkpoint persistence.
type ChildConfig ¶
type ChildConfig struct {
// Serdes serializes the child context result.
Serdes Serdes
// SubType identifies the child context operation subtype in checkpoints.
SubType OperationSubType
// SummaryGenerator optionally stores a compact summary instead of the full
// result when the child context needs replay-child reconstruction.
SummaryGenerator func(value any) string
// ErrorMapper can rewrite child errors before they are returned.
ErrorMapper func(error) error
}
ChildConfig configures DurableContext.RunInChildContext.
type ChildFunc ¶
type ChildFunc func(ctx context.Context, child *DurableContext) (any, error)
ChildFunc is executed inside a child DurableContext.
type CompletionConfig ¶
type CompletionConfig struct {
MinSuccessful *int
ToleratedFailureCount *int
ToleratedFailurePercentage *float64
}
CompletionConfig controls when a batch-style operation can stop early.
type ConcurrencyConfig ¶
type ConcurrencyConfig struct {
MaxConcurrency int
TopLevelSubType OperationSubType
IterationSubType OperationSubType
SummaryGenerator func(result *BatchResult) string
Serdes Serdes
ItemSerdes Serdes
CompletionConfig *CompletionConfig
}
ConcurrencyConfig configures DurableContext.ExecuteConcurrently.
type ConcurrentExecutionItem ¶
type ConcurrentExecutionItem struct {
ID string `json:"id"`
Data any `json:"data,omitempty"`
Index int `json:"index"`
Name string `json:"name,omitempty"`
}
ConcurrentExecutionItem describes one item for ExecuteConcurrently.
type ConcurrentExecutor ¶
type ConcurrentExecutor func(item ConcurrentExecutionItem, childContext *DurableContext) (any, error)
ConcurrentExecutor processes one ExecuteConcurrently item.
type ContextDetails ¶
type ContextDetails struct {
Result string `json:"Result,omitempty"`
ReplayChildren bool `json:"ReplayChildren,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
}
ContextDetails contains persisted state for a child context operation.
type ContextOptions ¶
type ContextOptions struct {
ReplayChildren bool `json:"ReplayChildren,omitempty"`
}
ContextOptions are checkpoint options for child contexts.
type CreateCallbackConfig ¶
CreateCallbackConfig configures DurableContext.CreateCallback.
type CreateCallbackResult ¶
CreateCallbackResult is returned by DurableContext.CreateCallback.
type DurableContext ¶
type DurableContext struct {
// contains filtered or unexported fields
}
DurableContext schedules durable workflow operations for one handler invocation.
DurableContext is passed to a DurableExecutionHandler. Its methods create deterministic operation IDs from call order, so durable operations must be called in the same order on every replay.
func NewDurableContext ¶
func NewDurableContext(execCtx *ExecutionContext, mode DurableExecutionMode, logger Logger, stepPrefix, parentID string, checkpoint *CheckpointManager) *DurableContext
NewDurableContext constructs a DurableContext for an execution context.
Most applications should use WithDurableExecution instead of constructing a DurableContext directly.
func (*DurableContext) CreateCallback ¶
func (c *DurableContext) CreateCallback(ctx context.Context, name string, cfg *CreateCallbackConfig) *Future[CreateCallbackResult]
CreateCallback creates a durable callback operation and returns its callback ID plus a promise for the eventual callback result.
The callback ID is intended to be given to an external system. The returned promise completes when the durable backend records callback success, failure, or timeout.
func (*DurableContext) ExecuteConcurrently ¶
func (c *DurableContext) ExecuteConcurrently( ctx context.Context, name string, items []ConcurrentExecutionItem, executor ConcurrentExecutor, cfg *ConcurrencyConfig, ) *Future[*BatchResult]
ExecuteConcurrently executes durable child contexts for items with optional concurrency and completion controls.
It is the lower-level primitive behind Map and Parallel. Most callers should prefer those typed helpers unless they need custom ConcurrentExecutionItem metadata or operation subtypes.
func (*DurableContext) Invoke ¶
func (c *DurableContext) Invoke(ctx context.Context, name string, functionName string, input any, cfg *InvokeConfig) *Future[any]
Invoke schedules a chained Lambda invocation and returns a future for its result.
functionName is passed through to the durable backend. input is serialized with cfg.PayloadSerdes or JSONSerdes, and successful results are deserialized with cfg.ResultSerdes or JSONSerdes.
func (*DurableContext) Map ¶
func (c *DurableContext) Map( ctx context.Context, name string, items []any, mapFunc MapFunc, cfg *MapConfig, ) *Future[*BatchResult]
Map applies mapFunc to each item in a durable child context and returns a BatchResult.
Each item receives a stable index and a child DurableContext. cfg can limit concurrency, customize item names, and stop scheduling after completion criteria are met.
Example ¶
package main
import (
"context"
"fmt"
"github.com/kurochan/aws-durable-execution-go/durable"
)
func exampleInput(arn, token, payload string) durable.InvocationInput {
return durable.InvocationInput{
DurableExecutionArn: arn,
CheckpointToken: token,
InitialExecutionState: durable.ExecutionState{
Operations: []durable.Operation{{
ID: "execution-root",
Type: durable.OperationTypeExecution,
Status: durable.OperationStatusStarted,
ExecutionDetails: &durable.ExecutionDetails{
InputPayload: payload,
},
}},
},
}
}
func main() {
client := durable.NewInMemoryClient()
wrapped := durable.WithDurableExecution(
func(ctx context.Context, _ any, dctx *durable.DurableContext) (any, error) {
result, err := dctx.Map(ctx, "double", []any{1, 2, 3},
func(_ *durable.DurableContext, item any, _ int, _ []any) (any, error) {
return item.(int) * 2, nil
},
&durable.MapConfig{MaxConcurrency: 2},
).Await(ctx)
if err != nil {
return nil, err
}
return map[string]any{
"total": result.TotalCount(),
"success": result.SuccessCount(),
"values": result.Results(),
}, nil
},
durable.DurableExecutionConfig{Client: client},
)
out, err := wrapped(context.Background(), exampleInput(
"arn:test:execution:map-example",
"token-map-example",
`{}`,
))
if err != nil {
panic(err)
}
fmt.Println(out.Status)
fmt.Println(out.Result)
}
Output: SUCCEEDED {"success":3,"total":3,"values":[2,4,6]}
func (*DurableContext) Parallel ¶
func (c *DurableContext) Parallel( ctx context.Context, name string, branches []NamedParallelBranch, cfg *ParallelConfig, ) *Future[*BatchResult]
Parallel executes named branches in durable child contexts and returns a BatchResult.
Branch order is part of replay behavior. Keep the branch slice stable across replays.
func (*DurableContext) RunInChildContext ¶
func (c *DurableContext) RunInChildContext(ctx context.Context, name string, fn ChildFunc, cfg *ChildConfig) *Future[any]
RunInChildContext executes fn with a child DurableContext and checkpoints the child context as one durable operation.
Child contexts are useful for grouping nested operations and for Map and Parallel internals. name and cfg.SubType are part of replay validation.
func (*DurableContext) Step ¶
func (c *DurableContext) Step(ctx context.Context, name string, fn StepFunc, cfg *StepConfig) *Future[any]
Step runs fn as a durable step and returns a future for its result.
On replay, a completed step returns its checkpointed result without running fn again. Failed steps are retried according to cfg.RetryStrategy. Step names are part of replay validation and should be stable.
func (*DurableContext) Wait ¶
func (c *DurableContext) Wait(ctx context.Context, name string, duration Duration) *Future[struct{}]
Wait schedules a durable timer and returns a future that completes after the backend marks the wait operation as succeeded.
Non-positive durations are normalized to one second. While the timer is pending, WithDurableExecution returns InvocationStatusPending.
Example ¶
package main
import (
"context"
"fmt"
"github.com/kurochan/aws-durable-execution-go/durable"
)
func exampleInput(arn, token, payload string) durable.InvocationInput {
return durable.InvocationInput{
DurableExecutionArn: arn,
CheckpointToken: token,
InitialExecutionState: durable.ExecutionState{
Operations: []durable.Operation{{
ID: "execution-root",
Type: durable.OperationTypeExecution,
Status: durable.OperationStatusStarted,
ExecutionDetails: &durable.ExecutionDetails{
InputPayload: payload,
},
}},
},
}
}
func main() {
client := durable.NewInMemoryClient()
wrapped := durable.WithDurableExecution(
func(ctx context.Context, _ any, dctx *durable.DurableContext) (any, error) {
if _, err := dctx.Wait(ctx, "cooldown", durable.Duration{Seconds: 30}).Await(ctx); err != nil {
return nil, err
}
return "done", nil
},
durable.DurableExecutionConfig{Client: client},
)
out, err := wrapped(context.Background(), exampleInput(
"arn:test:execution:wait-example",
"token-wait-example",
`{}`,
))
if err != nil {
panic(err)
}
fmt.Println(out.Status)
}
Output: PENDING
func (*DurableContext) WaitForCallback ¶
func (c *DurableContext) WaitForCallback(ctx context.Context, name string, submitter WaitForCallbackSubmitterFunc, cfg *WaitForCallbackConfig) *Future[any]
WaitForCallback creates a callback, runs submitter in a durable step, and waits for the callback result.
submitter receives the callback ID and should send it to the external system that will complete the callback. submitter failures follow cfg.RetryStrategy when provided.
func (*DurableContext) WaitForCondition ¶
func (c *DurableContext) WaitForCondition(ctx context.Context, name string, check WaitForConditionCheckFunc, cfg *WaitForConditionConfig) *Future[any]
WaitForCondition repeatedly runs check until cfg.WaitStrategy decides to stop waiting.
The state returned by check is checkpointed between attempts. When the wait strategy returns ShouldContinue, the SDK schedules a retry delay and the invocation becomes pending until the backend resumes it.
type DurableExecutionClient ¶
type DurableExecutionClient interface {
GetExecutionState(ctx context.Context, input GetExecutionStateRequest) (GetExecutionStateResponse, error)
Checkpoint(ctx context.Context, input CheckpointRequest) (CheckpointResponse, error)
}
DurableExecutionClient is the backend contract used by the SDK.
Implement this interface to plug in a production backend or use NewInMemoryClient for tests and examples.
type DurableExecutionConfig ¶
type DurableExecutionConfig struct {
// Client persists and loads durable execution state.
Client DurableExecutionClient
// Logger receives SDK log messages. NopLogger is used when Logger is nil.
Logger Logger
// RequestID is optional metadata for the current platform invocation.
RequestID string
// TenantID is optional metadata for multi-tenant deployments.
TenantID string
}
DurableExecutionConfig configures WithDurableExecution.
type DurableExecutionHandler ¶
type DurableExecutionHandler func(ctx context.Context, event any, dctx *DurableContext) (any, error)
DurableExecutionHandler is the user handler executed inside a durable invocation.
event is the customer payload extracted from the execution operation in the invocation input. dctx is used to schedule durable operations. The handler should call Await on futures whose results are required before returning.
type DurableExecutionMode ¶
type DurableExecutionMode string
DurableExecutionMode describes how a DurableContext should interpret checkpoint state for the current invocation.
const ( // ExecutionMode schedules new durable operations. ExecutionMode DurableExecutionMode = "ExecutionMode" // ReplayMode reads existing operation results before switching to execution. ReplayMode DurableExecutionMode = "ReplayMode" // ReplaySucceededContext replays children of a context whose full result was // replaced by a compact summary. ReplaySucceededContext DurableExecutionMode = "ReplaySucceededContext" )
type DurableOperationError ¶
DurableOperationError is the common error type used for durable operation failures that can be serialized into ErrorObject.
func (*DurableOperationError) Error ¶
func (e *DurableOperationError) Error() string
Error returns the durable operation error message.
func (*DurableOperationError) ToErrorObject ¶
func (e *DurableOperationError) ToErrorObject() *ErrorObject
ToErrorObject converts e into a serializable ErrorObject.
func (*DurableOperationError) Unwrap ¶
func (e *DurableOperationError) Unwrap() error
Unwrap returns the underlying cause.
type Duration ¶
type Duration struct {
Days int `json:"days,omitempty"`
Hours int `json:"hours,omitempty"`
Minutes int `json:"minutes,omitempty"`
Seconds int `json:"seconds,omitempty"`
}
Duration is a serializable duration used by durable timers and retry delays.
func (Duration) ToDuration ¶
ToDuration converts d to a time.Duration.
type ErrorObject ¶
type ErrorObject struct {
ErrorType string `json:"ErrorType,omitempty"`
ErrorMessage string `json:"ErrorMessage,omitempty"`
ErrorData string `json:"ErrorData,omitempty"`
StackTrace []string `json:"StackTrace,omitempty"`
}
ErrorObject is the serialized durable error shape stored in checkpoints and invocation outputs.
func CreateErrorObjectFromError ¶
func CreateErrorObjectFromError(err error, data string) *ErrorObject
CreateErrorObjectFromError converts err into the serialized ErrorObject representation used in invocation outputs and checkpoints.
type ExecutionContext ¶
type ExecutionContext struct {
// contains filtered or unexported fields
}
ExecutionContext holds durable execution state for one wrapped handler invocation.
Most applications interact with DurableContext instead of ExecutionContext.
func (*ExecutionContext) DurableExecutionArn ¶
func (e *ExecutionContext) DurableExecutionArn() string
DurableExecutionArn returns the durable execution ARN for this invocation.
func (*ExecutionContext) DurableExecutionClient ¶
func (e *ExecutionContext) DurableExecutionClient() DurableExecutionClient
DurableExecutionClient returns the backend client for this execution.
func (*ExecutionContext) GetStepData ¶
func (e *ExecutionContext) GetStepData(stepID string) *Operation
GetStepData returns checkpoint data for a durable operation ID.
stepID is the SDK-generated unhashed operation ID. The lookup hashes it to match backend operation IDs.
func (*ExecutionContext) TerminationManager ¶
func (e *ExecutionContext) TerminationManager() *TerminationManager
TerminationManager returns the invocation termination manager.
type ExecutionDetails ¶
type ExecutionDetails struct {
InputPayload string `json:"InputPayload,omitempty"`
}
ExecutionDetails contains metadata for the top-level execution operation.
type ExecutionState ¶
type ExecutionState struct {
Operations []Operation `json:"Operations"`
NextMarker string `json:"NextMarker,omitempty"`
}
ExecutionState is the checkpoint state included in an invocation input or returned by a durable backend.
type Future ¶
type Future[T any] struct { // contains filtered or unexported fields }
Future represents a durable operation whose result is produced when Await is called.
DurableContext methods return futures so the SDK can checkpoint an operation before the caller decides to wait for it. A Future starts at most once.
func NewFuture ¶
NewFuture creates a Future backed by exec.
exec is invoked once, lazily, by the first Await call.
func NewNeverFuture ¶
NewNeverFuture returns a Future that only completes when its context is canceled.
It is used during replay paths where a previously succeeded child context should not execute operations that were not part of the checkpointed result.
func NewRejectedFuture ¶
NewRejectedFuture returns a Future that fails with err.
func NewResolvedFuture ¶
NewResolvedFuture returns a Future that resolves to value.
type GetExecutionStateRequest ¶
type GetExecutionStateRequest struct {
DurableExecutionArn string
CheckpointToken string
Marker string
MaxItems int
}
GetExecutionStateRequest requests checkpoint state from a durable backend.
type GetExecutionStateResponse ¶
GetExecutionStateResponse contains a page of durable backend operations.
type InMemoryClient ¶
type InMemoryClient struct {
// contains filtered or unexported fields
}
InMemoryClient is a local DurableExecutionClient implementation for tests and local execution.
func NewInMemoryClient ¶
func NewInMemoryClient() *InMemoryClient
NewInMemoryClient creates an empty in-memory durable backend.
func (*InMemoryClient) Checkpoint ¶
func (c *InMemoryClient) Checkpoint(_ context.Context, input CheckpointRequest) (CheckpointResponse, error)
Checkpoint applies operation updates to the in-memory store.
func (*InMemoryClient) CompleteCallback ¶
func (c *InMemoryClient) CompleteCallback(callbackID string, payload string)
CompleteCallback marks an in-memory callback as succeeded.
func (*InMemoryClient) FailCallback ¶
func (c *InMemoryClient) FailCallback(callbackID string, errObj *ErrorObject, timedOut bool)
FailCallback marks an in-memory callback as failed or timed out.
func (*InMemoryClient) GetExecutionState ¶
func (c *InMemoryClient) GetExecutionState(_ context.Context, _ GetExecutionStateRequest) (GetExecutionStateResponse, error)
GetExecutionState returns all in-memory operations.
func (*InMemoryClient) SetOperation ¶
func (c *InMemoryClient) SetOperation(op Operation)
SetOperation inserts or replaces an operation in the in-memory store.
type InvocationInput ¶
type InvocationInput struct {
DurableExecutionArn string `json:"DurableExecutionArn"`
CheckpointToken string `json:"CheckpointToken"`
InitialExecutionState ExecutionState `json:"InitialExecutionState"`
}
InvocationInput is the payload consumed by a WrappedHandler.
func ParseInvocationInput ¶
func ParseInvocationInput(raw []byte) (InvocationInput, error)
ParseInvocationInput parses invocation payload from durable backend. The backend may encode timestamp fields as either ISO-8601 strings or epoch numbers.
type InvocationOutput ¶
type InvocationOutput struct {
Status InvocationStatus `json:"Status"`
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
}
InvocationOutput is returned by a wrapped durable handler.
type InvocationStatus ¶
type InvocationStatus string
InvocationStatus is the top-level status returned from a wrapped durable handler invocation.
const ( // InvocationStatusSucceeded means the handler completed and Result contains // the serialized handler return value when present. InvocationStatusSucceeded InvocationStatus = "SUCCEEDED" // InvocationStatusFailed means the handler completed with a business error. InvocationStatusFailed InvocationStatus = "FAILED" // InvocationStatusPending means execution paused on a durable wait, callback, // retry, invoke, or child operation and should be resumed later. InvocationStatusPending InvocationStatus = "PENDING" )
type InvokeConfig ¶
type InvokeConfig struct {
// PayloadSerdes serializes the invocation payload.
PayloadSerdes Serdes
// ResultSerdes deserializes the invoked function result.
ResultSerdes Serdes
}
InvokeConfig configures DurableContext.Invoke.
type JSONSerdes ¶
type JSONSerdes struct{}
JSONSerdes serializes values with encoding/json.
func (JSONSerdes) Deserialize ¶
func (JSONSerdes) Deserialize(data string, _ SerdesContext) (any, error)
Deserialize converts a JSON string to a Go value.
func (JSONSerdes) Serialize ¶
func (JSONSerdes) Serialize(value any, _ SerdesContext) (string, error)
Serialize converts value to a JSON string.
type Logger ¶
type Logger interface {
Debugf(format string, args ...any)
Infof(format string, args ...any)
Warnf(format string, args ...any)
Errorf(format string, args ...any)
}
Logger is the minimal logging interface used by the SDK.
type MapConfig ¶
type MapConfig struct {
MaxConcurrency int
ItemNamer func(item any, index int) string
Serdes Serdes
ItemSerdes Serdes
CompletionConfig *CompletionConfig
}
MapConfig configures DurableContext.Map.
type NamedParallelBranch ¶
type NamedParallelBranch struct {
Name string
Func ParallelFunc
}
NamedParallelBranch names one Parallel branch and provides its function.
type NopLogger ¶
type NopLogger struct{}
NopLogger discards all log messages.
type Operation ¶
type Operation struct {
ID string `json:"Id,omitempty"`
ParentID string `json:"ParentId,omitempty"`
Name string `json:"Name,omitempty"`
Type OperationType `json:"Type,omitempty"`
SubType OperationSubType `json:"SubType,omitempty"`
Status OperationStatus `json:"Status,omitempty"`
ExecutionDetails *ExecutionDetails `json:"ExecutionDetails,omitempty"`
StepDetails *StepDetails `json:"StepDetails,omitempty"`
WaitDetails *WaitDetails `json:"WaitDetails,omitempty"`
CallbackDetails *CallbackDetails `json:"CallbackDetails,omitempty"`
ChainedInvokeDetails *ChainedInvokeDetails `json:"ChainedInvokeDetails,omitempty"`
ContextDetails *ContextDetails `json:"ContextDetails,omitempty"`
StartTimestamp *time.Time `json:"StartTimestamp,omitempty"`
}
Operation is a durable backend operation record.
type OperationAction ¶
type OperationAction string
OperationAction is the checkpoint update action sent to the durable backend.
const ( // OperationActionStart starts an operation. OperationActionStart OperationAction = "START" // OperationActionSucceed marks an operation as succeeded. OperationActionSucceed OperationAction = "SUCCEED" // OperationActionFail marks an operation as failed. OperationActionFail OperationAction = "FAIL" // OperationActionRetry schedules a retry for an operation. OperationActionRetry OperationAction = "RETRY" )
type OperationLifecycleState ¶
type OperationLifecycleState string
OperationLifecycleState is an in-process state tracked while an invocation is running.
const ( // OperationLifecycleExecuting means the operation is running locally. OperationLifecycleExecuting OperationLifecycleState = "EXECUTING" // OperationLifecycleRetryWaiting means the operation is waiting for a retry // timer. OperationLifecycleRetryWaiting OperationLifecycleState = "RETRY_WAITING" // OperationLifecycleIdleNotAwaited means the operation has been scheduled but // its future has not been awaited. OperationLifecycleIdleNotAwaited OperationLifecycleState = "IDLE_NOT_AWAITED" // OperationLifecycleIdleAwaited means the operation has been awaited and is // waiting for external completion. OperationLifecycleIdleAwaited OperationLifecycleState = "IDLE_AWAITED" // OperationLifecycleCompleted means the operation is terminal locally. OperationLifecycleCompleted OperationLifecycleState = "COMPLETED" )
type OperationMetadata ¶
type OperationMetadata struct {
StepID string
Name string
Type OperationType
SubType OperationSubType
ParentID string
}
OperationMetadata identifies an operation being tracked locally.
type OperationStatus ¶
type OperationStatus string
OperationStatus is the status read from durable execution state.
const ( // OperationStatusStarted means the operation has started. OperationStatusStarted OperationStatus = "STARTED" // OperationStatusReady means the operation is ready to run. OperationStatusReady OperationStatus = "READY" // OperationStatusPending means the operation is waiting for a timer, // callback, retry, or external completion. OperationStatusPending OperationStatus = "PENDING" // OperationStatusSucceeded means the operation completed successfully. OperationStatusSucceeded OperationStatus = "SUCCEEDED" // OperationStatusFailed means the operation completed with an error. OperationStatusFailed OperationStatus = "FAILED" // OperationStatusCancelled means the operation was canceled. OperationStatusCancelled OperationStatus = "CANCELLED" // OperationStatusStopped means the operation was stopped. OperationStatusStopped OperationStatus = "STOPPED" // OperationStatusTimedOut means the operation timed out. OperationStatusTimedOut OperationStatus = "TIMED_OUT" )
type OperationSubType ¶
type OperationSubType string
OperationSubType identifies a higher-level SDK primitive within an OperationType.
const ( // OperationSubTypeStep is used by DurableContext.Step. OperationSubTypeStep OperationSubType = "Step" // OperationSubTypeWait is used by DurableContext.Wait. OperationSubTypeWait OperationSubType = "Wait" // OperationSubTypeCallback is used by DurableContext.CreateCallback. OperationSubTypeCallback OperationSubType = "Callback" // OperationSubTypeRunInChild is used by DurableContext.RunInChildContext. OperationSubTypeRunInChild OperationSubType = "RunInChildContext" // OperationSubTypeMap is the top-level Map context subtype. OperationSubTypeMap OperationSubType = "Map" // OperationSubTypeMapIteration is the per-item Map child subtype. OperationSubTypeMapIteration OperationSubType = "MapIteration" // OperationSubTypeParallel is the top-level Parallel context subtype. OperationSubTypeParallel OperationSubType = "Parallel" // OperationSubTypeParallelBranch is the per-branch Parallel child subtype. OperationSubTypeParallelBranch OperationSubType = "ParallelBranch" // OperationSubTypeWaitForCallback is used by DurableContext.WaitForCallback. OperationSubTypeWaitForCallback OperationSubType = "WaitForCallback" // OperationSubTypeWaitForCondition is used by DurableContext.WaitForCondition. OperationSubTypeWaitForCondition OperationSubType = "WaitForCondition" // OperationSubTypeChainedInvoke is used by DurableContext.Invoke. OperationSubTypeChainedInvoke OperationSubType = "ChainedInvoke" )
type OperationType ¶
type OperationType string
OperationType is the durable backend operation category.
const ( // OperationTypeExecution represents the top-level handler execution. OperationTypeExecution OperationType = "EXECUTION" // OperationTypeStep represents a durable step. OperationTypeStep OperationType = "STEP" // OperationTypeWait represents a durable timer. OperationTypeWait OperationType = "WAIT" // OperationTypeCallback represents an external callback wait. OperationTypeCallback OperationType = "CALLBACK" // OperationTypeContext represents a child durable context. OperationTypeContext OperationType = "CONTEXT" // OperationTypeChainedInvoke represents a durable chained Lambda invoke. OperationTypeChainedInvoke OperationType = "CHAINED_INVOKE" )
type OperationUpdate ¶
type OperationUpdate struct {
ID string `json:"Id,omitempty"`
ParentID string `json:"ParentId,omitempty"`
Name string `json:"Name,omitempty"`
Type OperationType `json:"Type,omitempty"`
SubType OperationSubType `json:"SubType,omitempty"`
Action OperationAction `json:"Action,omitempty"`
Payload string `json:"Payload,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
StepOptions *StepOptions `json:"StepOptions,omitempty"`
WaitOptions *WaitOptions `json:"WaitOptions,omitempty"`
CallbackOptions *CallbackOptions `json:"CallbackOptions,omitempty"`
ChainedInvokeOptions *ChainedInvokeOptions `json:"ChainedInvokeOptions,omitempty"`
ContextOptions *ContextOptions `json:"ContextOptions,omitempty"`
}
OperationUpdate is sent to DurableExecutionClient.Checkpoint to change operation state.
type ParallelConfig ¶
type ParallelConfig struct {
MaxConcurrency int
Serdes Serdes
ItemSerdes Serdes
CompletionConfig *CompletionConfig
}
ParallelConfig configures DurableContext.Parallel.
type ParallelFunc ¶
type ParallelFunc func(context *DurableContext) (any, error)
ParallelFunc executes one branch in DurableContext.Parallel.
type PassThroughSerdes ¶
type PassThroughSerdes struct{}
PassThroughSerdes passes string values through unchanged and JSON-serializes non-string values.
func (PassThroughSerdes) Deserialize ¶
func (PassThroughSerdes) Deserialize(data string, _ SerdesContext) (any, error)
Deserialize returns data unchanged.
func (PassThroughSerdes) Serialize ¶
func (PassThroughSerdes) Serialize(value any, _ SerdesContext) (string, error)
Serialize returns string values unchanged and JSON-serializes other values.
type RetryDecision ¶
RetryDecision is returned by a RetryStrategy.
type RetryStrategy ¶
type RetryStrategy func(err error, attempt int) RetryDecision
RetryStrategy decides whether a failed operation should be retried.
attempt is one-based for the retry attempt being scheduled.
type Serdes ¶
type Serdes interface {
Serialize(value any, ctx SerdesContext) (string, error)
Deserialize(data string, ctx SerdesContext) (any, error)
}
Serdes serializes values into checkpoint payload strings and deserializes them back into Go values.
type SerdesContext ¶
type SerdesContext struct {
// EntityID is the durable operation ID before backend hashing.
EntityID string
// DurableExecutionArn identifies the current durable execution.
DurableExecutionArn string
}
SerdesContext provides operation metadata to serializers.
type StepConfig ¶
type StepConfig struct {
// RetryStrategy controls retry scheduling after a step failure. A default
// exponential backoff strategy is used when nil.
RetryStrategy RetryStrategy
// Semantics controls how interrupted started steps are treated on replay.
Semantics StepSemantics
// Serdes serializes successful step results into checkpoint payloads.
Serdes Serdes
}
StepConfig configures DurableContext.Step.
type StepContext ¶
type StepContext struct {
// Logger receives log messages for the current step.
Logger Logger
}
StepContext contains metadata and helpers passed to a StepFunc.
type StepDetails ¶
type StepDetails struct {
Result string `json:"Result,omitempty"`
Error *ErrorObject `json:"Error,omitempty"`
Attempt int `json:"Attempt,omitempty"`
NextAttemptTimestamp *time.Time `json:"NextAttemptTimestamp,omitempty"`
}
StepDetails contains persisted state for a step-like operation.
type StepFunc ¶
type StepFunc func(ctx context.Context, stepCtx StepContext) (any, error)
StepFunc is the function executed by DurableContext.Step.
Step functions should be safe to retry according to the configured StepSemantics and RetryStrategy.
type StepOptions ¶
type StepOptions struct {
NextAttemptDelaySeconds int `json:"NextAttemptDelaySeconds,omitempty"`
}
StepOptions are checkpoint options for step retry scheduling.
type StepSemantics ¶
type StepSemantics string
StepSemantics controls how Step handles an operation that was started but not completed before the invocation stopped.
const ( // StepSemanticsAtLeastOncePerRetry allows a started step to run again on the // same retry attempt after replay. StepSemanticsAtLeastOncePerRetry StepSemantics = "AT_LEAST_ONCE_PER_RETRY" // StepSemanticsAtMostOncePerRetry treats an interrupted started step as a // failure and schedules the next retry attempt. StepSemanticsAtMostOncePerRetry StepSemantics = "AT_MOST_ONCE_PER_RETRY" )
type TerminationDetails ¶
type TerminationDetails struct {
Reason TerminationReason
Message string
Error error
}
TerminationDetails describes why the current invocation should stop early.
type TerminationManager ¶
type TerminationManager struct {
// contains filtered or unexported fields
}
TerminationManager coordinates early termination across concurrent durable operations in one invocation.
func NewTerminationManager ¶
func NewTerminationManager() *TerminationManager
NewTerminationManager creates an unterminated manager.
func (*TerminationManager) Channel ¶
func (m *TerminationManager) Channel() <-chan TerminationDetails
Channel is closed by sending the first termination details.
func (*TerminationManager) Details ¶
func (m *TerminationManager) Details() (TerminationDetails, bool)
Details returns termination details and whether termination has happened.
func (*TerminationManager) IsTerminated ¶
func (m *TerminationManager) IsTerminated() bool
IsTerminated reports whether Terminate has been called.
func (*TerminationManager) Terminate ¶
func (m *TerminationManager) Terminate(details TerminationDetails)
Terminate records termination details and notifies waiters once.
type TerminationReason ¶
type TerminationReason string
TerminationReason explains why the current invocation stopped before normal handler completion.
const ( // TerminationReasonOperationTerminated means an operation requested // termination of the current invocation. TerminationReasonOperationTerminated TerminationReason = "OPERATION_TERMINATED" // TerminationReasonRetryScheduled means a retry timer was scheduled. TerminationReasonRetryScheduled TerminationReason = "RETRY_SCHEDULED" // TerminationReasonRetryInterruptedStep means an interrupted at-most-once // step scheduled a retry. TerminationReasonRetryInterruptedStep TerminationReason = "RETRY_INTERRUPTED_STEP" // TerminationReasonWaitScheduled means a wait timer was scheduled. TerminationReasonWaitScheduled TerminationReason = "WAIT_SCHEDULED" // TerminationReasonCallbackPending means a callback is waiting externally. TerminationReasonCallbackPending TerminationReason = "CALLBACK_PENDING" // TerminationReasonCheckpointFailed means checkpoint persistence failed. TerminationReasonCheckpointFailed TerminationReason = "CHECKPOINT_FAILED" // TerminationReasonSerdesFailed means serialization or deserialization failed. TerminationReasonSerdesFailed TerminationReason = "SERDES_FAILED" // TerminationReasonContextValidation means a durable operation used the wrong // context. TerminationReasonContextValidation TerminationReason = "CONTEXT_VALIDATION_ERROR" // TerminationReasonCustom is available for user-defined termination. TerminationReasonCustom TerminationReason = "CUSTOM" )
type UnrecoverableError ¶
type UnrecoverableError interface {
error
TerminationReason() TerminationReason
InvocationLevel() bool
ExecutionLevel() bool
}
UnrecoverableError marks an error that should stop the current invocation or execution instead of being converted into InvocationStatusFailed.
type WaitDetails ¶
type WaitDetails struct {
ScheduledEndTimestamp *time.Time `json:"ScheduledEndTimestamp,omitempty"`
}
WaitDetails contains persisted state for a wait operation.
type WaitForCallbackConfig ¶
type WaitForCallbackConfig struct {
Timeout *Duration
HeartbeatTimeout *Duration
RetryStrategy RetryStrategy
Serdes Serdes
}
WaitForCallbackConfig configures DurableContext.WaitForCallback.
type WaitForCallbackContext ¶
type WaitForCallbackContext struct {
Logger Logger
}
WaitForCallbackContext is passed to a WaitForCallback submitter.
type WaitForCallbackSubmitterFunc ¶
type WaitForCallbackSubmitterFunc func(callbackID string, ctx WaitForCallbackContext) error
WaitForCallbackSubmitterFunc sends a callback ID to an external system.
type WaitForConditionCheckFunc ¶
type WaitForConditionCheckFunc func(state any, ctx WaitForConditionContext) (any, error)
WaitForConditionCheckFunc evaluates and returns the next condition state.
type WaitForConditionConfig ¶
type WaitForConditionConfig struct {
WaitStrategy WaitForConditionWaitStrategyFunc
InitialState any
Serdes Serdes
}
WaitForConditionConfig configures DurableContext.WaitForCondition.
type WaitForConditionContext ¶
type WaitForConditionContext struct {
Logger Logger
}
WaitForConditionContext is passed to a WaitForCondition check function.
type WaitForConditionDecision ¶
WaitForConditionDecision is returned by a WaitForCondition wait strategy.
type WaitForConditionWaitStrategyFunc ¶
type WaitForConditionWaitStrategyFunc func(state any, attempt int) WaitForConditionDecision
WaitForConditionWaitStrategyFunc decides whether WaitForCondition should wait and retry or complete with the current state.
type WaitOptions ¶
type WaitOptions struct {
WaitSeconds int `json:"WaitSeconds,omitempty"`
}
WaitOptions are checkpoint options for durable waits.
type WrappedHandler ¶
type WrappedHandler func(ctx context.Context, input InvocationInput) (InvocationOutput, error)
WrappedHandler is the handler shape returned by WithDurableExecution.
func WithDurableExecution ¶
func WithDurableExecution(handler DurableExecutionHandler, config DurableExecutionConfig) WrappedHandler
WithDurableExecution wraps a DurableExecutionHandler with replay, checkpointing, and pending-status handling.
The returned handler consumes InvocationInput from the durable backend. It returns InvocationStatusPending when execution stopped because a durable wait, callback, retry timer, or child operation is still pending. Infrastructure and unrecoverable invocation errors are returned as Go errors.
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/kurochan/aws-durable-execution-go/durable"
)
func exampleInput(arn, token, payload string) durable.InvocationInput {
return durable.InvocationInput{
DurableExecutionArn: arn,
CheckpointToken: token,
InitialExecutionState: durable.ExecutionState{
Operations: []durable.Operation{{
ID: "execution-root",
Type: durable.OperationTypeExecution,
Status: durable.OperationStatusStarted,
ExecutionDetails: &durable.ExecutionDetails{
InputPayload: payload,
},
}},
},
}
}
func main() {
client := durable.NewInMemoryClient()
wrapped := durable.WithDurableExecution(
func(ctx context.Context, event any, dctx *durable.DurableContext) (any, error) {
result, err := dctx.Step(ctx, "build-greeting", func(context.Context, durable.StepContext) (any, error) {
input := event.(map[string]any)
return "hello " + input["name"].(string), nil
}, nil).Await(ctx)
if err != nil {
return nil, err
}
return result, nil
},
durable.DurableExecutionConfig{Client: client},
)
out, err := wrapped(context.Background(), exampleInput(
"arn:test:execution:example",
"token-example",
`{"name":"alice"}`,
))
if err != nil {
panic(err)
}
var result string
if err := json.Unmarshal([]byte(out.Result), &result); err != nil {
panic(err)
}
fmt.Println(out.Status)
fmt.Println(result)
}
Output: SUCCEEDED hello alice
Source Files
¶
- aws_durable_execution_client.go
- batch_result_serdes.go
- callback_condition.go
- checkpoint_manager.go
- concurrent.go
- context_tracker.go
- doc.go
- durable_context.go
- errors.go
- execution_context.go
- future.go
- hash.go
- invocation_input_parser.go
- memory_client.go
- replay_validation.go
- serdes.go
- termination.go
- types.go
- with_durable_execution.go