Versions in this module Expand all Collapse all v0 v0.6.1 Dec 25, 2025 v0.6.0 Dec 21, 2025 Changes in this version + type ChannelModeConflictError struct + Channel string + ExistingMode string + RequestedMode string + func (e *ChannelModeConflictError) Error() string v0.5.0 Dec 20, 2025 v0.4.0 Dec 20, 2025 v0.3.0 Dec 18, 2025 Changes in this version + func EmbeddedMigrationsFS() fs.FS type ChannelMode + const ModeDirect type Option + func WithLeaderHeartbeatInterval(d time.Duration) Option + func WithLeaderLeaseDuration(d time.Duration) Option + func WithMigrationsFS(migrationsFS fs.FS) Option v0.2.0 Dec 14, 2025 Changes in this version type WorkflowContext + func (c *WorkflowContext) App() *App + func (c *WorkflowContext) SetApp(app *App) v0.1.0 Dec 13, 2025 Changes in this version + const SuspendForChannelMessage + const SuspendForRecur + const SuspendForTimer + var AsSuspendSignal = replay.AsSuspendSignal + var ErrActivityIDConflict = errors.New("activity ID conflict: duplicate activity ID in workflow") + var ErrChannelNotSubscribed = errors.New("not subscribed to channel") + var ErrDeterminismViolation = errors.New("determinism violation during replay") + var ErrGroupNotJoined = errors.New("not a member of group") + var ErrInvalidWorkflowState = errors.New("invalid workflow state") + var ErrWorkflowAlreadyCancelled = errors.New("workflow already cancelled") + var ErrWorkflowAlreadyCompleted = errors.New("workflow already completed") + var ErrWorkflowAlreadyFailed = errors.New("workflow already failed") + var ErrWorkflowNotCancellable = errors.New("workflow cannot be cancelled") + var IsSuspendSignal = replay.IsSuspendSignal + var NewChannelMessageSuspend = replay.NewChannelMessageSuspend + var NewRecurSuspend = replay.NewRecurSuspend + var NewTimerSuspend = replay.NewTimerSuspend + func CancelWorkflow(ctx context.Context, app *App, instanceID, reason string) error + func ContextWithWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) context.Context + func GetGroupMembers(ctx context.Context, s storage.Storage, groupName string) ([]string, error) + func IsTerminalError(err error) bool + func JoinGroup(ctx *WorkflowContext, groupName string) error + func LeaveGroup(ctx *WorkflowContext, groupName string) error + func Publish[T any](ctx *WorkflowContext, channelName string, data T, opts ...PublishOption) error + func Recur[T any](ctx *WorkflowContext, input T) (T, error) + func RegisterActivity[I, O any](activity *Activity[I, O]) + func RegisterWorkflow[I, O any](app *App, workflow Workflow[I, O], opts ...WorkflowOption) + func SendEventTransactional[T any](ctx *WorkflowContext, eventType, source string, data T, ...) error + func SendEvent[T any](ctx *WorkflowContext, eventType, source string, data T) error + func SendTo[T any](ctx *WorkflowContext, targetInstanceID, channelName string, data T, ...) error + func Sleep(ctx *WorkflowContext, duration time.Duration, opts ...SleepOption) error + func SleepUntil(ctx *WorkflowContext, t time.Time, opts ...SleepOption) error + func StartWorkflow[I, O any](ctx context.Context, app *App, workflow Workflow[I, O], input I, ...) (string, error) + func Subscribe(ctx *WorkflowContext, channelName string, mode ChannelMode) error + func Unsubscribe(ctx *WorkflowContext, channelName string) error + type Activity struct + func DefineActivity[I, O any](name string, fn func(ctx context.Context, input I) (O, error), ...) *Activity[I, O] + func (a *Activity[I, O]) Compensate(ctx context.Context, input I) error + func (a *Activity[I, O]) Execute(ctx *WorkflowContext, input I, opts ...ExecuteOption) (O, error) + func (a *Activity[I, O]) HasCompensation() bool + func (a *Activity[I, O]) Name() string + type ActivityOption func(*Activity[I, O]) + func WithCompensation[I, O any](fn func(ctx context.Context, input I) error) ActivityOption[I, O] + func WithRetryPolicy[I, O any](policy *retry.Policy) ActivityOption[I, O] + func WithTimeout[I, O any](d time.Duration) ActivityOption[I, O] + func WithTransactional[I, O any](transactional bool) ActivityOption[I, O] + type App struct + func NewApp(opts ...Option) *App + func (a *App) FindInstances(ctx context.Context, inputFilters map[string]any) ([]*storage.WorkflowInstance, error) + func (a *App) FindInstancesWithOptions(ctx context.Context, opts storage.ListInstancesOptions) ([]*storage.WorkflowInstance, error) + func (a *App) GetInstance(ctx context.Context, instanceID string) (*storage.WorkflowInstance, error) + func (a *App) Handler() http.Handler + func (a *App) ListenAndServe(addr string) error + func (a *App) Shutdown(ctx context.Context) error + func (a *App) Start(ctx context.Context) error + func (a *App) Storage() storage.Storage + func (a *App) WorkerID() string + type ChannelMessageTimeoutError struct + ChannelName string + InstanceID string + func (e *ChannelMessageTimeoutError) Error() string + type ChannelMode = storage.ChannelMode + const ModeBroadcast + const ModeCompeting + type CloudEvent struct + Data json.RawMessage + DataSchema string + Extensions map[string]any + ID string + Source string + SpecVersion string + Subject string + Time *time.Time + Type string + func (e *CloudEvent) UnmarshalJSON(data []byte) error + type CompensationExecutor func(ctx context.Context, input []byte) error + func GetCompensationExecutor(activityName string) (CompensationExecutor, bool) + type EventTimeoutError struct + EventType string + InstanceID string + func (e *EventTimeoutError) Error() string + type ExecuteOption func(*executeOptions) + func WithActivityID(id string) ExecuteOption + type LockAcquisitionError struct + InstanceID string + Reason string + WorkerID string + func (e *LockAcquisitionError) Error() string + type Option func(*appConfig) + func WithAutoMigrate(enabled bool) Option + func WithBrokerURL(url string) Option + func WithChannelCleanupInterval(d time.Duration) Option + func WithChannelMessageRetention(d time.Duration) Option + func WithDatabase(url string) Option + func WithEventTimeoutInterval(d time.Duration) Option + func WithHooks(h hooks.WorkflowHooks) Option + func WithListenNotify(enabled *bool) Option + func WithMaxConcurrentMessages(n int) Option + func WithMaxConcurrentResumptions(n int) Option + func WithMaxConcurrentTimers(n int) Option + func WithMaxMessagesPerBatch(n int) Option + func WithMaxTimersPerBatch(n int) Option + func WithMaxWorkflowsPerBatch(n int) Option + func WithMessageCheckInterval(d time.Duration) Option + func WithNotifyReconnectDelay(d time.Duration) Option + func WithOutbox(enabled bool) Option + func WithOutboxBatchSize(size int) Option + func WithOutboxInterval(d time.Duration) Option + func WithRecurCheckInterval(d time.Duration) Option + func WithServiceName(name string) Option + func WithShutdownTimeout(d time.Duration) Option + func WithSingletonChannelCleanup(enabled bool) Option + func WithSingletonStaleLockCleanup(enabled bool) Option + func WithStaleLockInterval(d time.Duration) Option + func WithStaleLockTimeout(d time.Duration) Option + func WithTimerCheckInterval(d time.Duration) Option + func WithWorkerID(id string) Option + func WithWorkflowResumptionInterval(d time.Duration) Option + type PublishOption func(*publishOptions) + func WithMetadata(metadata map[string]any) PublishOption + type ReceiveOption func(*receiveOptions) + func WithReceiveTimeout(d time.Duration) ReceiveOption + type ReceivedEvent struct + Data T + Extensions map[string]any + ID string + Source string + SpecVersion string + Time *time.Time + Type string + func WaitEvent[T any](ctx *WorkflowContext, eventType string, opts ...WaitEventOption) (*ReceivedEvent[T], error) + type ReceivedMessage struct + ChannelName string + CreatedAt time.Time + Data T + ID int64 + Metadata map[string]any + SenderInstanceID string + func Receive[T any](ctx *WorkflowContext, channelName string, opts ...ReceiveOption) (*ReceivedMessage[T], error) + type RetryExhaustedError struct + ActivityName string + Attempts int + LastErr error + func (e *RetryExhaustedError) Error() string + func (e *RetryExhaustedError) Unwrap() error + type SleepOption func(*sleepOptions) + func WithSleepID(id string) SleepOption + type StartOption func(*startOptions) + func WithInstanceID(id string) StartOption + type SuspendSignal = replay.SuspendSignal + type SuspendType = replay.SuspendType + type TerminalError struct + Err error + func NewTerminalError(err error) *TerminalError + func NewTerminalErrorf(format string, args ...any) *TerminalError + func (e *TerminalError) Error() string + func (e *TerminalError) Unwrap() error + type WaitEventOption func(*waitEventOptions) + func WithEventTimeout(d time.Duration) WaitEventOption + type Workflow interface + Execute func(ctx *WorkflowContext, input I) (O, error) + Name func() string + func GetWorkflow[I, O any](name string) Workflow[I, O] + type WorkflowCancelledError struct + InstanceID string + Reason string + func (e *WorkflowCancelledError) Error() string + type WorkflowContext struct + func GetWorkflowContext(ctx context.Context) *WorkflowContext + func NewWorkflowContext(ctx context.Context, instanceID, workerID, workflowName string, ...) *WorkflowContext + func NewWorkflowContextFromExecution(execCtx *replay.ExecutionContext) *WorkflowContext + func (c *WorkflowContext) Cancel() + func (c *WorkflowContext) Context() context.Context + func (c *WorkflowContext) Done() <-chan struct{} + func (c *WorkflowContext) Err() error + func (c *WorkflowContext) GenerateActivityID(functionName string) string + func (c *WorkflowContext) GetCachedResult(activityID string) (any, bool) + func (c *WorkflowContext) GetCachedResultRaw(activityID string) ([]byte, bool) + func (c *WorkflowContext) Hooks() hooks.WorkflowHooks + func (c *WorkflowContext) InstanceID() string + func (c *WorkflowContext) IsReplaying() bool + func (c *WorkflowContext) RecordActivityID(activityID string) + func (c *WorkflowContext) RecordActivityResult(activityID string, result any, err error) error + func (c *WorkflowContext) Session() storage.Executor + func (c *WorkflowContext) SetCachedResult(activityID string, result any) + func (c *WorkflowContext) Storage() storage.Storage + func (c *WorkflowContext) WithContext(ctx context.Context) *WorkflowContext + func (c *WorkflowContext) WorkerID() string + func (c *WorkflowContext) WorkflowName() string + type WorkflowFunc struct + func DefineWorkflow[I, O any](name string, fn func(ctx *WorkflowContext, input I) (O, error)) *WorkflowFunc[I, O] + func (w *WorkflowFunc[I, O]) Execute(ctx *WorkflowContext, input I) (O, error) + func (w *WorkflowFunc[I, O]) Name() string + type WorkflowNotFoundError struct + InstanceID string + func (e *WorkflowNotFoundError) Error() string + type WorkflowOption func(*workflowOptions) + func WithEventHandler(enabled bool) WorkflowOption + type WorkflowResult struct + Error error + InstanceID string + Output O + Status string + func GetWorkflowResult[O any](ctx context.Context, app *App, instanceID string) (*WorkflowResult[O], error)