Documentation
¶
Index ¶
- Constants
- Variables
- func CompactOnStartup(logPath string, snapshotDir string) (uint64, error)
- type Ballot
- type ByteSize
- type Config
- type FasterLog
- func (l *FasterLog) Accept(slot uint64, ballot Ballot, value []byte) error
- func (l *FasterLog) Checkpoint() error
- func (l *FasterLog) Close() error
- func (l *FasterLog) Commit(slot uint64) error
- func (l *FasterLog) GetCommittedRange() (minSlot uint64, maxSlot uint64, count uint64)
- func (l *FasterLog) IterateCommitted(fn func(entry *LogEntry) error, opts IterateOptions) error
- func (l *FasterLog) Read(slot uint64) (*LogEntry, error)
- func (l *FasterLog) ReadCommittedOnly(slot uint64) (*LogEntry, error)
- func (l *FasterLog) ReplayFromSlot(startSlot uint64, fn func(entry *LogEntry) error) error
- func (l *FasterLog) ReplayRange(startSlot, endSlot uint64, fn func(entry *LogEntry) error) error
- func (l *FasterLog) ScanUncommitted() ([]*LogEntry, error)
- type IterateOptions
- type KeyRegistry
- func (kr *KeyRegistry) Close() error
- func (kr *KeyRegistry) Compact() error
- func (kr *KeyRegistry) Contains(key string) bool
- func (kr *KeyRegistry) Count() int
- func (kr *KeyRegistry) Glob(pattern string) []string
- func (kr *KeyRegistry) List() []string
- func (kr *KeyRegistry) Register(key string) error
- func (kr *KeyRegistry) Unregister(key string) error
- type LogEntry
- type LogManager
- func (l *LogManager) CloseAll() error
- func (l *LogManager) GetLog(key []byte) (*FasterLog, func(), error)
- func (l *LogManager) GetSnapshotManager(key []byte, log *FasterLog) (*SnapshotManager, error)
- func (l *LogManager) Glob(pattern string) []string
- func (l *LogManager) HasKey(key string) bool
- func (l *LogManager) InitKey(key []byte, fromSnapshot func(*Snapshot) error, replay func(*LogEntry) error) (*FasterLog, func(), error)
- func (l *LogManager) KeyCount() int
- func (l *LogManager) ListKeys() []string
- func (l *LogManager) Stats() LogManagerStats
- type LogManagerStats
- type RingBuffer
- func (rb *RingBuffer) Append(entry *LogEntry) (uint64, error)
- func (rb *RingBuffer) AvailableSpace() uint64
- func (rb *RingBuffer) DrainCommitted() ([]*LogEntry, error)
- func (rb *RingBuffer) GetAllUncommitted() ([]*LogEntry, error)
- func (rb *RingBuffer) GetAllUncommittedWithIndex(indexCheck func(uint64) bool) ([]*LogEntry, error)
- func (rb *RingBuffer) MarkCommitted(slot uint64) error
- func (rb *RingBuffer) Read(offset uint64) (*LogEntry, error)
- func (rb *RingBuffer) ReadBySlot(slot uint64) (*LogEntry, error)
- func (rb *RingBuffer) Remove(slot uint64)
- func (rb *RingBuffer) Reset()
- func (rb *RingBuffer) TryAdvanceTail(indexCheck func(uint64) bool) uint64
- func (rb *RingBuffer) UsedSpace() uint64
- type Snapshot
- type SnapshotManager
- func (sm *SnapshotManager) CleanupOldSnapshots(keepCount int) error
- func (sm *SnapshotManager) CreateSnapshot(slot uint64, stateData []byte) error
- func (sm *SnapshotManager) GetLatestSnapshot() (*Snapshot, error)
- func (sm *SnapshotManager) GetSnapshot(slot uint64) (*Snapshot, error)
- func (sm *SnapshotManager) TruncateLog(truncateSlot uint64) error
Constants ¶
const ( MutableSize = 2 * MB // 2MB per log (small since few in-flight commits expected) SegmentSize = 1 * GB // 1GB per segment file NumThreads = 128 // Thread pool size for async operations MaxHotKeys = 1024 // LRU cache size (1024 × 2MB = 2GB max memory) )
Variables ¶
var ( // ErrSlotNotFound is returned when a slot is not in the log ErrSlotNotFound = errors.New("slot not found") // ErrCorruptedEntry is returned when an entry fails checksum validation ErrCorruptedEntry = errors.New("corrupted log entry") // ErrNotCommitted is returned when trying to read an uncommitted entry with ReadCommittedOnly ErrNotCommitted = errors.New("entry not committed") // ErrBufferFull is returned when the mutable buffer is full ErrBufferFull = errors.New("mutable buffer full") // ErrInvalidOffset is returned when an offset is out of bounds ErrInvalidOffset = errors.New("invalid offset") // ErrClosed is returned when operating on a closed log ErrClosed = errors.New("log is closed") // ErrNoSnapshot is returned when no snapshot exists ErrNoSnapshot = errors.New("no snapshot found") // ErrCorruptedSnapshot is returned when a snapshot fails checksum validation ErrCorruptedSnapshot = errors.New("corrupted snapshot") // ErrEntryTooLarge is returned when an entry value exceeds the maximum allowed size ErrEntryTooLarge = errors.New("entry value too large") )
var (
ErrRegistryClosed = errors.New("registry is closed")
)
Functions ¶
func CompactOnStartup ¶
CompactOnStartup performs offline log compaction before the log is opened This MUST be called before NewFasterLog() if you want to apply pending truncations Returns the truncation slot if compaction was performed, 0 if not
Types ¶
type Ballot ¶
Ballot represents a WPaxos ballot number Ballots are ordered first by ID, then by NodeID for tie-breaking
type Config ¶
type Config struct {
// Path to the log file on disk
Path string
// MutableSize is the size of the in-memory mutable region for uncommitted entries
// Larger = more uncommitted entries can be held in memory
// Recommended: 64MB - 256MB
MutableSize uint64
// SegmentSize is the size of each immutable log segment
// Larger = fewer files but slower recovery
// Recommended: 1GB - 4GB
SegmentSize uint64
// NumThreads is the number of concurrent threads/goroutines expected
// Used for epoch-based memory management
NumThreads int
// SyncOnCommit controls whether to fsync on every commit
// true = durable but slower, false = fast but risk data loss on crash
SyncOnCommit bool
}
Config holds configuration for the FASTER log
type FasterLog ¶
type FasterLog struct {
// contains filtered or unexported fields
}
FasterLog implements a FASTER-style hybrid log optimised for consensus protocols It has three regions: 1. In-memory index (hash map): slot -> offset lookup 2. Mutable region (ring buffer): recent uncommitted entries (LOCK-FREE!) 3. Immutable tail (mmap file): committed entries, append-only
LOCK-FREE DESIGN: - a Mutable region uses atomic CAS for allocation (no locks!) - Reads scan buffer sequentially (fast for 64MB region) - Epochs protect against use-after-free during concurrent access - Only tail writes use a mutex (sequential disk writes)
func NewFasterLog ¶
NewFasterLog creates a new FASTER-style log
func (*FasterLog) Accept ¶
Accept writes an accepted (uncommitted) entry to the mutable region This corresponds to WPaxos Phase-2b
func (*FasterLog) Checkpoint ¶
Checkpoint flushes all committed entries from mutable region to tail LOCK-FREE: Scans and drains without locks
func (*FasterLog) Commit ¶
Commit marks an entry as committed and flushes it to the immutable tail This corresponds to WPaxos Phase-3 Following FASTER's RCU principle: only committed entries go to the immutable tail. The tail is truly immutable - we never modify existing entries in place.
func (*FasterLog) GetCommittedRange ¶
GetCommittedRange returns the range of committed slots (min, max, count) Useful for learners to know what they need to catch up on Returns (0, 0, 0) if log is empty
func (*FasterLog) IterateCommitted ¶
func (l *FasterLog) IterateCommitted(fn func(entry *LogEntry) error, opts IterateOptions) error
IterateCommitted iterates over committed entries in slot order (ZERO-ALLOCATION) The callback receives each entry but MUST NOT store the pointer - it's reused! Use cases:
- State machine reconstruction: fn(entry) { applyToStateMachine(entry.Value) }
- Snapshot creation: fn(entry) { serialize(entry.Slot, entry.Value) }
- Learner bootstrap: fn(entry) { sendToLearner(entry) }
CRITICAL: The entry parameter is REUSED between calls for zero allocation. If you need to keep data, copy it immediately:
valueCopy := make([]byte, len(entry.Value)) copy(valueCopy, entry.Value)
The iterator:
- Collects slot numbers from index (committed entries only)
- Sorts slots numerically
- Iterates in order, calling fn(entry) for each
- Stops on first error from fn()
Options control iteration range and behavior (see IterateOptions)
func (*FasterLog) ReadCommittedOnly ¶
ReadCommittedOnly reads an entry only if it's committed This is what the state machine should use
func (*FasterLog) ReplayFromSlot ¶
ReplayFromSlot replays committed entries starting from a slot up to current max This is the primary method for learner bootstrap and crash recovery
CRITICAL: This captures the max slot BEFORE iteration to ensure a consistent snapshot. New commits that arrive during iteration are NOT included (prevents inconsistent reads).
Example - State machine reconstruction:
err := log.ReplayFromSlot(0, func(entry *LogEntry) error {
return stateMachine.Apply(entry.Slot, entry.Value)
})
Example - Learner catching up from slot 1000:
err := log.ReplayFromSlot(1000, func(entry *LogEntry) error {
return sendToLearner(entry)
})
The callback receives entries in slot order and MUST NOT store the entry pointer.
func (*FasterLog) ReplayRange ¶
ReplayRange replays committed entries in a specific range [startSlot, endSlot] Use this when you want explicit control over the upper bound (e.g., incremental catch-up)
Example - Learner catching up to specific slot:
leaderMax := getLeaderMaxSlot()
err := log.ReplayRange(1000, leaderMax, func(entry *LogEntry) error {
return applyEntry(entry)
})
func (*FasterLog) ScanUncommitted ¶
ScanUncommitted returns all uncommitted entries This is used for WPaxos Phase-1 recovery
type IterateOptions ¶
type IterateOptions struct {
// MinSlot is the minimum slot to iterate (0 = from beginning)
MinSlot uint64
// MaxSlot is the maximum slot to iterate (0 = to end)
MaxSlot uint64
// IncludeUncommitted includes uncommitted entries from mutable region
// Useful for debugging or special recovery scenarios
// Default: false (only committed entries)
IncludeUncommitted bool
// SkipErrors continues iteration even if individual entry reads fail
// Useful for recovering from partial corruption
// Default: false (stop on first error)
SkipErrors bool
}
IterateOptions controls iteration behavior for IterateCommitted
type KeyRegistry ¶
type KeyRegistry struct {
// contains filtered or unexported fields
}
KeyRegistry tracks all known keys for enumeration and glob matching. It uses an append-only file for durability and an in-memory set for fast access.
func NewKeyRegistry ¶
func NewKeyRegistry(path string) (*KeyRegistry, error)
NewKeyRegistry creates or opens a key registry at the given path.
func (*KeyRegistry) Compact ¶
func (kr *KeyRegistry) Compact() error
Compact rewrites the registry file, removing deleted entries. This reduces file size after many delete operations.
func (*KeyRegistry) Contains ¶
func (kr *KeyRegistry) Contains(key string) bool
Contains checks if a key exists in the registry.
func (*KeyRegistry) Count ¶
func (kr *KeyRegistry) Count() int
Count returns the number of keys in the registry.
func (*KeyRegistry) Glob ¶
func (kr *KeyRegistry) Glob(pattern string) []string
Glob returns all keys matching the given glob pattern. Supports: * (any chars), ? (single char), [abc] (char class), [a-z] (range)
func (*KeyRegistry) List ¶
func (kr *KeyRegistry) List() []string
List returns all keys in the registry.
func (*KeyRegistry) Register ¶
func (kr *KeyRegistry) Register(key string) error
Register adds a key to the registry. Idempotent - registering an existing key is a no-op.
func (*KeyRegistry) Unregister ¶
func (kr *KeyRegistry) Unregister(key string) error
Unregister removes a key from the registry.
type LogEntry ¶
LogEntry represents a single consensus log entry This matches WPaxos semantics: slot, ballot, value, committed flag
type LogManager ¶
type LogManager struct {
// contains filtered or unexported fields
}
LogManager manages FASTER logs with LRU eviction and reference counting This ensures logs are never closed while in use, preventing use-after-free bugs
func NewLogManager ¶
func NewLogManager() *LogManager
NewLogManager creates a new LogManager with LRU eviction
func NewLogManagerWithDir ¶
func NewLogManagerWithDir(dir string) *LogManager
NewLogManagerWithDir creates a LogManager with a specific base directory This is useful for tests/benchmarks that need isolated storage
func (*LogManager) CloseAll ¶
func (l *LogManager) CloseAll() error
CloseAll closes all logs (for shutdown) This will wait for active references to drain before closing If references don't drain within the timeout, it will NOT force-close
func (*LogManager) GetLog ¶
func (l *LogManager) GetLog(key []byte) (*FasterLog, func(), error)
GetLog returns a log handle with acquired reference CRITICAL: Caller MUST call the returned release function when done! Usage:
log, release, err := manager.GetLog(key)
if err != nil { return err }
defer release()
// ... use log safely ...
func (*LogManager) GetSnapshotManager ¶
func (l *LogManager) GetSnapshotManager(key []byte, log *FasterLog) (*SnapshotManager, error)
func (*LogManager) Glob ¶
func (l *LogManager) Glob(pattern string) []string
Glob returns all keys matching the given glob pattern. Supports: * (any chars), ? (single char), [abc] (char class), [a-z] (range) Returns nil if registry is not available.
func (*LogManager) HasKey ¶
func (l *LogManager) HasKey(key string) bool
HasKey checks if a key exists in the registry. Returns false if registry is not available.
func (*LogManager) KeyCount ¶
func (l *LogManager) KeyCount() int
KeyCount returns the number of registered keys. Returns 0 if registry is not available.
func (*LogManager) ListKeys ¶
func (l *LogManager) ListKeys() []string
ListKeys returns all registered keys. Returns nil if registry is not available.
func (*LogManager) Stats ¶
func (l *LogManager) Stats() LogManagerStats
Stats returns current statistics
type LogManagerStats ¶
Stats returns statistics about the LogManager
type RingBuffer ¶
type RingBuffer struct {
// contains filtered or unexported fields
}
RingBuffer is a lock-free buffer for the mutable region It stores uncommitted log entries in memory before they're flushed to the immutable tail Uses atomic CAS operations for true lock-free concurrent writes following FASTER design.
LOCK-FREE DESIGN: - Writes use atomic CAS to allocate space (no locks!) - Reads scan the buffer sequentially (acceptable since mutable region is small) - No index map needed (scanning is fast enough for ~64MB region)
Note: This is not a true "ring" buffer - it's a simple linear allocator that grows until entries are flushed. This matches WPaxos semantics better than a circular buffer.
func NewRingBuffer ¶
func NewRingBuffer(size uint64) *RingBuffer
NewRingBuffer creates a new ring buffer of the given size
func (*RingBuffer) Append ¶
func (rb *RingBuffer) Append(entry *LogEntry) (uint64, error)
Append adds an entry to the buffer using lock-free CAS allocation Returns the offset within the buffer LOCK-FREE: Uses atomic CompareAndSwap to reserve space without locks
SAFETY: Uses reserve-write-publish protocol to prevent readers from seeing partially-written entries: 1. Reserve space via CAS on 'reserved' 2. Write data into reserved region 3. Publish via CAS on 'published' (waits for sequential order)
func (*RingBuffer) AvailableSpace ¶
func (rb *RingBuffer) AvailableSpace() uint64
AvailableSpace returns the available space in the buffer This accounts for space that has been reclaimed via tail advancement
func (*RingBuffer) DrainCommitted ¶
func (rb *RingBuffer) DrainCommitted() ([]*LogEntry, error)
DrainCommitted removes committed entries and returns them This is called during checkpoint to flush committed entries to the tail LOCK-FREE: Scans the buffer sequentially Only scans published entries to avoid reading partially-written data
func (*RingBuffer) GetAllUncommitted ¶
func (rb *RingBuffer) GetAllUncommitted() ([]*LogEntry, error)
GetAllUncommitted returns all uncommitted entries (legacy wrapper)
func (*RingBuffer) GetAllUncommittedWithIndex ¶
func (rb *RingBuffer) GetAllUncommittedWithIndex(indexCheck func(uint64) bool) ([]*LogEntry, error)
GetAllUncommitted returns all uncommitted entries Used for Phase-1 recovery in WPaxos LOCK-FREE: Scans the buffer sequentially Only scans published entries to avoid reading partially-written data indexCheck: optional index to check if entry has been moved to tail
func (*RingBuffer) MarkCommitted ¶
func (rb *RingBuffer) MarkCommitted(slot uint64) error
MarkCommitted marks an entry as committed in the buffer (IN-PLACE UPDATE) WARNING: This is NOT lock-free but safe since we own the epoch Used by Commit() to mark an entry before flushing to tail Only scans published entries to avoid reading partially-written data
func (*RingBuffer) Read ¶
func (rb *RingBuffer) Read(offset uint64) (*LogEntry, error)
Read reads an entry at the given offset
func (*RingBuffer) ReadBySlot ¶
func (rb *RingBuffer) ReadBySlot(slot uint64) (*LogEntry, error)
ReadBySlot reads an entry by slot number LOCK-FREE: Scans the buffer sequentially (fast for small mutable regions) Only scans published entries to avoid reading partially-written data
func (*RingBuffer) Remove ¶
func (rb *RingBuffer) Remove(slot uint64)
Remove is no longer a no-op - it's critical for reclaiming space! This should be called after Commit() moves an entry to the tail. However, we can't just advance tail arbitrarily - we need to advance it to the first uncommitted entry to maintain the invariant that all entries before tail have been flushed.
func (*RingBuffer) Reset ¶
func (rb *RingBuffer) Reset()
Reset attempts to reset the buffer when all entries have been flushed. If writers are active or the buffer still holds data, the reset is skipped.
func (*RingBuffer) TryAdvanceTail ¶
func (rb *RingBuffer) TryAdvanceTail(indexCheck func(uint64) bool) uint64
TryAdvanceTail attempts to advance the tail pointer past committed entries that have been moved to the immutable tail, reclaiming space. This should be called after commits to incrementally reclaim space.
indexCheck: function that returns true if the slot is still in mutable region Returns: number of bytes reclaimed
func (*RingBuffer) UsedSpace ¶
func (rb *RingBuffer) UsedSpace() uint64
UsedSpace returns the space currently in use (not yet reclaimed)
type Snapshot ¶
type Snapshot struct {
// Slot is the last committed slot included in this snapshot
Slot uint64
// Data is the serialized state machine state
// For WPaxos, this would be the reconstructed Record for each key
Data []byte
// Metadata for the snapshot
NumEntries uint64 // Number of log entries represented
Checksum uint32 // CRC32 checksum of Data
}
Snapshot represents a state machine checkpoint at a specific slot It captures the reconstructed state at that point in the log
type SnapshotManager ¶
type SnapshotManager struct {
// contains filtered or unexported fields
}
SnapshotManager handles snapshot creation, storage, and recovery
func NewSnapshotManager ¶
func NewSnapshotManager(snapshotDir string, log *FasterLog) (*SnapshotManager, error)
NewSnapshotManager creates a new snapshot manager
func (*SnapshotManager) CleanupOldSnapshots ¶
func (sm *SnapshotManager) CleanupOldSnapshots(keepCount int) error
CleanupOldSnapshots removes all snapshots older than keepCount
func (*SnapshotManager) CreateSnapshot ¶
func (sm *SnapshotManager) CreateSnapshot(slot uint64, stateData []byte) error
CreateSnapshot creates a snapshot at the given slot The caller provides the serialized state machine data
func (*SnapshotManager) GetLatestSnapshot ¶
func (sm *SnapshotManager) GetLatestSnapshot() (*Snapshot, error)
GetLatestSnapshot returns the most recent snapshot
func (*SnapshotManager) GetSnapshot ¶
func (sm *SnapshotManager) GetSnapshot(slot uint64) (*Snapshot, error)
GetSnapshot returns the snapshot for a specific slot (or nearest before it)
func (*SnapshotManager) TruncateLog ¶
func (sm *SnapshotManager) TruncateLog(truncateSlot uint64) error
TruncateLog marks a truncation point in metadata The actual compaction happens offline when the log is reopened IMPORTANT: This does NOT immediately free disk space! Call CompactOnStartup() during recovery to perform actual truncation