Versions in this module Expand all Collapse all v1 v1.2.0 Jan 25, 2026 Changes in this version + func OpenConnection(ctx context.Context, config *ConnectionConfig) (*sql.DB, error) + type Config struct + KeyManager *keymanager.EncryptionKeyManager + Logger *log.Logger + Path string + type ConnectionConfig struct + BusyTimeout time.Duration + ConnMaxLifetime time.Duration + MaxIdleConns int + MaxOpenConns int + Path string + Synchronous string + func DefaultConnectionConfig(path string) *ConnectionConfig + type Dialect struct + func NewDialect() *Dialect + func (d *Dialect) BigIntType() string + func (d *Dialect) BlobType() string + func (d *Dialect) CurrentTimestamp() string + func (d *Dialect) JSONExtract() string + func (d *Dialect) JSONExtractPath(key string) string + func (d *Dialect) JSONSet() string + func (d *Dialect) JSONSetPath(key string) string + func (d *Dialect) JSONType() string + func (d *Dialect) Placeholder(n int) string + func (d *Dialect) RequiresSerializableForClaim() bool + func (d *Dialect) SetBusyTimeout(ms int) string + func (d *Dialect) SetSynchronous(level string) string + func (d *Dialect) SetWALMode() string + func (d *Dialect) SupportsAdvisoryLocks() bool + func (d *Dialect) SupportsReturning() bool + func (d *Dialect) SupportsSkipLocked() bool + func (d *Dialect) TimestampType() string + func (d *Dialect) ToJSON(value string) string + func (d *Dialect) UnixMillis(column string) string + type SchemaManager struct + func NewSchemaManager() *SchemaManager + func (m *SchemaManager) EnsureVersionTable(ctx context.Context, db *sql.DB) error + func (m *SchemaManager) GetVersion(ctx context.Context, db *sql.DB) (uint, bool, error) + func (m *SchemaManager) Initialize(ctx context.Context, db *sql.DB) error + func (m *SchemaManager) Migrate(ctx context.Context, db *sql.DB, targetVersion uint) error + func (m *SchemaManager) SetVersion(ctx context.Context, db *sql.DB, version uint, description string) error + func (m *SchemaManager) Version(ctx context.Context, db *sql.DB) (uint, bool, error) + type Storage struct + func NewStorage(ctx context.Context, config *Config) (*Storage, error) + func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error + func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error) + func (s *Storage) Close() error + func (s *Storage) CreateQueue(ctx context.Context, queue *queuepb.Queue) error + func (s *Storage) CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error + func (s *Storage) DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error + func (s *Storage) DeleteQueue(ctx context.Context, name string) error + func (s *Storage) DeleteSchedule(ctx context.Context, scheduleId string) error + func (s *Storage) EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error + func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, ...) error + func (s *Storage) FindExpiredMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error) + func (s *Storage) GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error) + func (s *Storage) GetQueue(ctx context.Context, name string) (*queuepb.Queue, error) + func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error) + func (s *Storage) GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error) + func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error) + func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error + func (s *Storage) ListQueues(ctx context.Context) ([]*queuepb.Queue, error) + func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error) + func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error + func (s *Storage) PauseSchedule(ctx context.Context, scheduleId string) error + func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error) + func (s *Storage) PurgeDLQ(ctx context.Context, queueName string) (int64, error) + func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error + func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error + func (s *Storage) ResumeSchedule(ctx context.Context, scheduleId string) error + func (s *Storage) RetryDLQMessage(ctx context.Context, queueName string, messageId string) error