sqlite

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package sqlite provides a SQLite implementation of outbox.Store using database/sql. Use it with any SQLite database/sql driver (mattn/go-sqlite3, modernc.org/sqlite, etc.).

SQLite has no FOR UPDATE SKIP LOCKED. FetchPending wraps the claim in a short transaction; concurrent workers serialize on the database write lock, which is fine for most outbox workloads but does not scale to many parallel claimers the way PostgreSQL does.

Index

Constants

View Source
const DefaultProcessedTable = "processed_messages"

DefaultProcessedTable is the default table name used by ProcessedStore when none is provided.

Variables

This section is empty.

Functions

func CreateProcessedTableSQL added in v0.11.1

func CreateProcessedTableSQL(tableName string) string

CreateProcessedTableSQL returns the SQLite DDL for creating the processed_messages dedup table used by outbox.ProcessedStore.

SQLite has no native JSON type; metadata is TEXT and the JSON1 extension operates on TEXT. The table is intentionally NOT declared STRICT — see CreateTableSQL above for the reason.

func CreateTableSQL

func CreateTableSQL(tableName string) string

CreateTableSQL returns the SQLite DDL for creating the outbox messages table.

SQLite has no native UUID, JSON or bytea types — TEXT/BLOB are used instead and IDs are generated by the Store on insert. The partial index requires SQLite 3.8 or newer.

The table is intentionally NOT declared STRICT because STRICT only allows the column types ANY/BLOB/INTEGER/REAL/TEXT, and we rely on DATETIME affinity for created_at/processed_at so that database/sql can scan rows into time.Time via sql.NullTime. Adding STRICT here would reject the table at CREATE time on every supported sqlite driver.

func Migrate

func Migrate(ctx context.Context, db osql.Execer, tableName string) error

Migrate creates the outbox messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to "outbox_messages" when empty (matches sql.DefaultConfig and MigrateProcessed's symmetric default).

func MigrateProcessed added in v0.11.1

func MigrateProcessed(ctx context.Context, db osql.Execer, tableName string) error

MigrateProcessed creates the processed_messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to DefaultProcessedTable when empty.

Types

type ProcessedStore added in v0.11.1

type ProcessedStore struct {
	// contains filtered or unexported fields
}

ProcessedStore implements outbox.ProcessedStore for SQLite.

Unlike Store, ProcessedStore does not hold a *sql.DB — every method takes the db/tx handle so callers compose with *sql.Tx for transactional exactly-once or pass *sql.DB for the cleanup loop.

func NewProcessed added in v0.11.1

func NewProcessed(tableName string) *ProcessedStore

NewProcessed creates a ProcessedStore for SQLite. tableName defaults to DefaultProcessedTable when empty.

SQLite has no schemas; pass an unqualified name. See sql.WithTableName for the equivalent constraint on the messages table.

func (*ProcessedStore) IsProcessed added in v0.11.1

func (s *ProcessedStore) IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)

IsProcessed implements outbox.ProcessedStore. db must satisfy osql.DBTX (e.g. *sql.DB or *sql.Tx).

func (*ProcessedStore) MarkProcessed added in v0.11.1

func (s *ProcessedStore) MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (bool, error)

MarkProcessed implements outbox.ProcessedStore. Non-empty metadata must be valid JSON; invalid input is rejected before the INSERT runs.

Validation matters for SQLite: the metadata column is TEXT (no JSON affinity), so without this check garbage would be silently stored.

func (*ProcessedStore) PurgeProcessedBefore added in v0.11.1

func (s *ProcessedStore) PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)

PurgeProcessedBefore implements outbox.ProcessedStore.

SQLite stores DATETIME values as TEXT, and CURRENT_TIMESTAMP renders them as "YYYY-MM-DD HH:MM:SS" (UTC, space-separated, no Z). The default driver binding for time.Time uses a different shape (e.g. "2026-05-07T09:27:21Z"); a raw lexicographic comparison would mix the two and mis-order rows around the space-vs-'T' boundary. We format the cutoff to match CURRENT_TIMESTAMP's exact shape so the lex compare is also a chronological compare.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements outbox.Store for SQLite.

func New

func New(db *stdsql.DB, options ...osql.Option) *Store

New creates a new SQLite outbox store backed by *sql.DB.

func (*Store) DeleteProcessedBefore added in v0.11.1

func (s *Store) DeleteProcessedBefore(ctx context.Context, t time.Time) (int64, error)

DeleteProcessedBefore removes processed messages older than t.

func (*Store) FetchPending

func (s *Store) FetchPending(ctx context.Context, limit int) ([]outbox.Message, error)

FetchPending atomically claims up to limit pending messages for processing. Wrapped in a transaction so the SELECT and UPDATE see a consistent snapshot.

func (*Store) MarkFailed

func (s *Store) MarkFailed(ctx context.Context, id string, err error) error

MarkFailed increments retries and records the error. Sets status to failed when max retries reached, else back to pending.

func (*Store) MarkProcessed

func (s *Store) MarkProcessed(ctx context.Context, ids ...string) error

MarkProcessed marks messages as successfully processed.

func (*Store) Save

func (s *Store) Save(ctx context.Context, tx any, msgs ...outbox.Message) error

Save persists messages within the given transaction. tx may be a *sql.Tx, or nil to have the Store create its own transaction. Message IDs are generated by the Store when empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL