sqlite

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OpenConnection

func OpenConnection(ctx context.Context, config *ConnectionConfig) (*sql.DB, error)

OpenConnection opens and configures a SQLite database connection

Types

type Config

type Config struct {
	Path       string
	Logger     *log.Logger
	KeyManager *keymanager.EncryptionKeyManager
}

Config holds SQLite storage configuration

type ConnectionConfig

type ConnectionConfig struct {
	Path            string        // Database file path or ":memory:"
	MaxOpenConns    int           // Maximum number of open connections (default: 1 for SQLite)
	MaxIdleConns    int           // Maximum number of idle connections
	ConnMaxLifetime time.Duration // Maximum connection lifetime
	BusyTimeout     time.Duration // Busy timeout (default: 30s)
	Synchronous     string        // Synchronous mode: OFF, NORMAL, FULL (default: NORMAL)
}

ConnectionConfig holds SQLite connection configuration

func DefaultConnectionConfig

func DefaultConnectionConfig(path string) *ConnectionConfig

DefaultConnectionConfig returns default SQLite connection configuration

type Dialect

type Dialect struct{}

Dialect implements sql.SQLDialect for SQLite databases. It provides SQLite-specific SQL syntax and capabilities.

func NewDialect

func NewDialect() *Dialect

NewDialect creates a new SQLite dialect

func (*Dialect) BigIntType

func (d *Dialect) BigIntType() string

BigIntType returns the SQLite integer type (SQLite uses INTEGER for all integers)

func (*Dialect) BlobType

func (d *Dialect) BlobType() string

BlobType returns the SQLite BLOB type

func (*Dialect) CurrentTimestamp

func (d *Dialect) CurrentTimestamp() string

CurrentTimestamp returns the SQLite current timestamp function

func (*Dialect) JSONExtract

func (d *Dialect) JSONExtract() string

JSONExtract returns the SQLite JSON extraction function

func (*Dialect) JSONExtractPath

func (d *Dialect) JSONExtractPath(key string) string

JSONExtractPath returns a JSON path argument for json_extract

func (*Dialect) JSONSet

func (d *Dialect) JSONSet() string

JSONSet returns the SQLite JSON modification function

func (*Dialect) JSONSetPath

func (d *Dialect) JSONSetPath(key string) string

JSONSetPath returns a JSON path for json_set

func (*Dialect) JSONType

func (d *Dialect) JSONType() string

JSONType returns the SQLite JSON type (stored as TEXT)

func (*Dialect) Placeholder

func (d *Dialect) Placeholder(n int) string

Placeholder returns the SQLite positional parameter syntax

func (*Dialect) RequiresSerializableForClaim

func (d *Dialect) RequiresSerializableForClaim() bool

RequiresSerializableForClaim indicates if SERIALIZABLE isolation is needed for claiming

func (*Dialect) SetBusyTimeout

func (d *Dialect) SetBusyTimeout(ms int) string

SetBusyTimeout returns the PRAGMA to set busy timeout

func (*Dialect) SetSynchronous

func (d *Dialect) SetSynchronous(level string) string

SetSynchronous returns the PRAGMA to set synchronous mode

func (*Dialect) SetWALMode

func (d *Dialect) SetWALMode() string

SetWALMode returns the PRAGMA to enable WAL mode

func (*Dialect) SupportsAdvisoryLocks

func (d *Dialect) SupportsAdvisoryLocks() bool

SupportsAdvisoryLocks indicates if advisory locks are supported

func (*Dialect) SupportsReturning

func (d *Dialect) SupportsReturning() bool

SupportsReturning indicates if RETURNING clause is supported

func (*Dialect) SupportsSkipLocked

func (d *Dialect) SupportsSkipLocked() bool

SupportsSkipLocked indicates if SELECT FOR UPDATE SKIP LOCKED is supported

func (*Dialect) TimestampType

func (d *Dialect) TimestampType() string

TimestampType returns the SQLite timestamp type

func (*Dialect) ToJSON

func (d *Dialect) ToJSON(value string) string

ToJSON converts a value to JSON (SQLite json_set accepts raw values directly)

func (*Dialect) UnixMillis

func (d *Dialect) UnixMillis(column string) string

UnixMillis extracts unix milliseconds from a timestamp column

type SchemaManager

type SchemaManager struct {
	// contains filtered or unexported fields
}

func NewSchemaManager

func NewSchemaManager() *SchemaManager

func (*SchemaManager) EnsureVersionTable

func (m *SchemaManager) EnsureVersionTable(ctx context.Context, db *sql.DB) error

func (*SchemaManager) GetVersion

func (m *SchemaManager) GetVersion(ctx context.Context, db *sql.DB) (uint, bool, error)

func (*SchemaManager) Initialize

func (m *SchemaManager) Initialize(ctx context.Context, db *sql.DB) error

func (*SchemaManager) Migrate

func (m *SchemaManager) Migrate(ctx context.Context, db *sql.DB, targetVersion uint) error

func (*SchemaManager) SetVersion

func (m *SchemaManager) SetVersion(ctx context.Context, db *sql.DB, version uint, description string) error

func (*SchemaManager) Version

func (m *SchemaManager) Version(ctx context.Context, db *sql.DB) (uint, bool, error)

type Storage

type Storage struct {
	*repositorysql.BaseSQL
	// contains filtered or unexported fields
}

Storage implements the persistence.Storage interface for SQLite

func NewStorage

func NewStorage(ctx context.Context, config *Config) (*Storage, error)

NewStorage creates a new SQLite storage instance

func (*Storage) AcknowledgeMessage

func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error

AcknowledgeMessage marks a message as completed

func (*Storage) ClaimMessage

func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)

ClaimMessage claims the next available message from a queue

func (*Storage) Close

func (s *Storage) Close() error

Close closes the storage

func (*Storage) CreateQueue

func (s *Storage) CreateQueue(ctx context.Context, queue *queuepb.Queue) error

CreateQueue creates a new queue

func (*Storage) CreateSchedule

func (s *Storage) CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error

func (*Storage) DeleteDLQMessage

func (s *Storage) DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error

DeleteDLQMessage permanently deletes a message from DLQ

func (*Storage) DeleteQueue

func (s *Storage) DeleteQueue(ctx context.Context, name string) error

DeleteQueue deletes a queue

func (*Storage) DeleteSchedule

func (s *Storage) DeleteSchedule(ctx context.Context, scheduleId string) error

DeleteSchedule deletes a schedule

func (*Storage) EnqueueMessage

func (s *Storage) EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error

func (*Storage) ExtendMessageLease

func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, extensionMs int64) error

ExtendMessageLease extends the lease on a message

func (*Storage) FindExpiredMessages

func (s *Storage) FindExpiredMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

func (*Storage) GetDLQMessages

func (s *Storage) GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

func (*Storage) GetQueue

func (s *Storage) GetQueue(ctx context.Context, name string) (*queuepb.Queue, error)

GetQueue retrieves a queue by name

func (*Storage) GetQueueMetadata

func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)

GetQueueMetadata retrieves queue metadata by name

func (*Storage) GetSchedule

func (s *Storage) GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error)

GetSchedule retrieves a schedule by ID

func (*Storage) GetScheduleHistory

func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)

GetScheduleHistory returns the execution history for a schedule

func (*Storage) HeartbeatMessage

func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error

HeartbeatMessage updates the heartbeat for a message

func (*Storage) ListQueues

func (s *Storage) ListQueues(ctx context.Context) ([]*queuepb.Queue, error)

ListQueues returns all queues

func (*Storage) ListSchedules

func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)

ListSchedules returns schedules for a queue

func (*Storage) NackMessage

func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error

NackMessage marks a message as failed

func (*Storage) PauseSchedule

func (s *Storage) PauseSchedule(ctx context.Context, scheduleId string) error

PauseSchedule pauses a schedule

func (*Storage) PeekMessages

func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

PeekMessages retrieves messages without claiming them

func (*Storage) PurgeDLQ

func (s *Storage) PurgeDLQ(ctx context.Context, queueName string) (int64, error)

PurgeDLQ bulk-deletes all messages in ERRORED state for a queue

func (*Storage) ReclaimExpiredMessage

func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error

ReclaimExpiredMessage moves an expired message back to pending or DLQ

func (*Storage) RecordScheduleExecution

func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error

RecordScheduleExecution records a schedule execution

func (*Storage) ResumeSchedule

func (s *Storage) ResumeSchedule(ctx context.Context, scheduleId string) error

ResumeSchedule resumes a paused schedule

func (*Storage) RetryDLQMessage

func (s *Storage) RetryDLQMessage(ctx context.Context, queueName string, messageId string) error

RetryDLQMessage moves a message from DLQ back to pending

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL