storage

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Package storage provides the storage layer for Romancy.

Index

Constants

This section is empty.

Variables

View Source
var ErrWorkflowNotCancellable = errors.New("workflow cannot be cancelled")

ErrWorkflowNotCancellable indicates that the workflow cannot be cancelled. This happens when the workflow is already completed, cancelled, failed, or does not exist.

Functions

func InitializeTestSchema added in v0.3.0

func InitializeTestSchema(ctx context.Context, s *SQLiteStorage) error

InitializeTestSchema applies the test schema to an SQLiteStorage. This is intended for use in tests only.

func InitializeTestSchemaForStorage added in v0.3.0

func InitializeTestSchemaForStorage(ctx context.Context, s Storage) error

InitializeTestSchemaForStorage initializes the test schema for any Storage interface. This is useful for external test packages.

func InitializeTestSchemaMySQL added in v0.3.0

func InitializeTestSchemaMySQL(ctx context.Context, s *MySQLStorage) error

InitializeTestSchemaMySQL applies the test schema to a MySQLStorage. This is intended for use in tests only.

func InitializeTestSchemaPostgres added in v0.3.0

func InitializeTestSchemaPostgres(ctx context.Context, s *PostgresStorage) error

InitializeTestSchemaPostgres applies the test schema to a PostgresStorage. This is intended for use in tests only.

func ValidateJSONPath

func ValidateJSONPath(path string) error

ValidateJSONPath validates a JSON path for safe use in SQL queries. Returns an error if the path contains potentially dangerous characters.

Types

type ArchivedHistoryEvent

type ArchivedHistoryEvent struct {
	ID                int64            `json:"id"`
	OriginalID        int64            `json:"original_id"`
	InstanceID        string           `json:"instance_id"`
	ActivityID        string           `json:"activity_id"`
	EventType         HistoryEventType `json:"event_type"`
	EventData         []byte           `json:"event_data"`
	EventDataBinary   []byte           `json:"event_data_binary"`
	DataType          string           `json:"data_type"`
	OriginalCreatedAt time.Time        `json:"original_created_at"`
	ArchivedAt        time.Time        `json:"archived_at"`
}

ArchivedHistoryEvent represents an archived history event (from recur).

type ChannelDeliveryCursor

type ChannelDeliveryCursor struct {
	ID              int64     `json:"id"`
	InstanceID      string    `json:"instance_id"`
	Channel         string    `json:"channel"`           // Channel name (unified with Edda)
	LastDeliveredID int64     `json:"last_delivered_id"` // Last delivered message ID (unified with Edda)
	UpdatedAt       time.Time `json:"updated_at"`
}

ChannelDeliveryCursor tracks message delivery progress for broadcast mode.

type ChannelDeliveryResult

type ChannelDeliveryResult struct {
	InstanceID   string `json:"instance_id"`
	WorkflowName string `json:"workflow_name"`
	ActivityID   string `json:"activity_id"`
}

ChannelDeliveryResult contains information about a successful message delivery.

type ChannelManager

type ChannelManager interface {
	// PublishToChannel publishes a message to a channel.
	// For direct messages, use dynamic channel names (e.g., "channel:instance_id").
	PublishToChannel(ctx context.Context, channelName string, dataJSON []byte, metadata []byte) (int64, error)

	// SubscribeToChannel subscribes an instance to a channel.
	SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

	// UnsubscribeFromChannel unsubscribes an instance from a channel.
	UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

	// GetChannelSubscription retrieves a subscription for an instance and channel.
	GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

	// GetChannelMode retrieves the mode for a channel (from any existing subscription).
	// Returns empty string if no subscriptions exist.
	GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)

	// RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait
	// and releases the workflow lock.
	RegisterChannelReceiveAndReleaseLock(
		ctx context.Context,
		instanceID, channelName, workerID, activityID string,
		timeoutAt *time.Time,
	) error

	// GetPendingChannelMessages retrieves pending messages for a channel after a given ID.
	GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

	// GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber.
	// For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance)
	// For competing mode: Returns unclaimed messages (not yet claimed by any instance)
	GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

	// ClaimChannelMessage claims a message for competing mode (returns false if already claimed).
	ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

	// DeleteChannelMessage deletes a message from the channel.
	DeleteChannelMessage(ctx context.Context, messageID int64) error

	// UpdateDeliveryCursor updates the delivery cursor for broadcast mode.
	UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

	// GetDeliveryCursor gets the current delivery cursor for an instance and channel.
	GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

	// GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel.
	GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

	// ClearChannelWaitingState clears the waiting state for an instance's channel subscription.
	ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

	// DeliverChannelMessage delivers a message to a waiting subscriber (records in history).
	DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

	// DeliverChannelMessageWithLock delivers a message using Lock-First pattern.
	// This is used for load balancing in multi-worker environments:
	// 1. Try to acquire lock on the target instance
	// 2. If lock fails, return (nil, nil) - another worker will handle it
	// 3. Record message in history
	// 4. Update status to 'running'
	// 5. Release lock
	// Returns the delivery info if successful, nil if lock was not acquired.
	DeliverChannelMessageWithLock(
		ctx context.Context,
		instanceID string,
		channelName string,
		message *ChannelMessage,
		workerID string,
		lockTimeoutSec int,
	) (*ChannelDeliveryResult, error)

	// CleanupOldChannelMessages removes old channel messages.
	CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

	// FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.
	// limit specifies the maximum number of subscriptions to return (0 = default 100).
	FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
}

ChannelManager handles channel-based messaging operations.

type ChannelMessage

type ChannelMessage struct {
	ID          int64     `json:"id"`
	Channel     string    `json:"channel"`               // Channel name (unified with Edda)
	MessageID   string    `json:"message_id"`            // UUID for message (Edda compatibility)
	DataType    string    `json:"data_type"`             // "json" or "binary" (Edda compatibility)
	Data        []byte    `json:"data,omitempty"`        // JSON data (unified with Edda)
	DataBinary  []byte    `json:"data_binary,omitempty"` // Binary data
	Metadata    []byte    `json:"metadata,omitempty"`    // JSON-encoded metadata
	PublishedAt time.Time `json:"published_at"`          // When the message was published (unified with Edda)
}

ChannelMessage represents a message in a channel. SendTo uses dynamic channel names (e.g., "channel:instance_id") instead of target_instance_id.

type ChannelMessageClaim

type ChannelMessageClaim struct {
	ID         int64     `json:"id"`
	MessageID  int64     `json:"message_id"`
	InstanceID string    `json:"instance_id"`
	ClaimedAt  time.Time `json:"claimed_at"`
}

ChannelMessageClaim tracks which instance claimed a message in competing mode.

type ChannelMode

type ChannelMode string

ChannelMode represents the subscription mode for a channel.

const (
	ChannelModeBroadcast ChannelMode = "broadcast" // All subscribers receive all messages
	ChannelModeCompeting ChannelMode = "competing" // Each message goes to one subscriber
	ChannelModeDirect    ChannelMode = "direct"    // Direct messages to specific instance (via SendTo)
)

type ChannelSubscription

type ChannelSubscription struct {
	ID              int64       `json:"id"`
	InstanceID      string      `json:"instance_id"`
	Channel         string      `json:"channel"` // Channel name (unified with Edda)
	Mode            ChannelMode `json:"mode"`
	CursorMessageID int64       `json:"cursor_message_id"` // Last delivered message ID (Edda compatibility)
	TimeoutAt       *time.Time  `json:"timeout_at,omitempty"`
	ActivityID      string      `json:"activity_id,omitempty"` // Activity ID for replay matching; non-null = waiting
	SubscribedAt    time.Time   `json:"subscribed_at"`         // When the subscription was created (unified with Edda)
}

ChannelSubscription represents a workflow's subscription to a channel. Waiting state is determined by "activity_id IS NOT NULL" instead of explicit waiting flag.

type CompensationEntry

type CompensationEntry struct {
	ID           int64     `json:"id"`
	InstanceID   string    `json:"instance_id"`
	ActivityID   string    `json:"activity_id"`
	ActivityName string    `json:"activity_name"` // Function name (unified with Edda)
	Args         []byte    `json:"args"`          // JSON-encoded arguments (unified with Edda)
	CreatedAt    time.Time `json:"created_at"`
}

CompensationEntry represents a registered compensation action. Order is determined by created_at DESC (LIFO). Status is tracked via history events (CompensationExecuted, CompensationFailed).

type CompensationManager

type CompensationManager interface {
	// AddCompensation registers a compensation action.
	AddCompensation(ctx context.Context, entry *CompensationEntry) error

	// GetCompensations retrieves compensation entries in LIFO order (by created_at DESC).
	// Status tracking is done via history events (CompensationExecuted, CompensationFailed).
	GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
}

CompensationManager handles saga compensation.

type Driver

type Driver interface {
	// DriverName returns the driver name (e.g., "sqlite", "postgres")
	DriverName() string

	// GetCurrentTimeExpr returns the SQL expression for current time.
	// SQLite: datetime('now'), PostgreSQL: NOW()
	GetCurrentTimeExpr() string

	// MakeDatetimeComparable wraps a datetime column for comparison.
	// SQLite needs datetime() wrapper, PostgreSQL doesn't.
	MakeDatetimeComparable(column string) string

	// SelectForUpdateSkipLocked returns the clause for row locking.
	// PostgreSQL: "FOR UPDATE SKIP LOCKED", SQLite: "" (uses table-level locking)
	SelectForUpdateSkipLocked() string

	// Placeholder returns the placeholder for the nth parameter.
	// SQLite: ?, PostgreSQL: $n
	Placeholder(n int) string

	// OnConflictDoNothing returns the clause for idempotent inserts.
	// SQLite: "ON CONFLICT DO NOTHING", PostgreSQL: "ON CONFLICT DO NOTHING"
	OnConflictDoNothing(conflictColumns ...string) string

	// ReturningClause returns the RETURNING clause for insert/update.
	// SQLite: "", PostgreSQL: "RETURNING id"
	ReturningClause(columns ...string) string

	// JSONExtract returns the SQL expression to extract a value from a JSON column.
	// The path is a dot-separated path like "order.customer.id".
	// The returned expression extracts the value as text.
	JSONExtract(column, path string) string

	// JSONCompare returns the SQL expression to compare a JSON-extracted value
	// with a given value, handling type coercion appropriately.
	// Returns the comparison expression and any additional args needed.
	JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, args []any)
}

Driver abstracts database-specific SQL operations.

func NewDriver

func NewDriver(dbURL string) Driver

NewDriver creates a new driver based on the database URL.

type Executor

type Executor interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

Executor is a database executor interface that can be either *sql.DB or *sql.Tx. This allows users to execute custom SQL queries within the same transaction as the workflow activity.

type GroupManager

type GroupManager interface {
	// JoinGroup adds an instance to a group.
	JoinGroup(ctx context.Context, instanceID, groupName string) error

	// LeaveGroup removes an instance from a group.
	LeaveGroup(ctx context.Context, instanceID, groupName string) error

	// GetGroupMembers retrieves all instance IDs in a group.
	GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

	// LeaveAllGroups removes an instance from all groups.
	LeaveAllGroups(ctx context.Context, instanceID string) error
}

GroupManager handles Erlang pg-style group membership operations.

type GroupMembership

type GroupMembership struct {
	ID         int64     `json:"id"`
	InstanceID string    `json:"instance_id"`
	GroupName  string    `json:"group_name"`
	JoinedAt   time.Time `json:"joined_at"` // When the instance joined the group (unified with Edda)
}

GroupMembership represents a workflow's membership in a group.

type HistoryArchiveManager

type HistoryArchiveManager interface {
	// ArchiveHistory moves all history events for an instance to the archive table.
	ArchiveHistory(ctx context.Context, instanceID string) error

	// GetArchivedHistory retrieves archived history events for an instance.
	GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

	// CleanupInstanceSubscriptions removes all subscriptions for an instance.
	// This is used during recur to clean up stale subscriptions.
	CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
}

HistoryArchiveManager handles history archival for the recur pattern.

type HistoryEvent

type HistoryEvent struct {
	ID              int64            `json:"id"`
	InstanceID      string           `json:"instance_id"`
	ActivityID      string           `json:"activity_id"`
	EventType       HistoryEventType `json:"event_type"`
	EventData       []byte           `json:"event_data"`        // JSON-encoded data
	EventDataBinary []byte           `json:"event_data_binary"` // Binary data (alternative)
	DataType        string           `json:"data_type"`         // "json" or "binary"
	CreatedAt       time.Time        `json:"created_at"`
}

HistoryEvent represents a single event in a workflow's execution history.

type HistoryEventType

type HistoryEventType string

HistoryEventType represents the type of history event. Values are unified with Edda (Python) for cross-language compatibility.

const (
	// Activity events
	HistoryActivityCompleted HistoryEventType = "ActivityCompleted"
	HistoryActivityFailed    HistoryEventType = "ActivityFailed"

	// External event received (via WaitEvent)
	HistoryEventReceived HistoryEventType = "EventReceived"

	// Timer expired (via Sleep/SleepUntil)
	HistoryTimerExpired HistoryEventType = "TimerExpired"

	// Channel message received (via Receive)
	HistoryChannelMessageReceived HistoryEventType = "ChannelMessageReceived"

	// Message receive timeout
	HistoryMessageTimeout HistoryEventType = "MessageTimeout"

	// Compensation events
	HistoryCompensationExecuted HistoryEventType = "CompensationExecuted"
	HistoryCompensationFailed   HistoryEventType = "CompensationFailed"

	// Workflow lifecycle events
	HistoryWorkflowFailed    HistoryEventType = "WorkflowFailed"
	HistoryWorkflowCancelled HistoryEventType = "WorkflowCancelled"
)

type HistoryIterator

type HistoryIterator struct {
	// contains filtered or unexported fields
}

HistoryIterator provides streaming access to workflow history events. This avoids loading the entire history into memory at once.

func NewHistoryIterator

func NewHistoryIterator(ctx context.Context, storage Storage, instanceID string, opts *HistoryIteratorOptions) *HistoryIterator

NewHistoryIterator creates a new history iterator.

func (*HistoryIterator) Close

func (it *HistoryIterator) Close() error

Close releases any resources held by the iterator. Currently a no-op, but provided for future compatibility.

func (*HistoryIterator) Collect

func (it *HistoryIterator) Collect() ([]*HistoryEvent, error)

Collect reads all remaining events into a slice. Use with caution for large histories - prefer iterating with Next().

func (*HistoryIterator) Err

func (it *HistoryIterator) Err() error

Err returns any error that occurred during iteration.

func (*HistoryIterator) Next

func (it *HistoryIterator) Next() (*HistoryEvent, bool)

Next returns the next history event. Returns (event, true) if an event is available, (nil, false) if no more events. Check Err() after iteration to see if an error occurred.

type HistoryIteratorOptions

type HistoryIteratorOptions struct {
	// BatchSize is the number of events to fetch per batch.
	// Default: 1000
	BatchSize int
}

HistoryIteratorOptions configures the history iterator.

type HistoryManager

type HistoryManager interface {
	// AppendHistory adds a new history event.
	AppendHistory(ctx context.Context, event *HistoryEvent) error

	// GetHistoryPaginated retrieves history events for an instance with pagination.
	// Returns events with id > afterID, up to limit events.
	// Returns (events, hasMore, error).
	GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

	// GetHistoryCount returns the total number of history events for an instance.
	GetHistoryCount(ctx context.Context, instanceID string) (int64, error)
}

HistoryManager handles workflow execution history.

type InputFilterBuilder

type InputFilterBuilder struct {
	// contains filtered or unexported fields
}

InputFilterBuilder builds SQL query conditions for input filters.

func NewInputFilterBuilder

func NewInputFilterBuilder(driver Driver) *InputFilterBuilder

NewInputFilterBuilder creates a new InputFilterBuilder.

func (*InputFilterBuilder) BuildFilterQuery

func (b *InputFilterBuilder) BuildFilterQuery(inputFilters map[string]any, startPlaceholder int) (conditions []string, args []any, err error)

BuildFilterQuery builds the WHERE clause conditions for input filters. Returns the query parts (as a slice of conditions) and args to append, or an error if validation fails. The startPlaceholder is the starting placeholder number (for PostgreSQL).

type InstanceManager

type InstanceManager interface {
	// CreateInstance creates a new workflow instance.
	CreateInstance(ctx context.Context, instance *WorkflowInstance) error

	// GetInstance retrieves a workflow instance by ID.
	GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

	// UpdateInstanceStatus updates the status and related fields.
	UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

	// UpdateInstanceActivity updates the current activity ID.
	UpdateInstanceActivity(ctx context.Context, instanceID string, activityID string) error

	// UpdateInstanceOutput updates the output data for a completed workflow.
	UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

	// CancelInstance marks a workflow as cancelled.
	CancelInstance(ctx context.Context, instanceID string, reason string) error

	// ListInstances lists workflow instances with optional filtering and pagination.
	// Returns PaginationResult with cursor-based pagination.
	ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)

	// FindResumableWorkflows finds workflows that are ready to be resumed.
	// Returns workflows with status='running' that don't have an active lock.
	// These are typically workflows that had a message delivered and are waiting
	// for a worker to pick them up and resume execution.
	// limit specifies the maximum number of workflows to return (0 = default 100).
	FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
}

InstanceManager handles workflow instance CRUD operations.

type ListInstancesOptions

type ListInstancesOptions struct {
	// Pagination
	Limit     int    // Page size (default: 50)
	PageToken string // Cursor token for pagination (format: "ISO_DATETIME||INSTANCE_ID")

	// Filters
	StatusFilter       WorkflowStatus // Filter by status
	WorkflowNameFilter string         // Filter by workflow name (partial match, case-insensitive)
	InstanceIDFilter   string         // Filter by instance ID (partial match, case-insensitive)
	StartedAfter       *time.Time     // Filter instances started after this time
	StartedBefore      *time.Time     // Filter instances started before this time
	InputFilters       map[string]any // Filter by JSON paths in input data (exact match)

	// Deprecated: Use StatusFilter instead
	Status WorkflowStatus
	// Deprecated: Use WorkflowNameFilter instead
	WorkflowName string
	// Deprecated: Use PageToken instead
	Offset int
}

ListInstancesOptions defines options for listing instances.

type LockManager

type LockManager interface {
	// TryAcquireLock attempts to acquire a lock on a workflow instance.
	// Returns true if the lock was acquired, false if already locked.
	TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

	// ReleaseLock releases the lock on a workflow instance.
	ReleaseLock(ctx context.Context, instanceID, workerID string) error

	// RefreshLock extends the lock timeout.
	RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

	// CleanupStaleLocks releases locks that have expired and returns
	// information about running workflows that need to be resumed.
	CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
}

LockManager handles distributed locking operations.

type MySQLDriver

type MySQLDriver struct{}

MySQLDriver implements Driver for MySQL 8.0+.

func (*MySQLDriver) DriverName

func (d *MySQLDriver) DriverName() string

func (*MySQLDriver) GetCurrentTimeExpr

func (d *MySQLDriver) GetCurrentTimeExpr() string

func (*MySQLDriver) JSONCompare

func (d *MySQLDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)

JSONCompare for MySQLDriver returns the SQL expression to compare a JSON-extracted value.

func (*MySQLDriver) JSONExtract

func (d *MySQLDriver) JSONExtract(column, path string) string

JSONExtract for MySQLDriver returns the SQL expression to extract a value from a JSON column.

func (*MySQLDriver) MakeDatetimeComparable

func (d *MySQLDriver) MakeDatetimeComparable(column string) string

func (*MySQLDriver) OnConflictDoNothing

func (d *MySQLDriver) OnConflictDoNothing(conflictColumns ...string) string

func (*MySQLDriver) Placeholder

func (d *MySQLDriver) Placeholder(n int) string

func (*MySQLDriver) ReturningClause

func (d *MySQLDriver) ReturningClause(columns ...string) string

func (*MySQLDriver) SelectForUpdateSkipLocked

func (d *MySQLDriver) SelectForUpdateSkipLocked() string

type MySQLStorage

type MySQLStorage struct {
	// contains filtered or unexported fields
}

MySQLStorage implements the Storage interface using MySQL 8.0+.

func NewMySQLStorage

func NewMySQLStorage(connStr string) (*MySQLStorage, error)

NewMySQLStorage creates a new MySQL storage. The connStr can be either a MySQL DSN or a URL format: URL format: "mysql://user:password@localhost:3306/dbname" DSN format: "user:password@tcp(localhost:3306)/dbname"

func (*MySQLStorage) AddCompensation

func (s *MySQLStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error

AddCompensation adds a compensation entry.

func (*MySQLStorage) AddOutboxEvent

func (s *MySQLStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

AddOutboxEvent adds an event to the outbox.

func (*MySQLStorage) AppendHistory

func (s *MySQLStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error

AppendHistory appends a history event.

func (*MySQLStorage) ArchiveHistory

func (s *MySQLStorage) ArchiveHistory(ctx context.Context, instanceID string) error

ArchiveHistory moves all history events for an instance to the archive table.

func (*MySQLStorage) BeginTransaction

func (s *MySQLStorage) BeginTransaction(ctx context.Context) (context.Context, error)

BeginTransaction starts a new transaction.

func (*MySQLStorage) CancelInstance

func (s *MySQLStorage) CancelInstance(ctx context.Context, instanceID, reason string) error

CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.

func (*MySQLStorage) ClaimChannelMessage

func (s *MySQLStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

ClaimChannelMessage claims a message for competing mode.

func (*MySQLStorage) CleanupExpiredSystemLocks

func (s *MySQLStorage) CleanupExpiredSystemLocks(ctx context.Context) error

CleanupExpiredSystemLocks removes expired system locks.

func (*MySQLStorage) CleanupInstanceSubscriptions

func (s *MySQLStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error

CleanupInstanceSubscriptions removes all subscriptions for an instance. Note: workflow_event_subscriptions was removed in schema unification; events now use channel subscriptions.

func (*MySQLStorage) CleanupOldChannelMessages

func (s *MySQLStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

CleanupOldChannelMessages removes old channel messages.

func (*MySQLStorage) CleanupOldOutboxEvents

func (s *MySQLStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldOutboxEvents removes old published events.

func (*MySQLStorage) CleanupStaleLocks

func (s *MySQLStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)

CleanupStaleLocks cleans up expired locks and returns workflows to resume.

func (*MySQLStorage) ClearChannelWaitingState

func (s *MySQLStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

ClearChannelWaitingState clears the waiting state for an instance's channel subscription.

func (*MySQLStorage) Close

func (s *MySQLStorage) Close() error

Close closes the database connection.

func (*MySQLStorage) CommitTransaction

func (s *MySQLStorage) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction.

func (*MySQLStorage) Conn

func (s *MySQLStorage) Conn(ctx context.Context) Executor

Conn returns the database executor for the current context.

func (*MySQLStorage) CreateInstance

func (s *MySQLStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error

CreateInstance creates a new workflow instance.

func (*MySQLStorage) DB

func (s *MySQLStorage) DB() *sql.DB

DB returns the underlying database connection.

func (*MySQLStorage) DeleteChannelMessage

func (s *MySQLStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error

DeleteChannelMessage deletes a message from the channel.

func (*MySQLStorage) DeliverChannelMessage

func (s *MySQLStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

DeliverChannelMessage delivers a message to a waiting subscriber.

func (*MySQLStorage) DeliverChannelMessageWithLock

func (s *MySQLStorage) DeliverChannelMessageWithLock(
	ctx context.Context,
	instanceID string,
	channelName string,
	message *ChannelMessage,
	workerID string,
	lockTimeoutSec int,
) (*ChannelDeliveryResult, error)

DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).

func (*MySQLStorage) FindExpiredChannelSubscriptions

func (s *MySQLStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)

FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.

func (*MySQLStorage) FindExpiredTimers

func (s *MySQLStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

FindExpiredTimers finds expired timers.

func (*MySQLStorage) FindResumableWorkflows

func (s *MySQLStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)

FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.

func (*MySQLStorage) GetArchivedHistory

func (s *MySQLStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

GetArchivedHistory retrieves archived history events for an instance.

func (*MySQLStorage) GetChannelMode added in v0.6.0

func (s *MySQLStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)

GetChannelMode retrieves the mode for a channel (from any existing subscription). Returns empty string if no subscriptions exist.

func (*MySQLStorage) GetChannelSubscribersWaiting

func (s *MySQLStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel. Returns all waiting subscribers regardless of framework - delivery is handled by Lock-First pattern.

func (*MySQLStorage) GetChannelSubscription

func (s *MySQLStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

GetChannelSubscription retrieves a subscription for an instance and channel.

func (*MySQLStorage) GetCompensations

func (s *MySQLStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

GetCompensations retrieves compensations for a workflow in LIFO order.

func (*MySQLStorage) GetDeliveryCursor

func (s *MySQLStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

GetDeliveryCursor gets the current delivery cursor for an instance and channel.

func (*MySQLStorage) GetGroupMembers

func (s *MySQLStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

GetGroupMembers retrieves all instance IDs in a group.

func (*MySQLStorage) GetHistoryCount

func (s *MySQLStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)

GetHistoryCount returns the total number of history events for an instance.

func (*MySQLStorage) GetHistoryPaginated

func (s *MySQLStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).

func (*MySQLStorage) GetInstance

func (s *MySQLStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*MySQLStorage) GetPendingChannelMessages

func (s *MySQLStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

GetPendingChannelMessages retrieves pending messages for a channel after a given ID.

func (*MySQLStorage) GetPendingChannelMessagesForInstance

func (s *MySQLStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)

func (*MySQLStorage) GetPendingOutboxEvents

func (s *MySQLStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

GetPendingOutboxEvents retrieves pending outbox events. Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.

func (*MySQLStorage) InTransaction

func (s *MySQLStorage) InTransaction(ctx context.Context) bool

InTransaction returns whether a transaction is in progress.

func (*MySQLStorage) IncrementOutboxAttempts

func (s *MySQLStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error

IncrementOutboxAttempts increments the attempt count for an event.

func (*MySQLStorage) Initialize

func (s *MySQLStorage) Initialize(ctx context.Context) error

Initialize is deprecated. Use dbmate for migrations: dbmate -d schema/db/migrations/mysql up

func (*MySQLStorage) JoinGroup

func (s *MySQLStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error

JoinGroup adds an instance to a group.

func (*MySQLStorage) LeaveAllGroups

func (s *MySQLStorage) LeaveAllGroups(ctx context.Context, instanceID string) error

LeaveAllGroups removes an instance from all groups.

func (*MySQLStorage) LeaveGroup

func (s *MySQLStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error

LeaveGroup removes an instance from a group.

func (*MySQLStorage) ListInstances

func (s *MySQLStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)

ListInstances lists workflow instances with cursor-based pagination and filters.

func (*MySQLStorage) MarkOutboxEventFailed

func (s *MySQLStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error

MarkOutboxEventFailed marks an outbox event as failed.

func (*MySQLStorage) MarkOutboxEventSent

func (s *MySQLStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error

MarkOutboxEventSent marks an outbox event as published.

func (*MySQLStorage) PublishToChannel

func (s *MySQLStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)

PublishToChannel publishes a message to a channel.

func (*MySQLStorage) RefreshLock

func (s *MySQLStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

RefreshLock extends the lock expiration time.

func (*MySQLStorage) RegisterChannelReceiveAndReleaseLock

func (s *MySQLStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error

RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.

func (*MySQLStorage) RegisterPostCommitCallback

func (s *MySQLStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error

RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.

func (*MySQLStorage) RegisterTimerSubscription

func (s *MySQLStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

RegisterTimerSubscription registers a timer for a workflow.

func (*MySQLStorage) RegisterTimerSubscriptionAndReleaseLock

func (s *MySQLStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error

RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.

func (*MySQLStorage) ReleaseLock

func (s *MySQLStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error

ReleaseLock releases a lock on a workflow instance.

func (*MySQLStorage) ReleaseSystemLock

func (s *MySQLStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

ReleaseSystemLock releases a system lock.

func (*MySQLStorage) RemoveTimerSubscription

func (s *MySQLStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error

RemoveTimerSubscription removes a timer subscription.

func (*MySQLStorage) RollbackTransaction

func (s *MySQLStorage) RollbackTransaction(ctx context.Context) error

RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.

func (*MySQLStorage) SubscribeToChannel

func (s *MySQLStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.

func (*MySQLStorage) TryAcquireLock

func (s *MySQLStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

TryAcquireLock attempts to acquire a lock on a workflow instance.

func (*MySQLStorage) TryAcquireSystemLock

func (s *MySQLStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

TryAcquireSystemLock attempts to acquire a system lock.

func (*MySQLStorage) UnsubscribeFromChannel

func (s *MySQLStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

UnsubscribeFromChannel unsubscribes an instance from a channel.

func (*MySQLStorage) UpdateDeliveryCursor

func (s *MySQLStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

UpdateDeliveryCursor updates the delivery cursor for broadcast mode.

func (*MySQLStorage) UpdateInstanceActivity

func (s *MySQLStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error

UpdateInstanceActivity updates the current activity ID.

func (*MySQLStorage) UpdateInstanceOutput

func (s *MySQLStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

UpdateInstanceOutput updates the output data.

func (*MySQLStorage) UpdateInstanceStatus

func (s *MySQLStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

UpdateInstanceStatus updates the status of a workflow instance.

type OutboxEvent

type OutboxEvent struct {
	EventID         string     `json:"event_id"`          // CloudEvents ID (PRIMARY KEY)
	EventType       string     `json:"event_type"`        // CloudEvents type
	EventSource     string     `json:"event_source"`      // CloudEvents source
	EventData       []byte     `json:"event_data"`        // JSON payload
	EventDataBinary []byte     `json:"event_data_binary"` // Binary payload (Edda compatibility)
	DataType        string     `json:"data_type"`         // "json" or "binary"
	ContentType     string     `json:"content_type"`      // e.g., "application/json"
	Status          string     `json:"status"`            // "pending", "processing", "published", "failed"
	RetryCount      int        `json:"retry_count"`       // Number of retry attempts (unified with Edda)
	LastError       string     `json:"last_error"`        // Last error message (Edda compatibility)
	PublishedAt     *time.Time `json:"published_at"`      // When the event was published (Edda compatibility)
	CreatedAt       time.Time  `json:"created_at"`
	UpdatedAt       time.Time  `json:"updated_at"`
}

OutboxEvent represents an event in the transactional outbox. Uses event_id (TEXT) as primary key for distributed DB compatibility (Spanner, CockroachDB).

type OutboxManager

type OutboxManager interface {
	// AddOutboxEvent adds an event to the outbox.
	AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

	// GetPendingOutboxEvents retrieves pending events for sending.
	// Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.
	GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

	// MarkOutboxEventSent marks an event as successfully sent.
	MarkOutboxEventSent(ctx context.Context, eventID string) error

	// MarkOutboxEventFailed marks an event as failed to send.
	MarkOutboxEventFailed(ctx context.Context, eventID string) error

	// IncrementOutboxAttempts increments the attempt count for an event.
	IncrementOutboxAttempts(ctx context.Context, eventID string) error

	// CleanupOldOutboxEvents removes old sent events.
	CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
}

OutboxManager handles the transactional outbox.

type PaginationResult

type PaginationResult struct {
	Instances     []*WorkflowInstance `json:"instances"`
	NextPageToken string              `json:"next_page_token,omitempty"`
	HasMore       bool                `json:"has_more"`
}

PaginationResult represents a paginated list of workflow instances.

type PostgresDriver

type PostgresDriver struct{}

PostgresDriver implements Driver for PostgreSQL.

func (*PostgresDriver) DriverName

func (d *PostgresDriver) DriverName() string

func (*PostgresDriver) GetCurrentTimeExpr

func (d *PostgresDriver) GetCurrentTimeExpr() string

func (*PostgresDriver) JSONCompare

func (d *PostgresDriver) JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, compArgs []any)

JSONCompare for PostgresDriver returns the SQL expression to compare a JSON-extracted value.

func (*PostgresDriver) JSONExtract

func (d *PostgresDriver) JSONExtract(column, path string) string

JSONExtract for PostgresDriver returns the SQL expression to extract a value from a JSON column.

func (*PostgresDriver) MakeDatetimeComparable

func (d *PostgresDriver) MakeDatetimeComparable(column string) string

func (*PostgresDriver) OnConflictDoNothing

func (d *PostgresDriver) OnConflictDoNothing(conflictColumns ...string) string

func (*PostgresDriver) Placeholder

func (d *PostgresDriver) Placeholder(n int) string

func (*PostgresDriver) ReturningClause

func (d *PostgresDriver) ReturningClause(columns ...string) string

func (*PostgresDriver) SelectForUpdateSkipLocked

func (d *PostgresDriver) SelectForUpdateSkipLocked() string

type PostgresStorage

type PostgresStorage struct {
	// contains filtered or unexported fields
}

PostgresStorage implements the Storage interface using PostgreSQL.

func NewPostgresStorage

func NewPostgresStorage(connStr string) (*PostgresStorage, error)

NewPostgresStorage creates a new PostgreSQL storage. The connStr should be a PostgreSQL connection string: "postgres://user:password@localhost:5432/dbname?sslmode=disable"

func (*PostgresStorage) AddCompensation

func (s *PostgresStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error

AddCompensation adds a compensation entry.

func (*PostgresStorage) AddOutboxEvent

func (s *PostgresStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

AddOutboxEvent adds an event to the outbox.

func (*PostgresStorage) AppendHistory

func (s *PostgresStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error

AppendHistory appends a history event.

func (*PostgresStorage) ArchiveHistory

func (s *PostgresStorage) ArchiveHistory(ctx context.Context, instanceID string) error

ArchiveHistory moves all history events for an instance to the archive table.

func (*PostgresStorage) BeginTransaction

func (s *PostgresStorage) BeginTransaction(ctx context.Context) (context.Context, error)

BeginTransaction starts a new transaction.

func (*PostgresStorage) CancelInstance

func (s *PostgresStorage) CancelInstance(ctx context.Context, instanceID, reason string) error

CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.

func (*PostgresStorage) ClaimChannelMessage

func (s *PostgresStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

ClaimChannelMessage claims a message for competing mode.

func (*PostgresStorage) CleanupExpiredSystemLocks

func (s *PostgresStorage) CleanupExpiredSystemLocks(ctx context.Context) error

CleanupExpiredSystemLocks removes expired system locks.

func (*PostgresStorage) CleanupInstanceSubscriptions

func (s *PostgresStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error

CleanupInstanceSubscriptions removes all subscriptions for an instance. Note: workflow_event_subscriptions was removed in schema unification; events now use channel subscriptions.

func (*PostgresStorage) CleanupOldChannelMessages

func (s *PostgresStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

CleanupOldChannelMessages removes old channel messages.

func (*PostgresStorage) CleanupOldOutboxEvents

func (s *PostgresStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldOutboxEvents removes old published events.

func (*PostgresStorage) CleanupStaleLocks

func (s *PostgresStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)

CleanupStaleLocks cleans up expired locks and returns workflows to resume.

func (*PostgresStorage) ClearChannelWaitingState

func (s *PostgresStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

ClearChannelWaitingState clears the waiting state for an instance's channel subscription.

func (*PostgresStorage) Close

func (s *PostgresStorage) Close() error

Close closes the database connection.

func (*PostgresStorage) CommitTransaction

func (s *PostgresStorage) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction.

func (*PostgresStorage) Conn

func (s *PostgresStorage) Conn(ctx context.Context) Executor

Conn returns the database executor for the current context.

func (*PostgresStorage) CreateInstance

func (s *PostgresStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error

CreateInstance creates a new workflow instance.

func (*PostgresStorage) DB

func (s *PostgresStorage) DB() *sql.DB

DB returns the underlying database connection.

func (*PostgresStorage) DeleteChannelMessage

func (s *PostgresStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error

DeleteChannelMessage deletes a message from the channel.

func (*PostgresStorage) DeliverChannelMessage

func (s *PostgresStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

DeliverChannelMessage delivers a message to a waiting subscriber.

func (*PostgresStorage) DeliverChannelMessageWithLock

func (s *PostgresStorage) DeliverChannelMessageWithLock(
	ctx context.Context,
	instanceID string,
	channelName string,
	message *ChannelMessage,
	workerID string,
	lockTimeoutSec int,
) (*ChannelDeliveryResult, error)

DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).

func (*PostgresStorage) FindExpiredChannelSubscriptions

func (s *PostgresStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)

FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.

func (*PostgresStorage) FindExpiredTimers

func (s *PostgresStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

FindExpiredTimers finds expired timers.

func (*PostgresStorage) FindResumableWorkflows

func (s *PostgresStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)

FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.

func (*PostgresStorage) GetArchivedHistory

func (s *PostgresStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

GetArchivedHistory retrieves archived history events for an instance.

func (*PostgresStorage) GetChannelMode added in v0.6.0

func (s *PostgresStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)

GetChannelMode retrieves the mode for a channel (from any existing subscription). Returns empty string if no subscriptions exist.

func (*PostgresStorage) GetChannelSubscribersWaiting

func (s *PostgresStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel. Returns all waiting subscribers regardless of framework - delivery is handled by Lock-First pattern.

func (*PostgresStorage) GetChannelSubscription

func (s *PostgresStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

GetChannelSubscription retrieves a subscription for an instance and channel.

func (*PostgresStorage) GetCompensations

func (s *PostgresStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

GetCompensations retrieves compensations for a workflow in LIFO order.

func (*PostgresStorage) GetDeliveryCursor

func (s *PostgresStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

GetDeliveryCursor gets the current delivery cursor for an instance and channel.

func (*PostgresStorage) GetGroupMembers

func (s *PostgresStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

GetGroupMembers retrieves all instance IDs in a group.

func (*PostgresStorage) GetHistoryCount

func (s *PostgresStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)

GetHistoryCount returns the total number of history events for an instance.

func (*PostgresStorage) GetHistoryPaginated

func (s *PostgresStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).

func (*PostgresStorage) GetInstance

func (s *PostgresStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*PostgresStorage) GetPendingChannelMessages

func (s *PostgresStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

GetPendingChannelMessages retrieves pending messages for a channel after a given ID.

func (*PostgresStorage) GetPendingChannelMessagesForInstance

func (s *PostgresStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)

func (*PostgresStorage) GetPendingOutboxEvents

func (s *PostgresStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

GetPendingOutboxEvents retrieves pending outbox events. Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.

func (*PostgresStorage) InTransaction

func (s *PostgresStorage) InTransaction(ctx context.Context) bool

InTransaction returns whether a transaction is in progress.

func (*PostgresStorage) IncrementOutboxAttempts

func (s *PostgresStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error

IncrementOutboxAttempts increments the attempt count for an event.

func (*PostgresStorage) Initialize

func (s *PostgresStorage) Initialize(ctx context.Context) error

Initialize is deprecated. Use dbmate for migrations: dbmate -d schema/db/migrations/postgresql up

func (*PostgresStorage) IsNotifyEnabled

func (s *PostgresStorage) IsNotifyEnabled() bool

IsNotifyEnabled returns whether PostgreSQL LISTEN/NOTIFY is enabled.

func (*PostgresStorage) JoinGroup

func (s *PostgresStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error

JoinGroup adds an instance to a group.

func (*PostgresStorage) LeaveAllGroups

func (s *PostgresStorage) LeaveAllGroups(ctx context.Context, instanceID string) error

LeaveAllGroups removes an instance from all groups.

func (*PostgresStorage) LeaveGroup

func (s *PostgresStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error

LeaveGroup removes an instance from a group.

func (*PostgresStorage) ListInstances

ListInstances lists workflow instances with cursor-based pagination and filters.

func (*PostgresStorage) MarkOutboxEventFailed

func (s *PostgresStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error

MarkOutboxEventFailed marks an outbox event as failed.

func (*PostgresStorage) MarkOutboxEventSent

func (s *PostgresStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error

MarkOutboxEventSent marks an outbox event as published.

func (*PostgresStorage) PublishToChannel

func (s *PostgresStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)

PublishToChannel publishes a message to a channel.

func (*PostgresStorage) RefreshLock

func (s *PostgresStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

RefreshLock extends the lock expiration time.

func (*PostgresStorage) RegisterChannelReceiveAndReleaseLock

func (s *PostgresStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error

RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.

func (*PostgresStorage) RegisterPostCommitCallback

func (s *PostgresStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error

RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.

func (*PostgresStorage) RegisterTimerSubscription

func (s *PostgresStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

RegisterTimerSubscription registers a timer for a workflow.

func (*PostgresStorage) RegisterTimerSubscriptionAndReleaseLock

func (s *PostgresStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error

RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.

func (*PostgresStorage) ReleaseLock

func (s *PostgresStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error

ReleaseLock releases a lock on a workflow instance.

func (*PostgresStorage) ReleaseSystemLock

func (s *PostgresStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

ReleaseSystemLock releases a system lock.

func (*PostgresStorage) RemoveTimerSubscription

func (s *PostgresStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error

RemoveTimerSubscription removes a timer subscription.

func (*PostgresStorage) RollbackTransaction

func (s *PostgresStorage) RollbackTransaction(ctx context.Context) error

RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.

func (*PostgresStorage) SetNotifyEnabled

func (s *PostgresStorage) SetNotifyEnabled(enabled bool)

SetNotifyEnabled enables or disables PostgreSQL LISTEN/NOTIFY. When enabled, the storage will send pg_notify() calls after key operations.

func (*PostgresStorage) SubscribeToChannel

func (s *PostgresStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.

func (*PostgresStorage) TryAcquireLock

func (s *PostgresStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

TryAcquireLock attempts to acquire a lock on a workflow instance.

func (*PostgresStorage) TryAcquireSystemLock

func (s *PostgresStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

TryAcquireSystemLock attempts to acquire a system lock.

func (*PostgresStorage) UnsubscribeFromChannel

func (s *PostgresStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

UnsubscribeFromChannel unsubscribes an instance from a channel.

func (*PostgresStorage) UpdateDeliveryCursor

func (s *PostgresStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

UpdateDeliveryCursor updates the delivery cursor for broadcast mode.

func (*PostgresStorage) UpdateInstanceActivity

func (s *PostgresStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error

UpdateInstanceActivity updates the current activity ID.

func (*PostgresStorage) UpdateInstanceOutput

func (s *PostgresStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

UpdateInstanceOutput updates the output data.

func (*PostgresStorage) UpdateInstanceStatus

func (s *PostgresStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

UpdateInstanceStatus updates the status of a workflow instance.

type ResumableWorkflow

type ResumableWorkflow struct {
	InstanceID   string
	WorkflowName string
}

ResumableWorkflow contains information about a workflow ready to be resumed. These are workflows with status='running' that don't have an active lock.

type SQLiteDriver

type SQLiteDriver struct{}

SQLiteDriver implements Driver for SQLite.

func (*SQLiteDriver) DriverName

func (d *SQLiteDriver) DriverName() string

func (*SQLiteDriver) GetCurrentTimeExpr

func (d *SQLiteDriver) GetCurrentTimeExpr() string

func (*SQLiteDriver) JSONCompare

func (d *SQLiteDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)

JSONCompare for SQLiteDriver returns the SQL expression to compare a JSON-extracted value.

func (*SQLiteDriver) JSONExtract

func (d *SQLiteDriver) JSONExtract(column, path string) string

JSONExtract for SQLiteDriver returns the SQL expression to extract a value from a JSON column.

func (*SQLiteDriver) MakeDatetimeComparable

func (d *SQLiteDriver) MakeDatetimeComparable(column string) string

func (*SQLiteDriver) OnConflictDoNothing

func (d *SQLiteDriver) OnConflictDoNothing(conflictColumns ...string) string

func (*SQLiteDriver) Placeholder

func (d *SQLiteDriver) Placeholder(n int) string

func (*SQLiteDriver) ReturningClause

func (d *SQLiteDriver) ReturningClause(columns ...string) string

func (*SQLiteDriver) SelectForUpdateSkipLocked

func (d *SQLiteDriver) SelectForUpdateSkipLocked() string

type SQLiteStorage

type SQLiteStorage struct {
	// contains filtered or unexported fields
}

SQLiteStorage implements the Storage interface using SQLite.

func NewSQLiteStorage

func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error)

NewSQLiteStorage creates a new SQLite storage.

func (*SQLiteStorage) AddCompensation

func (s *SQLiteStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error

AddCompensation adds a compensation entry. Order is determined by created_at DESC (LIFO). Uses millisecond-precision timestamp for reliable ordering.

func (*SQLiteStorage) AddOutboxEvent

func (s *SQLiteStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error

AddOutboxEvent adds an event to the outbox.

func (*SQLiteStorage) AppendHistory

func (s *SQLiteStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error

AppendHistory appends a history event.

func (*SQLiteStorage) ArchiveHistory

func (s *SQLiteStorage) ArchiveHistory(ctx context.Context, instanceID string) error

ArchiveHistory moves all history events for an instance to the archive table.

func (*SQLiteStorage) BeginTransaction

func (s *SQLiteStorage) BeginTransaction(ctx context.Context) (context.Context, error)

BeginTransaction starts a new transaction.

func (*SQLiteStorage) CancelInstance

func (s *SQLiteStorage) CancelInstance(ctx context.Context, instanceID, reason string) error

CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist. Note: reason parameter is kept for interface compatibility but is ignored. Cancellation reasons are tracked via history events.

func (*SQLiteStorage) ClaimChannelMessage

func (s *SQLiteStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)

ClaimChannelMessage claims a message for competing mode.

func (*SQLiteStorage) CleanupExpiredSystemLocks

func (s *SQLiteStorage) CleanupExpiredSystemLocks(ctx context.Context) error

CleanupExpiredSystemLocks removes expired system locks.

func (*SQLiteStorage) CleanupInstanceSubscriptions

func (s *SQLiteStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error

CleanupInstanceSubscriptions removes all subscriptions for an instance. Note: workflow_event_subscriptions was removed in schema unification; events now use channel subscriptions.

func (*SQLiteStorage) CleanupOldChannelMessages

func (s *SQLiteStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error

CleanupOldChannelMessages removes old channel messages.

func (*SQLiteStorage) CleanupOldOutboxEvents

func (s *SQLiteStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldOutboxEvents removes old published events.

func (*SQLiteStorage) CleanupStaleLocks

func (s *SQLiteStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)

CleanupStaleLocks cleans up expired locks and returns workflows to resume.

func (*SQLiteStorage) ClearChannelWaitingState

func (s *SQLiteStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error

ClearChannelWaitingState clears the waiting state for an instance's channel subscription. Waiting state is cleared by setting activity_id to NULL.

func (*SQLiteStorage) Close

func (s *SQLiteStorage) Close() error

Close closes the database connection.

func (*SQLiteStorage) CommitTransaction

func (s *SQLiteStorage) CommitTransaction(ctx context.Context) error

CommitTransaction commits the current transaction.

func (*SQLiteStorage) Conn

func (s *SQLiteStorage) Conn(ctx context.Context) Executor

Conn returns the database executor for the current context. If a transaction is active, returns the transaction; otherwise, returns the database. This allows users to execute custom SQL queries within the same transaction as the workflow activity when using transactional activities.

Example:

conn := ctx.Storage().Conn(ctx.Context())
_, err := conn.ExecContext(ctx.Context(), "INSERT INTO orders ...", ...)

func (*SQLiteStorage) CreateInstance

func (s *SQLiteStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error

CreateInstance creates a new workflow instance.

func (*SQLiteStorage) DB

func (s *SQLiteStorage) DB() *sql.DB

DB returns the underlying database connection.

func (*SQLiteStorage) DeleteChannelMessage

func (s *SQLiteStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error

DeleteChannelMessage deletes a message from the channel.

func (*SQLiteStorage) DeliverChannelMessage

func (s *SQLiteStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error

DeliverChannelMessage delivers a message to a waiting subscriber.

func (*SQLiteStorage) DeliverChannelMessageWithLock

func (s *SQLiteStorage) DeliverChannelMessageWithLock(
	ctx context.Context,
	instanceID string,
	channelName string,
	message *ChannelMessage,
	workerID string,
	lockTimeoutSec int,
) (*ChannelDeliveryResult, error)

DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).

func (*SQLiteStorage) FindExpiredChannelSubscriptions

func (s *SQLiteStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)

FindExpiredChannelSubscriptions finds channel subscriptions that have timed out. Waiting state is determined by activity_id IS NOT NULL.

func (*SQLiteStorage) FindExpiredTimers

func (s *SQLiteStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

FindExpiredTimers finds expired timers.

func (*SQLiteStorage) FindResumableWorkflows

func (s *SQLiteStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)

FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.

func (*SQLiteStorage) GetArchivedHistory

func (s *SQLiteStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)

GetArchivedHistory retrieves archived history events for an instance.

func (*SQLiteStorage) GetChannelMode added in v0.6.0

func (s *SQLiteStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)

GetChannelMode retrieves the mode for a channel (from any existing subscription). Returns empty string if no subscriptions exist.

func (*SQLiteStorage) GetChannelSubscribersWaiting

func (s *SQLiteStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)

GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel. Waiting state is determined by activity_id IS NOT NULL. Returns all waiting subscribers regardless of framework - delivery is handled by Lock-First pattern.

func (*SQLiteStorage) GetChannelSubscription

func (s *SQLiteStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)

GetChannelSubscription retrieves a subscription for an instance and channel.

func (*SQLiteStorage) GetCompensations

func (s *SQLiteStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)

GetCompensations retrieves compensations for a workflow in LIFO order (by created_at DESC). Uses millisecond-precision timestamps set by AddCompensation for reliable ordering. Status tracking is done via history events (CompensationExecuted, CompensationFailed).

func (*SQLiteStorage) GetDeliveryCursor

func (s *SQLiteStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)

GetDeliveryCursor gets the current delivery cursor for an instance and channel.

func (*SQLiteStorage) GetGroupMembers

func (s *SQLiteStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)

GetGroupMembers retrieves all instance IDs in a group.

func (*SQLiteStorage) GetHistoryCount

func (s *SQLiteStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)

GetHistoryCount returns the total number of history events for an instance.

func (*SQLiteStorage) GetHistoryPaginated

func (s *SQLiteStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)

GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).

func (*SQLiteStorage) GetInstance

func (s *SQLiteStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)

GetInstance retrieves a workflow instance by ID.

func (*SQLiteStorage) GetPendingChannelMessages

func (s *SQLiteStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)

GetPendingChannelMessages retrieves pending messages for a channel after a given ID.

func (*SQLiteStorage) GetPendingChannelMessagesForInstance

func (s *SQLiteStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)

GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)

func (*SQLiteStorage) GetPendingOutboxEvents

func (s *SQLiteStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)

GetPendingOutboxEvents retrieves pending outbox events.

func (*SQLiteStorage) InTransaction

func (s *SQLiteStorage) InTransaction(ctx context.Context) bool

InTransaction returns whether a transaction is in progress.

func (*SQLiteStorage) IncrementOutboxAttempts

func (s *SQLiteStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error

IncrementOutboxAttempts increments the attempt count for an event.

func (*SQLiteStorage) Initialize

func (s *SQLiteStorage) Initialize(ctx context.Context) error

Initialize is deprecated. Use dbmate for migrations: dbmate -d schema/db/migrations/sqlite up

func (*SQLiteStorage) JoinGroup

func (s *SQLiteStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error

JoinGroup adds an instance to a group.

func (*SQLiteStorage) LeaveAllGroups

func (s *SQLiteStorage) LeaveAllGroups(ctx context.Context, instanceID string) error

LeaveAllGroups removes an instance from all groups.

func (*SQLiteStorage) LeaveGroup

func (s *SQLiteStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error

LeaveGroup removes an instance from a group.

func (*SQLiteStorage) ListInstances

func (s *SQLiteStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)

ListInstances lists workflow instances with cursor-based pagination and filters.

func (*SQLiteStorage) MarkOutboxEventFailed

func (s *SQLiteStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error

MarkOutboxEventFailed marks an outbox event as failed.

func (*SQLiteStorage) MarkOutboxEventSent

func (s *SQLiteStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error

MarkOutboxEventSent marks an outbox event as published.

func (*SQLiteStorage) PublishToChannel

func (s *SQLiteStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)

PublishToChannel publishes a message to a channel. For direct messages, use dynamic channel names (e.g., "channel:instance_id").

func (*SQLiteStorage) RefreshLock

func (s *SQLiteStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error

RefreshLock extends the lock expiration time.

func (*SQLiteStorage) RegisterChannelReceiveAndReleaseLock

func (s *SQLiteStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error

RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock. Waiting state is indicated by activity_id being non-null.

func (*SQLiteStorage) RegisterPostCommitCallback

func (s *SQLiteStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error

RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.

func (*SQLiteStorage) RegisterTimerSubscription

func (s *SQLiteStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

RegisterTimerSubscription registers a timer for a workflow.

func (*SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock

func (s *SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error

RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.

func (*SQLiteStorage) ReleaseLock

func (s *SQLiteStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error

ReleaseLock releases a lock on a workflow instance.

func (*SQLiteStorage) ReleaseSystemLock

func (s *SQLiteStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

ReleaseSystemLock releases a system lock.

func (*SQLiteStorage) RemoveTimerSubscription

func (s *SQLiteStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error

RemoveTimerSubscription removes a timer subscription.

func (*SQLiteStorage) RollbackTransaction

func (s *SQLiteStorage) RollbackTransaction(ctx context.Context) error

RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.

func (*SQLiteStorage) SubscribeToChannel

func (s *SQLiteStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error

SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.

func (*SQLiteStorage) TryAcquireLock

func (s *SQLiteStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)

TryAcquireLock attempts to acquire a lock on a workflow instance.

func (*SQLiteStorage) TryAcquireSystemLock

func (s *SQLiteStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

TryAcquireSystemLock attempts to acquire a system lock.

func (*SQLiteStorage) UnsubscribeFromChannel

func (s *SQLiteStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error

UnsubscribeFromChannel unsubscribes an instance from a channel.

func (*SQLiteStorage) UpdateDeliveryCursor

func (s *SQLiteStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error

UpdateDeliveryCursor updates the delivery cursor for broadcast mode.

func (*SQLiteStorage) UpdateInstanceActivity

func (s *SQLiteStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error

UpdateInstanceActivity updates the current activity ID.

func (*SQLiteStorage) UpdateInstanceOutput

func (s *SQLiteStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error

UpdateInstanceOutput updates the output data.

func (*SQLiteStorage) UpdateInstanceStatus

func (s *SQLiteStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error

UpdateInstanceStatus updates the status of a workflow instance. Note: errorMsg parameter is kept for interface compatibility but is ignored. Error messages are tracked via history events (WorkflowFailed).

type StaleWorkflowInfo

type StaleWorkflowInfo struct {
	InstanceID   string
	WorkflowName string
}

StaleWorkflowInfo contains information about a workflow with a stale lock.

type Storage

type Storage interface {
	// Initialize sets up the storage (create tables, etc.)
	Initialize(ctx context.Context) error

	// Close closes the storage connection
	Close() error

	// DB returns the underlying database connection.
	// This is primarily used for migrations.
	DB() *sql.DB

	// Transaction Management
	TransactionManager

	// Workflow Instance Operations
	InstanceManager

	// Locking Operations
	LockManager

	// History Operations
	HistoryManager

	// Timer Subscription Operations
	TimerSubscriptionManager

	// Outbox Operations
	OutboxManager

	// Compensation Operations
	CompensationManager

	// Channel Operations
	ChannelManager

	// Group Operations
	GroupManager

	// System Lock Operations
	SystemLockManager

	// History Archive Operations
	HistoryArchiveManager
}

Storage defines the interface for workflow persistence. Implementations must be safe for concurrent use.

type SystemLock

type SystemLock struct {
	LockName  string    `json:"lock_name"`
	LockedBy  string    `json:"locked_by"`
	LockedAt  time.Time `json:"locked_at"`
	ExpiresAt time.Time `json:"expires_at"`
}

SystemLock represents a system-level lock for background tasks.

type SystemLockManager

type SystemLockManager interface {
	// TryAcquireSystemLock attempts to acquire a system lock.
	// Returns true if the lock was acquired.
	TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)

	// ReleaseSystemLock releases a system lock.
	ReleaseSystemLock(ctx context.Context, lockName, workerID string) error

	// CleanupExpiredSystemLocks removes expired system locks.
	CleanupExpiredSystemLocks(ctx context.Context) error
}

SystemLockManager handles system-level locks for background tasks.

type TimerSubscription

type TimerSubscription struct {
	ID         int64     `json:"id"`
	InstanceID string    `json:"instance_id"`
	TimerID    string    `json:"timer_id"`
	ExpiresAt  time.Time `json:"expires_at"`
	ActivityID string    `json:"activity_id"` // Activity ID for replay matching (Edda compatibility)
	CreatedAt  time.Time `json:"created_at"`
}

TimerSubscription represents a workflow waiting for a timer.

type TimerSubscriptionManager

type TimerSubscriptionManager interface {
	// RegisterTimerSubscription registers a workflow to wait for a timer.
	RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error

	// RegisterTimerSubscriptionAndReleaseLock atomically registers a timer
	// and releases the workflow lock.
	RegisterTimerSubscriptionAndReleaseLock(
		ctx context.Context,
		sub *TimerSubscription,
		instanceID, workerID string,
	) error

	// FindExpiredTimers finds timers that have expired.
	// limit specifies the maximum number of timers to return (0 = default 100).
	FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)

	// RemoveTimerSubscription removes a timer subscription.
	RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
}

TimerSubscriptionManager handles timer subscriptions.

type TransactionManager

type TransactionManager interface {
	// BeginTransaction starts a new transaction.
	// Returns a context with the transaction attached.
	BeginTransaction(ctx context.Context) (context.Context, error)

	// CommitTransaction commits the current transaction.
	CommitTransaction(ctx context.Context) error

	// RollbackTransaction rolls back the current transaction.
	RollbackTransaction(ctx context.Context) error

	// InTransaction returns true if there is an active transaction.
	InTransaction(ctx context.Context) bool

	// Conn returns the database executor for the current context.
	// If a transaction is active, returns the transaction; otherwise, returns the database.
	// This allows users to execute custom SQL queries within the same transaction.
	Conn(ctx context.Context) Executor

	// RegisterPostCommitCallback registers a callback to be executed after a successful commit.
	// This is used for operations that should only happen if the transaction commits,
	// such as notifying waiting subscribers after a message is published.
	// Returns an error if not currently in a transaction.
	RegisterPostCommitCallback(ctx context.Context, cb func() error) error
}

TransactionManager handles transaction operations.

type WorkflowInstance

type WorkflowInstance struct {
	InstanceID        string         `json:"instance_id"`
	WorkflowName      string         `json:"workflow_name"`
	Framework         string         `json:"framework"` // "go" or "python" (Edda unification)
	Status            WorkflowStatus `json:"status"`
	InputData         []byte         `json:"input_data"`          // JSON-encoded input
	OutputData        []byte         `json:"output_data"`         // JSON-encoded output (if completed)
	CurrentActivityID string         `json:"current_activity_id"` // Last executed activity ID
	SourceHash        string         `json:"source_hash"`         // Source code hash (Edda compatibility)
	OwnerService      string         `json:"owner_service"`       // Service that owns this workflow (Edda compatibility)
	ContinuedFrom     string         `json:"continued_from"`      // Previous instance ID (for recur)
	LockedBy          string         `json:"locked_by"`           // Worker ID holding the lock
	LockedAt          *time.Time     `json:"locked_at"`           // When the lock was acquired
	LockTimeoutSec    *int           `json:"lock_timeout_seconds"`
	LockExpiresAt     *time.Time     `json:"lock_expires_at"`
	StartedAt         time.Time      `json:"started_at"` // When the workflow was created (unified with Edda)
	UpdatedAt         time.Time      `json:"updated_at"`
}

WorkflowInstance represents a single execution of a workflow.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the current state of a workflow instance.

const (
	StatusRunning           WorkflowStatus = "running"
	StatusCompleted         WorkflowStatus = "completed"
	StatusFailed            WorkflowStatus = "failed"
	StatusCancelled         WorkflowStatus = "cancelled"
	StatusWaitingForEvent   WorkflowStatus = "waiting_for_event"
	StatusWaitingForTimer   WorkflowStatus = "waiting_for_timer"
	StatusWaitingForMessage WorkflowStatus = "waiting_for_message"
	StatusRecurred          WorkflowStatus = "recurred"
	StatusCompensating      WorkflowStatus = "compensating"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL