Documentation
¶
Overview ¶
Package romancy provides a durable execution framework for Go.
Package romancy provides a durable execution framework for Go.
Index ¶
- Constants
- Variables
- func CancelWorkflow(ctx context.Context, app *App, instanceID, reason string) error
- func ContextWithWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) context.Context
- func EmbeddedMigrationsFS() fs.FS
- func GetGroupMembers(ctx context.Context, s storage.Storage, groupName string) ([]string, error)
- func IsTerminalError(err error) bool
- func JoinGroup(ctx *WorkflowContext, groupName string) error
- func LeaveGroup(ctx *WorkflowContext, groupName string) error
- func Publish[T any](ctx *WorkflowContext, channelName string, data T, opts ...PublishOption) error
- func Recur[T any](ctx *WorkflowContext, input T) (T, error)
- func RegisterActivity[I, O any](activity *Activity[I, O])
- func RegisterWorkflow[I, O any](app *App, workflow Workflow[I, O], opts ...WorkflowOption)
- func SendEvent[T any](ctx *WorkflowContext, eventType, source string, data T) error
- func SendEventTransactional[T any](ctx *WorkflowContext, eventType, source string, data T, ...) error
- func SendTo[T any](ctx *WorkflowContext, targetInstanceID, channelName string, data T, ...) error
- func Sleep(ctx *WorkflowContext, duration time.Duration, opts ...SleepOption) error
- func SleepUntil(ctx *WorkflowContext, t time.Time, opts ...SleepOption) error
- func StartWorkflow[I, O any](ctx context.Context, app *App, workflow Workflow[I, O], input I, ...) (string, error)
- func Subscribe(ctx *WorkflowContext, channelName string, mode ChannelMode) error
- func Unsubscribe(ctx *WorkflowContext, channelName string) error
- type Activity
- type ActivityOption
- func WithCompensation[I, O any](fn func(ctx context.Context, input I) error) ActivityOption[I, O]
- func WithRetryPolicy[I, O any](policy *retry.Policy) ActivityOption[I, O]
- func WithTimeout[I, O any](d time.Duration) ActivityOption[I, O]
- func WithTransactional[I, O any](transactional bool) ActivityOption[I, O]
- type App
- func (a *App) FindInstances(ctx context.Context, inputFilters map[string]any) ([]*storage.WorkflowInstance, error)
- func (a *App) FindInstancesWithOptions(ctx context.Context, opts storage.ListInstancesOptions) ([]*storage.WorkflowInstance, error)
- func (a *App) GetInstance(ctx context.Context, instanceID string) (*storage.WorkflowInstance, error)
- func (a *App) Handler() http.Handler
- func (a *App) ListenAndServe(addr string) error
- func (a *App) Shutdown(ctx context.Context) error
- func (a *App) Start(ctx context.Context) error
- func (a *App) Storage() storage.Storage
- func (a *App) WorkerID() string
- type ChannelMessageTimeoutError
- type ChannelMode
- type ChannelModeConflictError
- type CloudEvent
- type CompensationExecutor
- type EventTimeoutError
- type ExecuteOption
- type LockAcquisitionError
- type Option
- func WithAutoMigrate(enabled bool) Option
- func WithBrokerURL(url string) Option
- func WithChannelCleanupInterval(d time.Duration) Option
- func WithChannelMessageRetention(d time.Duration) Option
- func WithDatabase(url string) Option
- func WithEventTimeoutInterval(d time.Duration) Option
- func WithHooks(h hooks.WorkflowHooks) Option
- func WithLeaderHeartbeatInterval(d time.Duration) Option
- func WithLeaderLeaseDuration(d time.Duration) Option
- func WithListenNotify(enabled *bool) Option
- func WithMaxConcurrentMessages(n int) Option
- func WithMaxConcurrentResumptions(n int) Option
- func WithMaxConcurrentTimers(n int) Option
- func WithMaxMessagesPerBatch(n int) Option
- func WithMaxTimersPerBatch(n int) Option
- func WithMaxWorkflowsPerBatch(n int) Option
- func WithMessageCheckInterval(d time.Duration) Option
- func WithMigrationsFS(migrationsFS fs.FS) Option
- func WithNotifyReconnectDelay(d time.Duration) Option
- func WithOutbox(enabled bool) Option
- func WithOutboxBatchSize(size int) Option
- func WithOutboxInterval(d time.Duration) Option
- func WithRecurCheckInterval(d time.Duration) Option
- func WithServiceName(name string) Option
- func WithShutdownTimeout(d time.Duration) Option
- func WithStaleLockInterval(d time.Duration) Option
- func WithStaleLockTimeout(d time.Duration) Option
- func WithTimerCheckInterval(d time.Duration) Option
- func WithWorkerID(id string) Option
- func WithWorkflowResumptionInterval(d time.Duration) Option
- type PublishOption
- type ReceiveOption
- type ReceivedEvent
- type ReceivedMessage
- type RetryExhaustedError
- type SleepOption
- type StartOption
- type SuspendSignal
- type SuspendType
- type TerminalError
- type WaitEventOption
- type Workflow
- type WorkflowCancelledError
- type WorkflowContext
- func (c *WorkflowContext) App() *App
- func (c *WorkflowContext) Cancel()
- func (c *WorkflowContext) Context() context.Context
- func (c *WorkflowContext) Done() <-chan struct{}
- func (c *WorkflowContext) Err() error
- func (c *WorkflowContext) GenerateActivityID(functionName string) string
- func (c *WorkflowContext) GetCachedResult(activityID string) (any, bool)
- func (c *WorkflowContext) GetCachedResultRaw(activityID string) ([]byte, bool)
- func (c *WorkflowContext) Hooks() hooks.WorkflowHooks
- func (c *WorkflowContext) InstanceID() string
- func (c *WorkflowContext) IsReplaying() bool
- func (c *WorkflowContext) RecordActivityID(activityID string)
- func (c *WorkflowContext) RecordActivityResult(activityID string, result any, err error) error
- func (c *WorkflowContext) Session() storage.Executor
- func (c *WorkflowContext) SetApp(app *App)
- func (c *WorkflowContext) SetCachedResult(activityID string, result any)
- func (c *WorkflowContext) Storage() storage.Storage
- func (c *WorkflowContext) WithContext(ctx context.Context) *WorkflowContext
- func (c *WorkflowContext) WorkerID() string
- func (c *WorkflowContext) WorkflowName() string
- type WorkflowFunc
- type WorkflowNotFoundError
- type WorkflowOption
- type WorkflowResult
Constants ¶
const ( // SuspendForTimer indicates the workflow is waiting for a timer to expire. SuspendForTimer = replay.SuspendForTimer // SuspendForChannelMessage indicates the workflow is waiting for a channel message (via Receive or WaitEvent). SuspendForChannelMessage = replay.SuspendForChannelMessage // SuspendForRecur indicates the workflow is recursing with new input. SuspendForRecur = replay.SuspendForRecur )
Variables ¶
var ( // IsSuspendSignal returns true if the error is a SuspendSignal. IsSuspendSignal = replay.IsSuspendSignal // AsSuspendSignal extracts the SuspendSignal from an error if present. AsSuspendSignal = replay.AsSuspendSignal // NewTimerSuspend creates a SuspendSignal for timer waiting. NewTimerSuspend = replay.NewTimerSuspend // NewChannelMessageSuspend creates a SuspendSignal for channel message waiting (via Receive or WaitEvent). NewChannelMessageSuspend = replay.NewChannelMessageSuspend // NewRecurSuspend creates a SuspendSignal for workflow recursion. NewRecurSuspend = replay.NewRecurSuspend )
var ErrActivityIDConflict = errors.New("activity ID conflict: duplicate activity ID in workflow")
ErrActivityIDConflict indicates duplicate activity IDs in a workflow.
var ErrChannelNotSubscribed = errors.New("not subscribed to channel")
ErrChannelNotSubscribed indicates an operation on a channel without subscription.
var ErrDeterminismViolation = errors.New("determinism violation during replay")
ErrDeterminismViolation indicates non-deterministic behavior during replay.
var ErrGroupNotJoined = errors.New("not a member of group")
ErrGroupNotJoined indicates an operation on a group without membership.
var ErrInvalidWorkflowState = errors.New("invalid workflow state")
ErrInvalidWorkflowState indicates an invalid workflow state transition.
var ErrWorkflowAlreadyCancelled = errors.New("workflow already cancelled")
ErrWorkflowAlreadyCancelled indicates an operation on a cancelled workflow.
var ErrWorkflowAlreadyCompleted = errors.New("workflow already completed")
ErrWorkflowAlreadyCompleted indicates an operation on a completed workflow.
var ErrWorkflowAlreadyFailed = errors.New("workflow already failed")
ErrWorkflowAlreadyFailed indicates an operation on a failed workflow.
var ErrWorkflowNotCancellable = errors.New("workflow cannot be cancelled")
ErrWorkflowNotCancellable indicates that the workflow cannot be cancelled. This happens when the workflow is already completed, cancelled, failed, or does not exist.
Functions ¶
func CancelWorkflow ¶
CancelWorkflow cancels a running workflow instance.
func ContextWithWorkflowContext ¶
func ContextWithWorkflowContext(ctx context.Context, wfCtx *WorkflowContext) context.Context
ContextWithWorkflowContext returns a new context with the WorkflowContext embedded. This is used internally to pass WorkflowContext to activities.
func EmbeddedMigrationsFS ¶ added in v0.3.0
EmbeddedMigrationsFS returns a filesystem rooted at schema/db/migrations for use with the migrations package.
The returned FS contains subdirectories for each database type:
- sqlite/
- postgresql/
- mysql/
func GetGroupMembers ¶
GetGroupMembers returns the instance IDs of all workflows in a group. This is useful for broadcasting messages to group members.
func IsTerminalError ¶
IsTerminalError returns true if the error is or wraps a TerminalError.
func JoinGroup ¶
func JoinGroup(ctx *WorkflowContext, groupName string) error
JoinGroup adds the workflow to a named group. Groups are useful for scenarios where you need to send messages to a set of related workflows.
Groups persist across workflow restarts and members are automatically removed when the workflow completes.
func LeaveGroup ¶
func LeaveGroup(ctx *WorkflowContext, groupName string) error
LeaveGroup removes the workflow from a named group.
func Publish ¶
func Publish[T any](ctx *WorkflowContext, channelName string, data T, opts ...PublishOption) error
Publish sends a message to all subscribers of a channel. This is an activity that persists the message to storage.
The message will be delivered to: - Broadcast subscribers: All receive the message - Competing subscribers: Exactly one receives the message
func Recur ¶
func Recur[T any](ctx *WorkflowContext, input T) (T, error)
Recur implements Erlang-style tail recursion for workflows. It archives the current workflow's history and starts a new instance with the provided input, maintaining the same instance ID.
This is useful for long-running workflows that need to periodically "reset" their history to prevent unbounded growth.
The workflow will: 1. Archive current history to workflow_history_archive 2. Clean up all subscriptions (events, timers, channels, groups) 3. Mark current instance as "recurred" 4. Create a new instance with continued_from set to current instance 5. Start executing with the new input
Example:
workflow := romancy.DefineWorkflow("counter", func(ctx *romancy.WorkflowContext, input CounterInput) (CounterResult, error) {
// Process batch
newCount := input.Count + 1
if newCount < 1000 {
// Continue with new input (tail recursion)
return romancy.Recur(ctx, CounterInput{Count: newCount})
}
return CounterResult{FinalCount: newCount}, nil
})
func RegisterActivity ¶
RegisterActivity registers an activity's compensation function globally. This is called automatically when an activity with compensation is created.
func RegisterWorkflow ¶
func RegisterWorkflow[I, O any](app *App, workflow Workflow[I, O], opts ...WorkflowOption)
RegisterWorkflow registers a workflow with the application. The workflow can later be started by name or by type.
func SendEvent ¶
func SendEvent[T any](ctx *WorkflowContext, eventType, source string, data T) error
SendEvent sends an event through the transactional outbox. This is a convenience wrapper for outbox.SendEventTransactional.
The event is stored in the database within the current transaction (if any) and will be delivered asynchronously by the outbox relayer.
This ensures that the event is only sent if the activity/transaction commits, providing exactly-once delivery guarantees when combined with idempotent consumers.
Example:
activity := romancy.DefineActivity("process_order", func(ctx context.Context, input OrderInput) (OrderResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
if wfCtx == nil {
return OrderResult{}, fmt.Errorf("workflow context not available")
}
err := romancy.SendEvent(wfCtx, "order.created", "order-service", map[string]any{
"order_id": input.OrderID,
"amount": input.Amount,
})
if err != nil {
return OrderResult{}, err
}
return OrderResult{Status: "created"}, nil
})
func SendEventTransactional ¶
func SendEventTransactional[T any](ctx *WorkflowContext, eventType, source string, data T, opts ...outbox.SendEventOption) error
SendEventTransactional is an alias for outbox.SendEventTransactional. Use this when you need more control over event options.
Example:
err := romancy.SendEventTransactional(wfCtx, "order.created", "order-service", orderData,
outbox.WithEventID("custom-event-id"),
outbox.WithContentType("application/json"),
)
func SendTo ¶
func SendTo[T any](ctx *WorkflowContext, targetInstanceID, channelName string, data T, opts ...PublishOption) error
SendTo sends a direct message to a specific workflow instance. The target instance must be subscribed to the channel. Uses dynamic channel names (Edda approach): publishes to "channel:instance_id" so only the target instance receives the message.
func Sleep ¶
func Sleep(ctx *WorkflowContext, duration time.Duration, opts ...SleepOption) error
Sleep suspends the workflow for a specified duration. The workflow will resume after the duration has elapsed.
During replay, if the timer already fired, this returns immediately. Otherwise, it returns a SuspendSignal that signals the engine to: 1. Register a timer subscription 2. Update workflow status to "waiting_for_timer" 3. Release the workflow lock
When the timer expires, the workflow will be resumed.
func SleepUntil ¶
func SleepUntil(ctx *WorkflowContext, t time.Time, opts ...SleepOption) error
SleepUntil suspends the workflow until a specific time.
func StartWorkflow ¶
func StartWorkflow[I, O any]( ctx context.Context, app *App, workflow Workflow[I, O], input I, opts ...StartOption, ) (string, error)
StartWorkflow starts a new workflow instance. Returns the instance ID.
func Subscribe ¶
func Subscribe(ctx *WorkflowContext, channelName string, mode ChannelMode) error
Subscribe registers the workflow to receive messages from a channel. The mode determines how messages are delivered: - ModeBroadcast: All subscribers receive every message - ModeCompeting: Each message goes to exactly one subscriber - ModeDirect: Receives messages sent via SendTo to this instance
Subscriptions persist across workflow restarts and are automatically cleaned up when the workflow completes.
Once a channel has subscribers, its mode is locked. Attempting to subscribe with a different mode will return a ChannelModeConflictError.
func Unsubscribe ¶
func Unsubscribe(ctx *WorkflowContext, channelName string) error
Unsubscribe removes the workflow's subscription to a channel.
Types ¶
type Activity ¶
type Activity[I, O any] struct { // contains filtered or unexported fields }
Activity represents a single unit of work within a workflow. Activities are the only way to perform I/O or side effects in workflows. I is the input type, O is the output type.
func DefineActivity ¶
func DefineActivity[I, O any]( name string, fn func(ctx context.Context, input I) (O, error), opts ...ActivityOption[I, O], ) *Activity[I, O]
DefineActivity creates a new activity. By default, activities are wrapped in a transaction for atomicity with history recording and outbox events.
func (*Activity[I, O]) Compensate ¶
Compensate executes the compensation function if defined.
func (*Activity[I, O]) Execute ¶
func (a *Activity[I, O]) Execute( ctx *WorkflowContext, input I, opts ...ExecuteOption, ) (O, error)
Execute runs the activity within a workflow context. If activityID is empty, it will be auto-generated.
When transactional=true (default), the activity execution, history recording, and outbox events are wrapped in a database transaction for atomicity.
func (*Activity[I, O]) HasCompensation ¶
HasCompensation returns true if the activity has a compensation function.
type ActivityOption ¶
ActivityOption configures an activity.
func WithCompensation ¶
func WithCompensation[I, O any](fn func(ctx context.Context, input I) error) ActivityOption[I, O]
WithCompensation sets the compensation function for the activity. This function will be called during saga rollback.
func WithRetryPolicy ¶
func WithRetryPolicy[I, O any](policy *retry.Policy) ActivityOption[I, O]
WithRetryPolicy sets the retry policy for the activity.
func WithTimeout ¶
func WithTimeout[I, O any](d time.Duration) ActivityOption[I, O]
WithTimeout sets the timeout for each activity execution attempt.
func WithTransactional ¶
func WithTransactional[I, O any](transactional bool) ActivityOption[I, O]
WithTransactional sets whether the activity execution should be wrapped in a database transaction. Default is true.
When transactional=true (default):
- Activity execution, history recording, and outbox events are atomic
- Rollback occurs on failure
- Use ctx.Storage() for database operations within the same transaction
When transactional=false:
- Useful for activities that call external APIs or don't need atomicity
- History and outbox events are still recorded, but not atomically
type App ¶
type App struct {
// contains filtered or unexported fields
}
App is the main entry point for Romancy. It manages workflow execution, storage, and background tasks.
func (*App) FindInstances ¶
func (a *App) FindInstances(ctx context.Context, inputFilters map[string]any) ([]*storage.WorkflowInstance, error)
FindInstances searches for workflow instances with input filters. This is a convenience method for searching workflows by their input data.
The inputFilters parameter maps JSON paths to expected values:
- "customer.id" -> "12345" matches input like {"customer": {"id": "12345"}}
- "status" -> "active" matches input like {"status": "active"}
Values are compared with exact match. Supported value types:
- string: Exact string match
- int/float64: Numeric comparison
- bool: Boolean comparison
- nil: Matches null values
Example:
instances, err := app.FindInstances(ctx, map[string]any{
"order.customer_id": "cust_123",
"order.status": "pending",
})
func (*App) FindInstancesWithOptions ¶
func (a *App) FindInstancesWithOptions(ctx context.Context, opts storage.ListInstancesOptions) ([]*storage.WorkflowInstance, error)
FindInstancesWithOptions searches for workflow instances with full options. Use this when you need pagination, status filters, or other advanced options in addition to input filters.
func (*App) GetInstance ¶
func (a *App) GetInstance(ctx context.Context, instanceID string) (*storage.WorkflowInstance, error)
GetInstance retrieves a workflow instance by ID.
func (*App) Handler ¶
Handler returns an http.Handler for CloudEvents and health endpoints. This allows integration with existing HTTP routers (gin, echo, chi, etc.).
Example with chi:
r := chi.NewRouter()
r.Mount("/romancy", app.Handler())
http.ListenAndServe(":8080", r)
Example with standard http.ServeMux:
mux := http.NewServeMux()
mux.Handle("/api/workflows/", app.Handler())
http.ListenAndServe(":8080", mux)
func (*App) ListenAndServe ¶
ListenAndServe starts the HTTP server for CloudEvents. For integration with existing HTTP routers, use Handler() instead.
type ChannelMessageTimeoutError ¶
ChannelMessageTimeoutError indicates that waiting for a channel message timed out.
func (*ChannelMessageTimeoutError) Error ¶
func (e *ChannelMessageTimeoutError) Error() string
type ChannelMode ¶
type ChannelMode = storage.ChannelMode
ChannelMode defines the message delivery mode for a channel subscription.
const ( // ModeBroadcast delivers messages to all subscribers. // Each subscriber receives every message. ModeBroadcast ChannelMode = storage.ChannelModeBroadcast // ModeCompeting delivers each message to exactly one subscriber. // Multiple subscribers compete for messages (work queue pattern). ModeCompeting ChannelMode = storage.ChannelModeCompeting // ModeDirect receives messages sent via SendTo to this workflow instance. // This is syntactic sugar that subscribes to "channel:instanceID" internally. // Use with SendTo for point-to-point messaging. // // Example: // // Receiver workflow // romancy.Subscribe(ctx, "requests", romancy.ModeDirect) // msg, err := romancy.Receive[Request](ctx, "requests") // // // Sender workflow // romancy.SendTo(ctx, targetInstanceID, "requests", request) ModeDirect ChannelMode = storage.ChannelModeDirect )
type ChannelModeConflictError ¶ added in v0.6.0
ChannelModeConflictError indicates subscribing with a different mode than the channel's established mode.
func (*ChannelModeConflictError) Error ¶ added in v0.6.0
func (e *ChannelModeConflictError) Error() string
type CloudEvent ¶
type CloudEvent struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
SpecVersion string `json:"specversion"`
Time *time.Time `json:"time,omitempty"`
DataSchema string `json:"dataschema,omitempty"`
Subject string `json:"subject,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Extensions map[string]any `json:"-"` // CloudEvents extension attributes
}
CloudEvent represents a CloudEvents v1.0 event.
func (*CloudEvent) UnmarshalJSON ¶
func (e *CloudEvent) UnmarshalJSON(data []byte) error
UnmarshalJSON implements custom JSON unmarshaling to capture extension attributes.
type CompensationExecutor ¶
CompensationExecutor is a function that executes a compensation. It takes JSON-encoded input and returns an error.
func GetCompensationExecutor ¶
func GetCompensationExecutor(activityName string) (CompensationExecutor, bool)
GetCompensationExecutor returns the compensation executor for an activity.
type EventTimeoutError ¶
EventTimeoutError indicates that waiting for an event timed out.
func (*EventTimeoutError) Error ¶
func (e *EventTimeoutError) Error() string
type ExecuteOption ¶
type ExecuteOption func(*executeOptions)
ExecuteOption configures activity execution.
func WithActivityID ¶
func WithActivityID(id string) ExecuteOption
WithActivityID specifies a manual activity ID. Required for concurrent activity execution to maintain determinism.
type LockAcquisitionError ¶
LockAcquisitionError indicates failure to acquire a workflow lock.
func (*LockAcquisitionError) Error ¶
func (e *LockAcquisitionError) Error() string
type Option ¶
type Option func(*appConfig)
Option configures an App.
func WithAutoMigrate ¶
WithAutoMigrate enables or disables automatic database migrations on startup.
When enabled (default), romancy will automatically apply pending dbmate-compatible migrations from the embedded schema/db/migrations/ directory during App.Start().
This is compatible with the dbmate CLI tool and uses the same schema_migrations table for tracking applied migrations.
Set to false if you prefer to manage migrations manually using dbmate CLI:
dbmate -d schema/db/migrations/sqlite up
func WithBrokerURL ¶
WithBrokerURL sets the CloudEvents broker URL for outbox event publishing. Example: "http://broker-ingress.knative-eventing.svc.cluster.local/default/default"
func WithChannelCleanupInterval ¶
WithChannelCleanupInterval sets the interval for cleaning up old channel messages.
func WithChannelMessageRetention ¶
WithChannelMessageRetention sets how long to keep channel messages before cleanup.
func WithDatabase ¶
WithDatabase sets the database connection URL. Supported formats:
- SQLite: "file:path/to/db.db" or "sqlite://path/to/db.db"
- PostgreSQL: "postgres://user:pass@host:port/dbname"
func WithEventTimeoutInterval ¶
WithEventTimeoutInterval sets the interval for checking event timeouts.
func WithHooks ¶
func WithHooks(h hooks.WorkflowHooks) Option
WithHooks sets the workflow lifecycle hooks.
func WithLeaderHeartbeatInterval ¶ added in v0.3.0
WithLeaderHeartbeatInterval sets the interval for leader election heartbeat. The leader will renew its lease at this interval. Default: 15 seconds.
func WithLeaderLeaseDuration ¶ added in v0.3.0
WithLeaderLeaseDuration sets the duration for which a leader holds its lease. Should be at least 3x the heartbeat interval to allow for missed heartbeats. Default: 45 seconds.
func WithListenNotify ¶
WithListenNotify configures PostgreSQL LISTEN/NOTIFY usage. - nil (default): auto-detect based on database URL (enabled for PostgreSQL) - true: force enable (fails if not PostgreSQL) - false: force disable (use polling only)
func WithMaxConcurrentMessages ¶
WithMaxConcurrentMessages sets the maximum number of concurrent message handlers. Default: 10.
func WithMaxConcurrentResumptions ¶
WithMaxConcurrentResumptions sets the maximum number of concurrent workflow resumptions. This limits goroutine spawning in background tasks to prevent resource exhaustion. Default: 10.
func WithMaxConcurrentTimers ¶
WithMaxConcurrentTimers sets the maximum number of concurrent timer handlers. Default: 10.
func WithMaxMessagesPerBatch ¶
WithMaxMessagesPerBatch sets the maximum number of channel messages to process per polling cycle. Default: 100.
func WithMaxTimersPerBatch ¶
WithMaxTimersPerBatch sets the maximum number of timers to process per polling cycle. Default: 100.
func WithMaxWorkflowsPerBatch ¶
WithMaxWorkflowsPerBatch sets the maximum number of workflows to process per polling cycle. Default: 100.
func WithMessageCheckInterval ¶
WithMessageCheckInterval sets the interval for checking channel message subscriptions.
func WithMigrationsFS ¶ added in v0.3.0
WithMigrationsFS sets a custom filesystem for database migrations.
By default, romancy uses embedded migrations from schema/db/migrations/. Use this option to provide custom migrations from a different source.
The filesystem should contain subdirectories for each database type:
- sqlite/
- postgresql/
- mysql/
Each subdirectory should contain .sql files in dbmate format with -- migrate:up and -- migrate:down sections.
func WithNotifyReconnectDelay ¶
WithNotifyReconnectDelay sets the delay before reconnecting after a LISTEN/NOTIFY connection failure. Default: 60 seconds.
func WithOutbox ¶
WithOutbox enables the transactional outbox pattern.
func WithOutboxBatchSize ¶
WithOutboxBatchSize sets the batch size for outbox processing.
func WithOutboxInterval ¶
WithOutboxInterval sets the interval for the outbox relayer.
func WithRecurCheckInterval ¶
WithRecurCheckInterval sets the interval for checking recurred workflows.
func WithServiceName ¶
WithServiceName sets the service name for identification.
func WithShutdownTimeout ¶
WithShutdownTimeout sets the timeout for graceful shutdown.
func WithStaleLockInterval ¶
WithStaleLockInterval sets the interval for stale lock cleanup.
func WithStaleLockTimeout ¶
WithStaleLockTimeout sets the timeout after which a lock is considered stale.
func WithTimerCheckInterval ¶
WithTimerCheckInterval sets the interval for checking expired timers.
func WithWorkerID ¶
WithWorkerID sets a custom worker ID. If not set, a UUID will be generated.
func WithWorkflowResumptionInterval ¶
WithWorkflowResumptionInterval sets the interval for the workflow resumption task. This task finds workflows with status='running' that don't have an active lock and resumes them. This is essential for load balancing in multi-worker environments. Default: 1 second (same as Edda).
type PublishOption ¶
type PublishOption func(*publishOptions)
PublishOption configures publish behavior.
func WithMetadata ¶
func WithMetadata(metadata map[string]any) PublishOption
WithMetadata attaches metadata to the published message.
type ReceiveOption ¶
type ReceiveOption func(*receiveOptions)
ReceiveOption configures channel receive behavior.
func WithReceiveTimeout ¶
func WithReceiveTimeout(d time.Duration) ReceiveOption
WithReceiveTimeout sets a timeout for waiting for a message.
type ReceivedEvent ¶
type ReceivedEvent[T any] struct { // CloudEvents metadata ID string `json:"id"` Type string `json:"type"` Source string `json:"source"` SpecVersion string `json:"specversion"` Time *time.Time `json:"time,omitempty"` Extensions map[string]any `json:"extensions,omitempty"` // Event data Data T `json:"data"` }
ReceivedEvent represents an event received by a workflow.
func WaitEvent ¶
func WaitEvent[T any](ctx *WorkflowContext, eventType string, opts ...WaitEventOption) (*ReceivedEvent[T], error)
WaitEvent suspends the workflow until an event of the specified type is received. The event data will be deserialized into type T.
Internally, this uses the channel messaging system. The event_type is used as the channel name, and the workflow subscribes in broadcast mode.
During replay, if the event was already received, this returns immediately. Otherwise, the workflow is suspended until an event arrives on the channel.
When the event arrives (published to the channel), the workflow will be resumed.
type ReceivedMessage ¶
type ReceivedMessage[T any] struct { // Message ID ID int64 `json:"id"` // Channel name ChannelName string `json:"channel_name"` // Message data Data T `json:"data"` // Metadata (optional) Metadata map[string]any `json:"metadata,omitempty"` // Sender instance ID (if sent via SendTo) SenderInstanceID string `json:"sender_instance_id,omitempty"` // When the message was created CreatedAt time.Time `json:"created_at"` }
ReceivedMessage represents a message received from a channel.
func Receive ¶
func Receive[T any](ctx *WorkflowContext, channelName string, opts ...ReceiveOption) (*ReceivedMessage[T], error)
Receive waits for and receives a message from a channel. The workflow must be subscribed to the channel before calling Receive.
This is a blocking operation - the workflow will be suspended until a message is available or the optional timeout expires.
During replay, if the message was already received, this returns immediately.
For channels subscribed with ModeDirect, this automatically receives from the direct channel (channel:instanceID) without needing to specify it.
type RetryExhaustedError ¶
RetryExhaustedError indicates that all retry attempts have been exhausted.
func (*RetryExhaustedError) Error ¶
func (e *RetryExhaustedError) Error() string
func (*RetryExhaustedError) Unwrap ¶
func (e *RetryExhaustedError) Unwrap() error
type SleepOption ¶
type SleepOption func(*sleepOptions)
SleepOption configures sleep behavior.
func WithSleepID ¶
func WithSleepID(id string) SleepOption
WithSleepID sets a custom timer ID for the sleep. This is useful for identifying timers in logs and for deterministic replay.
type StartOption ¶
type StartOption func(*startOptions)
StartOption configures workflow start options.
func WithInstanceID ¶
func WithInstanceID(id string) StartOption
WithInstanceID specifies a custom instance ID. If not provided, a UUID will be generated.
type SuspendSignal ¶
type SuspendSignal = replay.SuspendSignal
SuspendSignal is returned when a workflow needs to suspend execution. It implements the error interface for compatibility with Go's error handling, but it is NOT an error - it's a control flow signal.
type SuspendType ¶
type SuspendType = replay.SuspendType
SuspendType represents the type of workflow suspension.
type TerminalError ¶
type TerminalError struct {
Err error
}
TerminalError wraps an error to indicate it should not be retried. When an activity returns a TerminalError, the workflow will fail without attempting any retries.
func NewTerminalError ¶
func NewTerminalError(err error) *TerminalError
NewTerminalError creates a new TerminalError wrapping the given error.
func NewTerminalErrorf ¶
func NewTerminalErrorf(format string, args ...any) *TerminalError
NewTerminalErrorf creates a new TerminalError with a formatted message.
func (*TerminalError) Error ¶
func (e *TerminalError) Error() string
func (*TerminalError) Unwrap ¶
func (e *TerminalError) Unwrap() error
type WaitEventOption ¶
type WaitEventOption func(*waitEventOptions)
WaitEventOption configures event waiting behavior.
func WithEventTimeout ¶
func WithEventTimeout(d time.Duration) WaitEventOption
WithEventTimeout sets a timeout for waiting for an event.
type Workflow ¶
type Workflow[I, O any] interface { // Name returns the unique name of the workflow. Name() string // Execute runs the workflow logic. Execute(ctx *WorkflowContext, input I) (O, error) }
Workflow defines the interface for a durable workflow. I is the input type, O is the output type.
func GetWorkflow ¶
GetWorkflow retrieves a registered workflow by name. Returns nil if not found.
type WorkflowCancelledError ¶
WorkflowCancelledError indicates that a workflow has been cancelled.
func (*WorkflowCancelledError) Error ¶
func (e *WorkflowCancelledError) Error() string
type WorkflowContext ¶
type WorkflowContext struct {
// contains filtered or unexported fields
}
WorkflowContext provides context for workflow execution. It manages activity ID generation, replay state, and history caching.
func GetWorkflowContext ¶
func GetWorkflowContext(ctx context.Context) *WorkflowContext
GetWorkflowContext retrieves the WorkflowContext from a context.Context. This is useful in activities to access workflow-level features like Session() for database operations within the same transaction.
Returns nil if the context does not contain a WorkflowContext.
Example:
activity := romancy.DefineActivity("process_order", func(ctx context.Context, input OrderInput) (OrderResult, error) {
wfCtx := romancy.GetWorkflowContext(ctx)
if wfCtx == nil {
return OrderResult{}, fmt.Errorf("workflow context not available")
}
session := wfCtx.Session()
// Use session for database operations...
return OrderResult{}, nil
})
func NewWorkflowContext ¶
func NewWorkflowContext( ctx context.Context, instanceID, workerID, workflowName string, historyCache map[string]any, isReplaying bool, ) *WorkflowContext
NewWorkflowContext creates a new WorkflowContext.
func NewWorkflowContextFromExecution ¶
func NewWorkflowContextFromExecution(execCtx *replay.ExecutionContext) *WorkflowContext
NewWorkflowContextFromExecution creates a WorkflowContext from a replay ExecutionContext.
func (*WorkflowContext) App ¶ added in v0.2.0
func (c *WorkflowContext) App() *App
App returns the App reference for this workflow context. Returns nil if not set.
func (*WorkflowContext) Cancel ¶
func (c *WorkflowContext) Cancel()
Cancel cancels the workflow execution.
func (*WorkflowContext) Context ¶
func (c *WorkflowContext) Context() context.Context
Context returns the underlying context.Context.
func (*WorkflowContext) Done ¶
func (c *WorkflowContext) Done() <-chan struct{}
Done returns a channel that's closed when the context is cancelled.
func (*WorkflowContext) Err ¶
func (c *WorkflowContext) Err() error
Err returns any error from the context.
func (*WorkflowContext) GenerateActivityID ¶
func (c *WorkflowContext) GenerateActivityID(functionName string) string
GenerateActivityID generates a unique activity ID for the given function name. Format: {function_name}:{counter} This is used for deterministic replay - the same sequence of calls will always generate the same IDs.
func (*WorkflowContext) GetCachedResult ¶
func (c *WorkflowContext) GetCachedResult(activityID string) (any, bool)
GetCachedResult retrieves a cached result for the given activity ID. Returns the result and true if found, or nil and false if not cached.
func (*WorkflowContext) GetCachedResultRaw ¶
func (c *WorkflowContext) GetCachedResultRaw(activityID string) ([]byte, bool)
GetCachedResultRaw retrieves a cached result with raw JSON bytes. Returns (rawJSON, true) if found, (nil, false) if not found. Use this to avoid re-serialization when the raw JSON is needed.
func (*WorkflowContext) Hooks ¶
func (c *WorkflowContext) Hooks() hooks.WorkflowHooks
Hooks returns the workflow hooks for observability. Returns nil if hooks are not available (e.g., not using replay engine).
func (*WorkflowContext) InstanceID ¶
func (c *WorkflowContext) InstanceID() string
InstanceID returns the workflow instance ID.
func (*WorkflowContext) IsReplaying ¶
func (c *WorkflowContext) IsReplaying() bool
IsReplaying returns true if the workflow is being replayed.
func (*WorkflowContext) RecordActivityID ¶
func (c *WorkflowContext) RecordActivityID(activityID string)
RecordActivityID records that an activity ID has been used. This is used for tracking the current activity during execution.
func (*WorkflowContext) RecordActivityResult ¶
func (c *WorkflowContext) RecordActivityResult(activityID string, result any, err error) error
RecordActivityResult records an activity result to storage and caches it. This is called after activity execution when NOT replaying.
func (*WorkflowContext) Session ¶
func (c *WorkflowContext) Session() storage.Executor
Session returns the database executor for the current context. When called within a transactional activity, this returns the transaction, allowing you to execute custom SQL queries in the same transaction as the activity execution, history recording, and outbox events.
Returns nil if storage is not available.
Example:
activity := romancy.DefineActivity("process_order", func(ctx context.Context, input OrderInput) (OrderResult, error) {
// Get the database session (transaction if in transactional activity)
wfCtx := romancy.GetWorkflowContext(ctx)
session := wfCtx.Session()
if session != nil {
// Execute custom SQL in the same transaction
_, err := session.ExecContext(ctx, "INSERT INTO orders ...", input.OrderID)
if err != nil {
return OrderResult{}, err
}
}
return OrderResult{Status: "completed"}, nil
})
func (*WorkflowContext) SetApp ¶ added in v0.2.0
func (c *WorkflowContext) SetApp(app *App)
SetApp sets the App reference for this workflow context. This is typically called internally when creating the context.
func (*WorkflowContext) SetCachedResult ¶
func (c *WorkflowContext) SetCachedResult(activityID string, result any)
SetCachedResult stores a result in the history cache. This is typically called after activity execution to cache results locally.
func (*WorkflowContext) Storage ¶
func (c *WorkflowContext) Storage() storage.Storage
Storage returns the storage interface for advanced operations like compensation. Returns nil if not using the replay engine.
func (*WorkflowContext) WithContext ¶
func (c *WorkflowContext) WithContext(ctx context.Context) *WorkflowContext
WithContext returns a copy of the WorkflowContext with a new underlying context. This is useful for passing transaction contexts to storage operations.
func (*WorkflowContext) WorkerID ¶
func (c *WorkflowContext) WorkerID() string
WorkerID returns the worker ID that is executing this workflow.
func (*WorkflowContext) WorkflowName ¶
func (c *WorkflowContext) WorkflowName() string
WorkflowName returns the workflow name.
type WorkflowFunc ¶
type WorkflowFunc[I, O any] struct { // contains filtered or unexported fields }
WorkflowFunc is a convenience type for workflows defined as functions.
func DefineWorkflow ¶
func DefineWorkflow[I, O any](name string, fn func(ctx *WorkflowContext, input I) (O, error)) *WorkflowFunc[I, O]
DefineWorkflow creates a new workflow from a function.
func (*WorkflowFunc[I, O]) Execute ¶
func (w *WorkflowFunc[I, O]) Execute(ctx *WorkflowContext, input I) (O, error)
Execute runs the workflow function.
func (*WorkflowFunc[I, O]) Name ¶
func (w *WorkflowFunc[I, O]) Name() string
Name returns the workflow name.
type WorkflowNotFoundError ¶
type WorkflowNotFoundError struct {
InstanceID string
}
WorkflowNotFoundError indicates that a workflow instance was not found.
func (*WorkflowNotFoundError) Error ¶
func (e *WorkflowNotFoundError) Error() string
type WorkflowOption ¶
type WorkflowOption func(*workflowOptions)
WorkflowOption configures workflow registration.
func WithEventHandler ¶
func WithEventHandler(enabled bool) WorkflowOption
WithEventHandler marks the workflow as an event handler. When true, the workflow will be automatically started when a CloudEvent with a matching type is received.
type WorkflowResult ¶
WorkflowResult represents the result of a workflow execution.
func GetWorkflowResult ¶
func GetWorkflowResult[O any](ctx context.Context, app *App, instanceID string) (*WorkflowResult[O], error)
GetWorkflowResult retrieves the result of a workflow instance.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
crosstest
command
Package main provides a CLI tool for cross-language channel testing between Romancy (Go) and Edda (Python).
|
Package main provides a CLI tool for cross-language channel testing between Romancy (Go) and Edda (Python). |
|
romancy
command
Package main provides a CLI tool for interacting with Romancy workflows.
|
Package main provides a CLI tool for interacting with Romancy workflows. |
|
Package compensation provides saga compensation execution.
|
Package compensation provides saga compensation execution. |
|
examples
|
|
|
booking
command
Package main demonstrates human-in-the-loop workflow patterns.
|
Package main demonstrates human-in-the-loop workflow patterns. |
|
channel
command
Package main demonstrates channel-based message passing between workflows.
|
Package main demonstrates channel-based message passing between workflows. |
|
demo
command
Package main demonstrates a simple order processing workflow using Romancy.
|
Package main demonstrates a simple order processing workflow using Romancy. |
|
event_handler
command
Package main demonstrates event-driven workflow patterns.
|
Package main demonstrates event-driven workflow patterns. |
|
llm
command
Package main demonstrates LLM integration with Romancy workflows.
|
Package main demonstrates LLM integration with Romancy workflows. |
|
mcp
command
Package main demonstrates MCP (Model Context Protocol) integration with Romancy.
|
Package main demonstrates MCP (Model Context Protocol) integration with Romancy. |
|
retry
command
Package main demonstrates retry policies in activities.
|
Package main demonstrates retry policies in activities. |
|
saga
command
Package main demonstrates the Saga pattern with compensation.
|
Package main demonstrates the Saga pattern with compensation. |
|
simple
command
Package main demonstrates a simple workflow without events.
|
Package main demonstrates a simple workflow without events. |
|
timer
command
Package main demonstrates sleep/timer functionality in workflows.
|
Package main demonstrates sleep/timer functionality in workflows. |
|
Package hooks provides lifecycle hooks for workflow observability.
|
Package hooks provides lifecycle hooks for workflow observability. |
|
otel
Package otel provides OpenTelemetry integration for Romancy workflow hooks.
|
Package otel provides OpenTelemetry integration for Romancy workflow hooks. |
|
internal
|
|
|
migrations
Package migrations provides automatic dbmate-compatible migration support.
|
Package migrations provides automatic dbmate-compatible migration support. |
|
notify
Package notify provides PostgreSQL LISTEN/NOTIFY functionality for real-time notifications.
|
Package notify provides PostgreSQL LISTEN/NOTIFY functionality for real-time notifications. |
|
storage
Package storage provides the storage layer for Romancy.
|
Package storage provides the storage layer for Romancy. |
|
Package llm provides durable LLM integration for Romancy workflows.
|
Package llm provides durable LLM integration for Romancy workflows. |
|
Package mcp provides Model Context Protocol (MCP) integration for Romancy.
|
Package mcp provides Model Context Protocol (MCP) integration for Romancy. |
|
Package outbox provides transactional outbox pattern for reliable event delivery.
|
Package outbox provides transactional outbox pattern for reliable event delivery. |
|
Package retry provides retry policies for activities.
|
Package retry provides retry policies for activities. |