replay

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsSuspendSignal

func IsSuspendSignal(err error) bool

IsSuspendSignal returns true if the error is a SuspendSignal.

Types

type CachedResult

type CachedResult struct {
	RawJSON []byte // Original JSON bytes from history
	Value   any    // Unmarshaled value
}

CachedResult holds both raw JSON bytes and the unmarshaled value. This avoids re-serialization when the raw JSON is needed.

type CompensationRunner

type CompensationRunner func(ctx context.Context, funcName string, arg []byte) error

CompensationRunner is a function that executes a compensation. It receives the function name and JSON-encoded argument.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine handles workflow execution with deterministic replay.

func NewEngine

func NewEngine(s storage.Storage, h hooks.WorkflowHooks, workerID string) *Engine

NewEngine creates a new replay engine.

func (*Engine) RecordActivityResult

func (e *Engine) RecordActivityResult(
	ctx context.Context,
	instanceID string,
	activityID string,
	result any,
	activityErr error,
) error

RecordActivityResult records an activity result in history.

func (*Engine) ResumeWorkflow

func (e *Engine) ResumeWorkflow(
	ctx context.Context,
	instanceID string,
	runner WorkflowRunner,
) error

ResumeWorkflow resumes a workflow from its history.

func (*Engine) SetCompensationRunner

func (e *Engine) SetCompensationRunner(runner CompensationRunner)

SetCompensationRunner sets the compensation runner for the engine.

func (*Engine) StartWorkflow

func (e *Engine) StartWorkflow(
	ctx context.Context,
	instanceID string,
	workflowName string,
	inputData []byte,
	runner WorkflowRunner,
) error

StartWorkflow starts a new workflow instance.

type ExecutionContext

type ExecutionContext struct {
	// contains filtered or unexported fields
}

ExecutionContext provides context for workflow execution.

func (*ExecutionContext) Context

func (ec *ExecutionContext) Context() context.Context

Context returns the underlying context.Context.

func (*ExecutionContext) GenerateActivityID

func (ec *ExecutionContext) GenerateActivityID(activityName string) string

GenerateActivityID generates a unique activity ID for the given activity name.

func (*ExecutionContext) GetCachedResult

func (ec *ExecutionContext) GetCachedResult(activityID string) (any, bool)

GetCachedResult retrieves a cached result from replay history. Returns (result, true) if found, (nil, false) if not found. The returned value is the unmarshaled any value from CachedResult.Value.

func (*ExecutionContext) GetCachedResultRaw

func (ec *ExecutionContext) GetCachedResultRaw(activityID string) (*CachedResult, bool)

GetCachedResultRaw retrieves a cached result with raw JSON bytes. Returns (*CachedResult, true) if found, (nil, false) if not found. Use this to avoid re-serialization when the raw JSON is needed.

func (*ExecutionContext) Hooks

func (ec *ExecutionContext) Hooks() hooks.WorkflowHooks

Hooks returns the workflow hooks for observability.

func (*ExecutionContext) InstanceID

func (ec *ExecutionContext) InstanceID() string

InstanceID returns the workflow instance ID.

func (*ExecutionContext) IsReplaying

func (ec *ExecutionContext) IsReplaying() bool

IsReplaying returns true if the workflow is being replayed.

func (*ExecutionContext) RecordActivityID

func (ec *ExecutionContext) RecordActivityID(activityID string)

RecordActivityID records that an activity has been executed.

func (*ExecutionContext) RecordActivityResult

func (ec *ExecutionContext) RecordActivityResult(activityID string, result any, activityErr error) error

RecordActivityResult records an activity result to storage. This delegates to the engine's RecordActivityResult method.

func (*ExecutionContext) RecordActivityResultWithContext

func (ec *ExecutionContext) RecordActivityResultWithContext(ctx context.Context, activityID string, result any, activityErr error) error

RecordActivityResultWithContext records an activity result to storage using the provided context. This is useful when recording within a transaction where the transaction context should be used.

func (*ExecutionContext) SetCachedResult

func (ec *ExecutionContext) SetCachedResult(activityID string, result any)

SetCachedResult caches a result for replay.

func (*ExecutionContext) SetCachedResultWithRaw

func (ec *ExecutionContext) SetCachedResultWithRaw(activityID string, rawJSON []byte, value any)

SetCachedResultWithRaw caches a result with raw JSON bytes for replay.

func (*ExecutionContext) Storage

func (ec *ExecutionContext) Storage() storage.Storage

Storage returns the storage interface for advanced use cases.

type SuspendSignal

type SuspendSignal struct {
	// Type indicates what the workflow is waiting for
	Type SuspendType

	// InstanceID is the workflow instance being suspended
	InstanceID string

	// For SuspendForTimer
	TimerID   string
	ExpiresAt time.Time

	// For SuspendForChannelMessage (channel/event waiting) or Timer
	Channel    string         // Channel or event type name
	Timeout    *time.Duration // Optional timeout
	ActivityID string         // Activity ID for replay matching (used for both channel and timer)

	// For SuspendForRecur
	NewInput any // New input for the next iteration
}

SuspendSignal is returned when a workflow needs to suspend execution. It implements the error interface for compatibility with Go's error handling, but it is NOT an error - it's a control flow signal.

When a workflow function returns a SuspendSignal, callers should propagate it:

if err := romancy.Sleep(ctx, time.Hour); err != nil {
    return result, err  // Propagate the suspend signal
}

The replay engine will detect SuspendSignal and handle the suspension appropriately.

func AsSuspendSignal

func AsSuspendSignal(err error) *SuspendSignal

AsSuspendSignal extracts the SuspendSignal from an error if present. Returns nil if the error is not a SuspendSignal.

func NewChannelMessageSuspend

func NewChannelMessageSuspend(instanceID, channel, activityID string, timeout *time.Duration) *SuspendSignal

NewChannelMessageSuspend creates a SuspendSignal for channel message waiting (via Receive or WaitEvent).

func NewRecurSuspend

func NewRecurSuspend(instanceID string, newInput any) *SuspendSignal

NewRecurSuspend creates a SuspendSignal for workflow recursion.

func NewTimerSuspend

func NewTimerSuspend(instanceID, timerID, activityID string, expiresAt time.Time) *SuspendSignal

NewTimerSuspend creates a SuspendSignal for timer waiting.

func (*SuspendSignal) Error

func (s *SuspendSignal) Error() string

type SuspendType

type SuspendType int

SuspendType represents the type of workflow suspension.

const (
	// SuspendForTimer indicates the workflow is waiting for a timer to expire.
	SuspendForTimer SuspendType = iota
	// SuspendForChannelMessage indicates the workflow is waiting for a channel message (via Receive or WaitEvent).
	SuspendForChannelMessage
	// SuspendForRecur indicates the workflow is recursing with new input.
	SuspendForRecur
)

func (SuspendType) String

func (t SuspendType) String() string

type WorkflowRunner

type WorkflowRunner func(ctx *ExecutionContext) (any, error)

WorkflowRunner is a function that executes the workflow logic. It receives the context and returns the result or an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL