store

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package store is peerbus's durable message store: a pure-Go SQLite (modernc.org/sqlite, WAL mode) queue with dedupe-by-id and per-sender FIFO.

Delivery model (locked by the plan): at-least-once, durable, dedupe-by-message-id, per-sender FIFO (no global order). A message for an offline recipient is simply a row with delivered=0; it is flushed when the recipient registers/drains. A delivered-but-unacked message is requeued for redelivery on reconnect (consumers dedupe).

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("store: closed")

ErrClosed is returned by any operation invoked after Close.

View Source
var ErrDuplicateID = errors.New("store: duplicate message id")

ErrDuplicateID is returned by Enqueue when a message with the same id has already been stored. It is a distinct sentinel (not a silent success) so callers can tell a real insert from a dedupe hit; the row is left untouched.

View Source
var ErrUnknownPeer = errors.New("store: unknown peer")

ErrUnknownPeer is returned by operations that require a registered peer when no such peer exists.

Functions

This section is empty.

Types

type AuditRow

type AuditRow struct {
	Seq      int64
	PrevHash string
	Hash     string
	Event    []byte
	TS       int64
}

AuditRow is one append-only audit-log row. The blake3 hash-chain semantics live in internal/audit; the store only persists and reads back the rows.

Seq      : monotonic chain position (0 = genesis/first row).
PrevHash : hex hash of the previous row (first row chains off blake3("")).
Hash     : hex blake3(prev_hash || canonical event bytes).
Event    : opaque canonical event blob, stored verbatim.
TS       : unix nanos at append.

type Message

type Message struct {
	ID        string
	From      string
	To        string
	Envelope  []byte
	Delivered bool
	Acked     bool
	Attempts  int
	Seq       int64
	TS        int64
}

Message is one durable queue row.

type Peer

type Peer struct {
	Name string
}

Peer is a registered participant on the bus.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a durable SQLite-backed message queue. It is safe for concurrent use; SQLite serialises writes and an internal mutex guards the closed flag.

func Open

func Open(path string) (*Store, error)

Open opens (creating if needed) the SQLite database at path and applies the schema idempotently. Pass ":memory:" for an ephemeral database. WAL journal mode and sane durability/concurrency pragmas are set on the connection.

func (*Store) AuditAppend

func (s *Store) AuditAppend(r AuditRow) error

AuditAppend durably appends one audit row. It does NOT compute or validate the hash chain (that is internal/audit's job); it only persists the row as given. The UNIQUE constraint on audit.seq is the last line of defence against a duplicated chain position — internal/audit serialises appends so this should never fire in practice, but a violation surfaces as an error rather than a silent corruption.

func (*Store) AuditCount

func (s *Store) AuditCount() (int64, error)

AuditCount returns the number of audit rows. Used by internal/audit to find the next chain position without loading the whole log.

func (*Store) AuditLast

func (s *Store) AuditLast() (AuditRow, bool, error)

AuditLast returns the most recent audit row (highest seq) and ok=true, or ok=false when the log is empty.

func (*Store) AuditRows

func (s *Store) AuditRows() ([]AuditRow, error)

AuditRows returns every audit row ordered by seq ascending (genesis first). An empty slice (not an error) means the log is empty.

func (*Store) AuditTamper

func (s *Store) AuditTamper(seq int64, event []byte, hash string) error

AuditTamper overwrites the event and hash of the audit row at the given seq. It exists ONLY to let tests simulate on-disk corruption of the chain; it is never called by production code (the audit log is otherwise append-only).

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database. Subsequent operations return ErrClosed. Calling Close more than once is a no-op.

func (*Store) Enqueue

func (s *Store) Enqueue(msg Message) error

Enqueue durably stores msg for delivery. The recipient (msg.To) must be a registered peer (ErrUnknownPeer otherwise). Dedupe is by msg.ID: if a message with the same id already exists the row is left untouched and ErrDuplicateID is returned (a distinct sentinel, not a silent success).

A per-sender monotonic seq is assigned inside the transaction so that PendingFor returns each sender's messages in FIFO order even under concurrent enqueues.

func (*Store) MarkAcked

func (s *Store) MarkAcked(id string) error

MarkAcked marks the message with the given id as acked. Once acked a message is never requeued by RequeueUnacked. Unknown ids are a no-op.

func (*Store) MarkDelivered

func (s *Store) MarkDelivered(id string) error

MarkDelivered marks the message with the given id as delivered and bumps its attempt counter. Unknown ids are a no-op (at-least-once delivery means callers may retry; an unknown id is not an error here).

func (*Store) PendingFor

func (s *Store) PendingFor(name string) ([]Message, error)

PendingFor returns the undelivered (delivered=0) messages for recipient name, ordered per-sender FIFO via the monotonic seq. The recipient must be a registered peer (ErrUnknownPeer otherwise). An empty slice (not an error) means nothing is queued.

Ordering note: rows are ordered by (sender, seq) then ts so each sender's stream is strictly FIFO; there is intentionally no global cross-sender order (matches the locked per-sender-FIFO delivery model).

func (*Store) Register

func (s *Store) Register(p Peer) error

Register records a peer by name. It is idempotent: re-registering an existing peer refreshes its registration timestamp but keeps the original first-seen ts.

func (*Store) RequeueUnacked

func (s *Store) RequeueUnacked(name string) (int, error)

RequeueUnacked makes every delivered-but-unacked message for recipient name eligible for redelivery again (delivered -> 0). Acked messages are left alone. Called when a peer reconnects so in-flight-but-unconfirmed messages are redelivered (consumers dedupe by id). The recipient must be a registered peer (ErrUnknownPeer otherwise). Returns the number of rows requeued.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL