sql

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMaxExtensionReached = fmt.Errorf("maximum lease extension reached")
)

Common errors

Functions

This section is empty.

Types

type BaseSQL

type BaseSQL struct {
	DB           *sql.DB
	Logger       *log.Logger
	Serializer   *common.ProtoSerializer
	KeyManager   *keymanager.EncryptionKeyManager
	Dialect      SQLDialect
	Clock        *Clock
	StateManager *StateManager
	LeaseRuntime *LeaseRuntimeCalculator
}

BaseSQL provides common SQL storage functionality shared across SQL backends. This eliminates code duplication between SQLite, Postgres, MySQL, etc.

func NewBaseSQL

func NewBaseSQL(
	db *sql.DB,
	logger *log.Logger,
	keyManager *keymanager.EncryptionKeyManager,
	dialect SQLDialect,
) *BaseSQL

NewBaseSQL creates a new BaseSQL instance

func (*BaseSQL) ApplyConfig

func (b *BaseSQL) ApplyConfig(ctx context.Context, cfg *Config) error

ApplyConfig applies configuration to the database connection

func (*BaseSQL) Close

func (b *BaseSQL) Close() error

Close closes the database connection

func (*BaseSQL) WithReadOnlyTransaction

func (b *BaseSQL) WithReadOnlyTransaction(
	ctx context.Context,
	fn func(tx *sql.Tx) error,
) error

WithReadOnlyTransaction executes fn within a read-only transaction

func (*BaseSQL) WithSerializableTransaction

func (b *BaseSQL) WithSerializableTransaction(
	ctx context.Context,
	fn func(tx *sql.Tx) error,
) error

WithSerializableTransaction executes fn within a serializable transaction. Used for operations requiring strict isolation (e.g., message claiming in SQLite).

func (*BaseSQL) WithTransaction

func (b *BaseSQL) WithTransaction(
	ctx context.Context,
	opts *TxOptions,
	fn func(tx *sql.Tx) error,
) error

WithTransaction executes fn within a transaction with proper error handling and rollback. This provides a consistent transaction pattern across all SQL backends.

type Clock

type Clock struct{}

Clock provides timestamp operations for consistency across storage layer

func NewClock

func NewClock() *Clock

NewClock creates a new Clock instance

func (*Clock) FromMs

func (c *Clock) FromMs(ms int64) time.Time

FromMs converts Unix milliseconds to time.Time

func (*Clock) Now

func (c *Clock) Now() time.Time

Now returns the current time

func (*Clock) NowMs

func (c *Clock) NowMs() int64

NowMs returns the current Unix timestamp in milliseconds

func (*Clock) ToMs

func (c *Clock) ToMs(t time.Time) int64

ToMs converts a time.Time to Unix milliseconds

type Config

type Config struct {
	// Connection pooling
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxLifetime int // seconds

	// SQLite-specific
	BusyTimeout int // milliseconds
	WALMode     bool
	Synchronous string // NORMAL, FULL, etc.
}

Config holds SQL storage configuration

type LeaseRuntime

type LeaseRuntime struct {
	LeaseStartedAt     int64
	LeaseExpiry        int64
	LastHeartbeatAt    int64
	HeartbeatExpiry    int64
	LeaseExtensionUsed int64
}

LeaseRuntime represents the calculated runtime values for a message lease. All times are Unix milliseconds for consistency with database storage.

type LeaseRuntimeCalculator

type LeaseRuntimeCalculator struct {
	// contains filtered or unexported fields
}

LeaseRuntimeCalculator calculates lease expiry times and manages lease extensions. This provides a centralized way to compute lease-related timestamps.

func NewLeaseRuntimeCalculator

func NewLeaseRuntimeCalculator(clock *Clock) *LeaseRuntimeCalculator

NewLeaseRuntimeCalculator creates a new lease runtime calculator

func (*LeaseRuntimeCalculator) CalculateLeaseRuntime

func (lrc *LeaseRuntimeCalculator) CalculateLeaseRuntime(policy *commonpb.LeasePolicy) *LeaseRuntime

CalculateLeaseRuntime computes all lease-related timestamps from a lease policy. This is called when a message transitions to RUNNING state.

func (*LeaseRuntimeCalculator) ExtendLease

func (lrc *LeaseRuntimeCalculator) ExtendLease(
	policy *commonpb.LeasePolicy,
	currentExtensionUsed int64,
	requestedExtensionMs int64,
) (*LeaseRuntime, error)

ExtendLease calculates a new lease expiry when extending a lease. Returns updated lease runtime or error if max extension reached.

func (*LeaseRuntimeCalculator) Heartbeat

func (lrc *LeaseRuntimeCalculator) Heartbeat() *LeaseRuntime

Heartbeat updates the heartbeat expiry for an active lease. Returns updated runtime with new heartbeat timestamp.

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder helps construct SQL queries with dialect-specific syntax. This allows the same logical query to work across different SQL databases.

func NewQueryBuilder

func NewQueryBuilder(dialect SQLDialect) *QueryBuilder

NewQueryBuilder creates a new QueryBuilder for the given dialect

func (*QueryBuilder) BuildClaimMessageQuery

func (qb *QueryBuilder) BuildClaimMessageQuery() string

BuildClaimMessageQuery builds a query to claim a PENDING message. Returns different queries based on dialect capabilities.

func (*QueryBuilder) BuildCountByStateQuery

func (qb *QueryBuilder) BuildCountByStateQuery() string

BuildCountByStateQuery builds a query to count messages by state

func (*QueryBuilder) BuildDeleteMessageQuery

func (qb *QueryBuilder) BuildDeleteMessageQuery() string

BuildDeleteMessageQuery builds a query to delete a message

func (*QueryBuilder) BuildFindEarliestDeadlineQuery

func (qb *QueryBuilder) BuildFindEarliestDeadlineQuery() string

BuildFindEarliestDeadlineQuery builds a query to find the earliest lease expiry

func (*QueryBuilder) BuildFindExpiredMessagesQuery

func (qb *QueryBuilder) BuildFindExpiredMessagesQuery() string

BuildFindExpiredMessagesQuery builds a query to find messages with expired leases

func (*QueryBuilder) BuildGetQueueMetadataQuery

func (qb *QueryBuilder) BuildGetQueueMetadataQuery() string

BuildGetQueueMetadataQuery builds a query to retrieve queue metadata

func (*QueryBuilder) BuildGetScheduledMessagesQuery

func (qb *QueryBuilder) BuildGetScheduledMessagesQuery() string

BuildGetScheduledMessagesQuery builds a query to find messages ready for activation

func (*QueryBuilder) BuildInsertMessageQuery

func (qb *QueryBuilder) BuildInsertMessageQuery() string

BuildInsertMessageQuery builds a query to insert a new message with idempotency support

func (*QueryBuilder) BuildInsertQueueQuery

func (qb *QueryBuilder) BuildInsertQueueQuery() string

BuildInsertQueueQuery builds a query to insert a new queue

func (*QueryBuilder) BuildOldestMessageAgeQuery

func (qb *QueryBuilder) BuildOldestMessageAgeQuery(_ string) string

BuildOldestMessageAgeQuery returns the minimum created_at for pending messages in a priority range. Used for aging-based priority weighting.

func (*QueryBuilder) BuildUpdateMessageStateQuery

func (qb *QueryBuilder) BuildUpdateMessageStateQuery() string

BuildUpdateMessageStateQuery builds a query to update message state

func (*QueryBuilder) BuildUpdateStateCountersQuery

func (qb *QueryBuilder) BuildUpdateStateCountersQuery(increment bool, stateKey string) string

BuildUpdateStateCountersQuery builds a query to update queue state counters. Uses JSON functions to increment/decrement state counts.

type SQLDialect

type SQLDialect interface {
	// Query syntax
	Placeholder(n int) string          // Positional parameter syntax: $1 (Postgres) vs ? (SQLite/MySQL)
	CurrentTimestamp() string          // CURRENT_TIMESTAMP vs NOW()
	JSONSet() string                   // json_set (SQLite) vs jsonb_set (Postgres)
	JSONSetPath(key string) string     // Path argument for JSONSet
	JSONExtract() string               // json_extract (SQLite) vs jsonb_extract_path_text (Postgres)
	JSONExtractPath(key string) string // Path argument for JSONExtract
	ToJSON(value string) string        // Convert value to JSON: raw value (SQLite) vs to_jsonb() (Postgres)
	UnixMillis(column string) string   // Extract unix milliseconds from timestamp column

	// Type mappings
	BlobType() string      // BLOB vs BYTEA
	TimestampType() string // TIMESTAMP vs TIMESTAMPTZ
	BigIntType() string    // INTEGER vs BIGINT
	JSONType() string      // TEXT vs JSONB

	// Feature support
	SupportsReturning() bool            // RETURNING clause support
	SupportsSkipLocked() bool           // SELECT FOR UPDATE SKIP LOCKED
	SupportsAdvisoryLocks() bool        // Postgres advisory locks
	RequiresSerializableForClaim() bool // Needs SERIALIZABLE isolation for claiming

	// Connection configuration
	SetWALMode() string                 // SQLite: PRAGMA journal_mode=WAL
	SetBusyTimeout(ms int) string       // SQLite: PRAGMA busy_timeout
	SetSynchronous(level string) string // PRAGMA synchronous
}

SQLDialect abstracts database-specific SQL syntax and capabilities. Each SQL backend (SQLite, Postgres, MySQL) implements this interface.

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager handles message state counter updates for queue statistics. This provides O(1) queue state lookups by maintaining pre-computed counters.

func NewStateManager

func NewStateManager(dialect SQLDialect) *StateManager

NewStateManager creates a new StateManager instance

func (*StateManager) GetStateCounts

func (sm *StateManager) GetStateCounts(
	ctx context.Context,
	db *sql.DB,
	queueName string,
) (map[string]int64, error)

GetStateCounts retrieves the current state counts for a queue

func (*StateManager) UpdateCounters

func (sm *StateManager) UpdateCounters(
	ctx context.Context,
	tx *sql.Tx,
	queueName string,
	oldState, newState messagepb.Message_Metadata_State,
) error

UpdateCounters atomically updates state counters when a message transitions. This is called within a transaction to ensure consistency.

type TxOptions

type TxOptions struct {
	Isolation sql.IsolationLevel
	Timeout   time.Duration
	ReadOnly  bool
}

TxOptions holds transaction configuration

func DefaultTxOptions

func DefaultTxOptions() *TxOptions

DefaultTxOptions returns default transaction options

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL