durable

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package durable provides an experimental Go implementation of AWS Durable Execution style workflow helpers.

The package is centered around WithDurableExecution, which wraps a Lambda style handler and provides a DurableContext. A handler uses DurableContext to record durable operations such as Step, Wait, Invoke, callbacks, child contexts, Map, and Parallel. Each operation returns a Future; call Await to either receive a replayed result from the checkpoint state or wait for the operation to complete.

Durable handlers must be deterministic in their durable operation order. On a replay, the SDK validates that the next operation has the same type, subtype, and name as the checkpointed operation. Non-deterministic changes fail the invocation rather than silently binding results to a different operation.

This repository is an independent experimental implementation and is not an official AWS SDK. Public API and checkpoint payload compatibility may change.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClassifyCheckpointError

func ClassifyCheckpointError(err error) error

ClassifyCheckpointError classifies backend checkpoint errors into unrecoverable invocation or execution errors.

func DurableOperationErrorFromErrorObject

func DurableOperationErrorFromErrorObject(eo *ErrorObject) error

DurableOperationErrorFromErrorObject converts a checkpointed ErrorObject into a Go error.

func HashID

func HashID(input string) string

HashID returns the stable short operation ID used for backend checkpoint records.

func InitializeExecutionContext

func InitializeExecutionContext(ctx context.Context, input InvocationInput, client DurableExecutionClient, requestID, tenantID string) (*ExecutionContext, DurableExecutionMode, string, error)

InitializeExecutionContext constructs an ExecutionContext from InvocationInput and loads any additional paginated backend state.

func IsUnrecoverableError

func IsUnrecoverableError(err error) bool

IsUnrecoverableError reports whether err implements UnrecoverableError.

func IsUnrecoverableInvocationError

func IsUnrecoverableInvocationError(err error) bool

IsUnrecoverableInvocationError reports whether err is unrecoverable at the invocation level.

func NewCallbackError

func NewCallbackError(msg string, cause error, data string) error

NewCallbackError creates an error for a failed callback operation.

func NewCallbackSubmitterError

func NewCallbackSubmitterError(msg string, cause error, data string) error

NewCallbackSubmitterError creates an error for a failed WaitForCallback submitter step.

func NewCallbackTimeoutError

func NewCallbackTimeoutError(msg string, cause error, data string) error

NewCallbackTimeoutError creates an error for a timed-out callback.

func NewChildContextError

func NewChildContextError(msg string, cause error, data string) error

NewChildContextError creates an error for a failed child context.

func NewInvokeError

func NewInvokeError(msg string, cause error, data string) error

NewInvokeError creates an error for a failed chained invocation.

func NewStepError

func NewStepError(msg string, cause error, data string) error

NewStepError creates an error for a failed Step operation.

func NewUnrecoverableExecutionError

func NewUnrecoverableExecutionError(reason TerminationReason, msg string, cause error) error

NewUnrecoverableExecutionError creates an unrecoverable error scoped to the whole durable execution.

func NewUnrecoverableInvocationError

func NewUnrecoverableInvocationError(reason TerminationReason, msg string, cause error) error

NewUnrecoverableInvocationError creates an unrecoverable error scoped to the current invocation.

func NewWaitForConditionError

func NewWaitForConditionError(msg string, cause error, data string) error

NewWaitForConditionError creates an error for a failed WaitForCondition.

func SafeDeserialize

func SafeDeserialize(serdes Serdes, data string, stepID, stepName string, tm *TerminationManager, durableExecutionArn string) (any, error)

SafeDeserialize deserializes data and terminates the invocation on deserialization failure.

func SafeSerialize

func SafeSerialize(serdes Serdes, value any, stepID, stepName string, tm *TerminationManager, durableExecutionArn string) (string, error)

SafeSerialize serializes value and terminates the invocation on serialization failure.

func ValidateContextUsage

func ValidateContextUsage(ctx context.Context, operationContextID, operationName string, tm *TerminationManager)

ValidateContextUsage terminates the invocation when a durable operation is called with a context from a different DurableContext.

func ValidateReplayConsistency

func ValidateReplayConsistency(stepID string, current operationDescriptor, checkpointData *Operation, execCtx *ExecutionContext) error

ValidateReplayConsistency verifies that the current durable operation matches the checkpointed operation at the same call position.

func WithActiveOperation

func WithActiveOperation(ctx context.Context, op ActiveOperation) context.Context

WithActiveOperation stores active durable operation metadata in ctx.

Types

type APIError

type APIError struct {
	StatusCode int
	Code       string
	Message    string
}

APIError is a normalized backend API error used for checkpoint error classification.

func (*APIError) Error

func (e *APIError) Error() string

Error returns a formatted backend API error message.

type AWSDurableExecutionClient

type AWSDurableExecutionClient struct {
	// contains filtered or unexported fields
}

AWSDurableExecutionClient implements DurableExecutionClient with the AWS Lambda Durable Execution APIs.

func NewAWSDurableExecutionClient

func NewAWSDurableExecutionClient(client *lambdasdk.Client) *AWSDurableExecutionClient

NewAWSDurableExecutionClient wraps an existing AWS Lambda client.

func NewDefaultAWSDurableExecutionClient

func NewDefaultAWSDurableExecutionClient(ctx context.Context, optFns ...func(*awsconfig.LoadOptions) error) (*AWSDurableExecutionClient, error)

NewDefaultAWSDurableExecutionClient loads the default AWS configuration and creates an AWSDurableExecutionClient.

func (*AWSDurableExecutionClient) Checkpoint

Checkpoint persists durable operation updates through AWS Lambda.

func (*AWSDurableExecutionClient) GetExecutionState

GetExecutionState loads durable execution state from AWS Lambda.

type ActiveOperation

type ActiveOperation struct {
	ContextID string
	ParentID  string
	Attempt   int
	Mode      DurableExecutionMode
}

ActiveOperation identifies the durable context currently associated with a context.Context.

func GetActiveOperation

func GetActiveOperation(ctx context.Context) (ActiveOperation, bool)

GetActiveOperation returns active durable operation metadata from ctx.

type BatchCompletionReason

type BatchCompletionReason string

BatchCompletionReason explains why a batch operation stopped scheduling or completed.

const (
	// BatchCompletionReasonAllCompleted means all scheduled items completed.
	BatchCompletionReasonAllCompleted BatchCompletionReason = "ALL_COMPLETED"
	// BatchCompletionReasonMinSuccessfulReached means MinSuccessful was reached.
	BatchCompletionReasonMinSuccessfulReached BatchCompletionReason = "MIN_SUCCESSFUL_REACHED"
	// BatchCompletionReasonFailureToleranceExceeded means failure tolerance was
	// exceeded.
	BatchCompletionReasonFailureToleranceExceeded BatchCompletionReason = "FAILURE_TOLERANCE_EXCEEDED"
)

type BatchItem

type BatchItem struct {
	Result any             `json:"result,omitempty"`
	Error  *ErrorObject    `json:"error,omitempty"`
	Index  int             `json:"index"`
	Status BatchItemStatus `json:"status"`
}

BatchItem is one item result in a BatchResult.

type BatchItemStatus

type BatchItemStatus string

BatchItemStatus is the status of one Map, Parallel, or concurrent item.

const (
	// BatchItemStatusSucceeded means the item completed successfully.
	BatchItemStatusSucceeded BatchItemStatus = "SUCCEEDED"
	// BatchItemStatusFailed means the item completed with an error.
	BatchItemStatusFailed BatchItemStatus = "FAILED"
	// BatchItemStatusStarted means the item was started but not completed before
	// scheduling stopped.
	BatchItemStatusStarted BatchItemStatus = "STARTED"
)

type BatchResult

type BatchResult struct {
	All              []BatchItem           `json:"all"`
	CompletionReason BatchCompletionReason `json:"completionReason"`
}

BatchResult is returned by Map, Parallel, and ExecuteConcurrently.

func (*BatchResult) Errors

func (r *BatchResult) Errors() []error

Errors returns errors for failed items.

func (*BatchResult) Failed

func (r *BatchResult) Failed() []BatchItem

Failed returns all failed items.

func (*BatchResult) FailureCount

func (r *BatchResult) FailureCount() int

FailureCount returns the number of failed items.

func (*BatchResult) HasFailure

func (r *BatchResult) HasFailure() bool

HasFailure reports whether any item failed.

func (*BatchResult) Results

func (r *BatchResult) Results() []any

Results returns successful item results in batch order.

func (*BatchResult) Started

func (r *BatchResult) Started() []BatchItem

Started returns all items that were started but did not complete.

func (*BatchResult) StartedCount

func (r *BatchResult) StartedCount() int

StartedCount returns the number of started but incomplete items.

func (*BatchResult) Status

func (r *BatchResult) Status() BatchItemStatus

Status returns BatchItemStatusFailed if any item failed, otherwise BatchItemStatusSucceeded.

func (*BatchResult) Succeeded

func (r *BatchResult) Succeeded() []BatchItem

Succeeded returns all successful items.

func (*BatchResult) SuccessCount

func (r *BatchResult) SuccessCount() int

SuccessCount returns the number of successful items.

func (*BatchResult) ThrowIfError

func (r *BatchResult) ThrowIfError() error

ThrowIfError returns the first failed item error, if any.

func (*BatchResult) TotalCount

func (r *BatchResult) TotalCount() int

TotalCount returns the number of items represented in the result.

type BatchResultSerdes

type BatchResultSerdes struct{}

BatchResultSerdes serializes BatchResult values for Map, Parallel, and ExecuteConcurrently.

func (BatchResultSerdes) Deserialize

func (BatchResultSerdes) Deserialize(data string, _ SerdesContext) (any, error)

Deserialize converts JSON into a *BatchResult.

func (BatchResultSerdes) Serialize

func (BatchResultSerdes) Serialize(value any, _ SerdesContext) (string, error)

Serialize converts a BatchResult or compatible value to JSON.

type CallbackDetails

type CallbackDetails struct {
	CallbackID                string       `json:"CallbackId,omitempty"`
	Result                    string       `json:"Result,omitempty"`
	Error                     *ErrorObject `json:"Error,omitempty"`
	ScheduledTimeoutTimestamp *time.Time   `json:"ScheduledTimeoutTimestamp,omitempty"`
}

CallbackDetails contains persisted state for a callback operation.

type CallbackOptions

type CallbackOptions struct {
	TimeoutSeconds          int `json:"TimeoutSeconds,omitempty"`
	HeartbeatTimeoutSeconds int `json:"HeartbeatTimeoutSeconds,omitempty"`
}

CallbackOptions are checkpoint options for callbacks.

type ChainedInvokeDetails

type ChainedInvokeDetails struct {
	Result string       `json:"Result,omitempty"`
	Error  *ErrorObject `json:"Error,omitempty"`
}

ChainedInvokeDetails contains persisted state for a chained invocation.

type ChainedInvokeOptions

type ChainedInvokeOptions struct {
	FunctionName string `json:"FunctionName,omitempty"`
}

ChainedInvokeOptions are checkpoint options for chained invocation.

type CheckpointManager

type CheckpointManager struct {
	// contains filtered or unexported fields
}

CheckpointManager batches checkpoint writes and tracks local operation lifecycle state for one invocation.

It is part of the SDK runtime; most applications should not need to use it directly.

func NewCheckpointManager

func NewCheckpointManager(
	durableExecutionArn string,
	stepData map[string]Operation,
	stepDataMu *sync.RWMutex,
	client DurableExecutionClient,
	termination *TerminationManager,
	checkpointToken string,
	logger Logger,
) *CheckpointManager

NewCheckpointManager creates a CheckpointManager for an execution.

func (*CheckpointManager) Checkpoint

func (m *CheckpointManager) Checkpoint(ctx context.Context, stepID string, update OperationUpdate) error

Checkpoint enqueues one operation update and waits until it has been flushed to the backend.

func (*CheckpointManager) ForceCheckpoint

func (m *CheckpointManager) ForceCheckpoint(ctx context.Context) error

ForceCheckpoint flushes pending backend state without adding an operation update.

func (*CheckpointManager) GetOperationState

func (m *CheckpointManager) GetOperationState(stepID string) OperationLifecycleState

GetOperationState returns the local lifecycle state for stepID.

func (*CheckpointManager) MarkAncestorFinished

func (m *CheckpointManager) MarkAncestorFinished(stepID string)

MarkAncestorFinished records that a child-context ancestor has completed.

func (*CheckpointManager) MarkOperationAwaited

func (m *CheckpointManager) MarkOperationAwaited(stepID string)

MarkOperationAwaited records that a future for stepID has been awaited.

func (*CheckpointManager) MarkOperationState

func (m *CheckpointManager) MarkOperationState(stepID string, state OperationLifecycleState, metadata OperationMetadata, endTimestamp *time.Time)

MarkOperationState updates local lifecycle state for a durable operation.

func (*CheckpointManager) SetTerminating

func (m *CheckpointManager) SetTerminating()

SetTerminating prevents new checkpoint work from being enqueued.

func (*CheckpointManager) WaitForQueueCompletion

func (m *CheckpointManager) WaitForQueueCompletion(ctx context.Context) error

WaitForQueueCompletion waits until all queued checkpoint writes complete.

func (*CheckpointManager) WaitForRetryTimer

func (m *CheckpointManager) WaitForRetryTimer(ctx context.Context, stepID string) error

WaitForRetryTimer waits for a retry timer and then polls for operation completion.

func (*CheckpointManager) WaitForStatusChange

func (m *CheckpointManager) WaitForStatusChange(ctx context.Context, stepID string) error

WaitForStatusChange polls backend state until stepID reaches a terminal status or the context is canceled.

type CheckpointRequest

type CheckpointRequest struct {
	DurableExecutionArn string
	CheckpointToken     string
	Updates             []OperationUpdate
}

CheckpointRequest persists operation updates to a durable backend.

type CheckpointResponse

type CheckpointResponse struct {
	CheckpointToken   string
	NewExecutionState *ExecutionState
}

CheckpointResponse is returned after checkpoint persistence.

type ChildConfig

type ChildConfig struct {
	// Serdes serializes the child context result.
	Serdes Serdes
	// SubType identifies the child context operation subtype in checkpoints.
	SubType OperationSubType
	// SummaryGenerator optionally stores a compact summary instead of the full
	// result when the child context needs replay-child reconstruction.
	SummaryGenerator func(value any) string
	// ErrorMapper can rewrite child errors before they are returned.
	ErrorMapper func(error) error
}

ChildConfig configures DurableContext.RunInChildContext.

type ChildFunc

type ChildFunc func(ctx context.Context, child *DurableContext) (any, error)

ChildFunc is executed inside a child DurableContext.

type CompletionConfig

type CompletionConfig struct {
	MinSuccessful              *int
	ToleratedFailureCount      *int
	ToleratedFailurePercentage *float64
}

CompletionConfig controls when a batch-style operation can stop early.

type ConcurrencyConfig

type ConcurrencyConfig struct {
	MaxConcurrency   int
	TopLevelSubType  OperationSubType
	IterationSubType OperationSubType
	SummaryGenerator func(result *BatchResult) string
	Serdes           Serdes
	ItemSerdes       Serdes
	CompletionConfig *CompletionConfig
}

ConcurrencyConfig configures DurableContext.ExecuteConcurrently.

type ConcurrentExecutionItem

type ConcurrentExecutionItem struct {
	ID    string `json:"id"`
	Data  any    `json:"data,omitempty"`
	Index int    `json:"index"`
	Name  string `json:"name,omitempty"`
}

ConcurrentExecutionItem describes one item for ExecuteConcurrently.

type ConcurrentExecutor

type ConcurrentExecutor func(item ConcurrentExecutionItem, childContext *DurableContext) (any, error)

ConcurrentExecutor processes one ExecuteConcurrently item.

type ContextDetails

type ContextDetails struct {
	Result         string       `json:"Result,omitempty"`
	ReplayChildren bool         `json:"ReplayChildren,omitempty"`
	Error          *ErrorObject `json:"Error,omitempty"`
}

ContextDetails contains persisted state for a child context operation.

type ContextOptions

type ContextOptions struct {
	ReplayChildren bool `json:"ReplayChildren,omitempty"`
}

ContextOptions are checkpoint options for child contexts.

type CreateCallbackConfig

type CreateCallbackConfig struct {
	Timeout          *Duration
	HeartbeatTimeout *Duration
	Serdes           Serdes
}

CreateCallbackConfig configures DurableContext.CreateCallback.

type CreateCallbackResult

type CreateCallbackResult struct {
	Promise    *Future[any]
	CallbackID string
}

CreateCallbackResult is returned by DurableContext.CreateCallback.

type DurableContext

type DurableContext struct {
	// contains filtered or unexported fields
}

DurableContext schedules durable workflow operations for one handler invocation.

DurableContext is passed to a DurableExecutionHandler. Its methods create deterministic operation IDs from call order, so durable operations must be called in the same order on every replay.

func NewDurableContext

func NewDurableContext(execCtx *ExecutionContext, mode DurableExecutionMode, logger Logger, stepPrefix, parentID string, checkpoint *CheckpointManager) *DurableContext

NewDurableContext constructs a DurableContext for an execution context.

Most applications should use WithDurableExecution instead of constructing a DurableContext directly.

func (*DurableContext) CreateCallback

CreateCallback creates a durable callback operation and returns its callback ID plus a promise for the eventual callback result.

The callback ID is intended to be given to an external system. The returned promise completes when the durable backend records callback success, failure, or timeout.

func (*DurableContext) ExecuteConcurrently

func (c *DurableContext) ExecuteConcurrently(
	ctx context.Context,
	name string,
	items []ConcurrentExecutionItem,
	executor ConcurrentExecutor,
	cfg *ConcurrencyConfig,
) *Future[*BatchResult]

ExecuteConcurrently executes durable child contexts for items with optional concurrency and completion controls.

It is the lower-level primitive behind Map and Parallel. Most callers should prefer those typed helpers unless they need custom ConcurrentExecutionItem metadata or operation subtypes.

func (*DurableContext) Invoke

func (c *DurableContext) Invoke(ctx context.Context, name string, functionName string, input any, cfg *InvokeConfig) *Future[any]

Invoke schedules a chained Lambda invocation and returns a future for its result.

functionName is passed through to the durable backend. input is serialized with cfg.PayloadSerdes or JSONSerdes, and successful results are deserialized with cfg.ResultSerdes or JSONSerdes.

func (*DurableContext) Map

func (c *DurableContext) Map(
	ctx context.Context,
	name string,
	items []any,
	mapFunc MapFunc,
	cfg *MapConfig,
) *Future[*BatchResult]

Map applies mapFunc to each item in a durable child context and returns a BatchResult.

Each item receives a stable index and a child DurableContext. cfg can limit concurrency, customize item names, and stop scheduling after completion criteria are met.

Example
package main

import (
	"context"
	"fmt"

	"github.com/kurochan/aws-durable-execution-go/durable"
)

func exampleInput(arn, token, payload string) durable.InvocationInput {
	return durable.InvocationInput{
		DurableExecutionArn: arn,
		CheckpointToken:     token,
		InitialExecutionState: durable.ExecutionState{
			Operations: []durable.Operation{{
				ID:     "execution-root",
				Type:   durable.OperationTypeExecution,
				Status: durable.OperationStatusStarted,
				ExecutionDetails: &durable.ExecutionDetails{
					InputPayload: payload,
				},
			}},
		},
	}
}

func main() {
	client := durable.NewInMemoryClient()
	wrapped := durable.WithDurableExecution(
		func(ctx context.Context, _ any, dctx *durable.DurableContext) (any, error) {
			result, err := dctx.Map(ctx, "double", []any{1, 2, 3},
				func(_ *durable.DurableContext, item any, _ int, _ []any) (any, error) {
					return item.(int) * 2, nil
				},
				&durable.MapConfig{MaxConcurrency: 2},
			).Await(ctx)
			if err != nil {
				return nil, err
			}
			return map[string]any{
				"total":   result.TotalCount(),
				"success": result.SuccessCount(),
				"values":  result.Results(),
			}, nil
		},
		durable.DurableExecutionConfig{Client: client},
	)

	out, err := wrapped(context.Background(), exampleInput(
		"arn:test:execution:map-example",
		"token-map-example",
		`{}`,
	))
	if err != nil {
		panic(err)
	}
	fmt.Println(out.Status)
	fmt.Println(out.Result)

}
Output:
SUCCEEDED
{"success":3,"total":3,"values":[2,4,6]}

func (*DurableContext) Parallel

func (c *DurableContext) Parallel(
	ctx context.Context,
	name string,
	branches []NamedParallelBranch,
	cfg *ParallelConfig,
) *Future[*BatchResult]

Parallel executes named branches in durable child contexts and returns a BatchResult.

Branch order is part of replay behavior. Keep the branch slice stable across replays.

func (*DurableContext) RunInChildContext

func (c *DurableContext) RunInChildContext(ctx context.Context, name string, fn ChildFunc, cfg *ChildConfig) *Future[any]

RunInChildContext executes fn with a child DurableContext and checkpoints the child context as one durable operation.

Child contexts are useful for grouping nested operations and for Map and Parallel internals. name and cfg.SubType are part of replay validation.

func (*DurableContext) Step

func (c *DurableContext) Step(ctx context.Context, name string, fn StepFunc, cfg *StepConfig) *Future[any]

Step runs fn as a durable step and returns a future for its result.

On replay, a completed step returns its checkpointed result without running fn again. Failed steps are retried according to cfg.RetryStrategy. Step names are part of replay validation and should be stable.

func (*DurableContext) Wait

func (c *DurableContext) Wait(ctx context.Context, name string, duration Duration) *Future[struct{}]

Wait schedules a durable timer and returns a future that completes after the backend marks the wait operation as succeeded.

Non-positive durations are normalized to one second. While the timer is pending, WithDurableExecution returns InvocationStatusPending.

Example
package main

import (
	"context"
	"fmt"

	"github.com/kurochan/aws-durable-execution-go/durable"
)

func exampleInput(arn, token, payload string) durable.InvocationInput {
	return durable.InvocationInput{
		DurableExecutionArn: arn,
		CheckpointToken:     token,
		InitialExecutionState: durable.ExecutionState{
			Operations: []durable.Operation{{
				ID:     "execution-root",
				Type:   durable.OperationTypeExecution,
				Status: durable.OperationStatusStarted,
				ExecutionDetails: &durable.ExecutionDetails{
					InputPayload: payload,
				},
			}},
		},
	}
}

func main() {
	client := durable.NewInMemoryClient()
	wrapped := durable.WithDurableExecution(
		func(ctx context.Context, _ any, dctx *durable.DurableContext) (any, error) {
			if _, err := dctx.Wait(ctx, "cooldown", durable.Duration{Seconds: 30}).Await(ctx); err != nil {
				return nil, err
			}
			return "done", nil
		},
		durable.DurableExecutionConfig{Client: client},
	)

	out, err := wrapped(context.Background(), exampleInput(
		"arn:test:execution:wait-example",
		"token-wait-example",
		`{}`,
	))
	if err != nil {
		panic(err)
	}
	fmt.Println(out.Status)

}
Output:
PENDING

func (*DurableContext) WaitForCallback

func (c *DurableContext) WaitForCallback(ctx context.Context, name string, submitter WaitForCallbackSubmitterFunc, cfg *WaitForCallbackConfig) *Future[any]

WaitForCallback creates a callback, runs submitter in a durable step, and waits for the callback result.

submitter receives the callback ID and should send it to the external system that will complete the callback. submitter failures follow cfg.RetryStrategy when provided.

func (*DurableContext) WaitForCondition

func (c *DurableContext) WaitForCondition(ctx context.Context, name string, check WaitForConditionCheckFunc, cfg *WaitForConditionConfig) *Future[any]

WaitForCondition repeatedly runs check until cfg.WaitStrategy decides to stop waiting.

The state returned by check is checkpointed between attempts. When the wait strategy returns ShouldContinue, the SDK schedules a retry delay and the invocation becomes pending until the backend resumes it.

type DurableExecutionClient

type DurableExecutionClient interface {
	GetExecutionState(ctx context.Context, input GetExecutionStateRequest) (GetExecutionStateResponse, error)
	Checkpoint(ctx context.Context, input CheckpointRequest) (CheckpointResponse, error)
}

DurableExecutionClient is the backend contract used by the SDK.

Implement this interface to plug in a production backend or use NewInMemoryClient for tests and examples.

type DurableExecutionConfig

type DurableExecutionConfig struct {
	// Client persists and loads durable execution state.
	Client DurableExecutionClient
	// Logger receives SDK log messages. NopLogger is used when Logger is nil.
	Logger Logger
	// RequestID is optional metadata for the current platform invocation.
	RequestID string
	// TenantID is optional metadata for multi-tenant deployments.
	TenantID string
}

DurableExecutionConfig configures WithDurableExecution.

type DurableExecutionHandler

type DurableExecutionHandler func(ctx context.Context, event any, dctx *DurableContext) (any, error)

DurableExecutionHandler is the user handler executed inside a durable invocation.

event is the customer payload extracted from the execution operation in the invocation input. dctx is used to schedule durable operations. The handler should call Await on futures whose results are required before returning.

type DurableExecutionMode

type DurableExecutionMode string

DurableExecutionMode describes how a DurableContext should interpret checkpoint state for the current invocation.

const (
	// ExecutionMode schedules new durable operations.
	ExecutionMode DurableExecutionMode = "ExecutionMode"
	// ReplayMode reads existing operation results before switching to execution.
	ReplayMode DurableExecutionMode = "ReplayMode"
	// ReplaySucceededContext replays children of a context whose full result was
	// replaced by a compact summary.
	ReplaySucceededContext DurableExecutionMode = "ReplaySucceededContext"
)

type DurableOperationError

type DurableOperationError struct {
	Type    string
	Message string
	Cause   error
	Data    string
}

DurableOperationError is the common error type used for durable operation failures that can be serialized into ErrorObject.

func (*DurableOperationError) Error

func (e *DurableOperationError) Error() string

Error returns the durable operation error message.

func (*DurableOperationError) ToErrorObject

func (e *DurableOperationError) ToErrorObject() *ErrorObject

ToErrorObject converts e into a serializable ErrorObject.

func (*DurableOperationError) Unwrap

func (e *DurableOperationError) Unwrap() error

Unwrap returns the underlying cause.

type Duration

type Duration struct {
	Days    int `json:"days,omitempty"`
	Hours   int `json:"hours,omitempty"`
	Minutes int `json:"minutes,omitempty"`
	Seconds int `json:"seconds,omitempty"`
}

Duration is a serializable duration used by durable timers and retry delays.

func (Duration) ToDuration

func (d Duration) ToDuration() time.Duration

ToDuration converts d to a time.Duration.

func (Duration) ToSeconds

func (d Duration) ToSeconds() int

ToSeconds returns the total duration in seconds.

type ErrorObject

type ErrorObject struct {
	ErrorType    string   `json:"ErrorType,omitempty"`
	ErrorMessage string   `json:"ErrorMessage,omitempty"`
	ErrorData    string   `json:"ErrorData,omitempty"`
	StackTrace   []string `json:"StackTrace,omitempty"`
}

ErrorObject is the serialized durable error shape stored in checkpoints and invocation outputs.

func CreateErrorObjectFromError

func CreateErrorObjectFromError(err error, data string) *ErrorObject

CreateErrorObjectFromError converts err into the serialized ErrorObject representation used in invocation outputs and checkpoints.

type ExecutionContext

type ExecutionContext struct {
	// contains filtered or unexported fields
}

ExecutionContext holds durable execution state for one wrapped handler invocation.

Most applications interact with DurableContext instead of ExecutionContext.

func (*ExecutionContext) DurableExecutionArn

func (e *ExecutionContext) DurableExecutionArn() string

DurableExecutionArn returns the durable execution ARN for this invocation.

func (*ExecutionContext) DurableExecutionClient

func (e *ExecutionContext) DurableExecutionClient() DurableExecutionClient

DurableExecutionClient returns the backend client for this execution.

func (*ExecutionContext) GetStepData

func (e *ExecutionContext) GetStepData(stepID string) *Operation

GetStepData returns checkpoint data for a durable operation ID.

stepID is the SDK-generated unhashed operation ID. The lookup hashes it to match backend operation IDs.

func (*ExecutionContext) TerminationManager

func (e *ExecutionContext) TerminationManager() *TerminationManager

TerminationManager returns the invocation termination manager.

type ExecutionDetails

type ExecutionDetails struct {
	InputPayload string `json:"InputPayload,omitempty"`
}

ExecutionDetails contains metadata for the top-level execution operation.

type ExecutionState

type ExecutionState struct {
	Operations []Operation `json:"Operations"`
	NextMarker string      `json:"NextMarker,omitempty"`
}

ExecutionState is the checkpoint state included in an invocation input or returned by a durable backend.

type Future

type Future[T any] struct {
	// contains filtered or unexported fields
}

Future represents a durable operation whose result is produced when Await is called.

DurableContext methods return futures so the SDK can checkpoint an operation before the caller decides to wait for it. A Future starts at most once.

func NewFuture

func NewFuture[T any](exec func(context.Context) (T, error)) *Future[T]

NewFuture creates a Future backed by exec.

exec is invoked once, lazily, by the first Await call.

func NewNeverFuture

func NewNeverFuture[T any]() *Future[T]

NewNeverFuture returns a Future that only completes when its context is canceled.

It is used during replay paths where a previously succeeded child context should not execute operations that were not part of the checkpointed result.

func NewRejectedFuture

func NewRejectedFuture[T any](err error) *Future[T]

NewRejectedFuture returns a Future that fails with err.

func NewResolvedFuture

func NewResolvedFuture[T any](value T) *Future[T]

NewResolvedFuture returns a Future that resolves to value.

func (*Future[T]) Await

func (f *Future[T]) Await(ctx context.Context) (T, error)

Await starts the future if needed and waits for its result.

If ctx is canceled before the future completes, Await returns ctx.Err().

func (*Future[T]) Executed

func (f *Future[T]) Executed() bool

Executed reports whether the future has been started by Await.

type GetExecutionStateRequest

type GetExecutionStateRequest struct {
	DurableExecutionArn string
	CheckpointToken     string
	Marker              string
	MaxItems            int
}

GetExecutionStateRequest requests checkpoint state from a durable backend.

type GetExecutionStateResponse

type GetExecutionStateResponse struct {
	Operations []Operation
	NextMarker string
}

GetExecutionStateResponse contains a page of durable backend operations.

type InMemoryClient

type InMemoryClient struct {
	// contains filtered or unexported fields
}

InMemoryClient is a local DurableExecutionClient implementation for tests and local execution.

func NewInMemoryClient

func NewInMemoryClient() *InMemoryClient

NewInMemoryClient creates an empty in-memory durable backend.

func (*InMemoryClient) Checkpoint

Checkpoint applies operation updates to the in-memory store.

func (*InMemoryClient) CompleteCallback

func (c *InMemoryClient) CompleteCallback(callbackID string, payload string)

CompleteCallback marks an in-memory callback as succeeded.

func (*InMemoryClient) FailCallback

func (c *InMemoryClient) FailCallback(callbackID string, errObj *ErrorObject, timedOut bool)

FailCallback marks an in-memory callback as failed or timed out.

func (*InMemoryClient) GetExecutionState

GetExecutionState returns all in-memory operations.

func (*InMemoryClient) SetOperation

func (c *InMemoryClient) SetOperation(op Operation)

SetOperation inserts or replaces an operation in the in-memory store.

type InvocationInput

type InvocationInput struct {
	DurableExecutionArn   string         `json:"DurableExecutionArn"`
	CheckpointToken       string         `json:"CheckpointToken"`
	InitialExecutionState ExecutionState `json:"InitialExecutionState"`
}

InvocationInput is the payload consumed by a WrappedHandler.

func ParseInvocationInput

func ParseInvocationInput(raw []byte) (InvocationInput, error)

ParseInvocationInput parses invocation payload from durable backend. The backend may encode timestamp fields as either ISO-8601 strings or epoch numbers.

type InvocationOutput

type InvocationOutput struct {
	Status InvocationStatus `json:"Status"`
	Result string           `json:"Result,omitempty"`
	Error  *ErrorObject     `json:"Error,omitempty"`
}

InvocationOutput is returned by a wrapped durable handler.

type InvocationStatus

type InvocationStatus string

InvocationStatus is the top-level status returned from a wrapped durable handler invocation.

const (
	// InvocationStatusSucceeded means the handler completed and Result contains
	// the serialized handler return value when present.
	InvocationStatusSucceeded InvocationStatus = "SUCCEEDED"
	// InvocationStatusFailed means the handler completed with a business error.
	InvocationStatusFailed InvocationStatus = "FAILED"
	// InvocationStatusPending means execution paused on a durable wait, callback,
	// retry, invoke, or child operation and should be resumed later.
	InvocationStatusPending InvocationStatus = "PENDING"
)

type InvokeConfig

type InvokeConfig struct {
	// PayloadSerdes serializes the invocation payload.
	PayloadSerdes Serdes
	// ResultSerdes deserializes the invoked function result.
	ResultSerdes Serdes
}

InvokeConfig configures DurableContext.Invoke.

type JSONSerdes

type JSONSerdes struct{}

JSONSerdes serializes values with encoding/json.

func (JSONSerdes) Deserialize

func (JSONSerdes) Deserialize(data string, _ SerdesContext) (any, error)

Deserialize converts a JSON string to a Go value.

func (JSONSerdes) Serialize

func (JSONSerdes) Serialize(value any, _ SerdesContext) (string, error)

Serialize converts value to a JSON string.

type Logger

type Logger interface {
	Debugf(format string, args ...any)
	Infof(format string, args ...any)
	Warnf(format string, args ...any)
	Errorf(format string, args ...any)
}

Logger is the minimal logging interface used by the SDK.

type MapConfig

type MapConfig struct {
	MaxConcurrency   int
	ItemNamer        func(item any, index int) string
	Serdes           Serdes
	ItemSerdes       Serdes
	CompletionConfig *CompletionConfig
}

MapConfig configures DurableContext.Map.

type MapFunc

type MapFunc func(context *DurableContext, item any, index int, array []any) (any, error)

MapFunc processes one item in DurableContext.Map.

type NamedParallelBranch

type NamedParallelBranch struct {
	Name string
	Func ParallelFunc
}

NamedParallelBranch names one Parallel branch and provides its function.

type NopLogger

type NopLogger struct{}

NopLogger discards all log messages.

func (NopLogger) Debugf

func (NopLogger) Debugf(string, ...any)

Debugf discards a debug log message.

func (NopLogger) Errorf

func (NopLogger) Errorf(string, ...any)

Errorf discards an error log message.

func (NopLogger) Infof

func (NopLogger) Infof(string, ...any)

Infof discards an info log message.

func (NopLogger) Warnf

func (NopLogger) Warnf(string, ...any)

Warnf discards a warning log message.

type Operation

type Operation struct {
	ID       string           `json:"Id,omitempty"`
	ParentID string           `json:"ParentId,omitempty"`
	Name     string           `json:"Name,omitempty"`
	Type     OperationType    `json:"Type,omitempty"`
	SubType  OperationSubType `json:"SubType,omitempty"`
	Status   OperationStatus  `json:"Status,omitempty"`

	ExecutionDetails     *ExecutionDetails     `json:"ExecutionDetails,omitempty"`
	StepDetails          *StepDetails          `json:"StepDetails,omitempty"`
	WaitDetails          *WaitDetails          `json:"WaitDetails,omitempty"`
	CallbackDetails      *CallbackDetails      `json:"CallbackDetails,omitempty"`
	ChainedInvokeDetails *ChainedInvokeDetails `json:"ChainedInvokeDetails,omitempty"`
	ContextDetails       *ContextDetails       `json:"ContextDetails,omitempty"`

	StartTimestamp *time.Time `json:"StartTimestamp,omitempty"`
}

Operation is a durable backend operation record.

type OperationAction

type OperationAction string

OperationAction is the checkpoint update action sent to the durable backend.

const (
	// OperationActionStart starts an operation.
	OperationActionStart OperationAction = "START"
	// OperationActionSucceed marks an operation as succeeded.
	OperationActionSucceed OperationAction = "SUCCEED"
	// OperationActionFail marks an operation as failed.
	OperationActionFail OperationAction = "FAIL"
	// OperationActionRetry schedules a retry for an operation.
	OperationActionRetry OperationAction = "RETRY"
)

type OperationLifecycleState

type OperationLifecycleState string

OperationLifecycleState is an in-process state tracked while an invocation is running.

const (
	// OperationLifecycleExecuting means the operation is running locally.
	OperationLifecycleExecuting OperationLifecycleState = "EXECUTING"
	// OperationLifecycleRetryWaiting means the operation is waiting for a retry
	// timer.
	OperationLifecycleRetryWaiting OperationLifecycleState = "RETRY_WAITING"
	// OperationLifecycleIdleNotAwaited means the operation has been scheduled but
	// its future has not been awaited.
	OperationLifecycleIdleNotAwaited OperationLifecycleState = "IDLE_NOT_AWAITED"
	// OperationLifecycleIdleAwaited means the operation has been awaited and is
	// waiting for external completion.
	OperationLifecycleIdleAwaited OperationLifecycleState = "IDLE_AWAITED"
	// OperationLifecycleCompleted means the operation is terminal locally.
	OperationLifecycleCompleted OperationLifecycleState = "COMPLETED"
)

type OperationMetadata

type OperationMetadata struct {
	StepID   string
	Name     string
	Type     OperationType
	SubType  OperationSubType
	ParentID string
}

OperationMetadata identifies an operation being tracked locally.

type OperationStatus

type OperationStatus string

OperationStatus is the status read from durable execution state.

const (
	// OperationStatusStarted means the operation has started.
	OperationStatusStarted OperationStatus = "STARTED"
	// OperationStatusReady means the operation is ready to run.
	OperationStatusReady OperationStatus = "READY"
	// OperationStatusPending means the operation is waiting for a timer,
	// callback, retry, or external completion.
	OperationStatusPending OperationStatus = "PENDING"
	// OperationStatusSucceeded means the operation completed successfully.
	OperationStatusSucceeded OperationStatus = "SUCCEEDED"
	// OperationStatusFailed means the operation completed with an error.
	OperationStatusFailed OperationStatus = "FAILED"
	// OperationStatusCancelled means the operation was canceled.
	OperationStatusCancelled OperationStatus = "CANCELLED"
	// OperationStatusStopped means the operation was stopped.
	OperationStatusStopped OperationStatus = "STOPPED"
	// OperationStatusTimedOut means the operation timed out.
	OperationStatusTimedOut OperationStatus = "TIMED_OUT"
)

type OperationSubType

type OperationSubType string

OperationSubType identifies a higher-level SDK primitive within an OperationType.

const (
	// OperationSubTypeStep is used by DurableContext.Step.
	OperationSubTypeStep OperationSubType = "Step"
	// OperationSubTypeWait is used by DurableContext.Wait.
	OperationSubTypeWait OperationSubType = "Wait"
	// OperationSubTypeCallback is used by DurableContext.CreateCallback.
	OperationSubTypeCallback OperationSubType = "Callback"
	// OperationSubTypeRunInChild is used by DurableContext.RunInChildContext.
	OperationSubTypeRunInChild OperationSubType = "RunInChildContext"
	// OperationSubTypeMap is the top-level Map context subtype.
	OperationSubTypeMap OperationSubType = "Map"
	// OperationSubTypeMapIteration is the per-item Map child subtype.
	OperationSubTypeMapIteration OperationSubType = "MapIteration"
	// OperationSubTypeParallel is the top-level Parallel context subtype.
	OperationSubTypeParallel OperationSubType = "Parallel"
	// OperationSubTypeParallelBranch is the per-branch Parallel child subtype.
	OperationSubTypeParallelBranch OperationSubType = "ParallelBranch"
	// OperationSubTypeWaitForCallback is used by DurableContext.WaitForCallback.
	OperationSubTypeWaitForCallback OperationSubType = "WaitForCallback"
	// OperationSubTypeWaitForCondition is used by DurableContext.WaitForCondition.
	OperationSubTypeWaitForCondition OperationSubType = "WaitForCondition"
	// OperationSubTypeChainedInvoke is used by DurableContext.Invoke.
	OperationSubTypeChainedInvoke OperationSubType = "ChainedInvoke"
)

type OperationType

type OperationType string

OperationType is the durable backend operation category.

const (
	// OperationTypeExecution represents the top-level handler execution.
	OperationTypeExecution OperationType = "EXECUTION"
	// OperationTypeStep represents a durable step.
	OperationTypeStep OperationType = "STEP"
	// OperationTypeWait represents a durable timer.
	OperationTypeWait OperationType = "WAIT"
	// OperationTypeCallback represents an external callback wait.
	OperationTypeCallback OperationType = "CALLBACK"
	// OperationTypeContext represents a child durable context.
	OperationTypeContext OperationType = "CONTEXT"
	// OperationTypeChainedInvoke represents a durable chained Lambda invoke.
	OperationTypeChainedInvoke OperationType = "CHAINED_INVOKE"
)

type OperationUpdate

type OperationUpdate struct {
	ID       string           `json:"Id,omitempty"`
	ParentID string           `json:"ParentId,omitempty"`
	Name     string           `json:"Name,omitempty"`
	Type     OperationType    `json:"Type,omitempty"`
	SubType  OperationSubType `json:"SubType,omitempty"`
	Action   OperationAction  `json:"Action,omitempty"`

	Payload string       `json:"Payload,omitempty"`
	Error   *ErrorObject `json:"Error,omitempty"`

	StepOptions          *StepOptions          `json:"StepOptions,omitempty"`
	WaitOptions          *WaitOptions          `json:"WaitOptions,omitempty"`
	CallbackOptions      *CallbackOptions      `json:"CallbackOptions,omitempty"`
	ChainedInvokeOptions *ChainedInvokeOptions `json:"ChainedInvokeOptions,omitempty"`
	ContextOptions       *ContextOptions       `json:"ContextOptions,omitempty"`
}

OperationUpdate is sent to DurableExecutionClient.Checkpoint to change operation state.

type ParallelConfig

type ParallelConfig struct {
	MaxConcurrency   int
	Serdes           Serdes
	ItemSerdes       Serdes
	CompletionConfig *CompletionConfig
}

ParallelConfig configures DurableContext.Parallel.

type ParallelFunc

type ParallelFunc func(context *DurableContext) (any, error)

ParallelFunc executes one branch in DurableContext.Parallel.

type PassThroughSerdes

type PassThroughSerdes struct{}

PassThroughSerdes passes string values through unchanged and JSON-serializes non-string values.

func (PassThroughSerdes) Deserialize

func (PassThroughSerdes) Deserialize(data string, _ SerdesContext) (any, error)

Deserialize returns data unchanged.

func (PassThroughSerdes) Serialize

func (PassThroughSerdes) Serialize(value any, _ SerdesContext) (string, error)

Serialize returns string values unchanged and JSON-serializes other values.

type RetryDecision

type RetryDecision struct {
	ShouldRetry bool
	Delay       Duration
}

RetryDecision is returned by a RetryStrategy.

type RetryStrategy

type RetryStrategy func(err error, attempt int) RetryDecision

RetryStrategy decides whether a failed operation should be retried.

attempt is one-based for the retry attempt being scheduled.

type Serdes

type Serdes interface {
	Serialize(value any, ctx SerdesContext) (string, error)
	Deserialize(data string, ctx SerdesContext) (any, error)
}

Serdes serializes values into checkpoint payload strings and deserializes them back into Go values.

type SerdesContext

type SerdesContext struct {
	// EntityID is the durable operation ID before backend hashing.
	EntityID string
	// DurableExecutionArn identifies the current durable execution.
	DurableExecutionArn string
}

SerdesContext provides operation metadata to serializers.

type StepConfig

type StepConfig struct {
	// RetryStrategy controls retry scheduling after a step failure. A default
	// exponential backoff strategy is used when nil.
	RetryStrategy RetryStrategy
	// Semantics controls how interrupted started steps are treated on replay.
	Semantics StepSemantics
	// Serdes serializes successful step results into checkpoint payloads.
	Serdes Serdes
}

StepConfig configures DurableContext.Step.

type StepContext

type StepContext struct {
	// Logger receives log messages for the current step.
	Logger Logger
}

StepContext contains metadata and helpers passed to a StepFunc.

type StepDetails

type StepDetails struct {
	Result               string       `json:"Result,omitempty"`
	Error                *ErrorObject `json:"Error,omitempty"`
	Attempt              int          `json:"Attempt,omitempty"`
	NextAttemptTimestamp *time.Time   `json:"NextAttemptTimestamp,omitempty"`
}

StepDetails contains persisted state for a step-like operation.

type StepFunc

type StepFunc func(ctx context.Context, stepCtx StepContext) (any, error)

StepFunc is the function executed by DurableContext.Step.

Step functions should be safe to retry according to the configured StepSemantics and RetryStrategy.

type StepOptions

type StepOptions struct {
	NextAttemptDelaySeconds int `json:"NextAttemptDelaySeconds,omitempty"`
}

StepOptions are checkpoint options for step retry scheduling.

type StepSemantics

type StepSemantics string

StepSemantics controls how Step handles an operation that was started but not completed before the invocation stopped.

const (
	// StepSemanticsAtLeastOncePerRetry allows a started step to run again on the
	// same retry attempt after replay.
	StepSemanticsAtLeastOncePerRetry StepSemantics = "AT_LEAST_ONCE_PER_RETRY"
	// StepSemanticsAtMostOncePerRetry treats an interrupted started step as a
	// failure and schedules the next retry attempt.
	StepSemanticsAtMostOncePerRetry StepSemantics = "AT_MOST_ONCE_PER_RETRY"
)

type TerminationDetails

type TerminationDetails struct {
	Reason  TerminationReason
	Message string
	Error   error
}

TerminationDetails describes why the current invocation should stop early.

type TerminationManager

type TerminationManager struct {
	// contains filtered or unexported fields
}

TerminationManager coordinates early termination across concurrent durable operations in one invocation.

func NewTerminationManager

func NewTerminationManager() *TerminationManager

NewTerminationManager creates an unterminated manager.

func (*TerminationManager) Channel

func (m *TerminationManager) Channel() <-chan TerminationDetails

Channel is closed by sending the first termination details.

func (*TerminationManager) Details

func (m *TerminationManager) Details() (TerminationDetails, bool)

Details returns termination details and whether termination has happened.

func (*TerminationManager) IsTerminated

func (m *TerminationManager) IsTerminated() bool

IsTerminated reports whether Terminate has been called.

func (*TerminationManager) Terminate

func (m *TerminationManager) Terminate(details TerminationDetails)

Terminate records termination details and notifies waiters once.

type TerminationReason

type TerminationReason string

TerminationReason explains why the current invocation stopped before normal handler completion.

const (
	// TerminationReasonOperationTerminated means an operation requested
	// termination of the current invocation.
	TerminationReasonOperationTerminated TerminationReason = "OPERATION_TERMINATED"
	// TerminationReasonRetryScheduled means a retry timer was scheduled.
	TerminationReasonRetryScheduled TerminationReason = "RETRY_SCHEDULED"
	// TerminationReasonRetryInterruptedStep means an interrupted at-most-once
	// step scheduled a retry.
	TerminationReasonRetryInterruptedStep TerminationReason = "RETRY_INTERRUPTED_STEP"
	// TerminationReasonWaitScheduled means a wait timer was scheduled.
	TerminationReasonWaitScheduled TerminationReason = "WAIT_SCHEDULED"
	// TerminationReasonCallbackPending means a callback is waiting externally.
	TerminationReasonCallbackPending TerminationReason = "CALLBACK_PENDING"
	// TerminationReasonCheckpointFailed means checkpoint persistence failed.
	TerminationReasonCheckpointFailed TerminationReason = "CHECKPOINT_FAILED"
	// TerminationReasonSerdesFailed means serialization or deserialization failed.
	TerminationReasonSerdesFailed TerminationReason = "SERDES_FAILED"
	// TerminationReasonContextValidation means a durable operation used the wrong
	// context.
	TerminationReasonContextValidation TerminationReason = "CONTEXT_VALIDATION_ERROR"
	// TerminationReasonCustom is available for user-defined termination.
	TerminationReasonCustom TerminationReason = "CUSTOM"
)

type UnrecoverableError

type UnrecoverableError interface {
	error
	TerminationReason() TerminationReason
	InvocationLevel() bool
	ExecutionLevel() bool
}

UnrecoverableError marks an error that should stop the current invocation or execution instead of being converted into InvocationStatusFailed.

type WaitDetails

type WaitDetails struct {
	ScheduledEndTimestamp *time.Time `json:"ScheduledEndTimestamp,omitempty"`
}

WaitDetails contains persisted state for a wait operation.

type WaitForCallbackConfig

type WaitForCallbackConfig struct {
	Timeout          *Duration
	HeartbeatTimeout *Duration
	RetryStrategy    RetryStrategy
	Serdes           Serdes
}

WaitForCallbackConfig configures DurableContext.WaitForCallback.

type WaitForCallbackContext

type WaitForCallbackContext struct {
	Logger Logger
}

WaitForCallbackContext is passed to a WaitForCallback submitter.

type WaitForCallbackSubmitterFunc

type WaitForCallbackSubmitterFunc func(callbackID string, ctx WaitForCallbackContext) error

WaitForCallbackSubmitterFunc sends a callback ID to an external system.

type WaitForConditionCheckFunc

type WaitForConditionCheckFunc func(state any, ctx WaitForConditionContext) (any, error)

WaitForConditionCheckFunc evaluates and returns the next condition state.

type WaitForConditionConfig

type WaitForConditionConfig struct {
	WaitStrategy WaitForConditionWaitStrategyFunc
	InitialState any
	Serdes       Serdes
}

WaitForConditionConfig configures DurableContext.WaitForCondition.

type WaitForConditionContext

type WaitForConditionContext struct {
	Logger Logger
}

WaitForConditionContext is passed to a WaitForCondition check function.

type WaitForConditionDecision

type WaitForConditionDecision struct {
	ShouldContinue bool
	Delay          Duration
}

WaitForConditionDecision is returned by a WaitForCondition wait strategy.

type WaitForConditionWaitStrategyFunc

type WaitForConditionWaitStrategyFunc func(state any, attempt int) WaitForConditionDecision

WaitForConditionWaitStrategyFunc decides whether WaitForCondition should wait and retry or complete with the current state.

type WaitOptions

type WaitOptions struct {
	WaitSeconds int `json:"WaitSeconds,omitempty"`
}

WaitOptions are checkpoint options for durable waits.

type WrappedHandler

type WrappedHandler func(ctx context.Context, input InvocationInput) (InvocationOutput, error)

WrappedHandler is the handler shape returned by WithDurableExecution.

func WithDurableExecution

func WithDurableExecution(handler DurableExecutionHandler, config DurableExecutionConfig) WrappedHandler

WithDurableExecution wraps a DurableExecutionHandler with replay, checkpointing, and pending-status handling.

The returned handler consumes InvocationInput from the durable backend. It returns InvocationStatusPending when execution stopped because a durable wait, callback, retry timer, or child operation is still pending. Infrastructure and unrecoverable invocation errors are returned as Go errors.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/kurochan/aws-durable-execution-go/durable"
)

func exampleInput(arn, token, payload string) durable.InvocationInput {
	return durable.InvocationInput{
		DurableExecutionArn: arn,
		CheckpointToken:     token,
		InitialExecutionState: durable.ExecutionState{
			Operations: []durable.Operation{{
				ID:     "execution-root",
				Type:   durable.OperationTypeExecution,
				Status: durable.OperationStatusStarted,
				ExecutionDetails: &durable.ExecutionDetails{
					InputPayload: payload,
				},
			}},
		},
	}
}

func main() {
	client := durable.NewInMemoryClient()
	wrapped := durable.WithDurableExecution(
		func(ctx context.Context, event any, dctx *durable.DurableContext) (any, error) {
			result, err := dctx.Step(ctx, "build-greeting", func(context.Context, durable.StepContext) (any, error) {
				input := event.(map[string]any)
				return "hello " + input["name"].(string), nil
			}, nil).Await(ctx)
			if err != nil {
				return nil, err
			}
			return result, nil
		},
		durable.DurableExecutionConfig{Client: client},
	)

	out, err := wrapped(context.Background(), exampleInput(
		"arn:test:execution:example",
		"token-example",
		`{"name":"alice"}`,
	))
	if err != nil {
		panic(err)
	}

	var result string
	if err := json.Unmarshal([]byte(out.Result), &result); err != nil {
		panic(err)
	}
	fmt.Println(out.Status)
	fmt.Println(result)

}
Output:
SUCCEEDED
hello alice

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL