faster

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: AGPL-3.0 Imports: 20 Imported by: 0

README

FASTER: Fast Persistent Recoverable Log for Consensus

A pure-Go implementation of FASTER-style hybrid logging, optimized for consensus protocols like WPaxos. This implementation provides lock-free reads, atomic writes, and perfect semantic alignment with Paxos accept/commit phases.

Architecture

FASTER uses a three-region hybrid log architecture that separates uncommitted (speculative) entries from committed (durable) entries:

┌─────────────────────────────────────────────────────────────┐
│                     FASTER Architecture                      │
├─────────────────┬─────────────────┬─────────────────────────┤
│   In-Memory     │     Mutable     │    Immutable Tail       │
│   Hash Index    │  Ring Buffer    │   (Memory-Mapped)       │
│                 │                 │                         │
│  [slot→offset]  │  [Uncommitted]  │  [Committed entries]    │
│                 │   entries       │                         │
│  • sync.Map     │  • Lock-free    │  • Append-only          │
│  • Fast lookup  │  • CAS alloc    │  • Durable (fsync)      │
│  • Concurrent   │  • 2-64MB       │  • Lock-free reads      │
│                 │                 │  • 1GB+ segments        │
└─────────────────┴─────────────────┴─────────────────────────┘
Why This Design?

Traditional LSM-tree stores (like BadgerDB, LevelDB) have semantic mismatch with consensus logs:

Requirement LSM-Tree Stores FASTER
Accept/Commit distinction ❌ Must track in value ✅ Mutable vs. Immutable
Lock-free committed reads ❌ Locks on read path ✅ Direct mmap reads
Overwrite uncommitted ❌ Complex merge logic ✅ Natural in mutable region
Sequential execution ⚠️ Manual gap handling ✅ Natural slot ordering
Recovery scanning ⚠️ Compaction interference ✅ Fast sequential scan

Core Components

1. FasterLog - The Main Log

The core log structure that manages entries across the three regions.

log, err := faster.NewFasterLog(faster.Config{
    Path:         "/data/consensus.log",
    MutableSize:  64 * 1024 * 1024,  // 64MB for uncommitted entries
    SegmentSize:  1 * 1024 * 1024 * 1024,  // 1GB per segment
    NumThreads:   128,  // Max concurrent goroutines
    SyncOnCommit: true,  // fsync on every commit (durable)
})
defer log.Close()
Configuration Guidelines
Parameter Recommended Trade-off
MutableSize 64-256 MB Larger = more uncommitted entries, more memory
SegmentSize 1-4 GB Larger = fewer files, slower recovery
NumThreads 128-1024 Must be ≥ max concurrent goroutines
SyncOnCommit true (production) false = faster but risk data loss
2. LogManager - Multi-Log Management

Manages multiple FASTER logs with LRU eviction and reference counting.

manager := faster.NewLogManager()
defer manager.CloseAll()

// Get a log (automatically created if needed)
log, release, err := manager.GetLog([]byte("table:users"))
if err != nil {
    return err
}
defer release()  // CRITICAL: Always call release()

// Use the log safely
err = log.Accept(slot, ballot, value)

Key Features:

  • Reference Counting: Prevents closing logs while in use
  • LRU Eviction: Automatically closes idle logs (max 1024 open)
  • Thread-Safe: Concurrent GetLog calls are safe
  • Leak Protection: CloseAll waits for references to drain
3. RingBuffer - Lock-Free Mutable Region

Lock-free buffer for uncommitted entries using atomic CAS operations.

Design Principles:

  • Reserve-Write-Publish Protocol: Prevents readers from seeing partial writes
  • CAS-based Allocation: No locks on write path
  • Sequential Scanning: Fast for small regions (2-64MB)
  • Automatic Reclamation: Space freed as entries commit
// Writers (lock-free):
offset, err := buffer.Append(entry)  // Atomic CAS allocation

// Readers (lock-free):
entry, err := buffer.Read(offset)

// Reclamation (triggered by Commit):
buffer.TryAdvanceTail(indexCheck)
4. Snapshot Manager - State Machine Checkpoints

Handles periodic snapshots for faster recovery.

snapMgr, err := faster.NewSnapshotManager("/data/snapshots", log)

// Create snapshot at slot 1000
stateData := serializeStateMachine()
err = snapMgr.CreateSnapshot(1000, stateData)

// Recovery: load latest snapshot
snapshot, err := snapMgr.GetLatestSnapshot()
restoreStateMachine(snapshot.Data)
replayFrom(snapshot.Slot + 1)

WPaxos Integration

FASTER's three operations map perfectly to WPaxos phases:

Phase-2b: Accept (Uncommitted Entry)
// Acceptor receives Phase-2a message
err := log.Accept(
    slot,    // uint64: Paxos slot number
    ballot,  // faster.Ballot{ID, NodeID}
    value,   // []byte: proposed value
)

// Entry is now in mutable region (uncommitted)
// Can be overwritten by higher ballot

What Happens:

  1. Entry serialized with ballot and committed=false
  2. CAS-allocated space in ring buffer (lock-free!)
  3. Index updated: slot → offset|mutableFlag
  4. Durable: Only in memory (fast!)
Phase-3: Commit (Mark as Durable)
// Leader receives Q2 quorum of accepts
err := log.Commit(slot)

// Entry is now in immutable tail (committed)
// Cannot be overwritten

What Happens:

  1. Entry read from mutable region
  2. committed flag set to true
  3. Appended to immutable tail (sequential write)
  4. Optional fsync() if SyncOnCommit=true
  5. Index updated: slot → tailOffset (no mutable flag)
  6. Mutable space reclaimed automatically
Phase-1: Recovery (Scan Uncommitted)
// New leader needs to recover uncommitted entries
uncommitted, err := log.ScanUncommitted()

for _, entry := range uncommitted {
    // Re-propose with new ballot
    if entry.Ballot.Less(myBallot) {
        // Take over this slot
        propose(entry.Slot, myBallot, entry.Value)
    }
}

What Happens:

  1. Scans ring buffer for all entries
  2. Filters to only entries still in mutable region (via index check)
  3. Returns uncommitted entries for re-proposal
  4. Fast: Sequential scan of small region (~64MB)
State Machine Reads (Only Committed)
// Read only committed entries for state machine
entry, err := log.ReadCommittedOnly(slot)
if err == faster.ErrNotCommitted {
    // Slot exists but not yet committed - wait or skip
}

// entry.Value is safe to apply to state machine
applyToStateMachine(entry.Value)

What Happens:

  1. Index lookup: slot → offset
  2. If in mutable region: read from ring buffer, check committed flag
  3. If in tail: lock-free mmap read (fast!)
  4. Returns error if uncommitted

Performance Characteristics

Benchmarked on Intel i7-11800H @ 2.30GHz (16 threads). All benchmarks run with Go 1.21+.

Core Operations
Operation Latency Throughput Notes
Accept (uncommitted write) 755 ns 1.32M ops/sec Lock-free CAS allocation
Commit (no fsync) 1.76 µs 568k ops/sec Sequential append to mmap
Commit (with fsync) 4.47 ms 224 ops/sec Disk sync overhead
Read (uncommitted) 150 ns 6.68M ops/sec Ring buffer scan
Read (committed) 113 ns 8.84M ops/sec Direct mmap read
Accept+Commit+Read 1.85 µs 541k ops/sec Full write cycle
Advanced Operations
Operation Latency Throughput Details
ScanUncommitted 131 µs 7,634 scans/sec ~1,000 entries
Checkpoint 170 µs - Flush committed to tail
Recovery (1k entries) 4.16 ms 240k entries/sec Index rebuild
Recovery (10k entries) 5.74 ms 1.74M entries/sec Scales well
Snapshot Create 6.23 ms - 1MB state
Snapshot Read 201 µs - Deserialize + verify
Value Size Impact
Value Size Latency Throughput (MB/s)
10 bytes 1.81 µs 5.28 MB/s
100 bytes 1.76 µs 54.33 MB/s

Observation: Larger values improve throughput (better amortization of fixed overhead).

Concurrent Performance
Scenario Latency Throughput Speedup
Concurrent Reads (16 threads) 23 ns 42.9M ops/sec ~5x single-thread
Concurrent Writes (16 threads) 504 ns 1.99M ops/sec ~1.5x single-thread

Why: Lock-free reads scale linearly. Writes have CAS contention but still scale.

FASTER vs. BadgerDB Comparison

Head-to-head benchmarks against BadgerDB (same hardware, same workload):

Workload FASTER BadgerDB Speedup
Random Keys (writes) 2.18 µs
460k ops/sec
7.61 µs
131k ops/sec
3.5x faster
Sequential Writes 1.53 µs
656k ops/sec
6.54 µs
153k ops/sec
4.3x faster
Read-Heavy (90% reads) 133 ns
7.5M ops/sec
1.40 µs
713k ops/sec
10.5x faster
Mixed (50/50 read/write) 849 ns
1.18M ops/sec
3.48 µs
287k ops/sec
4.1x faster
Concurrent Writes 504 ns
1.99M ops/sec
5.08 µs
197k ops/sec
10x faster
Recovery (10k entries) 2.15 ms 21.7 ms 10x faster
ScanUncommitted 646 µs
1,548 scans/sec
557 µs
1,794 scans/sec
~Same

Key Takeaways:

  • Reads are 10x faster: Lock-free mmap reads vs. LSM lookup
  • Writes are 4x faster: No compaction overhead
  • Recovery is 10x faster: Sequential scan vs. LSM rebuild
  • Predictable latency: No background compaction spikes
  • ⚠️ Scan performance similar: Both use sequential scan (but FASTER's scan is on uncommitted entries, BadgerDB is on all entries)
Key Performance Features
  • Lock-free reads from immutable tail (memory-mapped)
  • Lock-free writes to mutable region (atomic CAS)
  • Only commits need mutex (sequential disk I/O)
  • No background compaction (predictable latency)
  • Linear read scaling with concurrent goroutines
  • 10x faster recovery (sequential vs. LSM rebuild)
Performance Tuning

For Maximum Throughput:

Config{
    SyncOnCommit: false,  // Skip fsync (568k vs 224 ops/sec)
    MutableSize:  256 * 1024 * 1024,  // Large buffer for batching
}

⚠️ Trade-off: Risk losing uncommitted data on crash (acceptable for consensus with replication).

For Maximum Durability:

Config{
    SyncOnCommit: true,  // fsync every commit (224 ops/sec)
    MutableSize:  64 * 1024 * 1024,
}

Guarantee: Committed entries survive crashes.

Batch Optimization:

  • Accept 100 entries (~755 ns × 100 = 75.5 µs)
  • Commit batch with single fsync (~4.47 ms)
  • Effective throughput: 100 commits / 4.55 ms = 22k commits/sec (100x improvement!)
Hardware Considerations

Results above are from:

  • CPU: Intel i7-11800H (8 cores, 16 threads, 2.3-4.6 GHz)
  • Storage: NVMe SSD (fsync ~4ms)
  • RAM: DDR4-3200

Expected performance on other hardware:

  • Faster SSD (Intel Optane): fsync ~100µs → 10k commits/sec (50x improvement)
  • Slower SSD (SATA): fsync ~10ms → 100 commits/sec (2x slower)
  • HDD: fsync ~50ms → 20 commits/sec (25x slower - not recommended!)
  • More cores: Linear scaling for concurrent reads (up to memory bandwidth)

Usage Patterns

Pattern 1: Single Log (Simple Consensus)
// Single consensus log for all operations
log, err := faster.NewFasterLog(faster.Config{
    Path:         "/data/consensus.log",
    MutableSize:  64 * 1024 * 1024,
    SegmentSize:  1 * 1024 * 1024 * 1024,
    NumThreads:   128,
    SyncOnCommit: true,
})
defer log.Close()

// Accept-Commit cycle
slot := getNextSlot()
ballot := getCurrentBallot()

// Phase-2: Accept
err = log.Accept(slot, ballot, value)

// Phase-3: Commit (after Q2 quorum)
err = log.Commit(slot)

// Read committed state
entry, err := log.ReadCommittedOnly(slot)
Pattern 2: Multi-Log (Table-Partitioned Consensus)
// Manage separate logs per table
manager := faster.NewLogManager()
defer manager.CloseAll()

// Each table gets its own log
func processWrite(table string, slot uint64, ballot faster.Ballot, value []byte) error {
    log, release, err := manager.GetLog([]byte(table))
    if err != nil {
        return err
    }
    defer release()  // CRITICAL: Don't leak references!

    // Accept-Commit for this table's log
    if err := log.Accept(slot, ballot, value); err != nil {
        return err
    }

    // Later: commit after quorum
    if err := log.Commit(slot); err != nil {
        return err
    }

    return nil
}

Benefits of Multi-Log:

  • Parallel consensus per table
  • LRU eviction keeps hot tables in memory
  • Automatic log lifecycle management
Pattern 3: Checkpointing (Long-Running Logs)
log, _ := faster.NewFasterLog(cfg)
snapMgr, _ := faster.NewSnapshotManager("/snapshots", log)

// Periodically checkpoint state machine
ticker := time.NewTicker(5 * time.Minute)
for range ticker.C {
    // Serialize current state
    stateData := marshalStateMachine(currentState)

    // Create snapshot at current slot
    err := snapMgr.CreateSnapshot(currentSlot, stateData)

    // Optionally: truncate old log entries before snapshot
    // (not yet implemented, but planned)
}

// Recovery: load snapshot + replay
snapshot, _ := snapMgr.GetLatestSnapshot()
currentState = unmarshalStateMachine(snapshot.Data)

// Replay entries after snapshot
for slot := snapshot.Slot + 1; slot <= latestSlot; slot++ {
    entry, err := log.ReadCommittedOnly(slot)
    if err == nil {
        applyToStateMachine(entry.Value)
    }
}

Error Handling

Common Errors
// Slot not found (never written)
_, err := log.Read(999)
if errors.Is(err, faster.ErrSlotNotFound) {
    // Slot doesn't exist
}

// Entry exists but not committed
_, err := log.ReadCommittedOnly(100)
if errors.Is(err, faster.ErrNotCommitted) {
    // Slot exists, but still in accept phase
    // Either wait for commit or skip
}

// Buffer full (too many uncommitted entries)
err := log.Accept(slot, ballot, value)
if errors.Is(err, faster.ErrBufferFull) {
    // Mutable region exhausted
    // Either: increase MutableSize, or commit more frequently
}

// Log closed
_, err := log.Read(100)
if errors.Is(err, faster.ErrClosed) {
    // Log has been closed, cannot use
}
Critical Safety Rules
  1. Always call release(): LogManager.GetLog returns a release function that MUST be called

    log, release, err := manager.GetLog(key)
    defer release()  // Don't forget!
    
  2. Don't use log after Close(): Once closed, all operations return ErrClosed

  3. Commit promptly: Mutable region has finite size. Commit entries to avoid ErrBufferFull

  4. Check committed flag: State machine should only read committed entries

    entry, err := log.ReadCommittedOnly(slot)  // Use this for state machine!
    

Thread Safety

All operations are thread-safe:

  • Accept: Lock-free CAS allocation
  • Commit: Mutex-protected tail writes (sequential I/O)
  • Read: Lock-free for tail, epoch-protected for mutable
  • ScanUncommitted: Epoch-protected iteration
Epoch-Based Memory Management

FASTER uses epochs to protect concurrent readers from use-after-free:

// Automatic epoch management (internal)
threadID := getThreadID(slot)
epoch := log.epoch.Load()
log.threadEpochs[threadID].Store(epoch)
defer log.threadEpochs[threadID].Store(0)

// Read is now safe - epoch prevents reclamation
entry := readFromBuffer(offset)

What This Means:

  • Readers announce presence via epoch
  • Writers cannot reclaim memory while readers are active
  • No locks needed (atomic operations only)

State Machine Reconstruction

The Challenge

WPaxos requires sequential execution in slot order (not commit order). FASTER provides efficient iteration over committed entries.

Zero-Allocation Iterator

FASTER uses a callback pattern to iterate without heap allocations:

// ZERO ALLOCATION: Entry is reused between calls
err := log.IterateCommitted(func(entry *LogEntry) error {
    // Process entry immediately
    applyToStateMachine(entry.Slot, entry.Value)

    // WARNING: Do NOT store entry pointer!
    // It will be reused on next iteration.

    // If you need to keep data, copy it:
    valueCopy := make([]byte, len(entry.Value))
    copy(valueCopy, entry.Value)

    return nil
}, faster.IterateOptions{
    MinSlot: 0,    // Start from beginning
    MaxSlot: 0,    // To end (0 = no limit)
})

Key Rules:

  1. Process immediately in the callback
  2. Copy data if you need to keep it
  3. Never store the entry pointer
  4. Never access the entry after callback returns
Pattern 1: Full State Machine Rebuild

Reconstruct state machine from scratch (e.g., after crash):

// Example: Key-value store state machine
type StateMachine struct {
    data map[string][]byte
    mu   sync.RWMutex
}

func (sm *StateMachine) Rebuild(log *faster.FasterLog) error {
    sm.mu.Lock()
    defer sm.mu.Unlock()

    // Clear existing state
    sm.data = make(map[string][]byte)

    // Replay all committed entries in slot order
    return log.ReplayFromSlot(0, func(entry *faster.LogEntry) error {
        // Parse command from entry.Value
        cmd, err := parseCommand(entry.Value)
        if err != nil {
            return err
        }

        // Apply to state machine
        switch cmd.Type {
        case "PUT":
            // Copy value (entry.Value will be reused!)
            value := make([]byte, len(cmd.Value))
            copy(value, cmd.Value)
            sm.data[cmd.Key] = value

        case "DELETE":
            delete(sm.data, cmd.Key)
        }

        return nil
    })
}
Pattern 2: Incremental Replay (From Snapshot)

Resume from a snapshot and replay only recent entries:

func (sm *StateMachine) RecoverFromSnapshot(
    log *faster.FasterLog,
    snapMgr *faster.SnapshotManager,
) error {
    // Step 1: Load latest snapshot
    snapshot, err := snapMgr.GetLatestSnapshot()
    if err != nil {
        // No snapshot, do full rebuild
        return sm.Rebuild(log)
    }

    // Step 2: Restore state from snapshot
    err = sm.Deserialize(snapshot.Data)
    if err != nil {
        return fmt.Errorf("failed to restore snapshot: %w", err)
    }

    // Step 3: Replay entries after snapshot
    startSlot := snapshot.Slot + 1
    return log.ReplayFromSlot(startSlot, func(entry *faster.LogEntry) error {
        return sm.ApplyCommand(entry.Value)
    })
}
Pattern 3: Learner Bootstrap (Catch-Up)

New node joining cluster needs to catch up:

// Learner discovers it's behind and needs to catch up
func (learner *Learner) CatchUp(
    leaderConn *grpc.ClientConn,
    localLog *faster.FasterLog,
) error {
    // Step 1: Find out what we have locally
    _, maxLocal, _ := localLog.GetCommittedRange()

    // Step 2: Ask leader for its range
    resp, err := leaderConn.GetCommittedRange(ctx, &pb.Empty{})
    if err != nil {
        return err
    }
    leaderMax := resp.MaxSlot

    if maxLocal >= leaderMax {
        // Already caught up!
        return nil
    }

    // Step 3: Stream missing entries from leader
    stream, err := leaderConn.StreamEntries(ctx, &pb.StreamRequest{
        StartSlot: maxLocal + 1,
        EndSlot:   leaderMax,
    })
    if err != nil {
        return err
    }

    // Step 4: Accept and commit each entry locally
    for {
        entry, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        // Accept entry (writes to mutable region)
        err = localLog.Accept(entry.Slot, entry.Ballot, entry.Value)
        if err != nil {
            return err
        }

        // Immediately commit (flushes to tail)
        err = localLog.Commit(entry.Slot)
        if err != nil {
            return err
        }
    }

    return nil
}
Pattern 4: Leader Serves Learner (Streaming)

Leader implements the streaming endpoint for learners:

func (server *ConsensusServer) StreamEntries(
    req *pb.StreamRequest,
    stream pb.Consensus_StreamEntriesServer,
) error {
    log := server.log

    // Use zero-allocation iterator to stream entries
    return log.IterateCommitted(func(entry *faster.LogEntry) error {
        // Convert to protobuf (requires copying data)
        pbEntry := &pb.LogEntry{
            Slot:      entry.Slot,
            BallotId:  entry.Ballot.ID,
            BallotNode: entry.Ballot.NodeID,
            Value:     append([]byte(nil), entry.Value...), // Copy!
            Committed: entry.Committed,
        }

        // Send to learner
        return stream.Send(pbEntry)
    }, faster.IterateOptions{
        MinSlot: req.StartSlot,
        MaxSlot: req.EndSlot,
    })
}
Pattern 5: Snapshot Creation

Periodically checkpoint state machine:

func (sm *StateMachine) CreateSnapshot(
    log *faster.FasterLog,
    snapMgr *faster.SnapshotManager,
) error {
    sm.mu.RLock()
    defer sm.mu.RUnlock()

    // Find highest committed slot
    _, maxSlot, _ := log.GetCommittedRange()

    // Serialize current state machine state
    data, err := sm.Serialize()
    if err != nil {
        return err
    }

    // Create snapshot
    return snapMgr.CreateSnapshot(maxSlot, data)
}
Performance Characteristics

Iterator Performance:

  • Slot collection: O(N) where N = number of entries in index
  • Sorting: O(N log N) for >100 slots, O(N²) for ≤100 slots (insertion sort)
  • Iteration: O(N) with zero allocations per entry
  • Total: ~1-2 µs per entry for 10,000 entries

Example: Rebuild 10,000 entries

  • Collect slots: ~100 µs
  • Sort: ~200 µs
  • Iterate + apply: ~20 ms (assuming 2µs per entry)
  • Total: ~20.3 ms (efficient!)
Advanced: Handling Gaps

WPaxos can have gaps in the log (slots never proposed). Handle them gracefully:

func (sm *StateMachine) RebuildWithGaps(log *faster.FasterLog) error {
    // Get range of committed slots
    minSlot, maxSlot, count := log.GetCommittedRange()

    expectedCount := maxSlot - minSlot + 1
    actualCount := count
    gapCount := expectedCount - actualCount

    if gapCount > 0 {
        log.Info("Detected %d gaps in log", gapCount)
    }

    // Iterate only over committed slots (gaps are skipped automatically)
    return log.ReplayFromSlot(minSlot, func(entry *faster.LogEntry) error {
        return sm.ApplyCommand(entry.Value)
    })
}

Why gaps are OK:

  • Slots may be reserved but never committed (leadership change)
  • WPaxos allows out-of-order commits (slot 7 might commit before slot 5)
  • Iterator only returns committed entries (gaps are invisible)

Important Distinction:

  • "No gaps in execution" ≠ Every slot must exist
  • "No gaps in execution" = Must apply committed slots in order
  • Example: If slots 5, 7, 9 are committed (6, 8 missing), apply them as 5→7→9
  • This is what WPaxos paper means by "without any gap" in execution order

Crash Recovery

Tail Recovery (Automatic)

On log open, FASTER rebuilds the index from the tail:

log, err := faster.NewFasterLog(cfg)
// Automatically:
// 1. Opens tail file
// 2. Memory-maps it
// 3. Scans entries and rebuilds index
// 4. Validates checksums
// 5. Truncates corrupted tail
Mutable Region Recovery

The mutable region is in-memory only and not persisted. On crash:

  • ✅ Committed entries are safe (in tail)
  • ❌ Uncommitted entries are lost (expected behavior!)

This is correct for consensus:

  • Uncommitted = speculative, can be lost
  • New leader will recover via Phase-1 from other replicas
  • Only committed entries are durable
Complete Recovery Example
func RecoverNode(logPath string, snapPath string) (*StateMachine, error) {
    // Step 1: Open log (rebuilds index automatically)
    log, err := faster.NewFasterLog(faster.Config{
        Path:         logPath,
        MutableSize:  64 * 1024 * 1024,
        SegmentSize:  1 * 1024 * 1024 * 1024,
        NumThreads:   128,
        SyncOnCommit: true,
    })
    if err != nil {
        return nil, fmt.Errorf("failed to open log: %w", err)
    }

    // Step 2: Load snapshot (if available)
    snapMgr, err := faster.NewSnapshotManager(snapPath, log)
    if err != nil {
        return nil, fmt.Errorf("failed to open snapshots: %w", err)
    }

    sm := &StateMachine{data: make(map[string][]byte)}

    snapshot, err := snapMgr.GetLatestSnapshot()
    if err == nil {
        // Snapshot exists - restore from it
        log.Info("Restoring from snapshot at slot %d", snapshot.Slot)
        err = sm.Deserialize(snapshot.Data)
        if err != nil {
            return nil, fmt.Errorf("failed to restore snapshot: %w", err)
        }

        // Step 3: Replay entries after snapshot
        replayFrom := snapshot.Slot + 1
        err = log.ReplayFromSlot(replayFrom, func(entry *faster.LogEntry) error {
            return sm.ApplyCommand(entry.Value)
        })
        if err != nil {
            return nil, fmt.Errorf("failed to replay log: %w", err)
        }
    } else {
        // No snapshot - do full rebuild from log
        log.Info("No snapshot found, rebuilding from log")
        err = sm.Rebuild(log)
        if err != nil {
            return nil, fmt.Errorf("failed to rebuild: %w", err)
        }
    }

    // Step 4: Get status
    _, maxSlot, count := log.GetCommittedRange()
    log.Info("Recovery complete: %d committed entries, max slot %d", count, maxSlot)

    return sm, nil
}

Debugging and Monitoring

Log Statistics
// LogManager provides statistics
stats := manager.Stats()
fmt.Printf("Total logs: %d\n", stats.TotalLogs)
fmt.Printf("Open logs: %d\n", stats.OpenLogs)
fmt.Printf("Active refs: %d\n", stats.ActiveRefs)

// Monitor for reference leaks
if stats.ActiveRefs > stats.OpenLogs*10 {
    log.Warn("Possible reference leak detected")
}
Common Issues

Issue: ErrBufferFull under load

  • Cause: Mutable region exhausted (too many uncommitted entries)
  • Fix 1: Increase MutableSize (e.g., 128MB or 256MB)
  • Fix 2: Commit more frequently (batch commits)
  • Fix 3: Call log.Checkpoint() periodically to flush committed entries

Issue: High memory usage

  • Cause: Too many open logs in LogManager
  • Fix: Decrease MaxHotKeys (default 1024)
  • Check: Call manager.Stats() to see open log count

Issue: Slow recovery after crash

  • Cause: Large tail file, no snapshots
  • Fix: Use SnapshotManager to checkpoint regularly
  • Check: Tail file size should be < 10GB for fast recovery

Issue: Reference leak in LogManager

  • Cause: Forgetting to call release() function
  • Symptom: CloseAll() times out, logs not evicted
  • Fix: Use defer release() immediately after GetLog()

Design Rationale

Why Not Use BadgerDB/LevelDB?

LSM-tree stores are optimized for key-value workloads, not consensus logs:

  1. MVCC vs. Commit: LSM uses versions for MVCC, we need accept/commit distinction
  2. Compaction: Background compaction interferes with recovery scans
  3. Locks: Read path has locks, we need lock-free committed reads
  4. Overwrite semantics: LSM makes it hard to replace uncommitted entries
Why Lock-Free?

Consensus protocols have tight latency requirements (especially Phase-2):

  • Traditional locks: ~50-100ns overhead + contention
  • Lock-free CAS: ~5-10ns, no contention
  • For 1M ops/sec, lock overhead = 5-10% of CPU!
Why Separate Mutable/Immutable?

Matches Paxos semantics perfectly:

Paxos Accept  →  Mutable Region  (uncommitted, can change)
Paxos Commit  →  Immutable Tail  (committed, permanent)

This makes correctness obvious rather than clever.

Future Enhancements

Planned features (not yet implemented):

  1. Log Truncation: Delete entries before snapshots to bound log size
  2. Batch Commits: Commit multiple slots in one fsync() call
  3. Remote Snapshots: Ship snapshots to new replicas for faster bootstrap
  4. Tiered Storage: Move old segments to S3/object storage
  5. Compression: Optional compression of tail segments

References

License

Atlas-DB is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

Documentation

Index

Constants

View Source
const (
	MutableSize = 2 * MB // 2MB per log (small since few in-flight commits expected)
	SegmentSize = 1 * GB // 1GB per segment file
	NumThreads  = 128    // Thread pool size for async operations
	MaxHotKeys  = 1024   // LRU cache size (1024 × 2MB = 2GB max memory)
)

Variables

View Source
var (
	// ErrSlotNotFound is returned when a slot is not in the log
	ErrSlotNotFound = errors.New("slot not found")

	// ErrCorruptedEntry is returned when an entry fails checksum validation
	ErrCorruptedEntry = errors.New("corrupted log entry")

	// ErrNotCommitted is returned when trying to read an uncommitted entry with ReadCommittedOnly
	ErrNotCommitted = errors.New("entry not committed")

	// ErrBufferFull is returned when the mutable buffer is full
	ErrBufferFull = errors.New("mutable buffer full")

	// ErrInvalidOffset is returned when an offset is out of bounds
	ErrInvalidOffset = errors.New("invalid offset")

	// ErrClosed is returned when operating on a closed log
	ErrClosed = errors.New("log is closed")

	// ErrNoSnapshot is returned when no snapshot exists
	ErrNoSnapshot = errors.New("no snapshot found")

	// ErrCorruptedSnapshot is returned when a snapshot fails checksum validation
	ErrCorruptedSnapshot = errors.New("corrupted snapshot")

	// ErrEntryTooLarge is returned when an entry value exceeds the maximum allowed size
	ErrEntryTooLarge = errors.New("entry value too large")
)
View Source
var (
	ErrRegistryClosed = errors.New("registry is closed")
)

Functions

func CompactOnStartup

func CompactOnStartup(logPath string, snapshotDir string) (uint64, error)

CompactOnStartup performs offline log compaction before the log is opened This MUST be called before NewFasterLog() if you want to apply pending truncations Returns the truncation slot if compaction was performed, 0 if not

Types

type Ballot

type Ballot struct {
	ID     uint64 // Ballot counter
	NodeID uint64 // Node that created this ballot
}

Ballot represents a WPaxos ballot number Ballots are ordered first by ID, then by NodeID for tie-breaking

func (Ballot) Less

func (b Ballot) Less(other Ballot) bool

Less returns true if this ballot is less than other

type ByteSize

type ByteSize uint64
const (
	B  ByteSize = 1
	KB ByteSize = 1 << (10 * iota)
	MB
	GB
	TB
	PB
)

type Config

type Config struct {
	// Path to the log file on disk
	Path string

	// MutableSize is the size of the in-memory mutable region for uncommitted entries
	// Larger = more uncommitted entries can be held in memory
	// Recommended: 64MB - 256MB
	MutableSize uint64

	// SegmentSize is the size of each immutable log segment
	// Larger = fewer files but slower recovery
	// Recommended: 1GB - 4GB
	SegmentSize uint64

	// NumThreads is the number of concurrent threads/goroutines expected
	// Used for epoch-based memory management
	NumThreads int

	// SyncOnCommit controls whether to fsync on every commit
	// true = durable but slower, false = fast but risk data loss on crash
	SyncOnCommit bool
}

Config holds configuration for the FASTER log

type FasterLog

type FasterLog struct {
	// contains filtered or unexported fields
}

FasterLog implements a FASTER-style hybrid log optimised for consensus protocols It has three regions: 1. In-memory index (hash map): slot -> offset lookup 2. Mutable region (ring buffer): recent uncommitted entries (LOCK-FREE!) 3. Immutable tail (mmap file): committed entries, append-only

LOCK-FREE DESIGN: - a Mutable region uses atomic CAS for allocation (no locks!) - Reads scan buffer sequentially (fast for 64MB region) - Epochs protect against use-after-free during concurrent access - Only tail writes use a mutex (sequential disk writes)

func NewFasterLog

func NewFasterLog(cfg Config) (*FasterLog, error)

NewFasterLog creates a new FASTER-style log

func (*FasterLog) Accept

func (l *FasterLog) Accept(slot uint64, ballot Ballot, value []byte) error

Accept writes an accepted (uncommitted) entry to the mutable region This corresponds to WPaxos Phase-2b

func (*FasterLog) Checkpoint

func (l *FasterLog) Checkpoint() error

Checkpoint flushes all committed entries from mutable region to tail LOCK-FREE: Scans and drains without locks

func (*FasterLog) Close

func (l *FasterLog) Close() error

Close closes the log

func (*FasterLog) Commit

func (l *FasterLog) Commit(slot uint64) error

Commit marks an entry as committed and flushes it to the immutable tail This corresponds to WPaxos Phase-3 Following FASTER's RCU principle: only committed entries go to the immutable tail. The tail is truly immutable - we never modify existing entries in place.

func (*FasterLog) GetCommittedRange

func (l *FasterLog) GetCommittedRange() (minSlot uint64, maxSlot uint64, count uint64)

GetCommittedRange returns the range of committed slots (min, max, count) Useful for learners to know what they need to catch up on Returns (0, 0, 0) if log is empty

func (*FasterLog) IterateCommitted

func (l *FasterLog) IterateCommitted(fn func(entry *LogEntry) error, opts IterateOptions) error

IterateCommitted iterates over committed entries in slot order (ZERO-ALLOCATION) The callback receives each entry but MUST NOT store the pointer - it's reused! Use cases:

  • State machine reconstruction: fn(entry) { applyToStateMachine(entry.Value) }
  • Snapshot creation: fn(entry) { serialize(entry.Slot, entry.Value) }
  • Learner bootstrap: fn(entry) { sendToLearner(entry) }

CRITICAL: The entry parameter is REUSED between calls for zero allocation. If you need to keep data, copy it immediately:

valueCopy := make([]byte, len(entry.Value))
copy(valueCopy, entry.Value)

The iterator:

  1. Collects slot numbers from index (committed entries only)
  2. Sorts slots numerically
  3. Iterates in order, calling fn(entry) for each
  4. Stops on first error from fn()

Options control iteration range and behavior (see IterateOptions)

func (*FasterLog) Read

func (l *FasterLog) Read(slot uint64) (*LogEntry, error)

Read reads an entry by slot number

func (*FasterLog) ReadCommittedOnly

func (l *FasterLog) ReadCommittedOnly(slot uint64) (*LogEntry, error)

ReadCommittedOnly reads an entry only if it's committed This is what the state machine should use

func (*FasterLog) ReplayFromSlot

func (l *FasterLog) ReplayFromSlot(startSlot uint64, fn func(entry *LogEntry) error) error

ReplayFromSlot replays committed entries starting from a slot up to current max This is the primary method for learner bootstrap and crash recovery

CRITICAL: This captures the max slot BEFORE iteration to ensure a consistent snapshot. New commits that arrive during iteration are NOT included (prevents inconsistent reads).

Example - State machine reconstruction:

err := log.ReplayFromSlot(0, func(entry *LogEntry) error {
    return stateMachine.Apply(entry.Slot, entry.Value)
})

Example - Learner catching up from slot 1000:

err := log.ReplayFromSlot(1000, func(entry *LogEntry) error {
    return sendToLearner(entry)
})

The callback receives entries in slot order and MUST NOT store the entry pointer.

func (*FasterLog) ReplayRange

func (l *FasterLog) ReplayRange(startSlot, endSlot uint64, fn func(entry *LogEntry) error) error

ReplayRange replays committed entries in a specific range [startSlot, endSlot] Use this when you want explicit control over the upper bound (e.g., incremental catch-up)

Example - Learner catching up to specific slot:

leaderMax := getLeaderMaxSlot()
err := log.ReplayRange(1000, leaderMax, func(entry *LogEntry) error {
    return applyEntry(entry)
})

func (*FasterLog) ScanUncommitted

func (l *FasterLog) ScanUncommitted() ([]*LogEntry, error)

ScanUncommitted returns all uncommitted entries This is used for WPaxos Phase-1 recovery

type IterateOptions

type IterateOptions struct {
	// MinSlot is the minimum slot to iterate (0 = from beginning)
	MinSlot uint64

	// MaxSlot is the maximum slot to iterate (0 = to end)
	MaxSlot uint64

	// IncludeUncommitted includes uncommitted entries from mutable region
	// Useful for debugging or special recovery scenarios
	// Default: false (only committed entries)
	IncludeUncommitted bool

	// SkipErrors continues iteration even if individual entry reads fail
	// Useful for recovering from partial corruption
	// Default: false (stop on first error)
	SkipErrors bool
}

IterateOptions controls iteration behavior for IterateCommitted

type KeyRegistry

type KeyRegistry struct {
	// contains filtered or unexported fields
}

KeyRegistry tracks all known keys for enumeration and glob matching. It uses an append-only file for durability and an in-memory set for fast access.

func NewKeyRegistry

func NewKeyRegistry(path string) (*KeyRegistry, error)

NewKeyRegistry creates or opens a key registry at the given path.

func (*KeyRegistry) Close

func (kr *KeyRegistry) Close() error

Close closes the registry file.

func (*KeyRegistry) Compact

func (kr *KeyRegistry) Compact() error

Compact rewrites the registry file, removing deleted entries. This reduces file size after many delete operations.

func (*KeyRegistry) Contains

func (kr *KeyRegistry) Contains(key string) bool

Contains checks if a key exists in the registry.

func (*KeyRegistry) Count

func (kr *KeyRegistry) Count() int

Count returns the number of keys in the registry.

func (*KeyRegistry) Glob

func (kr *KeyRegistry) Glob(pattern string) []string

Glob returns all keys matching the given glob pattern. Supports: * (any chars), ? (single char), [abc] (char class), [a-z] (range)

func (*KeyRegistry) List

func (kr *KeyRegistry) List() []string

List returns all keys in the registry.

func (*KeyRegistry) Register

func (kr *KeyRegistry) Register(key string) error

Register adds a key to the registry. Idempotent - registering an existing key is a no-op.

func (*KeyRegistry) Unregister

func (kr *KeyRegistry) Unregister(key string) error

Unregister removes a key from the registry.

type LogEntry

type LogEntry struct {
	Slot      uint64
	Ballot    Ballot
	Value     []byte
	Committed bool
}

LogEntry represents a single consensus log entry This matches WPaxos semantics: slot, ballot, value, committed flag

type LogManager

type LogManager struct {
	// contains filtered or unexported fields
}

LogManager manages FASTER logs with LRU eviction and reference counting This ensures logs are never closed while in use, preventing use-after-free bugs

func NewLogManager

func NewLogManager() *LogManager

NewLogManager creates a new LogManager with LRU eviction

func NewLogManagerWithDir

func NewLogManagerWithDir(dir string) *LogManager

NewLogManagerWithDir creates a LogManager with a specific base directory This is useful for tests/benchmarks that need isolated storage

func (*LogManager) CloseAll

func (l *LogManager) CloseAll() error

CloseAll closes all logs (for shutdown) This will wait for active references to drain before closing If references don't drain within the timeout, it will NOT force-close

func (*LogManager) GetLog

func (l *LogManager) GetLog(key []byte) (*FasterLog, func(), error)

GetLog returns a log handle with acquired reference CRITICAL: Caller MUST call the returned release function when done! Usage:

log, release, err := manager.GetLog(key)
if err != nil { return err }
defer release()
// ... use log safely ...

func (*LogManager) GetSnapshotManager

func (l *LogManager) GetSnapshotManager(key []byte, log *FasterLog) (*SnapshotManager, error)

func (*LogManager) Glob

func (l *LogManager) Glob(pattern string) []string

Glob returns all keys matching the given glob pattern. Supports: * (any chars), ? (single char), [abc] (char class), [a-z] (range) Returns nil if registry is not available.

func (*LogManager) HasKey

func (l *LogManager) HasKey(key string) bool

HasKey checks if a key exists in the registry. Returns false if registry is not available.

func (*LogManager) InitKey

func (l *LogManager) InitKey(key []byte, fromSnapshot func(*Snapshot) error, replay func(*LogEntry) error) (*FasterLog, func(), error)

func (*LogManager) KeyCount

func (l *LogManager) KeyCount() int

KeyCount returns the number of registered keys. Returns 0 if registry is not available.

func (*LogManager) ListKeys

func (l *LogManager) ListKeys() []string

ListKeys returns all registered keys. Returns nil if registry is not available.

func (*LogManager) Stats

func (l *LogManager) Stats() LogManagerStats

Stats returns current statistics

type LogManagerStats

type LogManagerStats struct {
	TotalLogs  int
	OpenLogs   int
	ActiveRefs int
}

Stats returns statistics about the LogManager

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer is a lock-free buffer for the mutable region It stores uncommitted log entries in memory before they're flushed to the immutable tail Uses atomic CAS operations for true lock-free concurrent writes following FASTER design.

LOCK-FREE DESIGN: - Writes use atomic CAS to allocate space (no locks!) - Reads scan the buffer sequentially (acceptable since mutable region is small) - No index map needed (scanning is fast enough for ~64MB region)

Note: This is not a true "ring" buffer - it's a simple linear allocator that grows until entries are flushed. This matches WPaxos semantics better than a circular buffer.

func NewRingBuffer

func NewRingBuffer(size uint64) *RingBuffer

NewRingBuffer creates a new ring buffer of the given size

func (*RingBuffer) Append

func (rb *RingBuffer) Append(entry *LogEntry) (uint64, error)

Append adds an entry to the buffer using lock-free CAS allocation Returns the offset within the buffer LOCK-FREE: Uses atomic CompareAndSwap to reserve space without locks

SAFETY: Uses reserve-write-publish protocol to prevent readers from seeing partially-written entries: 1. Reserve space via CAS on 'reserved' 2. Write data into reserved region 3. Publish via CAS on 'published' (waits for sequential order)

func (*RingBuffer) AvailableSpace

func (rb *RingBuffer) AvailableSpace() uint64

AvailableSpace returns the available space in the buffer This accounts for space that has been reclaimed via tail advancement

func (*RingBuffer) DrainCommitted

func (rb *RingBuffer) DrainCommitted() ([]*LogEntry, error)

DrainCommitted removes committed entries and returns them This is called during checkpoint to flush committed entries to the tail LOCK-FREE: Scans the buffer sequentially Only scans published entries to avoid reading partially-written data

func (*RingBuffer) GetAllUncommitted

func (rb *RingBuffer) GetAllUncommitted() ([]*LogEntry, error)

GetAllUncommitted returns all uncommitted entries (legacy wrapper)

func (*RingBuffer) GetAllUncommittedWithIndex

func (rb *RingBuffer) GetAllUncommittedWithIndex(indexCheck func(uint64) bool) ([]*LogEntry, error)

GetAllUncommitted returns all uncommitted entries Used for Phase-1 recovery in WPaxos LOCK-FREE: Scans the buffer sequentially Only scans published entries to avoid reading partially-written data indexCheck: optional index to check if entry has been moved to tail

func (*RingBuffer) MarkCommitted

func (rb *RingBuffer) MarkCommitted(slot uint64) error

MarkCommitted marks an entry as committed in the buffer (IN-PLACE UPDATE) WARNING: This is NOT lock-free but safe since we own the epoch Used by Commit() to mark an entry before flushing to tail Only scans published entries to avoid reading partially-written data

func (*RingBuffer) Read

func (rb *RingBuffer) Read(offset uint64) (*LogEntry, error)

Read reads an entry at the given offset

func (*RingBuffer) ReadBySlot

func (rb *RingBuffer) ReadBySlot(slot uint64) (*LogEntry, error)

ReadBySlot reads an entry by slot number LOCK-FREE: Scans the buffer sequentially (fast for small mutable regions) Only scans published entries to avoid reading partially-written data

func (*RingBuffer) Remove

func (rb *RingBuffer) Remove(slot uint64)

Remove is no longer a no-op - it's critical for reclaiming space! This should be called after Commit() moves an entry to the tail. However, we can't just advance tail arbitrarily - we need to advance it to the first uncommitted entry to maintain the invariant that all entries before tail have been flushed.

func (*RingBuffer) Reset

func (rb *RingBuffer) Reset()

Reset attempts to reset the buffer when all entries have been flushed. If writers are active or the buffer still holds data, the reset is skipped.

func (*RingBuffer) TryAdvanceTail

func (rb *RingBuffer) TryAdvanceTail(indexCheck func(uint64) bool) uint64

TryAdvanceTail attempts to advance the tail pointer past committed entries that have been moved to the immutable tail, reclaiming space. This should be called after commits to incrementally reclaim space.

indexCheck: function that returns true if the slot is still in mutable region Returns: number of bytes reclaimed

func (*RingBuffer) UsedSpace

func (rb *RingBuffer) UsedSpace() uint64

UsedSpace returns the space currently in use (not yet reclaimed)

type Snapshot

type Snapshot struct {
	// Slot is the last committed slot included in this snapshot
	Slot uint64

	// Data is the serialized state machine state
	// For WPaxos, this would be the reconstructed Record for each key
	Data []byte

	// Metadata for the snapshot
	NumEntries uint64 // Number of log entries represented
	Checksum   uint32 // CRC32 checksum of Data
}

Snapshot represents a state machine checkpoint at a specific slot It captures the reconstructed state at that point in the log

type SnapshotManager

type SnapshotManager struct {
	// contains filtered or unexported fields
}

SnapshotManager handles snapshot creation, storage, and recovery

func NewSnapshotManager

func NewSnapshotManager(snapshotDir string, log *FasterLog) (*SnapshotManager, error)

NewSnapshotManager creates a new snapshot manager

func (*SnapshotManager) CleanupOldSnapshots

func (sm *SnapshotManager) CleanupOldSnapshots(keepCount int) error

CleanupOldSnapshots removes all snapshots older than keepCount

func (*SnapshotManager) CreateSnapshot

func (sm *SnapshotManager) CreateSnapshot(slot uint64, stateData []byte) error

CreateSnapshot creates a snapshot at the given slot The caller provides the serialized state machine data

func (*SnapshotManager) GetLatestSnapshot

func (sm *SnapshotManager) GetLatestSnapshot() (*Snapshot, error)

GetLatestSnapshot returns the most recent snapshot

func (*SnapshotManager) GetSnapshot

func (sm *SnapshotManager) GetSnapshot(slot uint64) (*Snapshot, error)

GetSnapshot returns the snapshot for a specific slot (or nearest before it)

func (*SnapshotManager) TruncateLog

func (sm *SnapshotManager) TruncateLog(truncateSlot uint64) error

TruncateLog marks a truncation point in metadata The actual compaction happens offline when the log is reopened IMPORTANT: This does NOT immediately free disk space! Call CompactOnStartup() during recovery to perform actual truncation

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL