storage

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkflowNotCancellable = errors.New("workflow cannot be cancelled")

ErrWorkflowNotCancellable indicates that the workflow cannot be cancelled. This happens when the workflow is already completed, cancelled, failed, or does not exist.

Functions

func ValidateJSONPath

func ValidateJSONPath(path string) error

ValidateJSONPath validates a JSON path for safe use in SQL queries. Returns an error if the path contains potentially dangerous characters.

Types

type ArchivedHistoryEvent

type ArchivedHistoryEvent struct {
	ID                int64            `json:"id"`
	OriginalID        int64            `json:"original_id"`
	InstanceID        string           `json:"instance_id"`
	ActivityID        string           `json:"activity_id"`
	EventType         HistoryEventType `json:"event_type"`
	EventData         []byte           `json:"event_data"`
	EventDataBinary   []byte           `json:"event_data_binary"`
	DataType          string           `json:"data_type"`
	OriginalCreatedAt time.Time        `json:"original_created_at"`
	ArchivedAt        time.Time        `json:"archived_at"`
}

ArchivedHistoryEvent represents an archived history event (from recur).

type ChannelDeliveryCursor

type ChannelDeliveryCursor struct {
	ID            int64     `json:"id"`
	InstanceID    string    `json:"instance_id"`
	ChannelName   string    `json:"channel_name"`
	LastMessageID int64     `json:"last_message_id"`
	UpdatedAt     time.Time `json:"updated_at"`
}

ChannelDeliveryCursor tracks message delivery progress for broadcast mode.

type ChannelDeliveryResult

type ChannelDeliveryResult struct {
	InstanceID   string `json:"instance_id"`
	WorkflowName string `json:"workflow_name"`
	ActivityID   string `json:"activity_id"`
}

ChannelDeliveryResult contains information about a successful message delivery.

type ChannelManager

type ChannelManager interface {
	// PublishToChannel publishes a message to a channel.
	PublishToChannel(ctx context.Context, channelName string, dataJSON []byte, metadata []byte, targetInstanceID string) (int64, error)

	// SubscribeToChannel subscribes an instance to a channel.
	SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

	// UnsubscribeFromChannel unsubscribes an instance from a channel.
	UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

	// GetChannelSubscription retrieves a subscription for an instance and channel.
	GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

	// RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait
	// and releases the workflow lock.
	RegisterChannelReceiveAndReleaseLock(
		ctx context.Context,
		instanceID, channelName, workerID, activityID string,
		timeoutAt *time.Time,
	) error

	// GetPendingChannelMessages retrieves pending messages for a channel after a given ID.
	GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

	// GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber.
	// For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance)
	// For competing mode: Returns unclaimed messages (not yet claimed by any instance)
	GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

	// ClaimChannelMessage claims a message for competing mode (returns false if already claimed).
	ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

	// DeleteChannelMessage deletes a message from the channel.
	DeleteChannelMessage(ctx context.Context, messageID int64) error

	// UpdateDeliveryCursor updates the delivery cursor for broadcast mode.
	UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

	// GetDeliveryCursor gets the current delivery cursor for an instance and channel.
	GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

	// GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel.
	GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

	// ClearChannelWaitingState clears the waiting state for an instance's channel subscription.
	ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

	// DeliverChannelMessage delivers a message to a waiting subscriber (records in history).
	DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

	// DeliverChannelMessageWithLock delivers a message using Lock-First pattern.
	// This is used for load balancing in multi-worker environments:
	// 1. Try to acquire lock on the target instance
	// 2. If lock fails, return (nil, nil) - another worker will handle it
	// 3. Record message in history
	// 4. Update status to 'running'
	// 5. Release lock
	// Returns the delivery info if successful, nil if lock was not acquired.
	DeliverChannelMessageWithLock(
		ctx context.Context,
		instanceID string,
		channelName string,
		message *ChannelMessage,
		workerID string,
		lockTimeoutSec int,
	) (*ChannelDeliveryResult, error)

	// CleanupOldChannelMessages removes old channel messages.
	CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

	// FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.
	// limit specifies the maximum number of subscriptions to return (0 = default 100).
	FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
}

ChannelManager handles channel-based messaging operations.

type ChannelMessage

type ChannelMessage struct {
	ID               int64     `json:"id"`
	ChannelName      string    `json:"channel_name"`
	DataJSON         []byte    `json:"data_json,omitempty"`
	DataBinary       []byte    `json:"data_binary,omitempty"`
	Metadata         []byte    `json:"metadata,omitempty"` // JSON-encoded metadata
	TargetInstanceID string    `json:"target_instance_id,omitempty"`
	CreatedAt        time.Time `json:"created_at"`
}

ChannelMessage represents a message in a channel.

type ChannelMessageClaim

type ChannelMessageClaim struct {
	ID         int64     `json:"id"`
	MessageID  int64     `json:"message_id"`
	InstanceID string    `json:"instance_id"`
	ClaimedAt  time.Time `json:"claimed_at"`
}

ChannelMessageClaim tracks which instance claimed a message in competing mode.

type ChannelMode

type ChannelMode string

ChannelMode represents the subscription mode for a channel.

const (
	ChannelModeBroadcast ChannelMode = "broadcast" // All subscribers receive all messages
	ChannelModeCompeting ChannelMode = "competing" // Each message goes to one subscriber
)

type ChannelSubscription

type ChannelSubscription struct {
	ID          int64       `json:"id"`
	InstanceID  string      `json:"instance_id"`
	ChannelName string      `json:"channel_name"`
	Mode        ChannelMode `json:"mode"`
	Waiting     bool        `json:"waiting"`
	TimeoutAt   *time.Time  `json:"timeout_at,omitempty"`
	ActivityID  string      `json:"activity_id,omitempty"` // Activity ID for replay matching
	CreatedAt   time.Time   `json:"created_at"`
}

ChannelSubscription represents a workflow's subscription to a channel.

type CompensationEntry

type CompensationEntry struct {
	ID              int64     `json:"id"`
	InstanceID      string    `json:"instance_id"`
	ActivityID      string    `json:"activity_id"`
	CompensationFn  string    `json:"compensation_fn"`  // Function name
	CompensationArg []byte    `json:"compensation_arg"` // JSON-encoded arguments
	Order           int       `json:"order"`            // LIFO order (higher = execute first)
	Status          string    `json:"status"`           // "pending", "executed", "failed"
	CreatedAt       time.Time `json:"created_at"`
}

CompensationEntry represents a registered compensation action.

type CompensationManager

type CompensationManager interface {
	// AddCompensation registers a compensation action.
	AddCompensation(ctx context.Context, entry *CompensationEntry) error

	// GetCompensations retrieves compensation entries in LIFO order.
	GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

	// MarkCompensationExecuted marks a compensation as executed.
	MarkCompensationExecuted(ctx context.Context, id int64) error

	// MarkCompensationFailed marks a compensation as failed.
	MarkCompensationFailed(ctx context.Context, id int64) error
}

CompensationManager handles saga compensation.

type Driver

type Driver interface {
	// DriverName returns the driver name (e.g., "sqlite", "postgres")
	DriverName() string

	// GetCurrentTimeExpr returns the SQL expression for current time.
	// SQLite: datetime('now'), PostgreSQL: NOW()
	GetCurrentTimeExpr() string

	// MakeDatetimeComparable wraps a datetime column for comparison.
	// SQLite needs datetime() wrapper, PostgreSQL doesn't.
	MakeDatetimeComparable(column string) string

	// SelectForUpdateSkipLocked returns the clause for row locking.
	// PostgreSQL: "FOR UPDATE SKIP LOCKED", SQLite: "" (uses table-level locking)
	SelectForUpdateSkipLocked() string

	// Placeholder returns the placeholder for the nth parameter.
	// SQLite: ?, PostgreSQL: $n
	Placeholder(n int) string

	// OnConflictDoNothing returns the clause for idempotent inserts.
	// SQLite: "ON CONFLICT DO NOTHING", PostgreSQL: "ON CONFLICT DO NOTHING"
	OnConflictDoNothing(conflictColumns ...string) string

	// ReturningClause returns the RETURNING clause for insert/update.
	// SQLite: "", PostgreSQL: "RETURNING id"
	ReturningClause(columns ...string) string

	// JSONExtract returns the SQL expression to extract a value from a JSON column.
	// The path is a dot-separated path like "order.customer.id".
	// The returned expression extracts the value as text.
	JSONExtract(column, path string) string

	// JSONCompare returns the SQL expression to compare a JSON-extracted value
	// with a given value, handling type coercion appropriately.
	// Returns the comparison expression and any additional args needed.
	JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, args []any)
}

Driver abstracts database-specific SQL operations.

func NewDriver

func NewDriver(dbURL string) Driver

NewDriver creates a new driver based on the database URL.

type Executor

type Executor interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

Executor is a database executor interface that can be either *sql.DB or *sql.Tx. This allows users to execute custom SQL queries within the same transaction as the workflow activity.

type GroupManager

type GroupManager interface {
	// JoinGroup adds an instance to a group.
	JoinGroup(ctx context.Context, instanceID, groupName string) error

	// LeaveGroup removes an instance from a group.
	LeaveGroup(ctx context.Context, instanceID, groupName string) error

	// GetGroupMembers retrieves all instance IDs in a group.
	GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

	// LeaveAllGroups removes an instance from all groups.
	LeaveAllGroups(ctx context.Context, instanceID string) error
}

GroupManager handles Erlang pg-style group membership operations.

type GroupMembership

type GroupMembership struct {
	ID         int64     `json:"id"`
	InstanceID string    `json:"instance_id"`
	GroupName  string    `json:"group_name"`
	CreatedAt  time.Time `json:"created_at"`
}

GroupMembership represents a workflow's membership in a group.

type HistoryArchiveManager

type HistoryArchiveManager interface {
	// ArchiveHistory moves all history events for an instance to the archive table.
	ArchiveHistory(ctx context.Context, instanceID string) error

	// GetArchivedHistory retrieves archived history events for an instance.
	GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

	// CleanupInstanceSubscriptions removes all subscriptions for an instance.
	// This is used during recur to clean up stale subscriptions.
	CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
}

HistoryArchiveManager handles history archival for the recur pattern.

type HistoryEvent

type HistoryEvent struct {
	ID              int64            `json:"id"`
	InstanceID      string           `json:"instance_id"`
	ActivityID      string           `json:"activity_id"`
	EventType       HistoryEventType `json:"event_type"`
	EventData       []byte           `json:"event_data"`        // JSON-encoded data
	EventDataBinary []byte           `json:"event_data_binary"` // Binary data (alternative)
	DataType        string           `json:"data_type"`         // "json" or "binary"
	CreatedAt       time.Time        `json:"created_at"`
}

HistoryEvent represents a single event in a workflow's execution history.

type HistoryEventType

type HistoryEventType string

HistoryEventType represents the type of history event.

const (
	HistoryActivityStarted   HistoryEventType = "activity_started"
	HistoryActivityCompleted HistoryEventType = "activity_completed"
	HistoryActivityFailed    HistoryEventType = "activity_failed"
	HistoryEventReceived     HistoryEventType = "event_received"
	HistoryTimerFired        HistoryEventType = "timer_fired"
	HistoryCompensationAdded HistoryEventType = "compensation_added"
)

type HistoryIterator

type HistoryIterator struct {
	// contains filtered or unexported fields
}

HistoryIterator provides streaming access to workflow history events. This avoids loading the entire history into memory at once.

func NewHistoryIterator

func NewHistoryIterator(ctx context.Context, storage Storage, instanceID string, opts *HistoryIteratorOptions) *HistoryIterator

NewHistoryIterator creates a new history iterator.

func (*HistoryIterator) Close

func (it *HistoryIterator) Close() error

Close releases any resources held by the iterator. Currently a no-op, but provided for future compatibility.

func (*HistoryIterator) Collect

func (it *HistoryIterator) Collect() ([]*HistoryEvent, error)

Collect reads all remaining events into a slice. Use with caution for large histories - prefer iterating with Next().

func (*HistoryIterator) Err

func (it *HistoryIterator) Err() error

Err returns any error that occurred during iteration.

func (*HistoryIterator) Next

func (it *HistoryIterator) Next() (*HistoryEvent, bool)

Next returns the next history event. Returns (event, true) if an event is available, (nil, false) if no more events. Check Err() after iteration to see if an error occurred.

type HistoryIteratorOptions

type HistoryIteratorOptions struct {
	// BatchSize is the number of events to fetch per batch.
	// Default: 1000
	BatchSize int
}

HistoryIteratorOptions configures the history iterator.

type HistoryManager

type HistoryManager interface {
	// AppendHistory adds a new history event.
	AppendHistory(ctx context.Context, event *HistoryEvent) error

	// GetHistoryPaginated retrieves history events for an instance with pagination.
	// Returns events with id > afterID, up to limit events.
	// Returns (events, hasMore, error).
	GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

	// GetHistoryCount returns the total number of history events for an instance.
	GetHistoryCount(ctx context.Context, instanceID string) (int64, error)
}

HistoryManager handles workflow execution history.

type InputFilterBuilder

type InputFilterBuilder struct {
	// contains filtered or unexported fields
}

InputFilterBuilder builds SQL query conditions for input filters.

func NewInputFilterBuilder

func NewInputFilterBuilder(driver Driver) *InputFilterBuilder

NewInputFilterBuilder creates a new InputFilterBuilder.

func (*InputFilterBuilder) BuildFilterQuery

func (b *InputFilterBuilder) BuildFilterQuery(inputFilters map[string]any, startPlaceholder int) (conditions []string, args []any, err error)

BuildFilterQuery builds the WHERE clause conditions for input filters. Returns the query parts (as a slice of conditions) and args to append, or an error if validation fails. The startPlaceholder is the starting placeholder number (for PostgreSQL).

type InstanceManager

type InstanceManager interface {
	// CreateInstance creates a new workflow instance.
	CreateInstance(ctx context.Context, instance *WorkflowInstance) error

	// GetInstance retrieves a workflow instance by ID.
	GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

	// UpdateInstanceStatus updates the status and related fields.
	UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

	// UpdateInstanceActivity updates the current activity ID.
	UpdateInstanceActivity(ctx context.Context, instanceID string, activityID string) error

	// UpdateInstanceOutput updates the output data for a completed workflow.
	UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

	// CancelInstance marks a workflow as cancelled.
	CancelInstance(ctx context.Context, instanceID string, reason string) error

	// ListInstances lists workflow instances with optional filtering and pagination.
	// Returns PaginationResult with cursor-based pagination.
	ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)

	// FindResumableWorkflows finds workflows that are ready to be resumed.
	// Returns workflows with status='running' that don't have an active lock.
	// These are typically workflows that had a message delivered and are waiting
	// for a worker to pick them up and resume execution.
	// limit specifies the maximum number of workflows to return (0 = default 100).
	FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
}

InstanceManager handles workflow instance CRUD operations.

type ListInstancesOptions

type ListInstancesOptions struct {
	// Pagination
	Limit     int    // Page size (default: 50)
	PageToken string // Cursor token for pagination (format: "ISO_DATETIME||INSTANCE_ID")

	// Filters
	StatusFilter       WorkflowStatus // Filter by status
	WorkflowNameFilter string         // Filter by workflow name (partial match, case-insensitive)
	InstanceIDFilter   string         // Filter by instance ID (partial match, case-insensitive)
	StartedAfter       *time.Time     // Filter instances started after this time
	StartedBefore      *time.Time     // Filter instances started before this time
	InputFilters       map[string]any // Filter by JSON paths in input data (exact match)

	// Deprecated: Use StatusFilter instead
	Status WorkflowStatus
	// Deprecated: Use WorkflowNameFilter instead
	WorkflowName string
	// Deprecated: Use PageToken instead
	Offset int
}

ListInstancesOptions defines options for listing instances.

type LockManager

type LockManager interface {
	// TryAcquireLock attempts to acquire a lock on a workflow instance.
	// Returns true if the lock was acquired, false if already locked.
	TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

	// ReleaseLock releases the lock on a workflow instance.
	ReleaseLock(ctx context.Context, instanceID, workerID string) error

	// RefreshLock extends the lock timeout.
	RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

	// CleanupStaleLocks releases locks that have expired and returns
	// information about running workflows that need to be resumed.
	CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
}

LockManager handles distributed locking operations.

type MySQLDriver

type MySQLDriver struct{}

MySQLDriver implements Driver for MySQL 8.0+.

func (*MySQLDriver) DriverName

func (d *MySQLDriver) DriverName() string

func (*MySQLDriver) GetCurrentTimeExpr

func (d *MySQLDriver) GetCurrentTimeExpr() string

func (*MySQLDriver) JSONCompare

func (d *MySQLDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)

JSONCompare for MySQLDriver returns the SQL expression to compare a JSON-extracted value.

func (*MySQLDriver) JSONExtract

func (d *MySQLDriver) JSONExtract(column, path string) string

JSONExtract for MySQLDriver returns the SQL expression to extract a value from a JSON column.

func (*MySQLDriver) MakeDatetimeComparable

func (d *MySQLDriver) MakeDatetimeComparable(column string) string

func (*MySQLDriver) OnConflictDoNothing

func (d *MySQLDriver) OnConflictDoNothing(conflictColumns ...string) string

func (*MySQLDriver) Placeholder

func (d *MySQLDriver) Placeholder(n int) string

func (*MySQLDriver) ReturningClause

func (d *MySQLDriver) ReturningClause(columns ...string) string

func (*MySQLDriver) SelectForUpdateSkipLocked

func (d *MySQLDriver) SelectForUpdateSkipLocked() string

type MySQLStorage

type MySQLStorage struct {
	// contains filtered or unexported fields
}

MySQLStorage implements the Storage interface using MySQL 8.0+.

func NewMySQLStorage

func NewMySQLStorage(connStr string) (*MySQLStorage, error)

NewMySQLStorage creates a new MySQL storage. The connStr can be either a MySQL DSN or a URL format: URL format: "mysql://user:password@localhost:3306/dbname" DSN format: "user:password@tcp(localhost:3306)/dbname"

func (*MySQLStorage) AddCompensation

func (s *MySQLStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error

AddCompensation adds a compensation entry.

func (*MySQLStorage) AddOutboxEvent

func (s *MySQLStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

AddOutboxEvent adds an event to the outbox.

func (*MySQLStorage) AppendHistory

func (s *MySQLStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error

AppendHistory appends a history event.

func (*MySQLStorage) ArchiveHistory

func (s *MySQLStorage) ArchiveHistory(ctx context.Context, instanceID string) error

ArchiveHistory moves all history events for an instance to the archive table.

func (*MySQLStorage) BeginTransaction

func (s *MySQLStorage) BeginTransaction(ctx context.Context) (context.Context, error)

BeginTransaction starts a new transaction.

func (*MySQLStorage) CancelInstance

func (s *MySQLStorage) CancelInstance(ctx context.Context, instanceID, reason string) error

CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.

func (*MySQLStorage) ClaimChannelMessage

func (s *MySQLStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

ClaimChannelMessage claims a message for competing mode.

func (*MySQLStorage) CleanupExpiredSystemLocks

func (s *MySQLStorage) CleanupExpiredSystemLocks(ctx context.Context) error

CleanupExpiredSystemLocks removes expired system locks.

func (*MySQLStorage) CleanupInstanceSubscriptions

func (s *MySQLStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error

CleanupInstanceSubscriptions removes all subscriptions for an instance.

func (*MySQLStorage) CleanupOldChannelMessages

func (s *MySQLStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

CleanupOldChannelMessages removes old channel messages.

func (*MySQLStorage) CleanupOldOutboxEvents

func (s *MySQLStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldOutboxEvents removes old sent events.

func (*MySQLStorage) CleanupStaleLocks

func (s *MySQLStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)

CleanupStaleLocks cleans up expired locks and returns workflows to resume.

func (*MySQLStorage) ClearChannelWaitingState

func (s *MySQLStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

ClearChannelWaitingState clears the waiting state for an instance's channel subscription.

func (*MySQLStorage) Close

func (s *MySQLStorage) Close() error

Close closes the database connection.

func (*MySQLStorage) CommitTransaction

func (s *MySQLStorage) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction.

func (*MySQLStorage) Conn

func (s *MySQLStorage) Conn(ctx context.Context) Executor

Conn returns the database executor for the current context.

func (*MySQLStorage) CreateInstance

func (s *MySQLStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error

CreateInstance creates a new workflow instance.

func (*MySQLStorage) DB

func (s *MySQLStorage) DB() *sql.DB

DB returns the underlying database connection.

func (*MySQLStorage) DeleteChannelMessage

func (s *MySQLStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error

DeleteChannelMessage deletes a message from the channel.

func (*MySQLStorage) DeliverChannelMessage

func (s *MySQLStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

DeliverChannelMessage delivers a message to a waiting subscriber.

func (*MySQLStorage) DeliverChannelMessageWithLock

func (s *MySQLStorage) DeliverChannelMessageWithLock(
	ctx context.Context,
	instanceID string,
	channelName string,
	message *ChannelMessage,
	workerID string,
	lockTimeoutSec int,
) (*ChannelDeliveryResult, error)

DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).

func (*MySQLStorage) FindExpiredChannelSubscriptions

func (s *MySQLStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)

FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.

func (*MySQLStorage) FindExpiredTimers

func (s *MySQLStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

FindExpiredTimers finds expired timers.

func (*MySQLStorage) FindResumableWorkflows

func (s *MySQLStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)

FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.

func (*MySQLStorage) GetArchivedHistory

func (s *MySQLStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

GetArchivedHistory retrieves archived history events for an instance.

func (*MySQLStorage) GetChannelSubscribersWaiting

func (s *MySQLStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel.

func (*MySQLStorage) GetChannelSubscription

func (s *MySQLStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

GetChannelSubscription retrieves a subscription for an instance and channel.

func (*MySQLStorage) GetCompensations

func (s *MySQLStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

GetCompensations retrieves compensations for a workflow in LIFO order.

func (*MySQLStorage) GetDeliveryCursor

func (s *MySQLStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

GetDeliveryCursor gets the current delivery cursor for an instance and channel.

func (*MySQLStorage) GetGroupMembers

func (s *MySQLStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

GetGroupMembers retrieves all instance IDs in a group.

func (*MySQLStorage) GetHistoryCount

func (s *MySQLStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)

GetHistoryCount returns the total number of history events for an instance.

func (*MySQLStorage) GetHistoryPaginated

func (s *MySQLStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).

func (*MySQLStorage) GetInstance

func (s *MySQLStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*MySQLStorage) GetPendingChannelMessages

func (s *MySQLStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

GetPendingChannelMessages retrieves pending messages for a channel after a given ID.

func (*MySQLStorage) GetPendingChannelMessagesForInstance

func (s *MySQLStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)

func (*MySQLStorage) GetPendingOutboxEvents

func (s *MySQLStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

GetPendingOutboxEvents retrieves pending outbox events. Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.

func (*MySQLStorage) InTransaction

func (s *MySQLStorage) InTransaction(ctx context.Context) bool

InTransaction returns whether a transaction is in progress.

func (*MySQLStorage) IncrementOutboxAttempts

func (s *MySQLStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error

IncrementOutboxAttempts increments the attempt count for an event.

func (*MySQLStorage) Initialize

func (s *MySQLStorage) Initialize(ctx context.Context) error

Initialize creates the database schema using migrations.

func (*MySQLStorage) JoinGroup

func (s *MySQLStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error

JoinGroup adds an instance to a group.

func (*MySQLStorage) LeaveAllGroups

func (s *MySQLStorage) LeaveAllGroups(ctx context.Context, instanceID string) error

LeaveAllGroups removes an instance from all groups.

func (*MySQLStorage) LeaveGroup

func (s *MySQLStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error

LeaveGroup removes an instance from a group.

func (*MySQLStorage) ListInstances

func (s *MySQLStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)

ListInstances lists workflow instances with cursor-based pagination and filters.

func (*MySQLStorage) MarkCompensationExecuted

func (s *MySQLStorage) MarkCompensationExecuted(ctx context.Context, id int64) error

MarkCompensationExecuted marks a compensation as executed.

func (*MySQLStorage) MarkCompensationFailed

func (s *MySQLStorage) MarkCompensationFailed(ctx context.Context, id int64) error

MarkCompensationFailed marks a compensation as failed.

func (*MySQLStorage) MarkOutboxEventFailed

func (s *MySQLStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error

MarkOutboxEventFailed marks an outbox event as failed.

func (*MySQLStorage) MarkOutboxEventSent

func (s *MySQLStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error

MarkOutboxEventSent marks an outbox event as sent.

func (*MySQLStorage) PublishToChannel

func (s *MySQLStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte, targetInstanceID string) (int64, error)

PublishToChannel publishes a message to a channel.

func (*MySQLStorage) RefreshLock

func (s *MySQLStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

RefreshLock extends the lock expiration time.

func (*MySQLStorage) RegisterChannelReceiveAndReleaseLock

func (s *MySQLStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error

RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.

func (*MySQLStorage) RegisterPostCommitCallback

func (s *MySQLStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error

RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.

func (*MySQLStorage) RegisterTimerSubscription

func (s *MySQLStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

RegisterTimerSubscription registers a timer for a workflow.

func (*MySQLStorage) RegisterTimerSubscriptionAndReleaseLock

func (s *MySQLStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error

RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.

func (*MySQLStorage) ReleaseLock

func (s *MySQLStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error

ReleaseLock releases a lock on a workflow instance.

func (*MySQLStorage) ReleaseSystemLock

func (s *MySQLStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

ReleaseSystemLock releases a system lock.

func (*MySQLStorage) RemoveTimerSubscription

func (s *MySQLStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error

RemoveTimerSubscription removes a timer subscription.

func (*MySQLStorage) RollbackTransaction

func (s *MySQLStorage) RollbackTransaction(ctx context.Context) error

RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.

func (*MySQLStorage) SubscribeToChannel

func (s *MySQLStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.

func (*MySQLStorage) TryAcquireLock

func (s *MySQLStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

TryAcquireLock attempts to acquire a lock on a workflow instance.

func (*MySQLStorage) TryAcquireSystemLock

func (s *MySQLStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

TryAcquireSystemLock attempts to acquire a system lock.

func (*MySQLStorage) UnsubscribeFromChannel

func (s *MySQLStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

UnsubscribeFromChannel unsubscribes an instance from a channel.

func (*MySQLStorage) UpdateDeliveryCursor

func (s *MySQLStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

UpdateDeliveryCursor updates the delivery cursor for broadcast mode.

func (*MySQLStorage) UpdateInstanceActivity

func (s *MySQLStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error

UpdateInstanceActivity updates the current activity ID.

func (*MySQLStorage) UpdateInstanceOutput

func (s *MySQLStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

UpdateInstanceOutput updates the output data.

func (*MySQLStorage) UpdateInstanceStatus

func (s *MySQLStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

UpdateInstanceStatus updates the status of a workflow instance.

type OutboxEvent

type OutboxEvent struct {
	ID          int64     `json:"id"`
	EventID     string    `json:"event_id"`     // CloudEvents ID
	EventType   string    `json:"event_type"`   // CloudEvents type
	EventSource string    `json:"event_source"` // CloudEvents source
	EventData   []byte    `json:"event_data"`   // JSON or binary payload
	DataType    string    `json:"data_type"`    // "json" or "binary"
	ContentType string    `json:"content_type"` // e.g., "application/json"
	Status      string    `json:"status"`       // "pending", "sent", "failed"
	Attempts    int       `json:"attempts"`
	CreatedAt   time.Time `json:"created_at"`
	UpdatedAt   time.Time `json:"updated_at"`
}

OutboxEvent represents an event in the transactional outbox.

type OutboxManager

type OutboxManager interface {
	// AddOutboxEvent adds an event to the outbox.
	AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

	// GetPendingOutboxEvents retrieves pending events for sending.
	// Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.
	GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

	// MarkOutboxEventSent marks an event as successfully sent.
	MarkOutboxEventSent(ctx context.Context, eventID string) error

	// MarkOutboxEventFailed marks an event as failed to send.
	MarkOutboxEventFailed(ctx context.Context, eventID string) error

	// IncrementOutboxAttempts increments the attempt count for an event.
	IncrementOutboxAttempts(ctx context.Context, eventID string) error

	// CleanupOldOutboxEvents removes old sent events.
	CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
}

OutboxManager handles the transactional outbox.

type PaginationResult

type PaginationResult struct {
	Instances     []*WorkflowInstance `json:"instances"`
	NextPageToken string              `json:"next_page_token,omitempty"`
	HasMore       bool                `json:"has_more"`
}

PaginationResult represents a paginated list of workflow instances.

type PostgresDriver

type PostgresDriver struct{}

PostgresDriver implements Driver for PostgreSQL.

func (*PostgresDriver) DriverName

func (d *PostgresDriver) DriverName() string

func (*PostgresDriver) GetCurrentTimeExpr

func (d *PostgresDriver) GetCurrentTimeExpr() string

func (*PostgresDriver) JSONCompare

func (d *PostgresDriver) JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, compArgs []any)

JSONCompare for PostgresDriver returns the SQL expression to compare a JSON-extracted value.

func (*PostgresDriver) JSONExtract

func (d *PostgresDriver) JSONExtract(column, path string) string

JSONExtract for PostgresDriver returns the SQL expression to extract a value from a JSON column.

func (*PostgresDriver) MakeDatetimeComparable

func (d *PostgresDriver) MakeDatetimeComparable(column string) string

func (*PostgresDriver) OnConflictDoNothing

func (d *PostgresDriver) OnConflictDoNothing(conflictColumns ...string) string

func (*PostgresDriver) Placeholder

func (d *PostgresDriver) Placeholder(n int) string

func (*PostgresDriver) ReturningClause

func (d *PostgresDriver) ReturningClause(columns ...string) string

func (*PostgresDriver) SelectForUpdateSkipLocked

func (d *PostgresDriver) SelectForUpdateSkipLocked() string

type PostgresStorage

type PostgresStorage struct {
	// contains filtered or unexported fields
}

PostgresStorage implements the Storage interface using PostgreSQL.

func NewPostgresStorage

func NewPostgresStorage(connStr string) (*PostgresStorage, error)

NewPostgresStorage creates a new PostgreSQL storage. The connStr should be a PostgreSQL connection string: "postgres://user:password@localhost:5432/dbname?sslmode=disable"

func (*PostgresStorage) AddCompensation

func (s *PostgresStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error

AddCompensation adds a compensation entry.

func (*PostgresStorage) AddOutboxEvent

func (s *PostgresStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

AddOutboxEvent adds an event to the outbox.

func (*PostgresStorage) AppendHistory

func (s *PostgresStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error

AppendHistory appends a history event.

func (*PostgresStorage) ArchiveHistory

func (s *PostgresStorage) ArchiveHistory(ctx context.Context, instanceID string) error

ArchiveHistory moves all history events for an instance to the archive table.

func (*PostgresStorage) BeginTransaction

func (s *PostgresStorage) BeginTransaction(ctx context.Context) (context.Context, error)

BeginTransaction starts a new transaction.

func (*PostgresStorage) CancelInstance

func (s *PostgresStorage) CancelInstance(ctx context.Context, instanceID, reason string) error

CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.

func (*PostgresStorage) ClaimChannelMessage

func (s *PostgresStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

ClaimChannelMessage claims a message for competing mode.

func (*PostgresStorage) CleanupExpiredSystemLocks

func (s *PostgresStorage) CleanupExpiredSystemLocks(ctx context.Context) error

CleanupExpiredSystemLocks removes expired system locks.

func (*PostgresStorage) CleanupInstanceSubscriptions

func (s *PostgresStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error

CleanupInstanceSubscriptions removes all subscriptions for an instance.

func (*PostgresStorage) CleanupOldChannelMessages

func (s *PostgresStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

CleanupOldChannelMessages removes old channel messages.

func (*PostgresStorage) CleanupOldOutboxEvents

func (s *PostgresStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldOutboxEvents removes old sent events.

func (*PostgresStorage) CleanupStaleLocks

func (s *PostgresStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)

CleanupStaleLocks cleans up expired locks and returns workflows to resume.

func (*PostgresStorage) ClearChannelWaitingState

func (s *PostgresStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

ClearChannelWaitingState clears the waiting state for an instance's channel subscription.

func (*PostgresStorage) Close

func (s *PostgresStorage) Close() error

Close closes the database connection.

func (*PostgresStorage) CommitTransaction

func (s *PostgresStorage) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction.

func (*PostgresStorage) Conn

func (s *PostgresStorage) Conn(ctx context.Context) Executor

Conn returns the database executor for the current context.

func (*PostgresStorage) CreateInstance

func (s *PostgresStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error

CreateInstance creates a new workflow instance.

func (*PostgresStorage) DB

func (s *PostgresStorage) DB() *sql.DB

DB returns the underlying database connection.

func (*PostgresStorage) DeleteChannelMessage

func (s *PostgresStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error

DeleteChannelMessage deletes a message from the channel.

func (*PostgresStorage) DeliverChannelMessage

func (s *PostgresStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

DeliverChannelMessage delivers a message to a waiting subscriber.

func (*PostgresStorage) DeliverChannelMessageWithLock

func (s *PostgresStorage) DeliverChannelMessageWithLock(
	ctx context.Context,
	instanceID string,
	channelName string,
	message *ChannelMessage,
	workerID string,
	lockTimeoutSec int,
) (*ChannelDeliveryResult, error)

DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).

func (*PostgresStorage) FindExpiredChannelSubscriptions

func (s *PostgresStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)

FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.

func (*PostgresStorage) FindExpiredTimers

func (s *PostgresStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

FindExpiredTimers finds expired timers.

func (*PostgresStorage) FindResumableWorkflows

func (s *PostgresStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)

FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.

func (*PostgresStorage) GetArchivedHistory

func (s *PostgresStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

GetArchivedHistory retrieves archived history events for an instance.

func (*PostgresStorage) GetChannelSubscribersWaiting

func (s *PostgresStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel.

func (*PostgresStorage) GetChannelSubscription

func (s *PostgresStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

GetChannelSubscription retrieves a subscription for an instance and channel.

func (*PostgresStorage) GetCompensations

func (s *PostgresStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

GetCompensations retrieves compensations for a workflow in LIFO order.

func (*PostgresStorage) GetDeliveryCursor

func (s *PostgresStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

GetDeliveryCursor gets the current delivery cursor for an instance and channel.

func (*PostgresStorage) GetGroupMembers

func (s *PostgresStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

GetGroupMembers retrieves all instance IDs in a group.

func (*PostgresStorage) GetHistoryCount

func (s *PostgresStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)

GetHistoryCount returns the total number of history events for an instance.

func (*PostgresStorage) GetHistoryPaginated

func (s *PostgresStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).

func (*PostgresStorage) GetInstance

func (s *PostgresStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*PostgresStorage) GetPendingChannelMessages

func (s *PostgresStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

GetPendingChannelMessages retrieves pending messages for a channel after a given ID.

func (*PostgresStorage) GetPendingChannelMessagesForInstance

func (s *PostgresStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)

func (*PostgresStorage) GetPendingOutboxEvents

func (s *PostgresStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

GetPendingOutboxEvents retrieves pending outbox events. Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.

func (*PostgresStorage) InTransaction

func (s *PostgresStorage) InTransaction(ctx context.Context) bool

InTransaction returns whether a transaction is in progress.

func (*PostgresStorage) IncrementOutboxAttempts

func (s *PostgresStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error

IncrementOutboxAttempts increments the attempt count for an event.

func (*PostgresStorage) Initialize

func (s *PostgresStorage) Initialize(ctx context.Context) error

Initialize creates the database schema using migrations.

func (*PostgresStorage) IsNotifyEnabled

func (s *PostgresStorage) IsNotifyEnabled() bool

IsNotifyEnabled returns whether PostgreSQL LISTEN/NOTIFY is enabled.

func (*PostgresStorage) JoinGroup

func (s *PostgresStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error

JoinGroup adds an instance to a group.

func (*PostgresStorage) LeaveAllGroups

func (s *PostgresStorage) LeaveAllGroups(ctx context.Context, instanceID string) error

LeaveAllGroups removes an instance from all groups.

func (*PostgresStorage) LeaveGroup

func (s *PostgresStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error

LeaveGroup removes an instance from a group.

func (*PostgresStorage) ListInstances

ListInstances lists workflow instances with cursor-based pagination and filters.

func (*PostgresStorage) MarkCompensationExecuted

func (s *PostgresStorage) MarkCompensationExecuted(ctx context.Context, id int64) error

MarkCompensationExecuted marks a compensation as executed.

func (*PostgresStorage) MarkCompensationFailed

func (s *PostgresStorage) MarkCompensationFailed(ctx context.Context, id int64) error

MarkCompensationFailed marks a compensation as failed.

func (*PostgresStorage) MarkOutboxEventFailed

func (s *PostgresStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error

MarkOutboxEventFailed marks an outbox event as failed.

func (*PostgresStorage) MarkOutboxEventSent

func (s *PostgresStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error

MarkOutboxEventSent marks an outbox event as sent.

func (*PostgresStorage) PublishToChannel

func (s *PostgresStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte, targetInstanceID string) (int64, error)

PublishToChannel publishes a message to a channel.

func (*PostgresStorage) RefreshLock

func (s *PostgresStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

RefreshLock extends the lock expiration time.

func (*PostgresStorage) RegisterChannelReceiveAndReleaseLock

func (s *PostgresStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error

RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.

func (*PostgresStorage) RegisterPostCommitCallback

func (s *PostgresStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error

RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.

func (*PostgresStorage) RegisterTimerSubscription

func (s *PostgresStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

RegisterTimerSubscription registers a timer for a workflow.

func (*PostgresStorage) RegisterTimerSubscriptionAndReleaseLock

func (s *PostgresStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error

RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.

func (*PostgresStorage) ReleaseLock

func (s *PostgresStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error

ReleaseLock releases a lock on a workflow instance.

func (*PostgresStorage) ReleaseSystemLock

func (s *PostgresStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

ReleaseSystemLock releases a system lock.

func (*PostgresStorage) RemoveTimerSubscription

func (s *PostgresStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error

RemoveTimerSubscription removes a timer subscription.

func (*PostgresStorage) RollbackTransaction

func (s *PostgresStorage) RollbackTransaction(ctx context.Context) error

RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.

func (*PostgresStorage) SetNotifyEnabled

func (s *PostgresStorage) SetNotifyEnabled(enabled bool)

SetNotifyEnabled enables or disables PostgreSQL LISTEN/NOTIFY. When enabled, the storage will send pg_notify() calls after key operations.

func (*PostgresStorage) SubscribeToChannel

func (s *PostgresStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.

func (*PostgresStorage) TryAcquireLock

func (s *PostgresStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

TryAcquireLock attempts to acquire a lock on a workflow instance.

func (*PostgresStorage) TryAcquireSystemLock

func (s *PostgresStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

TryAcquireSystemLock attempts to acquire a system lock.

func (*PostgresStorage) UnsubscribeFromChannel

func (s *PostgresStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

UnsubscribeFromChannel unsubscribes an instance from a channel.

func (*PostgresStorage) UpdateDeliveryCursor

func (s *PostgresStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

UpdateDeliveryCursor updates the delivery cursor for broadcast mode.

func (*PostgresStorage) UpdateInstanceActivity

func (s *PostgresStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error

UpdateInstanceActivity updates the current activity ID.

func (*PostgresStorage) UpdateInstanceOutput

func (s *PostgresStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

UpdateInstanceOutput updates the output data.

func (*PostgresStorage) UpdateInstanceStatus

func (s *PostgresStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

UpdateInstanceStatus updates the status of a workflow instance.

type ResumableWorkflow

type ResumableWorkflow struct {
	InstanceID   string
	WorkflowName string
}

ResumableWorkflow contains information about a workflow ready to be resumed. These are workflows with status='running' that don't have an active lock.

type SQLiteDriver

type SQLiteDriver struct{}

SQLiteDriver implements Driver for SQLite.

func (*SQLiteDriver) DriverName

func (d *SQLiteDriver) DriverName() string

func (*SQLiteDriver) GetCurrentTimeExpr

func (d *SQLiteDriver) GetCurrentTimeExpr() string

func (*SQLiteDriver) JSONCompare

func (d *SQLiteDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)

JSONCompare for SQLiteDriver returns the SQL expression to compare a JSON-extracted value.

func (*SQLiteDriver) JSONExtract

func (d *SQLiteDriver) JSONExtract(column, path string) string

JSONExtract for SQLiteDriver returns the SQL expression to extract a value from a JSON column.

func (*SQLiteDriver) MakeDatetimeComparable

func (d *SQLiteDriver) MakeDatetimeComparable(column string) string

func (*SQLiteDriver) OnConflictDoNothing

func (d *SQLiteDriver) OnConflictDoNothing(conflictColumns ...string) string

func (*SQLiteDriver) Placeholder

func (d *SQLiteDriver) Placeholder(n int) string

func (*SQLiteDriver) ReturningClause

func (d *SQLiteDriver) ReturningClause(columns ...string) string

func (*SQLiteDriver) SelectForUpdateSkipLocked

func (d *SQLiteDriver) SelectForUpdateSkipLocked() string

type SQLiteStorage

type SQLiteStorage struct {
	// contains filtered or unexported fields
}

SQLiteStorage implements the Storage interface using SQLite.

func NewSQLiteStorage

func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error)

NewSQLiteStorage creates a new SQLite storage.

func (*SQLiteStorage) AddCompensation

func (s *SQLiteStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error

AddCompensation adds a compensation entry.

func (*SQLiteStorage) AddOutboxEvent

func (s *SQLiteStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

AddOutboxEvent adds an event to the outbox.

func (*SQLiteStorage) AppendHistory

func (s *SQLiteStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error

AppendHistory appends a history event.

func (*SQLiteStorage) ArchiveHistory

func (s *SQLiteStorage) ArchiveHistory(ctx context.Context, instanceID string) error

ArchiveHistory moves all history events for an instance to the archive table.

func (*SQLiteStorage) BeginTransaction

func (s *SQLiteStorage) BeginTransaction(ctx context.Context) (context.Context, error)

BeginTransaction starts a new transaction.

func (*SQLiteStorage) CancelInstance

func (s *SQLiteStorage) CancelInstance(ctx context.Context, instanceID, reason string) error

CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.

func (*SQLiteStorage) ClaimChannelMessage

func (s *SQLiteStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

ClaimChannelMessage claims a message for competing mode.

func (*SQLiteStorage) CleanupExpiredSystemLocks

func (s *SQLiteStorage) CleanupExpiredSystemLocks(ctx context.Context) error

CleanupExpiredSystemLocks removes expired system locks.

func (*SQLiteStorage) CleanupInstanceSubscriptions

func (s *SQLiteStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error

CleanupInstanceSubscriptions removes all subscriptions for an instance.

func (*SQLiteStorage) CleanupOldChannelMessages

func (s *SQLiteStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

CleanupOldChannelMessages removes old channel messages.

func (*SQLiteStorage) CleanupOldOutboxEvents

func (s *SQLiteStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldOutboxEvents removes old sent events.

func (*SQLiteStorage) CleanupStaleLocks

func (s *SQLiteStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)

CleanupStaleLocks cleans up expired locks and returns workflows to resume.

func (*SQLiteStorage) ClearChannelWaitingState

func (s *SQLiteStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

ClearChannelWaitingState clears the waiting state for an instance's channel subscription.

func (*SQLiteStorage) Close

func (s *SQLiteStorage) Close() error

Close closes the database connection.

func (*SQLiteStorage) CommitTransaction

func (s *SQLiteStorage) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction.

func (*SQLiteStorage) Conn

func (s *SQLiteStorage) Conn(ctx context.Context) Executor

Conn returns the database executor for the current context. If a transaction is active, returns the transaction; otherwise, returns the database. This allows users to execute custom SQL queries within the same transaction as the workflow activity when using transactional activities.

Example:

conn := ctx.Storage().Conn(ctx.Context())
_, err := conn.ExecContext(ctx.Context(), "INSERT INTO orders ...", ...)

func (*SQLiteStorage) CreateInstance

func (s *SQLiteStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error

CreateInstance creates a new workflow instance.

func (*SQLiteStorage) DB

func (s *SQLiteStorage) DB() *sql.DB

DB returns the underlying database connection.

func (*SQLiteStorage) DeleteChannelMessage

func (s *SQLiteStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error

DeleteChannelMessage deletes a message from the channel.

func (*SQLiteStorage) DeliverChannelMessage

func (s *SQLiteStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

DeliverChannelMessage delivers a message to a waiting subscriber.

func (*SQLiteStorage) DeliverChannelMessageWithLock

func (s *SQLiteStorage) DeliverChannelMessageWithLock(
	ctx context.Context,
	instanceID string,
	channelName string,
	message *ChannelMessage,
	workerID string,
	lockTimeoutSec int,
) (*ChannelDeliveryResult, error)

DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).

func (*SQLiteStorage) FindExpiredChannelSubscriptions

func (s *SQLiteStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)

FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.

func (*SQLiteStorage) FindExpiredTimers

func (s *SQLiteStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

FindExpiredTimers finds expired timers.

func (*SQLiteStorage) FindResumableWorkflows

func (s *SQLiteStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)

FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.

func (*SQLiteStorage) GetArchivedHistory

func (s *SQLiteStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

GetArchivedHistory retrieves archived history events for an instance.

func (*SQLiteStorage) GetChannelSubscribersWaiting

func (s *SQLiteStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel.

func (*SQLiteStorage) GetChannelSubscription

func (s *SQLiteStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

GetChannelSubscription retrieves a subscription for an instance and channel.

func (*SQLiteStorage) GetCompensations

func (s *SQLiteStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

GetCompensations retrieves compensations for a workflow in LIFO order.

func (*SQLiteStorage) GetDeliveryCursor

func (s *SQLiteStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

GetDeliveryCursor gets the current delivery cursor for an instance and channel.

func (*SQLiteStorage) GetGroupMembers

func (s *SQLiteStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

GetGroupMembers retrieves all instance IDs in a group.

func (*SQLiteStorage) GetHistoryCount

func (s *SQLiteStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)

GetHistoryCount returns the total number of history events for an instance.

func (*SQLiteStorage) GetHistoryPaginated

func (s *SQLiteStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).

func (*SQLiteStorage) GetInstance

func (s *SQLiteStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*SQLiteStorage) GetPendingChannelMessages

func (s *SQLiteStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

GetPendingChannelMessages retrieves pending messages for a channel after a given ID.

func (*SQLiteStorage) GetPendingChannelMessagesForInstance

func (s *SQLiteStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)

func (*SQLiteStorage) GetPendingOutboxEvents

func (s *SQLiteStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

GetPendingOutboxEvents retrieves pending outbox events.

func (*SQLiteStorage) InTransaction

func (s *SQLiteStorage) InTransaction(ctx context.Context) bool

InTransaction returns whether a transaction is in progress.

func (*SQLiteStorage) IncrementOutboxAttempts

func (s *SQLiteStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error

IncrementOutboxAttempts increments the attempt count for an event.

func (*SQLiteStorage) Initialize

func (s *SQLiteStorage) Initialize(ctx context.Context) error

Initialize creates the database schema using migrations.

func (*SQLiteStorage) JoinGroup

func (s *SQLiteStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error

JoinGroup adds an instance to a group.

func (*SQLiteStorage) LeaveAllGroups

func (s *SQLiteStorage) LeaveAllGroups(ctx context.Context, instanceID string) error

LeaveAllGroups removes an instance from all groups.

func (*SQLiteStorage) LeaveGroup

func (s *SQLiteStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error

LeaveGroup removes an instance from a group.

func (*SQLiteStorage) ListInstances

func (s *SQLiteStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)

ListInstances lists workflow instances with cursor-based pagination and filters.

func (*SQLiteStorage) MarkCompensationExecuted

func (s *SQLiteStorage) MarkCompensationExecuted(ctx context.Context, id int64) error

MarkCompensationExecuted marks a compensation as executed.

func (*SQLiteStorage) MarkCompensationFailed

func (s *SQLiteStorage) MarkCompensationFailed(ctx context.Context, id int64) error

MarkCompensationFailed marks a compensation as failed.

func (*SQLiteStorage) MarkOutboxEventFailed

func (s *SQLiteStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error

MarkOutboxEventFailed marks an outbox event as failed.

func (*SQLiteStorage) MarkOutboxEventSent

func (s *SQLiteStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error

MarkOutboxEventSent marks an outbox event as sent.

func (*SQLiteStorage) PublishToChannel

func (s *SQLiteStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte, targetInstanceID string) (int64, error)

PublishToChannel publishes a message to a channel.

func (*SQLiteStorage) RefreshLock

func (s *SQLiteStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

RefreshLock extends the lock expiration time.

func (*SQLiteStorage) RegisterChannelReceiveAndReleaseLock

func (s *SQLiteStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error

RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.

func (*SQLiteStorage) RegisterPostCommitCallback

func (s *SQLiteStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error

RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.

func (*SQLiteStorage) RegisterTimerSubscription

func (s *SQLiteStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

RegisterTimerSubscription registers a timer for a workflow.

func (*SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock

func (s *SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error

RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.

func (*SQLiteStorage) ReleaseLock

func (s *SQLiteStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error

ReleaseLock releases a lock on a workflow instance.

func (*SQLiteStorage) ReleaseSystemLock

func (s *SQLiteStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

ReleaseSystemLock releases a system lock.

func (*SQLiteStorage) RemoveTimerSubscription

func (s *SQLiteStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error

RemoveTimerSubscription removes a timer subscription.

func (*SQLiteStorage) RollbackTransaction

func (s *SQLiteStorage) RollbackTransaction(ctx context.Context) error

RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.

func (*SQLiteStorage) SubscribeToChannel

func (s *SQLiteStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.

func (*SQLiteStorage) TryAcquireLock

func (s *SQLiteStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

TryAcquireLock attempts to acquire a lock on a workflow instance.

func (*SQLiteStorage) TryAcquireSystemLock

func (s *SQLiteStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

TryAcquireSystemLock attempts to acquire a system lock.

func (*SQLiteStorage) UnsubscribeFromChannel

func (s *SQLiteStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

UnsubscribeFromChannel unsubscribes an instance from a channel.

func (*SQLiteStorage) UpdateDeliveryCursor

func (s *SQLiteStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

UpdateDeliveryCursor updates the delivery cursor for broadcast mode.

func (*SQLiteStorage) UpdateInstanceActivity

func (s *SQLiteStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error

UpdateInstanceActivity updates the current activity ID.

func (*SQLiteStorage) UpdateInstanceOutput

func (s *SQLiteStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

UpdateInstanceOutput updates the output data.

func (*SQLiteStorage) UpdateInstanceStatus

func (s *SQLiteStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

UpdateInstanceStatus updates the status of a workflow instance.

type StaleWorkflowInfo

type StaleWorkflowInfo struct {
	InstanceID   string
	WorkflowName string
}

StaleWorkflowInfo contains information about a workflow with a stale lock.

type Storage

type Storage interface {
	// Initialize sets up the storage (create tables, etc.)
	Initialize(ctx context.Context) error

	// Close closes the storage connection
	Close() error

	// DB returns the underlying database connection.
	// This is primarily used for migrations.
	DB() *sql.DB

	// Transaction Management
	TransactionManager

	// Workflow Instance Operations
	InstanceManager

	// Locking Operations
	LockManager

	// History Operations
	HistoryManager

	// Timer Subscription Operations
	TimerSubscriptionManager

	// Outbox Operations
	OutboxManager

	// Compensation Operations
	CompensationManager

	// Channel Operations
	ChannelManager

	// Group Operations
	GroupManager

	// System Lock Operations
	SystemLockManager

	// History Archive Operations
	HistoryArchiveManager
}

Storage defines the interface for workflow persistence. Implementations must be safe for concurrent use.

type SystemLock

type SystemLock struct {
	LockName  string    `json:"lock_name"`
	LockedBy  string    `json:"locked_by"`
	LockedAt  time.Time `json:"locked_at"`
	ExpiresAt time.Time `json:"expires_at"`
}

SystemLock represents a system-level lock for background tasks.

type SystemLockManager

type SystemLockManager interface {
	// TryAcquireSystemLock attempts to acquire a system lock.
	// Returns true if the lock was acquired.
	TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

	// ReleaseSystemLock releases a system lock.
	ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

	// CleanupExpiredSystemLocks removes expired system locks.
	CleanupExpiredSystemLocks(ctx context.Context) error
}

SystemLockManager handles system-level locks for background tasks.

type TimerSubscription

type TimerSubscription struct {
	ID         int64     `json:"id"`
	InstanceID string    `json:"instance_id"`
	TimerID    string    `json:"timer_id"`
	ExpiresAt  time.Time `json:"expires_at"`
	Step       int       `json:"step"` // Activity step for replay
	CreatedAt  time.Time `json:"created_at"`
}

TimerSubscription represents a workflow waiting for a timer.

type TimerSubscriptionManager

type TimerSubscriptionManager interface {
	// RegisterTimerSubscription registers a workflow to wait for a timer.
	RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

	// RegisterTimerSubscriptionAndReleaseLock atomically registers a timer
	// and releases the workflow lock.
	RegisterTimerSubscriptionAndReleaseLock(
		ctx context.Context,
		sub *TimerSubscription,
		instanceID, workerID string,
	) error

	// FindExpiredTimers finds timers that have expired.
	// limit specifies the maximum number of timers to return (0 = default 100).
	FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

	// RemoveTimerSubscription removes a timer subscription.
	RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
}

TimerSubscriptionManager handles timer subscriptions.

type TransactionManager

type TransactionManager interface {
	// BeginTransaction starts a new transaction.
	// Returns a context with the transaction attached.
	BeginTransaction(ctx context.Context) (context.Context, error)

	// CommitTransaction commits the current transaction.
	CommitTransaction(ctx context.Context) error

	// RollbackTransaction rolls back the current transaction.
	RollbackTransaction(ctx context.Context) error

	// InTransaction returns true if there is an active transaction.
	InTransaction(ctx context.Context) bool

	// Conn returns the database executor for the current context.
	// If a transaction is active, returns the transaction; otherwise, returns the database.
	// This allows users to execute custom SQL queries within the same transaction.
	Conn(ctx context.Context) Executor

	// RegisterPostCommitCallback registers a callback to be executed after a successful commit.
	// This is used for operations that should only happen if the transaction commits,
	// such as notifying waiting subscribers after a message is published.
	// Returns an error if not currently in a transaction.
	RegisterPostCommitCallback(ctx context.Context, cb func() error) error
}

TransactionManager handles transaction operations.

type WorkflowInstance

type WorkflowInstance struct {
	InstanceID        string         `json:"instance_id"`
	WorkflowName      string         `json:"workflow_name"`
	Status            WorkflowStatus `json:"status"`
	InputData         []byte         `json:"input_data"`          // JSON-encoded input
	OutputData        []byte         `json:"output_data"`         // JSON-encoded output (if completed)
	ErrorMessage      string         `json:"error_message"`       // Error message (if failed)
	CurrentActivityID string         `json:"current_activity_id"` // Last executed activity ID
	SourceCode        string         `json:"source_code"`         // Source code snapshot (optional)
	ContinuedFrom     string         `json:"continued_from"`      // Previous instance ID (for recur)
	LockedBy          string         `json:"locked_by"`           // Worker ID holding the lock
	LockedAt          *time.Time     `json:"locked_at"`           // When the lock was acquired
	LockTimeoutSec    *int           `json:"lock_timeout_seconds"`
	LockExpiresAt     *time.Time     `json:"lock_expires_at"`
	CreatedAt         time.Time      `json:"created_at"`
	UpdatedAt         time.Time      `json:"updated_at"`
}

WorkflowInstance represents a single execution of a workflow.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the current state of a workflow instance.

const (
	StatusPending           WorkflowStatus = "pending"
	StatusRunning           WorkflowStatus = "running"
	StatusCompleted         WorkflowStatus = "completed"
	StatusFailed            WorkflowStatus = "failed"
	StatusCancelled         WorkflowStatus = "cancelled"
	StatusWaitingForEvent   WorkflowStatus = "waiting_for_event"
	StatusWaitingForTimer   WorkflowStatus = "waiting_for_timer"
	StatusWaitingForMessage WorkflowStatus = "waiting_for_message"
	StatusRecurred          WorkflowStatus = "recurred"
	StatusCompensating      WorkflowStatus = "compensating"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL