Versions in this module Expand all Collapse all v1 v1.2.0 Jan 25, 2026 Changes in this version + var ErrMaxExtensionReached = fmt.Errorf("maximum lease extension reached") + type BaseSQL struct + Clock *Clock + DB *sql.DB + Dialect SQLDialect + KeyManager *keymanager.EncryptionKeyManager + LeaseRuntime *LeaseRuntimeCalculator + Logger *log.Logger + Serializer *common.ProtoSerializer + StateManager *StateManager + func NewBaseSQL(db *sql.DB, logger *log.Logger, keyManager *keymanager.EncryptionKeyManager, ...) *BaseSQL + func (b *BaseSQL) ApplyConfig(ctx context.Context, cfg *Config) error + func (b *BaseSQL) Close() error + func (b *BaseSQL) WithReadOnlyTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error + func (b *BaseSQL) WithSerializableTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error + func (b *BaseSQL) WithTransaction(ctx context.Context, opts *TxOptions, fn func(tx *sql.Tx) error) error + type Clock struct + func NewClock() *Clock + func (c *Clock) FromMs(ms int64) time.Time + func (c *Clock) Now() time.Time + func (c *Clock) NowMs() int64 + func (c *Clock) ToMs(t time.Time) int64 + type Config struct + BusyTimeout int + ConnMaxLifetime int + MaxIdleConns int + MaxOpenConns int + Synchronous string + WALMode bool + type LeaseRuntime struct + HeartbeatExpiry int64 + LastHeartbeatAt int64 + LeaseExpiry int64 + LeaseExtensionUsed int64 + LeaseStartedAt int64 + type LeaseRuntimeCalculator struct + func NewLeaseRuntimeCalculator(clock *Clock) *LeaseRuntimeCalculator + func (lrc *LeaseRuntimeCalculator) CalculateLeaseRuntime(policy *commonpb.LeasePolicy) *LeaseRuntime + func (lrc *LeaseRuntimeCalculator) ExtendLease(policy *commonpb.LeasePolicy, currentExtensionUsed int64, ...) (*LeaseRuntime, error) + func (lrc *LeaseRuntimeCalculator) Heartbeat() *LeaseRuntime + type QueryBuilder struct + func NewQueryBuilder(dialect SQLDialect) *QueryBuilder + func (qb *QueryBuilder) BuildClaimMessageQuery() string + func (qb *QueryBuilder) BuildCountByStateQuery() string + func (qb *QueryBuilder) BuildDeleteMessageQuery() string + func (qb *QueryBuilder) BuildFindEarliestDeadlineQuery() string + func (qb *QueryBuilder) BuildFindExpiredMessagesQuery() string + func (qb *QueryBuilder) BuildGetQueueMetadataQuery() string + func (qb *QueryBuilder) BuildGetScheduledMessagesQuery() string + func (qb *QueryBuilder) BuildInsertMessageQuery() string + func (qb *QueryBuilder) BuildInsertQueueQuery() string + func (qb *QueryBuilder) BuildOldestMessageAgeQuery(_ string) string + func (qb *QueryBuilder) BuildUpdateMessageStateQuery() string + func (qb *QueryBuilder) BuildUpdateStateCountersQuery(increment bool, stateKey string) string + type SQLDialect interface + BigIntType func() string + BlobType func() string + CurrentTimestamp func() string + JSONExtract func() string + JSONExtractPath func(key string) string + JSONSet func() string + JSONSetPath func(key string) string + JSONType func() string + Placeholder func(n int) string + RequiresSerializableForClaim func() bool + SetBusyTimeout func(ms int) string + SetSynchronous func(level string) string + SetWALMode func() string + SupportsAdvisoryLocks func() bool + SupportsReturning func() bool + SupportsSkipLocked func() bool + TimestampType func() string + ToJSON func(value string) string + UnixMillis func(column string) string + type StateManager struct + func NewStateManager(dialect SQLDialect) *StateManager + func (sm *StateManager) GetStateCounts(ctx context.Context, db *sql.DB, queueName string) (map[string]int64, error) + func (sm *StateManager) UpdateCounters(ctx context.Context, tx *sql.Tx, queueName string, ...) error + type TxOptions struct + Isolation sql.IsolationLevel + ReadOnly bool + Timeout time.Duration + func DefaultTxOptions() *TxOptions