Documentation
¶
Overview ¶
Package sqlite provides a SQLite implementation of outbox.Store using database/sql. Use it with any SQLite database/sql driver (mattn/go-sqlite3, modernc.org/sqlite, etc.).
SQLite has no FOR UPDATE SKIP LOCKED. FetchPending wraps the claim in a short transaction; concurrent workers serialize on the database write lock, which is fine for most outbox workloads but does not scale to many parallel claimers the way PostgreSQL does.
Index ¶
- Constants
- func CreateProcessedTableSQL(tableName string) string
- func CreateTableSQL(tableName string) string
- func Migrate(ctx context.Context, db osql.Execer, tableName string) error
- func MigrateProcessed(ctx context.Context, db osql.Execer, tableName string) error
- type ProcessedStore
- func (s *ProcessedStore) IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)
- func (s *ProcessedStore) MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (bool, error)
- func (s *ProcessedStore) PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)
- type Store
- func (s *Store) DeleteProcessedBefore(ctx context.Context, t time.Time) (int64, error)
- func (s *Store) FetchPending(ctx context.Context, limit int) ([]outbox.Message, error)
- func (s *Store) MarkFailed(ctx context.Context, id string, err error) error
- func (s *Store) MarkProcessed(ctx context.Context, ids ...string) error
- func (s *Store) Save(ctx context.Context, tx any, msgs ...outbox.Message) error
Constants ¶
const DefaultProcessedTable = "processed_messages"
DefaultProcessedTable is the default table name used by ProcessedStore when none is provided.
Variables ¶
This section is empty.
Functions ¶
func CreateProcessedTableSQL ¶ added in v0.11.1
CreateProcessedTableSQL returns the SQLite DDL for creating the processed_messages dedup table used by outbox.ProcessedStore.
SQLite has no native JSON type; metadata is TEXT and the JSON1 extension operates on TEXT. The table is intentionally NOT declared STRICT — see CreateTableSQL above for the reason.
func CreateTableSQL ¶
CreateTableSQL returns the SQLite DDL for creating the outbox messages table.
SQLite has no native UUID, JSON or bytea types — TEXT/BLOB are used instead and IDs are generated by the Store on insert. The partial index requires SQLite 3.8 or newer.
The table is intentionally NOT declared STRICT because STRICT only allows the column types ANY/BLOB/INTEGER/REAL/TEXT, and we rely on DATETIME affinity for created_at/processed_at so that database/sql can scan rows into time.Time via sql.NullTime. Adding STRICT here would reject the table at CREATE time on every supported sqlite driver.
func Migrate ¶
Migrate creates the outbox messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to "outbox_messages" when empty (matches sql.DefaultConfig and MigrateProcessed's symmetric default).
func MigrateProcessed ¶ added in v0.11.1
MigrateProcessed creates the processed_messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to DefaultProcessedTable when empty.
Types ¶
type ProcessedStore ¶ added in v0.11.1
type ProcessedStore struct {
// contains filtered or unexported fields
}
ProcessedStore implements outbox.ProcessedStore for SQLite.
Unlike Store, ProcessedStore does not hold a *sql.DB — every method takes the db/tx handle so callers compose with *sql.Tx for transactional exactly-once or pass *sql.DB for the cleanup loop.
func NewProcessed ¶ added in v0.11.1
func NewProcessed(tableName string) *ProcessedStore
NewProcessed creates a ProcessedStore for SQLite. tableName defaults to DefaultProcessedTable when empty.
SQLite has no schemas; pass an unqualified name. See sql.WithTableName for the equivalent constraint on the messages table.
func (*ProcessedStore) IsProcessed ¶ added in v0.11.1
func (s *ProcessedStore) IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)
IsProcessed implements outbox.ProcessedStore. db must satisfy osql.DBTX (e.g. *sql.DB or *sql.Tx).
func (*ProcessedStore) MarkProcessed ¶ added in v0.11.1
func (s *ProcessedStore) MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (bool, error)
MarkProcessed implements outbox.ProcessedStore. Non-empty metadata must be valid JSON; invalid input is rejected before the INSERT runs.
Validation matters for SQLite: the metadata column is TEXT (no JSON affinity), so without this check garbage would be silently stored.
func (*ProcessedStore) PurgeProcessedBefore ¶ added in v0.11.1
func (s *ProcessedStore) PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)
PurgeProcessedBefore implements outbox.ProcessedStore.
SQLite stores DATETIME values as TEXT, and CURRENT_TIMESTAMP renders them as "YYYY-MM-DD HH:MM:SS" (UTC, space-separated, no Z). The default driver binding for time.Time uses a different shape (e.g. "2026-05-07T09:27:21Z"); a raw lexicographic comparison would mix the two and mis-order rows around the space-vs-'T' boundary. We format the cutoff to match CURRENT_TIMESTAMP's exact shape so the lex compare is also a chronological compare.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements outbox.Store for SQLite.
func (*Store) DeleteProcessedBefore ¶ added in v0.11.1
DeleteProcessedBefore removes processed messages older than t.
func (*Store) FetchPending ¶
FetchPending atomically claims up to limit pending messages for processing. Wrapped in a transaction so the SELECT and UPDATE see a consistent snapshot.
func (*Store) MarkFailed ¶
MarkFailed increments retries and records the error. Sets status to failed when max retries reached, else back to pending.
func (*Store) MarkProcessed ¶
MarkProcessed marks messages as successfully processed.