Documentation
¶
Overview ¶
Package store is peerbus's durable message store: a pure-Go SQLite (modernc.org/sqlite, WAL mode) queue with dedupe-by-id and per-sender FIFO.
Delivery model (locked by the plan): at-least-once, durable, dedupe-by-message-id, per-sender FIFO (no global order). A message for an offline recipient is simply a row with delivered=0; it is flushed when the recipient registers/drains. A delivered-but-unacked message is requeued for redelivery on reconnect (consumers dedupe).
Index ¶
- Variables
- type AuditRow
- type Message
- type Peer
- type Store
- func (s *Store) AuditAppend(r AuditRow) error
- func (s *Store) AuditCount() (int64, error)
- func (s *Store) AuditLast() (AuditRow, bool, error)
- func (s *Store) AuditRows() ([]AuditRow, error)
- func (s *Store) AuditTamper(seq int64, event []byte, hash string) error
- func (s *Store) Close() error
- func (s *Store) Enqueue(msg Message) error
- func (s *Store) MarkAcked(id string) error
- func (s *Store) MarkDelivered(id string) error
- func (s *Store) PendingFor(name string) ([]Message, error)
- func (s *Store) Register(p Peer) error
- func (s *Store) RequeueUnacked(name string) (int, error)
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("store: closed")
ErrClosed is returned by any operation invoked after Close.
var ErrDuplicateID = errors.New("store: duplicate message id")
ErrDuplicateID is returned by Enqueue when a message with the same id has already been stored. It is a distinct sentinel (not a silent success) so callers can tell a real insert from a dedupe hit; the row is left untouched.
var ErrUnknownPeer = errors.New("store: unknown peer")
ErrUnknownPeer is returned by operations that require a registered peer when no such peer exists.
Functions ¶
This section is empty.
Types ¶
type AuditRow ¶
AuditRow is one append-only audit-log row. The blake3 hash-chain semantics live in internal/audit; the store only persists and reads back the rows.
Seq : monotonic chain position (0 = genesis/first row).
PrevHash : hex hash of the previous row (first row chains off blake3("")).
Hash : hex blake3(prev_hash || canonical event bytes).
Event : opaque canonical event blob, stored verbatim.
TS : unix nanos at append.
type Message ¶
type Message struct {
ID string
From string
To string
Envelope []byte
Delivered bool
Acked bool
Attempts int
Seq int64
TS int64
}
Message is one durable queue row.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a durable SQLite-backed message queue. It is safe for concurrent use; SQLite serialises writes and an internal mutex guards the closed flag.
func Open ¶
Open opens (creating if needed) the SQLite database at path and applies the schema idempotently. Pass ":memory:" for an ephemeral database. WAL journal mode and sane durability/concurrency pragmas are set on the connection.
func (*Store) AuditAppend ¶
AuditAppend durably appends one audit row. It does NOT compute or validate the hash chain (that is internal/audit's job); it only persists the row as given. The UNIQUE constraint on audit.seq is the last line of defence against a duplicated chain position — internal/audit serialises appends so this should never fire in practice, but a violation surfaces as an error rather than a silent corruption.
func (*Store) AuditCount ¶
AuditCount returns the number of audit rows. Used by internal/audit to find the next chain position without loading the whole log.
func (*Store) AuditLast ¶
AuditLast returns the most recent audit row (highest seq) and ok=true, or ok=false when the log is empty.
func (*Store) AuditRows ¶
AuditRows returns every audit row ordered by seq ascending (genesis first). An empty slice (not an error) means the log is empty.
func (*Store) AuditTamper ¶
AuditTamper overwrites the event and hash of the audit row at the given seq. It exists ONLY to let tests simulate on-disk corruption of the chain; it is never called by production code (the audit log is otherwise append-only).
func (*Store) Close ¶
Close closes the underlying database. Subsequent operations return ErrClosed. Calling Close more than once is a no-op.
func (*Store) Enqueue ¶
Enqueue durably stores msg for delivery. The recipient (msg.To) must be a registered peer (ErrUnknownPeer otherwise). Dedupe is by msg.ID: if a message with the same id already exists the row is left untouched and ErrDuplicateID is returned (a distinct sentinel, not a silent success).
A per-sender monotonic seq is assigned inside the transaction so that PendingFor returns each sender's messages in FIFO order even under concurrent enqueues.
func (*Store) MarkAcked ¶
MarkAcked marks the message with the given id as acked. Once acked a message is never requeued by RequeueUnacked. Unknown ids are a no-op.
func (*Store) MarkDelivered ¶
MarkDelivered marks the message with the given id as delivered and bumps its attempt counter. Unknown ids are a no-op (at-least-once delivery means callers may retry; an unknown id is not an error here).
func (*Store) PendingFor ¶
PendingFor returns the undelivered (delivered=0) messages for recipient name, ordered per-sender FIFO via the monotonic seq. The recipient must be a registered peer (ErrUnknownPeer otherwise). An empty slice (not an error) means nothing is queued.
Ordering note: rows are ordered by (sender, seq) then ts so each sender's stream is strictly FIFO; there is intentionally no global cross-sender order (matches the locked per-sender-FIFO delivery model).
func (*Store) Register ¶
Register records a peer by name. It is idempotent: re-registering an existing peer refreshes its registration timestamp but keeps the original first-seen ts.
func (*Store) RequeueUnacked ¶
RequeueUnacked makes every delivered-but-unacked message for recipient name eligible for redelivery again (delivered -> 0). Acked messages are left alone. Called when a peer reconnects so in-flight-but-unconfirmed messages are redelivered (consumers dedupe by id). The recipient must be a registered peer (ErrUnknownPeer otherwise). Returns the number of rows requeued.