Documentation
¶
Overview ¶
Package storage provides the storage layer for Romancy.
Package storage provides the storage layer for Romancy.
Package storage provides the storage layer for Romancy.
Package storage provides the storage layer for Romancy.
Package storage provides the storage layer for Romancy.
Index ¶
- Variables
- func InitializeTestSchema(ctx context.Context, s *SQLiteStorage) error
- func InitializeTestSchemaForStorage(ctx context.Context, s Storage) error
- func InitializeTestSchemaMySQL(ctx context.Context, s *MySQLStorage) error
- func InitializeTestSchemaPostgres(ctx context.Context, s *PostgresStorage) error
- func ValidateJSONPath(path string) error
- type ArchivedHistoryEvent
- type ChannelDeliveryCursor
- type ChannelDeliveryResult
- type ChannelManager
- type ChannelMessage
- type ChannelMessageClaim
- type ChannelMode
- type ChannelSubscription
- type CompensationEntry
- type CompensationManager
- type Driver
- type Executor
- type GroupManager
- type GroupMembership
- type HistoryArchiveManager
- type HistoryEvent
- type HistoryEventType
- type HistoryIterator
- type HistoryIteratorOptions
- type HistoryManager
- type InputFilterBuilder
- type InstanceManager
- type ListInstancesOptions
- type LockManager
- type MySQLDriver
- func (d *MySQLDriver) DriverName() string
- func (d *MySQLDriver) GetCurrentTimeExpr() string
- func (d *MySQLDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)
- func (d *MySQLDriver) JSONExtract(column, path string) string
- func (d *MySQLDriver) MakeDatetimeComparable(column string) string
- func (d *MySQLDriver) OnConflictDoNothing(conflictColumns ...string) string
- func (d *MySQLDriver) Placeholder(n int) string
- func (d *MySQLDriver) ReturningClause(columns ...string) string
- func (d *MySQLDriver) SelectForUpdateSkipLocked() string
- type MySQLStorage
- func (s *MySQLStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error
- func (s *MySQLStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
- func (s *MySQLStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error
- func (s *MySQLStorage) ArchiveHistory(ctx context.Context, instanceID string) error
- func (s *MySQLStorage) BeginTransaction(ctx context.Context) (context.Context, error)
- func (s *MySQLStorage) CancelInstance(ctx context.Context, instanceID, reason string) error
- func (s *MySQLStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
- func (s *MySQLStorage) CleanupExpiredSystemLocks(ctx context.Context) error
- func (s *MySQLStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
- func (s *MySQLStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
- func (s *MySQLStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
- func (s *MySQLStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
- func (s *MySQLStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
- func (s *MySQLStorage) Close() error
- func (s *MySQLStorage) CommitTransaction(ctx context.Context) error
- func (s *MySQLStorage) Conn(ctx context.Context) Executor
- func (s *MySQLStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error
- func (s *MySQLStorage) DB() *sql.DB
- func (s *MySQLStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error
- func (s *MySQLStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
- func (s *MySQLStorage) DeliverChannelMessageWithLock(ctx context.Context, instanceID string, channelName string, ...) (*ChannelDeliveryResult, error)
- func (s *MySQLStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
- func (s *MySQLStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
- func (s *MySQLStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
- func (s *MySQLStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
- func (s *MySQLStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
- func (s *MySQLStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
- func (s *MySQLStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
- func (s *MySQLStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
- func (s *MySQLStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
- func (s *MySQLStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)
- func (s *MySQLStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)
- func (s *MySQLStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
- func (s *MySQLStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
- func (s *MySQLStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
- func (s *MySQLStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
- func (s *MySQLStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
- func (s *MySQLStorage) InTransaction(ctx context.Context) bool
- func (s *MySQLStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error
- func (s *MySQLStorage) Initialize(ctx context.Context) error
- func (s *MySQLStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error
- func (s *MySQLStorage) LeaveAllGroups(ctx context.Context, instanceID string) error
- func (s *MySQLStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error
- func (s *MySQLStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
- func (s *MySQLStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error
- func (s *MySQLStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error
- func (s *MySQLStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)
- func (s *MySQLStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
- func (s *MySQLStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, ...) error
- func (s *MySQLStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error
- func (s *MySQLStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
- func (s *MySQLStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error
- func (s *MySQLStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error
- func (s *MySQLStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
- func (s *MySQLStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
- func (s *MySQLStorage) RollbackTransaction(ctx context.Context) error
- func (s *MySQLStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
- func (s *MySQLStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
- func (s *MySQLStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
- func (s *MySQLStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
- func (s *MySQLStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
- func (s *MySQLStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error
- func (s *MySQLStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
- func (s *MySQLStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
- type OutboxEvent
- type OutboxManager
- type PaginationResult
- type PostgresDriver
- func (d *PostgresDriver) DriverName() string
- func (d *PostgresDriver) GetCurrentTimeExpr() string
- func (d *PostgresDriver) JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, compArgs []any)
- func (d *PostgresDriver) JSONExtract(column, path string) string
- func (d *PostgresDriver) MakeDatetimeComparable(column string) string
- func (d *PostgresDriver) OnConflictDoNothing(conflictColumns ...string) string
- func (d *PostgresDriver) Placeholder(n int) string
- func (d *PostgresDriver) ReturningClause(columns ...string) string
- func (d *PostgresDriver) SelectForUpdateSkipLocked() string
- type PostgresStorage
- func (s *PostgresStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error
- func (s *PostgresStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
- func (s *PostgresStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error
- func (s *PostgresStorage) ArchiveHistory(ctx context.Context, instanceID string) error
- func (s *PostgresStorage) BeginTransaction(ctx context.Context) (context.Context, error)
- func (s *PostgresStorage) CancelInstance(ctx context.Context, instanceID, reason string) error
- func (s *PostgresStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
- func (s *PostgresStorage) CleanupExpiredSystemLocks(ctx context.Context) error
- func (s *PostgresStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
- func (s *PostgresStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
- func (s *PostgresStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
- func (s *PostgresStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
- func (s *PostgresStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
- func (s *PostgresStorage) Close() error
- func (s *PostgresStorage) CommitTransaction(ctx context.Context) error
- func (s *PostgresStorage) Conn(ctx context.Context) Executor
- func (s *PostgresStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error
- func (s *PostgresStorage) DB() *sql.DB
- func (s *PostgresStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error
- func (s *PostgresStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
- func (s *PostgresStorage) DeliverChannelMessageWithLock(ctx context.Context, instanceID string, channelName string, ...) (*ChannelDeliveryResult, error)
- func (s *PostgresStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
- func (s *PostgresStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
- func (s *PostgresStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
- func (s *PostgresStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
- func (s *PostgresStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
- func (s *PostgresStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
- func (s *PostgresStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
- func (s *PostgresStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
- func (s *PostgresStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
- func (s *PostgresStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)
- func (s *PostgresStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)
- func (s *PostgresStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
- func (s *PostgresStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
- func (s *PostgresStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
- func (s *PostgresStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
- func (s *PostgresStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
- func (s *PostgresStorage) InTransaction(ctx context.Context) bool
- func (s *PostgresStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error
- func (s *PostgresStorage) Initialize(ctx context.Context) error
- func (s *PostgresStorage) IsNotifyEnabled() bool
- func (s *PostgresStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error
- func (s *PostgresStorage) LeaveAllGroups(ctx context.Context, instanceID string) error
- func (s *PostgresStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error
- func (s *PostgresStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
- func (s *PostgresStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error
- func (s *PostgresStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error
- func (s *PostgresStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)
- func (s *PostgresStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
- func (s *PostgresStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, ...) error
- func (s *PostgresStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error
- func (s *PostgresStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
- func (s *PostgresStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error
- func (s *PostgresStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error
- func (s *PostgresStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
- func (s *PostgresStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
- func (s *PostgresStorage) RollbackTransaction(ctx context.Context) error
- func (s *PostgresStorage) SetNotifyEnabled(enabled bool)
- func (s *PostgresStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
- func (s *PostgresStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
- func (s *PostgresStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
- func (s *PostgresStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
- func (s *PostgresStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
- func (s *PostgresStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error
- func (s *PostgresStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
- func (s *PostgresStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
- type ResumableWorkflow
- type SQLiteDriver
- func (d *SQLiteDriver) DriverName() string
- func (d *SQLiteDriver) GetCurrentTimeExpr() string
- func (d *SQLiteDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)
- func (d *SQLiteDriver) JSONExtract(column, path string) string
- func (d *SQLiteDriver) MakeDatetimeComparable(column string) string
- func (d *SQLiteDriver) OnConflictDoNothing(conflictColumns ...string) string
- func (d *SQLiteDriver) Placeholder(n int) string
- func (d *SQLiteDriver) ReturningClause(columns ...string) string
- func (d *SQLiteDriver) SelectForUpdateSkipLocked() string
- type SQLiteStorage
- func (s *SQLiteStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error
- func (s *SQLiteStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
- func (s *SQLiteStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error
- func (s *SQLiteStorage) ArchiveHistory(ctx context.Context, instanceID string) error
- func (s *SQLiteStorage) BeginTransaction(ctx context.Context) (context.Context, error)
- func (s *SQLiteStorage) CancelInstance(ctx context.Context, instanceID, reason string) error
- func (s *SQLiteStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
- func (s *SQLiteStorage) CleanupExpiredSystemLocks(ctx context.Context) error
- func (s *SQLiteStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
- func (s *SQLiteStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
- func (s *SQLiteStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
- func (s *SQLiteStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
- func (s *SQLiteStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
- func (s *SQLiteStorage) Close() error
- func (s *SQLiteStorage) CommitTransaction(ctx context.Context) error
- func (s *SQLiteStorage) Conn(ctx context.Context) Executor
- func (s *SQLiteStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error
- func (s *SQLiteStorage) DB() *sql.DB
- func (s *SQLiteStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error
- func (s *SQLiteStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
- func (s *SQLiteStorage) DeliverChannelMessageWithLock(ctx context.Context, instanceID string, channelName string, ...) (*ChannelDeliveryResult, error)
- func (s *SQLiteStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
- func (s *SQLiteStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
- func (s *SQLiteStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
- func (s *SQLiteStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
- func (s *SQLiteStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
- func (s *SQLiteStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
- func (s *SQLiteStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
- func (s *SQLiteStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
- func (s *SQLiteStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
- func (s *SQLiteStorage) GetGroupMembers(ctx context.Context, groupName string) ([]string, error)
- func (s *SQLiteStorage) GetHistoryCount(ctx context.Context, instanceID string) (int64, error)
- func (s *SQLiteStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
- func (s *SQLiteStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
- func (s *SQLiteStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
- func (s *SQLiteStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
- func (s *SQLiteStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
- func (s *SQLiteStorage) InTransaction(ctx context.Context) bool
- func (s *SQLiteStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error
- func (s *SQLiteStorage) Initialize(ctx context.Context) error
- func (s *SQLiteStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error
- func (s *SQLiteStorage) LeaveAllGroups(ctx context.Context, instanceID string) error
- func (s *SQLiteStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error
- func (s *SQLiteStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
- func (s *SQLiteStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error
- func (s *SQLiteStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error
- func (s *SQLiteStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)
- func (s *SQLiteStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
- func (s *SQLiteStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, ...) error
- func (s *SQLiteStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error
- func (s *SQLiteStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
- func (s *SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error
- func (s *SQLiteStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error
- func (s *SQLiteStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
- func (s *SQLiteStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
- func (s *SQLiteStorage) RollbackTransaction(ctx context.Context) error
- func (s *SQLiteStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
- func (s *SQLiteStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
- func (s *SQLiteStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
- func (s *SQLiteStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
- func (s *SQLiteStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
- func (s *SQLiteStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error
- func (s *SQLiteStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
- func (s *SQLiteStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
- type StaleWorkflowInfo
- type Storage
- type SystemLock
- type SystemLockManager
- type TimerSubscription
- type TimerSubscriptionManager
- type TransactionManager
- type WorkflowInstance
- type WorkflowStatus
Constants ¶
This section is empty.
Variables ¶
var ErrWorkflowNotCancellable = errors.New("workflow cannot be cancelled")
ErrWorkflowNotCancellable indicates that the workflow cannot be cancelled. This happens when the workflow is already completed, cancelled, failed, or does not exist.
Functions ¶
func InitializeTestSchema ¶ added in v0.3.0
func InitializeTestSchema(ctx context.Context, s *SQLiteStorage) error
InitializeTestSchema applies the test schema to an SQLiteStorage. This is intended for use in tests only.
func InitializeTestSchemaForStorage ¶ added in v0.3.0
InitializeTestSchemaForStorage initializes the test schema for any Storage interface. This is useful for external test packages.
func InitializeTestSchemaMySQL ¶ added in v0.3.0
func InitializeTestSchemaMySQL(ctx context.Context, s *MySQLStorage) error
InitializeTestSchemaMySQL applies the test schema to a MySQLStorage. This is intended for use in tests only.
func InitializeTestSchemaPostgres ¶ added in v0.3.0
func InitializeTestSchemaPostgres(ctx context.Context, s *PostgresStorage) error
InitializeTestSchemaPostgres applies the test schema to a PostgresStorage. This is intended for use in tests only.
func ValidateJSONPath ¶
ValidateJSONPath validates a JSON path for safe use in SQL queries. Returns an error if the path contains potentially dangerous characters.
Types ¶
type ArchivedHistoryEvent ¶
type ArchivedHistoryEvent struct {
ID int64 `json:"id"`
OriginalID int64 `json:"original_id"`
InstanceID string `json:"instance_id"`
ActivityID string `json:"activity_id"`
EventType HistoryEventType `json:"event_type"`
EventData []byte `json:"event_data"`
EventDataBinary []byte `json:"event_data_binary"`
DataType string `json:"data_type"`
OriginalCreatedAt time.Time `json:"original_created_at"`
ArchivedAt time.Time `json:"archived_at"`
}
ArchivedHistoryEvent represents an archived history event (from recur).
type ChannelDeliveryCursor ¶
type ChannelDeliveryCursor struct {
ID int64 `json:"id"`
InstanceID string `json:"instance_id"`
Channel string `json:"channel"` // Channel name (unified with Edda)
LastDeliveredID int64 `json:"last_delivered_id"` // Last delivered message ID (unified with Edda)
UpdatedAt time.Time `json:"updated_at"`
}
ChannelDeliveryCursor tracks message delivery progress for broadcast mode.
type ChannelDeliveryResult ¶
type ChannelDeliveryResult struct {
InstanceID string `json:"instance_id"`
WorkflowName string `json:"workflow_name"`
ActivityID string `json:"activity_id"`
}
ChannelDeliveryResult contains information about a successful message delivery.
type ChannelManager ¶
type ChannelManager interface {
// PublishToChannel publishes a message to a channel.
// For direct messages, use dynamic channel names (e.g., "channel:instance_id").
PublishToChannel(ctx context.Context, channelName string, dataJSON []byte, metadata []byte) (int64, error)
// SubscribeToChannel subscribes an instance to a channel.
SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
// UnsubscribeFromChannel unsubscribes an instance from a channel.
UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
// GetChannelSubscription retrieves a subscription for an instance and channel.
GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
// GetChannelMode retrieves the mode for a channel (from any existing subscription).
// Returns empty string if no subscriptions exist.
GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
// RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait
// and releases the workflow lock.
RegisterChannelReceiveAndReleaseLock(
ctx context.Context,
instanceID, channelName, workerID, activityID string,
timeoutAt *time.Time,
) error
// GetPendingChannelMessages retrieves pending messages for a channel after a given ID.
GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
// GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber.
// For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance)
// For competing mode: Returns unclaimed messages (not yet claimed by any instance)
GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
// ClaimChannelMessage claims a message for competing mode (returns false if already claimed).
ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
// DeleteChannelMessage deletes a message from the channel.
DeleteChannelMessage(ctx context.Context, messageID int64) error
// UpdateDeliveryCursor updates the delivery cursor for broadcast mode.
UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
// GetDeliveryCursor gets the current delivery cursor for an instance and channel.
GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
// GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel.
GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
// ClearChannelWaitingState clears the waiting state for an instance's channel subscription.
ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
// DeliverChannelMessage delivers a message to a waiting subscriber (records in history).
DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
// DeliverChannelMessageWithLock delivers a message using Lock-First pattern.
// This is used for load balancing in multi-worker environments:
// 1. Try to acquire lock on the target instance
// 2. If lock fails, return (nil, nil) - another worker will handle it
// 3. Record message in history
// 4. Update status to 'running'
// 5. Release lock
// Returns the delivery info if successful, nil if lock was not acquired.
DeliverChannelMessageWithLock(
ctx context.Context,
instanceID string,
channelName string,
message *ChannelMessage,
workerID string,
lockTimeoutSec int,
) (*ChannelDeliveryResult, error)
// CleanupOldChannelMessages removes old channel messages.
CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
// FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.
// limit specifies the maximum number of subscriptions to return (0 = default 100).
FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
}
ChannelManager handles channel-based messaging operations.
type ChannelMessage ¶
type ChannelMessage struct {
ID int64 `json:"id"`
Channel string `json:"channel"` // Channel name (unified with Edda)
MessageID string `json:"message_id"` // UUID for message (Edda compatibility)
DataType string `json:"data_type"` // "json" or "binary" (Edda compatibility)
Data []byte `json:"data,omitempty"` // JSON data (unified with Edda)
DataBinary []byte `json:"data_binary,omitempty"` // Binary data
Metadata []byte `json:"metadata,omitempty"` // JSON-encoded metadata
PublishedAt time.Time `json:"published_at"` // When the message was published (unified with Edda)
}
ChannelMessage represents a message in a channel. SendTo uses dynamic channel names (e.g., "channel:instance_id") instead of target_instance_id.
type ChannelMessageClaim ¶
type ChannelMessageClaim struct {
ID int64 `json:"id"`
MessageID int64 `json:"message_id"`
InstanceID string `json:"instance_id"`
ClaimedAt time.Time `json:"claimed_at"`
}
ChannelMessageClaim tracks which instance claimed a message in competing mode.
type ChannelMode ¶
type ChannelMode string
ChannelMode represents the subscription mode for a channel.
const ( ChannelModeBroadcast ChannelMode = "broadcast" // All subscribers receive all messages ChannelModeCompeting ChannelMode = "competing" // Each message goes to one subscriber ChannelModeDirect ChannelMode = "direct" // Direct messages to specific instance (via SendTo) )
type ChannelSubscription ¶
type ChannelSubscription struct {
ID int64 `json:"id"`
InstanceID string `json:"instance_id"`
Channel string `json:"channel"` // Channel name (unified with Edda)
Mode ChannelMode `json:"mode"`
CursorMessageID int64 `json:"cursor_message_id"` // Last delivered message ID (Edda compatibility)
TimeoutAt *time.Time `json:"timeout_at,omitempty"`
ActivityID string `json:"activity_id,omitempty"` // Activity ID for replay matching; non-null = waiting
SubscribedAt time.Time `json:"subscribed_at"` // When the subscription was created (unified with Edda)
}
ChannelSubscription represents a workflow's subscription to a channel. Waiting state is determined by "activity_id IS NOT NULL" instead of explicit waiting flag.
type CompensationEntry ¶
type CompensationEntry struct {
ID int64 `json:"id"`
InstanceID string `json:"instance_id"`
ActivityID string `json:"activity_id"`
ActivityName string `json:"activity_name"` // Function name (unified with Edda)
Args []byte `json:"args"` // JSON-encoded arguments (unified with Edda)
CreatedAt time.Time `json:"created_at"`
}
CompensationEntry represents a registered compensation action. Order is determined by created_at DESC (LIFO). Status is tracked via history events (CompensationExecuted, CompensationFailed).
type CompensationManager ¶
type CompensationManager interface {
// AddCompensation registers a compensation action.
AddCompensation(ctx context.Context, entry *CompensationEntry) error
// GetCompensations retrieves compensation entries in LIFO order (by created_at DESC).
// Status tracking is done via history events (CompensationExecuted, CompensationFailed).
GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
}
CompensationManager handles saga compensation.
type Driver ¶
type Driver interface {
// DriverName returns the driver name (e.g., "sqlite", "postgres")
DriverName() string
// GetCurrentTimeExpr returns the SQL expression for current time.
// SQLite: datetime('now'), PostgreSQL: NOW()
GetCurrentTimeExpr() string
// MakeDatetimeComparable wraps a datetime column for comparison.
// SQLite needs datetime() wrapper, PostgreSQL doesn't.
MakeDatetimeComparable(column string) string
// SelectForUpdateSkipLocked returns the clause for row locking.
// PostgreSQL: "FOR UPDATE SKIP LOCKED", SQLite: "" (uses table-level locking)
SelectForUpdateSkipLocked() string
// Placeholder returns the placeholder for the nth parameter.
// SQLite: ?, PostgreSQL: $n
Placeholder(n int) string
// OnConflictDoNothing returns the clause for idempotent inserts.
// SQLite: "ON CONFLICT DO NOTHING", PostgreSQL: "ON CONFLICT DO NOTHING"
OnConflictDoNothing(conflictColumns ...string) string
// ReturningClause returns the RETURNING clause for insert/update.
// SQLite: "", PostgreSQL: "RETURNING id"
ReturningClause(columns ...string) string
// JSONExtract returns the SQL expression to extract a value from a JSON column.
// The path is a dot-separated path like "order.customer.id".
// The returned expression extracts the value as text.
JSONExtract(column, path string) string
// JSONCompare returns the SQL expression to compare a JSON-extracted value
// with a given value, handling type coercion appropriately.
// Returns the comparison expression and any additional args needed.
JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, args []any)
}
Driver abstracts database-specific SQL operations.
type Executor ¶
type Executor interface {
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}
Executor is a database executor interface that can be either *sql.DB or *sql.Tx. This allows users to execute custom SQL queries within the same transaction as the workflow activity.
type GroupManager ¶
type GroupManager interface {
// JoinGroup adds an instance to a group.
JoinGroup(ctx context.Context, instanceID, groupName string) error
// LeaveGroup removes an instance from a group.
LeaveGroup(ctx context.Context, instanceID, groupName string) error
// GetGroupMembers retrieves all instance IDs in a group.
GetGroupMembers(ctx context.Context, groupName string) ([]string, error)
// LeaveAllGroups removes an instance from all groups.
LeaveAllGroups(ctx context.Context, instanceID string) error
}
GroupManager handles Erlang pg-style group membership operations.
type GroupMembership ¶
type GroupMembership struct {
ID int64 `json:"id"`
InstanceID string `json:"instance_id"`
GroupName string `json:"group_name"`
JoinedAt time.Time `json:"joined_at"` // When the instance joined the group (unified with Edda)
}
GroupMembership represents a workflow's membership in a group.
type HistoryArchiveManager ¶
type HistoryArchiveManager interface {
// ArchiveHistory moves all history events for an instance to the archive table.
ArchiveHistory(ctx context.Context, instanceID string) error
// GetArchivedHistory retrieves archived history events for an instance.
GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
// CleanupInstanceSubscriptions removes all subscriptions for an instance.
// This is used during recur to clean up stale subscriptions.
CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
}
HistoryArchiveManager handles history archival for the recur pattern.
type HistoryEvent ¶
type HistoryEvent struct {
ID int64 `json:"id"`
InstanceID string `json:"instance_id"`
ActivityID string `json:"activity_id"`
EventType HistoryEventType `json:"event_type"`
EventData []byte `json:"event_data"` // JSON-encoded data
EventDataBinary []byte `json:"event_data_binary"` // Binary data (alternative)
DataType string `json:"data_type"` // "json" or "binary"
CreatedAt time.Time `json:"created_at"`
}
HistoryEvent represents a single event in a workflow's execution history.
type HistoryEventType ¶
type HistoryEventType string
HistoryEventType represents the type of history event. Values are unified with Edda (Python) for cross-language compatibility.
const ( // Activity events HistoryActivityCompleted HistoryEventType = "ActivityCompleted" HistoryActivityFailed HistoryEventType = "ActivityFailed" // External event received (via WaitEvent) HistoryEventReceived HistoryEventType = "EventReceived" // Timer expired (via Sleep/SleepUntil) HistoryTimerExpired HistoryEventType = "TimerExpired" // Channel message received (via Receive) HistoryChannelMessageReceived HistoryEventType = "ChannelMessageReceived" // Message receive timeout HistoryMessageTimeout HistoryEventType = "MessageTimeout" // Compensation events HistoryCompensationExecuted HistoryEventType = "CompensationExecuted" HistoryCompensationFailed HistoryEventType = "CompensationFailed" // Workflow lifecycle events HistoryWorkflowFailed HistoryEventType = "WorkflowFailed" HistoryWorkflowCancelled HistoryEventType = "WorkflowCancelled" )
type HistoryIterator ¶
type HistoryIterator struct {
// contains filtered or unexported fields
}
HistoryIterator provides streaming access to workflow history events. This avoids loading the entire history into memory at once.
func NewHistoryIterator ¶
func NewHistoryIterator(ctx context.Context, storage Storage, instanceID string, opts *HistoryIteratorOptions) *HistoryIterator
NewHistoryIterator creates a new history iterator.
func (*HistoryIterator) Close ¶
func (it *HistoryIterator) Close() error
Close releases any resources held by the iterator. Currently a no-op, but provided for future compatibility.
func (*HistoryIterator) Collect ¶
func (it *HistoryIterator) Collect() ([]*HistoryEvent, error)
Collect reads all remaining events into a slice. Use with caution for large histories - prefer iterating with Next().
func (*HistoryIterator) Err ¶
func (it *HistoryIterator) Err() error
Err returns any error that occurred during iteration.
func (*HistoryIterator) Next ¶
func (it *HistoryIterator) Next() (*HistoryEvent, bool)
Next returns the next history event. Returns (event, true) if an event is available, (nil, false) if no more events. Check Err() after iteration to see if an error occurred.
type HistoryIteratorOptions ¶
type HistoryIteratorOptions struct {
// BatchSize is the number of events to fetch per batch.
// Default: 1000
BatchSize int
}
HistoryIteratorOptions configures the history iterator.
type HistoryManager ¶
type HistoryManager interface {
// AppendHistory adds a new history event.
AppendHistory(ctx context.Context, event *HistoryEvent) error
// GetHistoryPaginated retrieves history events for an instance with pagination.
// Returns events with id > afterID, up to limit events.
// Returns (events, hasMore, error).
GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
// GetHistoryCount returns the total number of history events for an instance.
GetHistoryCount(ctx context.Context, instanceID string) (int64, error)
}
HistoryManager handles workflow execution history.
type InputFilterBuilder ¶
type InputFilterBuilder struct {
// contains filtered or unexported fields
}
InputFilterBuilder builds SQL query conditions for input filters.
func NewInputFilterBuilder ¶
func NewInputFilterBuilder(driver Driver) *InputFilterBuilder
NewInputFilterBuilder creates a new InputFilterBuilder.
func (*InputFilterBuilder) BuildFilterQuery ¶
func (b *InputFilterBuilder) BuildFilterQuery(inputFilters map[string]any, startPlaceholder int) (conditions []string, args []any, err error)
BuildFilterQuery builds the WHERE clause conditions for input filters. Returns the query parts (as a slice of conditions) and args to append, or an error if validation fails. The startPlaceholder is the starting placeholder number (for PostgreSQL).
type InstanceManager ¶
type InstanceManager interface {
// CreateInstance creates a new workflow instance.
CreateInstance(ctx context.Context, instance *WorkflowInstance) error
// GetInstance retrieves a workflow instance by ID.
GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
// UpdateInstanceStatus updates the status and related fields.
UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
// UpdateInstanceActivity updates the current activity ID.
UpdateInstanceActivity(ctx context.Context, instanceID string, activityID string) error
// UpdateInstanceOutput updates the output data for a completed workflow.
UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
// CancelInstance marks a workflow as cancelled.
CancelInstance(ctx context.Context, instanceID string, reason string) error
// ListInstances lists workflow instances with optional filtering and pagination.
// Returns PaginationResult with cursor-based pagination.
ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
// FindResumableWorkflows finds workflows that are ready to be resumed.
// Returns workflows with status='running' that don't have an active lock.
// These are typically workflows that had a message delivered and are waiting
// for a worker to pick them up and resume execution.
// limit specifies the maximum number of workflows to return (0 = default 100).
FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
}
InstanceManager handles workflow instance CRUD operations.
type ListInstancesOptions ¶
type ListInstancesOptions struct {
// Pagination
Limit int // Page size (default: 50)
PageToken string // Cursor token for pagination (format: "ISO_DATETIME||INSTANCE_ID")
// Filters
StatusFilter WorkflowStatus // Filter by status
WorkflowNameFilter string // Filter by workflow name (partial match, case-insensitive)
InstanceIDFilter string // Filter by instance ID (partial match, case-insensitive)
StartedAfter *time.Time // Filter instances started after this time
StartedBefore *time.Time // Filter instances started before this time
InputFilters map[string]any // Filter by JSON paths in input data (exact match)
// Deprecated: Use StatusFilter instead
Status WorkflowStatus
// Deprecated: Use WorkflowNameFilter instead
WorkflowName string
// Deprecated: Use PageToken instead
Offset int
}
ListInstancesOptions defines options for listing instances.
type LockManager ¶
type LockManager interface {
// TryAcquireLock attempts to acquire a lock on a workflow instance.
// Returns true if the lock was acquired, false if already locked.
TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
// ReleaseLock releases the lock on a workflow instance.
ReleaseLock(ctx context.Context, instanceID, workerID string) error
// RefreshLock extends the lock timeout.
RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
// CleanupStaleLocks releases locks that have expired and returns
// information about running workflows that need to be resumed.
CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
}
LockManager handles distributed locking operations.
type MySQLDriver ¶
type MySQLDriver struct{}
MySQLDriver implements Driver for MySQL 8.0+.
func (*MySQLDriver) DriverName ¶
func (d *MySQLDriver) DriverName() string
func (*MySQLDriver) GetCurrentTimeExpr ¶
func (d *MySQLDriver) GetCurrentTimeExpr() string
func (*MySQLDriver) JSONCompare ¶
func (d *MySQLDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)
JSONCompare for MySQLDriver returns the SQL expression to compare a JSON-extracted value.
func (*MySQLDriver) JSONExtract ¶
func (d *MySQLDriver) JSONExtract(column, path string) string
JSONExtract for MySQLDriver returns the SQL expression to extract a value from a JSON column.
func (*MySQLDriver) MakeDatetimeComparable ¶
func (d *MySQLDriver) MakeDatetimeComparable(column string) string
func (*MySQLDriver) OnConflictDoNothing ¶
func (d *MySQLDriver) OnConflictDoNothing(conflictColumns ...string) string
func (*MySQLDriver) Placeholder ¶
func (d *MySQLDriver) Placeholder(n int) string
func (*MySQLDriver) ReturningClause ¶
func (d *MySQLDriver) ReturningClause(columns ...string) string
func (*MySQLDriver) SelectForUpdateSkipLocked ¶
func (d *MySQLDriver) SelectForUpdateSkipLocked() string
type MySQLStorage ¶
type MySQLStorage struct {
// contains filtered or unexported fields
}
MySQLStorage implements the Storage interface using MySQL 8.0+.
func NewMySQLStorage ¶
func NewMySQLStorage(connStr string) (*MySQLStorage, error)
NewMySQLStorage creates a new MySQL storage. The connStr can be either a MySQL DSN or a URL format: URL format: "mysql://user:password@localhost:3306/dbname" DSN format: "user:password@tcp(localhost:3306)/dbname"
func (*MySQLStorage) AddCompensation ¶
func (s *MySQLStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error
AddCompensation adds a compensation entry.
func (*MySQLStorage) AddOutboxEvent ¶
func (s *MySQLStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
AddOutboxEvent adds an event to the outbox.
func (*MySQLStorage) AppendHistory ¶
func (s *MySQLStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error
AppendHistory appends a history event.
func (*MySQLStorage) ArchiveHistory ¶
func (s *MySQLStorage) ArchiveHistory(ctx context.Context, instanceID string) error
ArchiveHistory moves all history events for an instance to the archive table.
func (*MySQLStorage) BeginTransaction ¶
BeginTransaction starts a new transaction.
func (*MySQLStorage) CancelInstance ¶
func (s *MySQLStorage) CancelInstance(ctx context.Context, instanceID, reason string) error
CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.
func (*MySQLStorage) ClaimChannelMessage ¶
func (s *MySQLStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
ClaimChannelMessage claims a message for competing mode.
func (*MySQLStorage) CleanupExpiredSystemLocks ¶
func (s *MySQLStorage) CleanupExpiredSystemLocks(ctx context.Context) error
CleanupExpiredSystemLocks removes expired system locks.
func (*MySQLStorage) CleanupInstanceSubscriptions ¶
func (s *MySQLStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
CleanupInstanceSubscriptions removes all subscriptions for an instance. Note: workflow_event_subscriptions was removed in schema unification; events now use channel subscriptions.
func (*MySQLStorage) CleanupOldChannelMessages ¶
func (s *MySQLStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
CleanupOldChannelMessages removes old channel messages.
func (*MySQLStorage) CleanupOldOutboxEvents ¶
CleanupOldOutboxEvents removes old published events.
func (*MySQLStorage) CleanupStaleLocks ¶
func (s *MySQLStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
CleanupStaleLocks cleans up expired locks and returns workflows to resume.
func (*MySQLStorage) ClearChannelWaitingState ¶
func (s *MySQLStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
ClearChannelWaitingState clears the waiting state for an instance's channel subscription.
func (*MySQLStorage) Close ¶
func (s *MySQLStorage) Close() error
Close closes the database connection.
func (*MySQLStorage) CommitTransaction ¶
func (s *MySQLStorage) CommitTransaction(ctx context.Context) error
CommitTransaction commits the current transaction.
func (*MySQLStorage) Conn ¶
func (s *MySQLStorage) Conn(ctx context.Context) Executor
Conn returns the database executor for the current context.
func (*MySQLStorage) CreateInstance ¶
func (s *MySQLStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error
CreateInstance creates a new workflow instance.
func (*MySQLStorage) DB ¶
func (s *MySQLStorage) DB() *sql.DB
DB returns the underlying database connection.
func (*MySQLStorage) DeleteChannelMessage ¶
func (s *MySQLStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error
DeleteChannelMessage deletes a message from the channel.
func (*MySQLStorage) DeliverChannelMessage ¶
func (s *MySQLStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
DeliverChannelMessage delivers a message to a waiting subscriber.
func (*MySQLStorage) DeliverChannelMessageWithLock ¶
func (s *MySQLStorage) DeliverChannelMessageWithLock( ctx context.Context, instanceID string, channelName string, message *ChannelMessage, workerID string, lockTimeoutSec int, ) (*ChannelDeliveryResult, error)
DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).
func (*MySQLStorage) FindExpiredChannelSubscriptions ¶
func (s *MySQLStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.
func (*MySQLStorage) FindExpiredTimers ¶
func (s *MySQLStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
FindExpiredTimers finds expired timers.
func (*MySQLStorage) FindResumableWorkflows ¶
func (s *MySQLStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.
func (*MySQLStorage) GetArchivedHistory ¶
func (s *MySQLStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
GetArchivedHistory retrieves archived history events for an instance.
func (*MySQLStorage) GetChannelMode ¶ added in v0.6.0
func (s *MySQLStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
GetChannelMode retrieves the mode for a channel (from any existing subscription). Returns empty string if no subscriptions exist.
func (*MySQLStorage) GetChannelSubscribersWaiting ¶
func (s *MySQLStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel. Returns all waiting subscribers regardless of framework - delivery is handled by Lock-First pattern.
func (*MySQLStorage) GetChannelSubscription ¶
func (s *MySQLStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
GetChannelSubscription retrieves a subscription for an instance and channel.
func (*MySQLStorage) GetCompensations ¶
func (s *MySQLStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
GetCompensations retrieves compensations for a workflow in LIFO order.
func (*MySQLStorage) GetDeliveryCursor ¶
func (s *MySQLStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
GetDeliveryCursor gets the current delivery cursor for an instance and channel.
func (*MySQLStorage) GetGroupMembers ¶
GetGroupMembers retrieves all instance IDs in a group.
func (*MySQLStorage) GetHistoryCount ¶
GetHistoryCount returns the total number of history events for an instance.
func (*MySQLStorage) GetHistoryPaginated ¶
func (s *MySQLStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).
func (*MySQLStorage) GetInstance ¶
func (s *MySQLStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
GetInstance retrieves a workflow instance by ID.
func (*MySQLStorage) GetPendingChannelMessages ¶
func (s *MySQLStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
GetPendingChannelMessages retrieves pending messages for a channel after a given ID.
func (*MySQLStorage) GetPendingChannelMessagesForInstance ¶
func (s *MySQLStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)
func (*MySQLStorage) GetPendingOutboxEvents ¶
func (s *MySQLStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
GetPendingOutboxEvents retrieves pending outbox events. Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.
func (*MySQLStorage) InTransaction ¶
func (s *MySQLStorage) InTransaction(ctx context.Context) bool
InTransaction returns whether a transaction is in progress.
func (*MySQLStorage) IncrementOutboxAttempts ¶
func (s *MySQLStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error
IncrementOutboxAttempts increments the attempt count for an event.
func (*MySQLStorage) Initialize ¶
func (s *MySQLStorage) Initialize(ctx context.Context) error
Initialize is deprecated. Use dbmate for migrations: dbmate -d schema/db/migrations/mysql up
func (*MySQLStorage) JoinGroup ¶
func (s *MySQLStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error
JoinGroup adds an instance to a group.
func (*MySQLStorage) LeaveAllGroups ¶
func (s *MySQLStorage) LeaveAllGroups(ctx context.Context, instanceID string) error
LeaveAllGroups removes an instance from all groups.
func (*MySQLStorage) LeaveGroup ¶
func (s *MySQLStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error
LeaveGroup removes an instance from a group.
func (*MySQLStorage) ListInstances ¶
func (s *MySQLStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
ListInstances lists workflow instances with cursor-based pagination and filters.
func (*MySQLStorage) MarkOutboxEventFailed ¶
func (s *MySQLStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error
MarkOutboxEventFailed marks an outbox event as failed.
func (*MySQLStorage) MarkOutboxEventSent ¶
func (s *MySQLStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error
MarkOutboxEventSent marks an outbox event as published.
func (*MySQLStorage) PublishToChannel ¶
func (s *MySQLStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)
PublishToChannel publishes a message to a channel.
func (*MySQLStorage) RefreshLock ¶
func (s *MySQLStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
RefreshLock extends the lock expiration time.
func (*MySQLStorage) RegisterChannelReceiveAndReleaseLock ¶
func (s *MySQLStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error
RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.
func (*MySQLStorage) RegisterPostCommitCallback ¶
func (s *MySQLStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error
RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.
func (*MySQLStorage) RegisterTimerSubscription ¶
func (s *MySQLStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
RegisterTimerSubscription registers a timer for a workflow.
func (*MySQLStorage) RegisterTimerSubscriptionAndReleaseLock ¶
func (s *MySQLStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error
RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.
func (*MySQLStorage) ReleaseLock ¶
func (s *MySQLStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error
ReleaseLock releases a lock on a workflow instance.
func (*MySQLStorage) ReleaseSystemLock ¶
func (s *MySQLStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
ReleaseSystemLock releases a system lock.
func (*MySQLStorage) RemoveTimerSubscription ¶
func (s *MySQLStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
RemoveTimerSubscription removes a timer subscription.
func (*MySQLStorage) RollbackTransaction ¶
func (s *MySQLStorage) RollbackTransaction(ctx context.Context) error
RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.
func (*MySQLStorage) SubscribeToChannel ¶
func (s *MySQLStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.
func (*MySQLStorage) TryAcquireLock ¶
func (s *MySQLStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
TryAcquireLock attempts to acquire a lock on a workflow instance.
func (*MySQLStorage) TryAcquireSystemLock ¶
func (s *MySQLStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
TryAcquireSystemLock attempts to acquire a system lock.
func (*MySQLStorage) UnsubscribeFromChannel ¶
func (s *MySQLStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
UnsubscribeFromChannel unsubscribes an instance from a channel.
func (*MySQLStorage) UpdateDeliveryCursor ¶
func (s *MySQLStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
UpdateDeliveryCursor updates the delivery cursor for broadcast mode.
func (*MySQLStorage) UpdateInstanceActivity ¶
func (s *MySQLStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error
UpdateInstanceActivity updates the current activity ID.
func (*MySQLStorage) UpdateInstanceOutput ¶
func (s *MySQLStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
UpdateInstanceOutput updates the output data.
func (*MySQLStorage) UpdateInstanceStatus ¶
func (s *MySQLStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
UpdateInstanceStatus updates the status of a workflow instance.
type OutboxEvent ¶
type OutboxEvent struct {
EventID string `json:"event_id"` // CloudEvents ID (PRIMARY KEY)
EventType string `json:"event_type"` // CloudEvents type
EventSource string `json:"event_source"` // CloudEvents source
EventData []byte `json:"event_data"` // JSON payload
EventDataBinary []byte `json:"event_data_binary"` // Binary payload (Edda compatibility)
DataType string `json:"data_type"` // "json" or "binary"
ContentType string `json:"content_type"` // e.g., "application/json"
Status string `json:"status"` // "pending", "processing", "published", "failed"
RetryCount int `json:"retry_count"` // Number of retry attempts (unified with Edda)
LastError string `json:"last_error"` // Last error message (Edda compatibility)
PublishedAt *time.Time `json:"published_at"` // When the event was published (Edda compatibility)
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
OutboxEvent represents an event in the transactional outbox. Uses event_id (TEXT) as primary key for distributed DB compatibility (Spanner, CockroachDB).
type OutboxManager ¶
type OutboxManager interface {
// AddOutboxEvent adds an event to the outbox.
AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
// GetPendingOutboxEvents retrieves pending events for sending.
// Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.
GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
// MarkOutboxEventSent marks an event as successfully sent.
MarkOutboxEventSent(ctx context.Context, eventID string) error
// MarkOutboxEventFailed marks an event as failed to send.
MarkOutboxEventFailed(ctx context.Context, eventID string) error
// IncrementOutboxAttempts increments the attempt count for an event.
IncrementOutboxAttempts(ctx context.Context, eventID string) error
// CleanupOldOutboxEvents removes old sent events.
CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
}
OutboxManager handles the transactional outbox.
type PaginationResult ¶
type PaginationResult struct {
Instances []*WorkflowInstance `json:"instances"`
NextPageToken string `json:"next_page_token,omitempty"`
HasMore bool `json:"has_more"`
}
PaginationResult represents a paginated list of workflow instances.
type PostgresDriver ¶
type PostgresDriver struct{}
PostgresDriver implements Driver for PostgreSQL.
func (*PostgresDriver) DriverName ¶
func (d *PostgresDriver) DriverName() string
func (*PostgresDriver) GetCurrentTimeExpr ¶
func (d *PostgresDriver) GetCurrentTimeExpr() string
func (*PostgresDriver) JSONCompare ¶
func (d *PostgresDriver) JSONCompare(extractExpr string, value any, placeholderNum int) (expr string, compArgs []any)
JSONCompare for PostgresDriver returns the SQL expression to compare a JSON-extracted value.
func (*PostgresDriver) JSONExtract ¶
func (d *PostgresDriver) JSONExtract(column, path string) string
JSONExtract for PostgresDriver returns the SQL expression to extract a value from a JSON column.
func (*PostgresDriver) MakeDatetimeComparable ¶
func (d *PostgresDriver) MakeDatetimeComparable(column string) string
func (*PostgresDriver) OnConflictDoNothing ¶
func (d *PostgresDriver) OnConflictDoNothing(conflictColumns ...string) string
func (*PostgresDriver) Placeholder ¶
func (d *PostgresDriver) Placeholder(n int) string
func (*PostgresDriver) ReturningClause ¶
func (d *PostgresDriver) ReturningClause(columns ...string) string
func (*PostgresDriver) SelectForUpdateSkipLocked ¶
func (d *PostgresDriver) SelectForUpdateSkipLocked() string
type PostgresStorage ¶
type PostgresStorage struct {
// contains filtered or unexported fields
}
PostgresStorage implements the Storage interface using PostgreSQL.
func NewPostgresStorage ¶
func NewPostgresStorage(connStr string) (*PostgresStorage, error)
NewPostgresStorage creates a new PostgreSQL storage. The connStr should be a PostgreSQL connection string: "postgres://user:password@localhost:5432/dbname?sslmode=disable"
func (*PostgresStorage) AddCompensation ¶
func (s *PostgresStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error
AddCompensation adds a compensation entry.
func (*PostgresStorage) AddOutboxEvent ¶
func (s *PostgresStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
AddOutboxEvent adds an event to the outbox.
func (*PostgresStorage) AppendHistory ¶
func (s *PostgresStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error
AppendHistory appends a history event.
func (*PostgresStorage) ArchiveHistory ¶
func (s *PostgresStorage) ArchiveHistory(ctx context.Context, instanceID string) error
ArchiveHistory moves all history events for an instance to the archive table.
func (*PostgresStorage) BeginTransaction ¶
BeginTransaction starts a new transaction.
func (*PostgresStorage) CancelInstance ¶
func (s *PostgresStorage) CancelInstance(ctx context.Context, instanceID, reason string) error
CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist.
func (*PostgresStorage) ClaimChannelMessage ¶
func (s *PostgresStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
ClaimChannelMessage claims a message for competing mode.
func (*PostgresStorage) CleanupExpiredSystemLocks ¶
func (s *PostgresStorage) CleanupExpiredSystemLocks(ctx context.Context) error
CleanupExpiredSystemLocks removes expired system locks.
func (*PostgresStorage) CleanupInstanceSubscriptions ¶
func (s *PostgresStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
CleanupInstanceSubscriptions removes all subscriptions for an instance. Note: workflow_event_subscriptions was removed in schema unification; events now use channel subscriptions.
func (*PostgresStorage) CleanupOldChannelMessages ¶
func (s *PostgresStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
CleanupOldChannelMessages removes old channel messages.
func (*PostgresStorage) CleanupOldOutboxEvents ¶
func (s *PostgresStorage) CleanupOldOutboxEvents(ctx context.Context, olderThan time.Duration) error
CleanupOldOutboxEvents removes old published events.
func (*PostgresStorage) CleanupStaleLocks ¶
func (s *PostgresStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
CleanupStaleLocks cleans up expired locks and returns workflows to resume.
func (*PostgresStorage) ClearChannelWaitingState ¶
func (s *PostgresStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
ClearChannelWaitingState clears the waiting state for an instance's channel subscription.
func (*PostgresStorage) Close ¶
func (s *PostgresStorage) Close() error
Close closes the database connection.
func (*PostgresStorage) CommitTransaction ¶
func (s *PostgresStorage) CommitTransaction(ctx context.Context) error
CommitTransaction commits the current transaction.
func (*PostgresStorage) Conn ¶
func (s *PostgresStorage) Conn(ctx context.Context) Executor
Conn returns the database executor for the current context.
func (*PostgresStorage) CreateInstance ¶
func (s *PostgresStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error
CreateInstance creates a new workflow instance.
func (*PostgresStorage) DB ¶
func (s *PostgresStorage) DB() *sql.DB
DB returns the underlying database connection.
func (*PostgresStorage) DeleteChannelMessage ¶
func (s *PostgresStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error
DeleteChannelMessage deletes a message from the channel.
func (*PostgresStorage) DeliverChannelMessage ¶
func (s *PostgresStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
DeliverChannelMessage delivers a message to a waiting subscriber.
func (*PostgresStorage) DeliverChannelMessageWithLock ¶
func (s *PostgresStorage) DeliverChannelMessageWithLock( ctx context.Context, instanceID string, channelName string, message *ChannelMessage, workerID string, lockTimeoutSec int, ) (*ChannelDeliveryResult, error)
DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).
func (*PostgresStorage) FindExpiredChannelSubscriptions ¶
func (s *PostgresStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
FindExpiredChannelSubscriptions finds channel subscriptions that have timed out.
func (*PostgresStorage) FindExpiredTimers ¶
func (s *PostgresStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
FindExpiredTimers finds expired timers.
func (*PostgresStorage) FindResumableWorkflows ¶
func (s *PostgresStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.
func (*PostgresStorage) GetArchivedHistory ¶
func (s *PostgresStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
GetArchivedHistory retrieves archived history events for an instance.
func (*PostgresStorage) GetChannelMode ¶ added in v0.6.0
func (s *PostgresStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
GetChannelMode retrieves the mode for a channel (from any existing subscription). Returns empty string if no subscriptions exist.
func (*PostgresStorage) GetChannelSubscribersWaiting ¶
func (s *PostgresStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel. Returns all waiting subscribers regardless of framework - delivery is handled by Lock-First pattern.
func (*PostgresStorage) GetChannelSubscription ¶
func (s *PostgresStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
GetChannelSubscription retrieves a subscription for an instance and channel.
func (*PostgresStorage) GetCompensations ¶
func (s *PostgresStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
GetCompensations retrieves compensations for a workflow in LIFO order.
func (*PostgresStorage) GetDeliveryCursor ¶
func (s *PostgresStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
GetDeliveryCursor gets the current delivery cursor for an instance and channel.
func (*PostgresStorage) GetGroupMembers ¶
GetGroupMembers retrieves all instance IDs in a group.
func (*PostgresStorage) GetHistoryCount ¶
GetHistoryCount returns the total number of history events for an instance.
func (*PostgresStorage) GetHistoryPaginated ¶
func (s *PostgresStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).
func (*PostgresStorage) GetInstance ¶
func (s *PostgresStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
GetInstance retrieves a workflow instance by ID.
func (*PostgresStorage) GetPendingChannelMessages ¶
func (s *PostgresStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
GetPendingChannelMessages retrieves pending messages for a channel after a given ID.
func (*PostgresStorage) GetPendingChannelMessagesForInstance ¶
func (s *PostgresStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)
func (*PostgresStorage) GetPendingOutboxEvents ¶
func (s *PostgresStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
GetPendingOutboxEvents retrieves pending outbox events. Uses SELECT FOR UPDATE SKIP LOCKED for concurrent workers.
func (*PostgresStorage) InTransaction ¶
func (s *PostgresStorage) InTransaction(ctx context.Context) bool
InTransaction returns whether a transaction is in progress.
func (*PostgresStorage) IncrementOutboxAttempts ¶
func (s *PostgresStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error
IncrementOutboxAttempts increments the attempt count for an event.
func (*PostgresStorage) Initialize ¶
func (s *PostgresStorage) Initialize(ctx context.Context) error
Initialize is deprecated. Use dbmate for migrations: dbmate -d schema/db/migrations/postgresql up
func (*PostgresStorage) IsNotifyEnabled ¶
func (s *PostgresStorage) IsNotifyEnabled() bool
IsNotifyEnabled returns whether PostgreSQL LISTEN/NOTIFY is enabled.
func (*PostgresStorage) JoinGroup ¶
func (s *PostgresStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error
JoinGroup adds an instance to a group.
func (*PostgresStorage) LeaveAllGroups ¶
func (s *PostgresStorage) LeaveAllGroups(ctx context.Context, instanceID string) error
LeaveAllGroups removes an instance from all groups.
func (*PostgresStorage) LeaveGroup ¶
func (s *PostgresStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error
LeaveGroup removes an instance from a group.
func (*PostgresStorage) ListInstances ¶
func (s *PostgresStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
ListInstances lists workflow instances with cursor-based pagination and filters.
func (*PostgresStorage) MarkOutboxEventFailed ¶
func (s *PostgresStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error
MarkOutboxEventFailed marks an outbox event as failed.
func (*PostgresStorage) MarkOutboxEventSent ¶
func (s *PostgresStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error
MarkOutboxEventSent marks an outbox event as published.
func (*PostgresStorage) PublishToChannel ¶
func (s *PostgresStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)
PublishToChannel publishes a message to a channel.
func (*PostgresStorage) RefreshLock ¶
func (s *PostgresStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
RefreshLock extends the lock expiration time.
func (*PostgresStorage) RegisterChannelReceiveAndReleaseLock ¶
func (s *PostgresStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error
RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock.
func (*PostgresStorage) RegisterPostCommitCallback ¶
func (s *PostgresStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error
RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.
func (*PostgresStorage) RegisterTimerSubscription ¶
func (s *PostgresStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
RegisterTimerSubscription registers a timer for a workflow.
func (*PostgresStorage) RegisterTimerSubscriptionAndReleaseLock ¶
func (s *PostgresStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error
RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.
func (*PostgresStorage) ReleaseLock ¶
func (s *PostgresStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error
ReleaseLock releases a lock on a workflow instance.
func (*PostgresStorage) ReleaseSystemLock ¶
func (s *PostgresStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
ReleaseSystemLock releases a system lock.
func (*PostgresStorage) RemoveTimerSubscription ¶
func (s *PostgresStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
RemoveTimerSubscription removes a timer subscription.
func (*PostgresStorage) RollbackTransaction ¶
func (s *PostgresStorage) RollbackTransaction(ctx context.Context) error
RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.
func (*PostgresStorage) SetNotifyEnabled ¶
func (s *PostgresStorage) SetNotifyEnabled(enabled bool)
SetNotifyEnabled enables or disables PostgreSQL LISTEN/NOTIFY. When enabled, the storage will send pg_notify() calls after key operations.
func (*PostgresStorage) SubscribeToChannel ¶
func (s *PostgresStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.
func (*PostgresStorage) TryAcquireLock ¶
func (s *PostgresStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
TryAcquireLock attempts to acquire a lock on a workflow instance.
func (*PostgresStorage) TryAcquireSystemLock ¶
func (s *PostgresStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
TryAcquireSystemLock attempts to acquire a system lock.
func (*PostgresStorage) UnsubscribeFromChannel ¶
func (s *PostgresStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
UnsubscribeFromChannel unsubscribes an instance from a channel.
func (*PostgresStorage) UpdateDeliveryCursor ¶
func (s *PostgresStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
UpdateDeliveryCursor updates the delivery cursor for broadcast mode.
func (*PostgresStorage) UpdateInstanceActivity ¶
func (s *PostgresStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error
UpdateInstanceActivity updates the current activity ID.
func (*PostgresStorage) UpdateInstanceOutput ¶
func (s *PostgresStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
UpdateInstanceOutput updates the output data.
func (*PostgresStorage) UpdateInstanceStatus ¶
func (s *PostgresStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
UpdateInstanceStatus updates the status of a workflow instance.
type ResumableWorkflow ¶
ResumableWorkflow contains information about a workflow ready to be resumed. These are workflows with status='running' that don't have an active lock.
type SQLiteDriver ¶
type SQLiteDriver struct{}
SQLiteDriver implements Driver for SQLite.
func (*SQLiteDriver) DriverName ¶
func (d *SQLiteDriver) DriverName() string
func (*SQLiteDriver) GetCurrentTimeExpr ¶
func (d *SQLiteDriver) GetCurrentTimeExpr() string
func (*SQLiteDriver) JSONCompare ¶
func (d *SQLiteDriver) JSONCompare(extractExpr string, value any, _ int) (expr string, compArgs []any)
JSONCompare for SQLiteDriver returns the SQL expression to compare a JSON-extracted value.
func (*SQLiteDriver) JSONExtract ¶
func (d *SQLiteDriver) JSONExtract(column, path string) string
JSONExtract for SQLiteDriver returns the SQL expression to extract a value from a JSON column.
func (*SQLiteDriver) MakeDatetimeComparable ¶
func (d *SQLiteDriver) MakeDatetimeComparable(column string) string
func (*SQLiteDriver) OnConflictDoNothing ¶
func (d *SQLiteDriver) OnConflictDoNothing(conflictColumns ...string) string
func (*SQLiteDriver) Placeholder ¶
func (d *SQLiteDriver) Placeholder(n int) string
func (*SQLiteDriver) ReturningClause ¶
func (d *SQLiteDriver) ReturningClause(columns ...string) string
func (*SQLiteDriver) SelectForUpdateSkipLocked ¶
func (d *SQLiteDriver) SelectForUpdateSkipLocked() string
type SQLiteStorage ¶
type SQLiteStorage struct {
// contains filtered or unexported fields
}
SQLiteStorage implements the Storage interface using SQLite.
func NewSQLiteStorage ¶
func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error)
NewSQLiteStorage creates a new SQLite storage.
func (*SQLiteStorage) AddCompensation ¶
func (s *SQLiteStorage) AddCompensation(ctx context.Context, entry *CompensationEntry) error
AddCompensation adds a compensation entry. Order is determined by created_at DESC (LIFO). Uses millisecond-precision timestamp for reliable ordering.
func (*SQLiteStorage) AddOutboxEvent ¶
func (s *SQLiteStorage) AddOutboxEvent(ctx context.Context, event *OutboxEvent) error
AddOutboxEvent adds an event to the outbox.
func (*SQLiteStorage) AppendHistory ¶
func (s *SQLiteStorage) AppendHistory(ctx context.Context, event *HistoryEvent) error
AppendHistory appends a history event.
func (*SQLiteStorage) ArchiveHistory ¶
func (s *SQLiteStorage) ArchiveHistory(ctx context.Context, instanceID string) error
ArchiveHistory moves all history events for an instance to the archive table.
func (*SQLiteStorage) BeginTransaction ¶
BeginTransaction starts a new transaction.
func (*SQLiteStorage) CancelInstance ¶
func (s *SQLiteStorage) CancelInstance(ctx context.Context, instanceID, reason string) error
CancelInstance cancels a workflow instance. Returns ErrWorkflowNotCancellable if the workflow is already completed, cancelled, failed, or does not exist. Note: reason parameter is kept for interface compatibility but is ignored. Cancellation reasons are tracked via history events.
func (*SQLiteStorage) ClaimChannelMessage ¶
func (s *SQLiteStorage) ClaimChannelMessage(ctx context.Context, messageID int64, instanceID string) (bool, error)
ClaimChannelMessage claims a message for competing mode.
func (*SQLiteStorage) CleanupExpiredSystemLocks ¶
func (s *SQLiteStorage) CleanupExpiredSystemLocks(ctx context.Context) error
CleanupExpiredSystemLocks removes expired system locks.
func (*SQLiteStorage) CleanupInstanceSubscriptions ¶
func (s *SQLiteStorage) CleanupInstanceSubscriptions(ctx context.Context, instanceID string) error
CleanupInstanceSubscriptions removes all subscriptions for an instance. Note: workflow_event_subscriptions was removed in schema unification; events now use channel subscriptions.
func (*SQLiteStorage) CleanupOldChannelMessages ¶
func (s *SQLiteStorage) CleanupOldChannelMessages(ctx context.Context, olderThan time.Duration) error
CleanupOldChannelMessages removes old channel messages.
func (*SQLiteStorage) CleanupOldOutboxEvents ¶
CleanupOldOutboxEvents removes old published events.
func (*SQLiteStorage) CleanupStaleLocks ¶
func (s *SQLiteStorage) CleanupStaleLocks(ctx context.Context, timeoutSec int) ([]StaleWorkflowInfo, error)
CleanupStaleLocks cleans up expired locks and returns workflows to resume.
func (*SQLiteStorage) ClearChannelWaitingState ¶
func (s *SQLiteStorage) ClearChannelWaitingState(ctx context.Context, instanceID, channelName string) error
ClearChannelWaitingState clears the waiting state for an instance's channel subscription. Waiting state is cleared by setting activity_id to NULL.
func (*SQLiteStorage) Close ¶
func (s *SQLiteStorage) Close() error
Close closes the database connection.
func (*SQLiteStorage) CommitTransaction ¶
func (s *SQLiteStorage) CommitTransaction(ctx context.Context) error
CommitTransaction commits the current transaction.
func (*SQLiteStorage) Conn ¶
func (s *SQLiteStorage) Conn(ctx context.Context) Executor
Conn returns the database executor for the current context. If a transaction is active, returns the transaction; otherwise, returns the database. This allows users to execute custom SQL queries within the same transaction as the workflow activity when using transactional activities.
Example:
conn := ctx.Storage().Conn(ctx.Context()) _, err := conn.ExecContext(ctx.Context(), "INSERT INTO orders ...", ...)
func (*SQLiteStorage) CreateInstance ¶
func (s *SQLiteStorage) CreateInstance(ctx context.Context, instance *WorkflowInstance) error
CreateInstance creates a new workflow instance.
func (*SQLiteStorage) DB ¶
func (s *SQLiteStorage) DB() *sql.DB
DB returns the underlying database connection.
func (*SQLiteStorage) DeleteChannelMessage ¶
func (s *SQLiteStorage) DeleteChannelMessage(ctx context.Context, messageID int64) error
DeleteChannelMessage deletes a message from the channel.
func (*SQLiteStorage) DeliverChannelMessage ¶
func (s *SQLiteStorage) DeliverChannelMessage(ctx context.Context, instanceID string, message *ChannelMessage) error
DeliverChannelMessage delivers a message to a waiting subscriber.
func (*SQLiteStorage) DeliverChannelMessageWithLock ¶
func (s *SQLiteStorage) DeliverChannelMessageWithLock( ctx context.Context, instanceID string, channelName string, message *ChannelMessage, workerID string, lockTimeoutSec int, ) (*ChannelDeliveryResult, error)
DeliverChannelMessageWithLock delivers a message using Lock-First pattern. Returns nil result if lock could not be acquired (another worker will handle it).
func (*SQLiteStorage) FindExpiredChannelSubscriptions ¶
func (s *SQLiteStorage) FindExpiredChannelSubscriptions(ctx context.Context, limit int) ([]*ChannelSubscription, error)
FindExpiredChannelSubscriptions finds channel subscriptions that have timed out. Waiting state is determined by activity_id IS NOT NULL.
func (*SQLiteStorage) FindExpiredTimers ¶
func (s *SQLiteStorage) FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
FindExpiredTimers finds expired timers.
func (*SQLiteStorage) FindResumableWorkflows ¶
func (s *SQLiteStorage) FindResumableWorkflows(ctx context.Context, limit int) ([]*ResumableWorkflow, error)
FindResumableWorkflows finds workflows with status='running' that don't have an active lock. These are workflows that had a message delivered and are waiting for a worker to resume them.
func (*SQLiteStorage) GetArchivedHistory ¶
func (s *SQLiteStorage) GetArchivedHistory(ctx context.Context, instanceID string) ([]*ArchivedHistoryEvent, error)
GetArchivedHistory retrieves archived history events for an instance.
func (*SQLiteStorage) GetChannelMode ¶ added in v0.6.0
func (s *SQLiteStorage) GetChannelMode(ctx context.Context, channelName string) (ChannelMode, error)
GetChannelMode retrieves the mode for a channel (from any existing subscription). Returns empty string if no subscriptions exist.
func (*SQLiteStorage) GetChannelSubscribersWaiting ¶
func (s *SQLiteStorage) GetChannelSubscribersWaiting(ctx context.Context, channelName string) ([]*ChannelSubscription, error)
GetChannelSubscribersWaiting finds subscribers waiting for messages on a channel. Waiting state is determined by activity_id IS NOT NULL. Returns all waiting subscribers regardless of framework - delivery is handled by Lock-First pattern.
func (*SQLiteStorage) GetChannelSubscription ¶
func (s *SQLiteStorage) GetChannelSubscription(ctx context.Context, instanceID, channelName string) (*ChannelSubscription, error)
GetChannelSubscription retrieves a subscription for an instance and channel.
func (*SQLiteStorage) GetCompensations ¶
func (s *SQLiteStorage) GetCompensations(ctx context.Context, instanceID string) ([]*CompensationEntry, error)
GetCompensations retrieves compensations for a workflow in LIFO order (by created_at DESC). Uses millisecond-precision timestamps set by AddCompensation for reliable ordering. Status tracking is done via history events (CompensationExecuted, CompensationFailed).
func (*SQLiteStorage) GetDeliveryCursor ¶
func (s *SQLiteStorage) GetDeliveryCursor(ctx context.Context, instanceID, channelName string) (int64, error)
GetDeliveryCursor gets the current delivery cursor for an instance and channel.
func (*SQLiteStorage) GetGroupMembers ¶
GetGroupMembers retrieves all instance IDs in a group.
func (*SQLiteStorage) GetHistoryCount ¶
GetHistoryCount returns the total number of history events for an instance.
func (*SQLiteStorage) GetHistoryPaginated ¶
func (s *SQLiteStorage) GetHistoryPaginated(ctx context.Context, instanceID string, afterID int64, limit int) ([]*HistoryEvent, bool, error)
GetHistoryPaginated retrieves history events with pagination. Returns events with id > afterID, up to limit events. Returns (events, hasMore, error).
func (*SQLiteStorage) GetInstance ¶
func (s *SQLiteStorage) GetInstance(ctx context.Context, instanceID string) (*WorkflowInstance, error)
GetInstance retrieves a workflow instance by ID.
func (*SQLiteStorage) GetPendingChannelMessages ¶
func (s *SQLiteStorage) GetPendingChannelMessages(ctx context.Context, channelName string, afterID int64, limit int) ([]*ChannelMessage, error)
GetPendingChannelMessages retrieves pending messages for a channel after a given ID.
func (*SQLiteStorage) GetPendingChannelMessagesForInstance ¶
func (s *SQLiteStorage) GetPendingChannelMessagesForInstance(ctx context.Context, instanceID, channelName string) ([]*ChannelMessage, error)
GetPendingChannelMessagesForInstance gets pending messages for a specific subscriber. For broadcast mode: Returns messages with id > cursor (messages not yet seen by this instance) For competing mode: Returns unclaimed messages (not yet claimed by any instance)
func (*SQLiteStorage) GetPendingOutboxEvents ¶
func (s *SQLiteStorage) GetPendingOutboxEvents(ctx context.Context, limit int) ([]*OutboxEvent, error)
GetPendingOutboxEvents retrieves pending outbox events.
func (*SQLiteStorage) InTransaction ¶
func (s *SQLiteStorage) InTransaction(ctx context.Context) bool
InTransaction returns whether a transaction is in progress.
func (*SQLiteStorage) IncrementOutboxAttempts ¶
func (s *SQLiteStorage) IncrementOutboxAttempts(ctx context.Context, eventID string) error
IncrementOutboxAttempts increments the attempt count for an event.
func (*SQLiteStorage) Initialize ¶
func (s *SQLiteStorage) Initialize(ctx context.Context) error
Initialize is deprecated. Use dbmate for migrations: dbmate -d schema/db/migrations/sqlite up
func (*SQLiteStorage) JoinGroup ¶
func (s *SQLiteStorage) JoinGroup(ctx context.Context, instanceID, groupName string) error
JoinGroup adds an instance to a group.
func (*SQLiteStorage) LeaveAllGroups ¶
func (s *SQLiteStorage) LeaveAllGroups(ctx context.Context, instanceID string) error
LeaveAllGroups removes an instance from all groups.
func (*SQLiteStorage) LeaveGroup ¶
func (s *SQLiteStorage) LeaveGroup(ctx context.Context, instanceID, groupName string) error
LeaveGroup removes an instance from a group.
func (*SQLiteStorage) ListInstances ¶
func (s *SQLiteStorage) ListInstances(ctx context.Context, opts ListInstancesOptions) (*PaginationResult, error)
ListInstances lists workflow instances with cursor-based pagination and filters.
func (*SQLiteStorage) MarkOutboxEventFailed ¶
func (s *SQLiteStorage) MarkOutboxEventFailed(ctx context.Context, eventID string) error
MarkOutboxEventFailed marks an outbox event as failed.
func (*SQLiteStorage) MarkOutboxEventSent ¶
func (s *SQLiteStorage) MarkOutboxEventSent(ctx context.Context, eventID string) error
MarkOutboxEventSent marks an outbox event as published.
func (*SQLiteStorage) PublishToChannel ¶
func (s *SQLiteStorage) PublishToChannel(ctx context.Context, channelName string, dataJSON, metadata []byte) (int64, error)
PublishToChannel publishes a message to a channel. For direct messages, use dynamic channel names (e.g., "channel:instance_id").
func (*SQLiteStorage) RefreshLock ¶
func (s *SQLiteStorage) RefreshLock(ctx context.Context, instanceID, workerID string, timeoutSec int) error
RefreshLock extends the lock expiration time.
func (*SQLiteStorage) RegisterChannelReceiveAndReleaseLock ¶
func (s *SQLiteStorage) RegisterChannelReceiveAndReleaseLock(ctx context.Context, instanceID, channelName, workerID, activityID string, timeoutAt *time.Time) error
RegisterChannelReceiveAndReleaseLock atomically registers a channel receive wait and releases the lock. Waiting state is indicated by activity_id being non-null.
func (*SQLiteStorage) RegisterPostCommitCallback ¶
func (s *SQLiteStorage) RegisterPostCommitCallback(ctx context.Context, cb func() error) error
RegisterPostCommitCallback registers a callback to be executed after a successful commit. Returns an error if not currently in a transaction.
func (*SQLiteStorage) RegisterTimerSubscription ¶
func (s *SQLiteStorage) RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
RegisterTimerSubscription registers a timer for a workflow.
func (*SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock ¶
func (s *SQLiteStorage) RegisterTimerSubscriptionAndReleaseLock(ctx context.Context, sub *TimerSubscription, instanceID, workerID string) error
RegisterTimerSubscriptionAndReleaseLock atomically registers timer and releases lock.
func (*SQLiteStorage) ReleaseLock ¶
func (s *SQLiteStorage) ReleaseLock(ctx context.Context, instanceID, workerID string) error
ReleaseLock releases a lock on a workflow instance.
func (*SQLiteStorage) ReleaseSystemLock ¶
func (s *SQLiteStorage) ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
ReleaseSystemLock releases a system lock.
func (*SQLiteStorage) RemoveTimerSubscription ¶
func (s *SQLiteStorage) RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
RemoveTimerSubscription removes a timer subscription.
func (*SQLiteStorage) RollbackTransaction ¶
func (s *SQLiteStorage) RollbackTransaction(ctx context.Context) error
RollbackTransaction rolls back the current transaction. Callbacks are not executed on rollback.
func (*SQLiteStorage) SubscribeToChannel ¶
func (s *SQLiteStorage) SubscribeToChannel(ctx context.Context, instanceID, channelName string, mode ChannelMode) error
SubscribeToChannel subscribes an instance to a channel. For broadcast mode, it initializes the delivery cursor to the current max message ID so that only messages published after subscription are received.
func (*SQLiteStorage) TryAcquireLock ¶
func (s *SQLiteStorage) TryAcquireLock(ctx context.Context, instanceID, workerID string, timeoutSec int) (bool, error)
TryAcquireLock attempts to acquire a lock on a workflow instance.
func (*SQLiteStorage) TryAcquireSystemLock ¶
func (s *SQLiteStorage) TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
TryAcquireSystemLock attempts to acquire a system lock.
func (*SQLiteStorage) UnsubscribeFromChannel ¶
func (s *SQLiteStorage) UnsubscribeFromChannel(ctx context.Context, instanceID, channelName string) error
UnsubscribeFromChannel unsubscribes an instance from a channel.
func (*SQLiteStorage) UpdateDeliveryCursor ¶
func (s *SQLiteStorage) UpdateDeliveryCursor(ctx context.Context, instanceID, channelName string, lastMessageID int64) error
UpdateDeliveryCursor updates the delivery cursor for broadcast mode.
func (*SQLiteStorage) UpdateInstanceActivity ¶
func (s *SQLiteStorage) UpdateInstanceActivity(ctx context.Context, instanceID, activityID string) error
UpdateInstanceActivity updates the current activity ID.
func (*SQLiteStorage) UpdateInstanceOutput ¶
func (s *SQLiteStorage) UpdateInstanceOutput(ctx context.Context, instanceID string, outputData []byte) error
UpdateInstanceOutput updates the output data.
func (*SQLiteStorage) UpdateInstanceStatus ¶
func (s *SQLiteStorage) UpdateInstanceStatus(ctx context.Context, instanceID string, status WorkflowStatus, errorMsg string) error
UpdateInstanceStatus updates the status of a workflow instance. Note: errorMsg parameter is kept for interface compatibility but is ignored. Error messages are tracked via history events (WorkflowFailed).
type StaleWorkflowInfo ¶
StaleWorkflowInfo contains information about a workflow with a stale lock.
type Storage ¶
type Storage interface {
// Initialize sets up the storage (create tables, etc.)
Initialize(ctx context.Context) error
// Close closes the storage connection
Close() error
// DB returns the underlying database connection.
// This is primarily used for migrations.
DB() *sql.DB
// Transaction Management
TransactionManager
// Workflow Instance Operations
InstanceManager
// Locking Operations
LockManager
// History Operations
HistoryManager
// Timer Subscription Operations
TimerSubscriptionManager
// Outbox Operations
OutboxManager
// Compensation Operations
CompensationManager
// Channel Operations
ChannelManager
// Group Operations
GroupManager
// System Lock Operations
SystemLockManager
// History Archive Operations
HistoryArchiveManager
}
Storage defines the interface for workflow persistence. Implementations must be safe for concurrent use.
type SystemLock ¶
type SystemLock struct {
LockName string `json:"lock_name"`
LockedBy string `json:"locked_by"`
LockedAt time.Time `json:"locked_at"`
ExpiresAt time.Time `json:"expires_at"`
}
SystemLock represents a system-level lock for background tasks.
type SystemLockManager ¶
type SystemLockManager interface {
// TryAcquireSystemLock attempts to acquire a system lock.
// Returns true if the lock was acquired.
TryAcquireSystemLock(ctx context.Context, lockName, workerID string, timeoutSec int) (bool, error)
// ReleaseSystemLock releases a system lock.
ReleaseSystemLock(ctx context.Context, lockName, workerID string) error
// CleanupExpiredSystemLocks removes expired system locks.
CleanupExpiredSystemLocks(ctx context.Context) error
}
SystemLockManager handles system-level locks for background tasks.
type TimerSubscription ¶
type TimerSubscription struct {
ID int64 `json:"id"`
InstanceID string `json:"instance_id"`
TimerID string `json:"timer_id"`
ExpiresAt time.Time `json:"expires_at"`
ActivityID string `json:"activity_id"` // Activity ID for replay matching (Edda compatibility)
CreatedAt time.Time `json:"created_at"`
}
TimerSubscription represents a workflow waiting for a timer.
type TimerSubscriptionManager ¶
type TimerSubscriptionManager interface {
// RegisterTimerSubscription registers a workflow to wait for a timer.
RegisterTimerSubscription(ctx context.Context, sub *TimerSubscription) error
// RegisterTimerSubscriptionAndReleaseLock atomically registers a timer
// and releases the workflow lock.
RegisterTimerSubscriptionAndReleaseLock(
ctx context.Context,
sub *TimerSubscription,
instanceID, workerID string,
) error
// FindExpiredTimers finds timers that have expired.
// limit specifies the maximum number of timers to return (0 = default 100).
FindExpiredTimers(ctx context.Context, limit int) ([]*TimerSubscription, error)
// RemoveTimerSubscription removes a timer subscription.
RemoveTimerSubscription(ctx context.Context, instanceID, timerID string) error
}
TimerSubscriptionManager handles timer subscriptions.
type TransactionManager ¶
type TransactionManager interface {
// BeginTransaction starts a new transaction.
// Returns a context with the transaction attached.
BeginTransaction(ctx context.Context) (context.Context, error)
// CommitTransaction commits the current transaction.
CommitTransaction(ctx context.Context) error
// RollbackTransaction rolls back the current transaction.
RollbackTransaction(ctx context.Context) error
// InTransaction returns true if there is an active transaction.
InTransaction(ctx context.Context) bool
// Conn returns the database executor for the current context.
// If a transaction is active, returns the transaction; otherwise, returns the database.
// This allows users to execute custom SQL queries within the same transaction.
Conn(ctx context.Context) Executor
// RegisterPostCommitCallback registers a callback to be executed after a successful commit.
// This is used for operations that should only happen if the transaction commits,
// such as notifying waiting subscribers after a message is published.
// Returns an error if not currently in a transaction.
RegisterPostCommitCallback(ctx context.Context, cb func() error) error
}
TransactionManager handles transaction operations.
type WorkflowInstance ¶
type WorkflowInstance struct {
InstanceID string `json:"instance_id"`
WorkflowName string `json:"workflow_name"`
Framework string `json:"framework"` // "go" or "python" (Edda unification)
Status WorkflowStatus `json:"status"`
InputData []byte `json:"input_data"` // JSON-encoded input
OutputData []byte `json:"output_data"` // JSON-encoded output (if completed)
CurrentActivityID string `json:"current_activity_id"` // Last executed activity ID
SourceHash string `json:"source_hash"` // Source code hash (Edda compatibility)
OwnerService string `json:"owner_service"` // Service that owns this workflow (Edda compatibility)
ContinuedFrom string `json:"continued_from"` // Previous instance ID (for recur)
LockedBy string `json:"locked_by"` // Worker ID holding the lock
LockedAt *time.Time `json:"locked_at"` // When the lock was acquired
LockTimeoutSec *int `json:"lock_timeout_seconds"`
LockExpiresAt *time.Time `json:"lock_expires_at"`
StartedAt time.Time `json:"started_at"` // When the workflow was created (unified with Edda)
UpdatedAt time.Time `json:"updated_at"`
}
WorkflowInstance represents a single execution of a workflow.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the current state of a workflow instance.
const ( StatusRunning WorkflowStatus = "running" StatusCompleted WorkflowStatus = "completed" StatusFailed WorkflowStatus = "failed" StatusCancelled WorkflowStatus = "cancelled" StatusWaitingForEvent WorkflowStatus = "waiting_for_event" StatusWaitingForTimer WorkflowStatus = "waiting_for_timer" StatusWaitingForMessage WorkflowStatus = "waiting_for_message" StatusRecurred WorkflowStatus = "recurred" StatusCompensating WorkflowStatus = "compensating" )