Documentation
¶
Index ¶
- func IsSuspendSignal(err error) bool
- type CachedResult
- type CompensationRunner
- type Engine
- func (e *Engine) RecordActivityResult(ctx context.Context, instanceID string, activityID string, result any, ...) error
- func (e *Engine) ResumeWorkflow(ctx context.Context, instanceID string, runner WorkflowRunner) error
- func (e *Engine) SetCompensationRunner(runner CompensationRunner)
- func (e *Engine) StartWorkflow(ctx context.Context, instanceID string, workflowName string, inputData []byte, ...) error
- type ExecutionContext
- func (ec *ExecutionContext) Context() context.Context
- func (ec *ExecutionContext) GenerateActivityID(activityName string) string
- func (ec *ExecutionContext) GetCachedResult(activityID string) (any, bool)
- func (ec *ExecutionContext) GetCachedResultRaw(activityID string) (*CachedResult, bool)
- func (ec *ExecutionContext) Hooks() hooks.WorkflowHooks
- func (ec *ExecutionContext) InstanceID() string
- func (ec *ExecutionContext) IsReplaying() bool
- func (ec *ExecutionContext) RecordActivityID(activityID string)
- func (ec *ExecutionContext) RecordActivityResult(activityID string, result any, activityErr error) error
- func (ec *ExecutionContext) RecordActivityResultWithContext(ctx context.Context, activityID string, result any, activityErr error) error
- func (ec *ExecutionContext) SetCachedResult(activityID string, result any)
- func (ec *ExecutionContext) SetCachedResultWithRaw(activityID string, rawJSON []byte, value any)
- func (ec *ExecutionContext) Storage() storage.Storage
- type SuspendSignal
- func AsSuspendSignal(err error) *SuspendSignal
- func NewChannelMessageSuspend(instanceID, channel, activityID string, timeout *time.Duration) *SuspendSignal
- func NewRecurSuspend(instanceID string, newInput any) *SuspendSignal
- func NewTimerSuspend(instanceID, timerID, activityID string, expiresAt time.Time) *SuspendSignal
- type SuspendType
- type WorkflowRunner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsSuspendSignal ¶
IsSuspendSignal returns true if the error is a SuspendSignal.
Types ¶
type CachedResult ¶
type CachedResult struct {
RawJSON []byte // Original JSON bytes from history
Value any // Unmarshaled value
}
CachedResult holds both raw JSON bytes and the unmarshaled value. This avoids re-serialization when the raw JSON is needed.
type CompensationRunner ¶
CompensationRunner is a function that executes a compensation. It receives the function name and JSON-encoded argument.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine handles workflow execution with deterministic replay.
func (*Engine) RecordActivityResult ¶
func (e *Engine) RecordActivityResult( ctx context.Context, instanceID string, activityID string, result any, activityErr error, ) error
RecordActivityResult records an activity result in history.
func (*Engine) ResumeWorkflow ¶
func (e *Engine) ResumeWorkflow( ctx context.Context, instanceID string, runner WorkflowRunner, ) error
ResumeWorkflow resumes a workflow from its history.
func (*Engine) SetCompensationRunner ¶
func (e *Engine) SetCompensationRunner(runner CompensationRunner)
SetCompensationRunner sets the compensation runner for the engine.
func (*Engine) StartWorkflow ¶
func (e *Engine) StartWorkflow( ctx context.Context, instanceID string, workflowName string, inputData []byte, runner WorkflowRunner, ) error
StartWorkflow starts a new workflow instance.
type ExecutionContext ¶
type ExecutionContext struct {
// contains filtered or unexported fields
}
ExecutionContext provides context for workflow execution.
func (*ExecutionContext) Context ¶
func (ec *ExecutionContext) Context() context.Context
Context returns the underlying context.Context.
func (*ExecutionContext) GenerateActivityID ¶
func (ec *ExecutionContext) GenerateActivityID(activityName string) string
GenerateActivityID generates a unique activity ID for the given activity name.
func (*ExecutionContext) GetCachedResult ¶
func (ec *ExecutionContext) GetCachedResult(activityID string) (any, bool)
GetCachedResult retrieves a cached result from replay history. Returns (result, true) if found, (nil, false) if not found. The returned value is the unmarshaled any value from CachedResult.Value.
func (*ExecutionContext) GetCachedResultRaw ¶
func (ec *ExecutionContext) GetCachedResultRaw(activityID string) (*CachedResult, bool)
GetCachedResultRaw retrieves a cached result with raw JSON bytes. Returns (*CachedResult, true) if found, (nil, false) if not found. Use this to avoid re-serialization when the raw JSON is needed.
func (*ExecutionContext) Hooks ¶
func (ec *ExecutionContext) Hooks() hooks.WorkflowHooks
Hooks returns the workflow hooks for observability.
func (*ExecutionContext) InstanceID ¶
func (ec *ExecutionContext) InstanceID() string
InstanceID returns the workflow instance ID.
func (*ExecutionContext) IsReplaying ¶
func (ec *ExecutionContext) IsReplaying() bool
IsReplaying returns true if the workflow is being replayed.
func (*ExecutionContext) RecordActivityID ¶
func (ec *ExecutionContext) RecordActivityID(activityID string)
RecordActivityID records that an activity has been executed.
func (*ExecutionContext) RecordActivityResult ¶
func (ec *ExecutionContext) RecordActivityResult(activityID string, result any, activityErr error) error
RecordActivityResult records an activity result to storage. This delegates to the engine's RecordActivityResult method.
func (*ExecutionContext) RecordActivityResultWithContext ¶
func (ec *ExecutionContext) RecordActivityResultWithContext(ctx context.Context, activityID string, result any, activityErr error) error
RecordActivityResultWithContext records an activity result to storage using the provided context. This is useful when recording within a transaction where the transaction context should be used.
func (*ExecutionContext) SetCachedResult ¶
func (ec *ExecutionContext) SetCachedResult(activityID string, result any)
SetCachedResult caches a result for replay.
func (*ExecutionContext) SetCachedResultWithRaw ¶
func (ec *ExecutionContext) SetCachedResultWithRaw(activityID string, rawJSON []byte, value any)
SetCachedResultWithRaw caches a result with raw JSON bytes for replay.
func (*ExecutionContext) Storage ¶
func (ec *ExecutionContext) Storage() storage.Storage
Storage returns the storage interface for advanced use cases.
type SuspendSignal ¶
type SuspendSignal struct {
// Type indicates what the workflow is waiting for
Type SuspendType
// InstanceID is the workflow instance being suspended
InstanceID string
// For SuspendForTimer
TimerID string
ExpiresAt time.Time
// For SuspendForChannelMessage (channel/event waiting) or Timer
Channel string // Channel or event type name
Timeout *time.Duration // Optional timeout
ActivityID string // Activity ID for replay matching (used for both channel and timer)
// For SuspendForRecur
NewInput any // New input for the next iteration
}
SuspendSignal is returned when a workflow needs to suspend execution. It implements the error interface for compatibility with Go's error handling, but it is NOT an error - it's a control flow signal.
When a workflow function returns a SuspendSignal, callers should propagate it:
if err := romancy.Sleep(ctx, time.Hour); err != nil {
return result, err // Propagate the suspend signal
}
The replay engine will detect SuspendSignal and handle the suspension appropriately.
func AsSuspendSignal ¶
func AsSuspendSignal(err error) *SuspendSignal
AsSuspendSignal extracts the SuspendSignal from an error if present. Returns nil if the error is not a SuspendSignal.
func NewChannelMessageSuspend ¶
func NewChannelMessageSuspend(instanceID, channel, activityID string, timeout *time.Duration) *SuspendSignal
NewChannelMessageSuspend creates a SuspendSignal for channel message waiting (via Receive or WaitEvent).
func NewRecurSuspend ¶
func NewRecurSuspend(instanceID string, newInput any) *SuspendSignal
NewRecurSuspend creates a SuspendSignal for workflow recursion.
func NewTimerSuspend ¶
func NewTimerSuspend(instanceID, timerID, activityID string, expiresAt time.Time) *SuspendSignal
NewTimerSuspend creates a SuspendSignal for timer waiting.
func (*SuspendSignal) Error ¶
func (s *SuspendSignal) Error() string
type SuspendType ¶
type SuspendType int
SuspendType represents the type of workflow suspension.
const ( // SuspendForTimer indicates the workflow is waiting for a timer to expire. SuspendForTimer SuspendType = iota // SuspendForChannelMessage indicates the workflow is waiting for a channel message (via Receive or WaitEvent). SuspendForChannelMessage // SuspendForRecur indicates the workflow is recursing with new input. SuspendForRecur )
func (SuspendType) String ¶
func (t SuspendType) String() string
type WorkflowRunner ¶
type WorkflowRunner func(ctx *ExecutionContext) (any, error)
WorkflowRunner is a function that executes the workflow logic. It receives the context and returns the result or an error.