statemachine

package
v0.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

README

State Machine Implementation

Overview

The consensus extension provides three state machine implementations, each optimized for different use cases:

  1. Memory State Machine (memory.go) - In-memory, fast, for testing/development
  2. Persistent State Machine (persistent.go) - Disk-backed with caching, for production
  3. Apply Manager (apply.go) - Optimized log application with batching and parallelism

Memory State Machine

File: memory.go

Simple in-memory key-value store using Go maps with mutex protection.

Features
  • Fast read/write operations
  • Zero persistence overhead
  • Snapshot support (serialized to memory)
  • Thread-safe operations
Use Cases
  • Development and testing
  • Ephemeral data
  • Prototyping
  • Performance benchmarking
Example
sm := statemachine.NewMemoryStateMachine(logger)
sm.Apply(entry) // Instant

Persistent State Machine

File: persistent.go

Production-ready state machine with persistent storage and intelligent caching.

Features
1. Persistent Storage
  • Backed by any internal.Storage implementation
  • Atomic batch operations
  • Crash recovery
  • Durable snapshots
2. Intelligent Caching
  • In-memory cache for hot data
  • Configurable cache size
  • Cache hit/miss tracking
  • LRU-style eviction (simplified)
3. Batched Writes
  • Configurable batch size (default: 100)
  • Time-based flushing (default: 10ms)
  • Reduces I/O operations
  • Improves write throughput
4. Async Application
  • Non-blocking apply operations
  • Background worker for batch processing
  • Automatic queue flushing
  • Graceful shutdown with flush
Configuration
config := statemachine.PersistentStateMachineConfig{
    Storage:       storage,      // BadgerDB, BoltDB, etc.
    EnableCache:   true,          // Enable in-memory cache
    MaxCacheSize:  10000,         // Max cached entries
    BatchSize:     100,           // Entries per batch
    BatchInterval: 10 * time.Millisecond,
    SyncWrites:    true,          // Fsync on write
}

psm, err := statemachine.NewPersistentStateMachine(config, logger)
Performance Characteristics
Operation Latency Throughput Notes
Apply (async) ~1μs 100k+ ops/s Queue only
Apply (sync) ~10ms 10k+ ops/s Wait for batch
Get (cache hit) ~1μs 1M+ ops/s In-memory
Get (cache miss) ~1ms 50k+ ops/s Disk read
Snapshot ~500ms N/A Full state
Statistics
stats := psm.GetStats()
// Returns:
// - applied_count: Total entries applied
// - last_applied: Last applied index
// - last_snapshot: Last snapshot index
// - queue_size: Pending entries
// - cache_size: Cached entries
// - cache_hits: Cache hits
// - cache_misses: Cache misses
// - cache_hit_rate: Hit rate percentage

Apply Manager

File: apply.go

High-performance log application engine with batching, pipelining, and parallelism.

Features
1. Worker Pool
  • Configurable worker count (default: 4)
  • Parallel entry processing
  • Load distribution
  • Worker isolation
2. Apply Pipeline
  • Buffered pipeline (default: 1000 entries)
  • Async submission
  • Result channels
  • Backpressure handling
3. Batching
  • Configurable batch size (default: 100)
  • Time-based batching (default: 10ms)
  • Reduces overhead
  • Improves throughput
4. Advanced Features
  • Sync/Async modes: Choose based on requirements
  • Retry logic: Exponential backoff
  • Timeout handling: Per-operation timeouts
  • Pipeline monitoring: Utilization tracking
  • Flush control: Wait for pending applies
Configuration
config := statemachine.ApplyManagerConfig{
    PipelineDepth: 1000,           // Pipeline buffer size
    WorkerCount:   4,              // Parallel workers
    BatchSize:     100,            // Entries per batch
    BatchTimeout:  10 * time.Millisecond,
}

am := statemachine.NewApplyManager(stateMachine, config, logger)
Usage Patterns
1. Async Apply (High Throughput)
// Submit and continue
err := am.ApplyEntries(entries)

// Later, wait for completion
err = am.WaitForApply(lastIndex, 30*time.Second)
2. Sync Apply (Immediate Consistency)
// Wait for completion
err := am.ApplyEntriesSync(entries, 30*time.Second)
3. Batch Apply (Parallel Processing)
// Automatically splits into batches and processes in parallel
err := am.ApplyBatch(largeEntryList)
4. Apply with Retry
// Automatic retry with exponential backoff
err := am.ApplyWithRetry(entries, 3)
Performance Characteristics
Mode Latency Throughput Use Case
Async ~1μs 1M+ ops/s High throughput
Sync ~10ms 100k+ ops/s Consistency
Batch ~50ms 500k+ ops/s Bulk operations
Retry Variable Variable Reliability
Monitoring
stats := am.GetStats()
// Returns:
// - total_applied: Total entries applied
// - total_failed: Failed applications
// - average_latency_ms: Average latency
// - pending_count: Pending applies
// - pipeline_size: Current pipeline size
// - pipeline_capacity: Max pipeline size
// - worker_count: Active workers
// - success_rate: Success percentage

// Check pipeline health
utilization := am.GetPipelineUtilization() // 0-100%
isFull := am.IsPipelineFull()
pending := am.GetPendingCount()

// Flush all pending
err := am.FlushPending(30*time.Second)

Architecture

┌─────────────────────────────────────────────────┐
│                 Raft Node                       │
└───────────────────┬─────────────────────────────┘
                    │
                    │ Log Entries
                    ▼
┌─────────────────────────────────────────────────┐
│              Apply Manager                      │
│  ┌─────────────────────────────────────────┐   │
│  │         Apply Pipeline (1000)            │   │
│  └─────────────────┬───────────────────────┘   │
│                    │                            │
│  ┌─────────┬──────┴──────┬─────────┐          │
│  │ Worker  │   Worker     │ Worker  │          │
│  │   #1    │     #2       │   #3    │          │
│  └────┬────┴──────┬───────┴────┬────┘          │
└───────┼───────────┼────────────┼───────────────┘
        │           │            │
        │    Batched Operations  │
        ▼           ▼            ▼
┌─────────────────────────────────────────────────┐
│         Persistent State Machine                │
│  ┌─────────────────────────────────────────┐   │
│  │         Apply Queue (100)                │   │
│  └─────────────────┬───────────────────────┘   │
│                    │                            │
│  ┌─────────────────▼───────────────────────┐   │
│  │       Background Worker                  │   │
│  │  - Batches operations                    │   │
│  │  - Writes to storage                     │   │
│  │  - Updates cache                         │   │
│  └─────────────────┬───────────────────────┘   │
└────────────────────┼────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│              Storage Backend                    │
│         (BadgerDB / BoltDB / etc.)             │
└─────────────────────────────────────────────────┘

Production Recommendations

Small Cluster (3-5 nodes)
// Memory state machine is fine
sm := statemachine.NewMemoryStateMachine(logger)
Medium Cluster (5-10 nodes)
// Persistent with moderate caching
config := statemachine.PersistentStateMachineConfig{
    Storage:       badgerStorage,
    EnableCache:   true,
    MaxCacheSize:  5000,
    BatchSize:     50,
    BatchInterval: 20 * time.Millisecond,
}
psm, _ := statemachine.NewPersistentStateMachine(config, logger)
Large Cluster (10+ nodes) or High Throughput
// Persistent + Apply Manager for maximum performance
psmConfig := statemachine.PersistentStateMachineConfig{
    Storage:       badgerStorage,
    EnableCache:   true,
    MaxCacheSize:  10000,
    BatchSize:     100,
    BatchInterval: 10 * time.Millisecond,
}
psm, _ := statemachine.NewPersistentStateMachine(psmConfig, logger)

amConfig := statemachine.ApplyManagerConfig{
    PipelineDepth: 2000,
    WorkerCount:   8,
    BatchSize:     200,
    BatchTimeout:  10 * time.Millisecond,
}
am := statemachine.NewApplyManager(psm, amConfig, logger)

Tuning Guide

For Low Latency
  • Reduce batch sizes
  • Reduce batch intervals
  • Increase worker count
  • Enable caching
For High Throughput
  • Increase batch sizes
  • Increase pipeline depth
  • Enable batching
  • Use more workers
For Memory Efficiency
  • Reduce cache size
  • Reduce pipeline depth
  • Smaller batch sizes
  • Disable cache
For Durability
  • Enable sync writes
  • Smaller batch intervals
  • Use persistent storage
  • Regular snapshots

Error Handling

All operations return errors that should be checked:

// Apply
if err := sm.Apply(entry); err != nil {
    // Handle apply error
    log.Error("apply failed", err)
}

// Get
value, err := sm.Get("key")
if err == internal.ErrNodeNotFound {
    // Key doesn't exist
} else if err != nil {
    // Other error
}

// Snapshot
data, err := sm.CreateSnapshot()
if err != nil {
    // Snapshot failed
}

Testing

Use the in-memory state machine for tests:

func TestMyFeature(t *testing.T) {
    logger := &testLogger{t}
    sm := statemachine.NewMemoryStateMachine(logger)
    
    // Test away!
    sm.Apply(entry)
    value, _ := sm.Get("key")
    assert.Equal(t, expected, value)
}

Metrics Integration

All implementations expose metrics:

// Get statistics
stats := sm.GetStats()

// Export to Prometheus
prometheus.GaugeFunc(prometheus.GaugeOpts{
    Name: "statemachine_applied_total",
}, func() float64 {
    return float64(stats["applied_count"].(uint64))
})

Best Practices

  1. Always use persistent storage in production
  2. Enable caching for read-heavy workloads
  3. Tune batch sizes based on entry size
  4. Monitor cache hit rates
  5. Use Apply Manager for high throughput
  6. Implement custom state machines for complex data models
  7. Take snapshots regularly
  8. Test recovery scenarios
  9. Monitor apply latency
  10. Use retry logic for transient failures

Summary

Implementation Persistence Caching Batching Parallelism Best For
Memory N/A Testing
Persistent Production
Apply Manager N/A N/A High Performance

Recommended Setup: Persistent State Machine + Apply Manager for production deployments requiring high performance and durability.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ApplyManager

type ApplyManager struct {
	// contains filtered or unexported fields
}

ApplyManager manages optimized log entry application.

func NewApplyManager

func NewApplyManager(
	stateMachine internal.StateMachine,
	config ApplyManagerConfig,
	logger forge.Logger,
) *ApplyManager

NewApplyManager creates a new apply manager.

func (*ApplyManager) ApplyBatch

func (am *ApplyManager) ApplyBatch(entries []internal.LogEntry) error

ApplyBatch applies a batch of entries with optimizations.

func (*ApplyManager) ApplyEntries

func (am *ApplyManager) ApplyEntries(entries []internal.LogEntry) error

ApplyEntries applies log entries asynchronously.

func (*ApplyManager) ApplyEntriesSync

func (am *ApplyManager) ApplyEntriesSync(entries []internal.LogEntry, timeout time.Duration) error

ApplyEntriesSync applies log entries synchronously.

func (*ApplyManager) ApplyWithRetry

func (am *ApplyManager) ApplyWithRetry(entries []internal.LogEntry, maxRetries int) error

ApplyWithRetry applies entries with retry logic.

func (*ApplyManager) FlushPending

func (am *ApplyManager) FlushPending(timeout time.Duration) error

FlushPending waits for all pending applies to complete.

func (*ApplyManager) GetPendingCount

func (am *ApplyManager) GetPendingCount() int

GetPendingCount returns the number of pending applies.

func (*ApplyManager) GetPipelineUtilization

func (am *ApplyManager) GetPipelineUtilization() float64

GetPipelineUtilization returns pipeline utilization percentage.

func (*ApplyManager) GetStats

func (am *ApplyManager) GetStats() map[string]any

GetStats returns apply manager statistics.

func (*ApplyManager) IsPipelineFull

func (am *ApplyManager) IsPipelineFull() bool

IsPipelineFull returns true if the pipeline is full.

func (*ApplyManager) Start

func (am *ApplyManager) Start(ctx context.Context) error

Start starts the apply manager.

func (*ApplyManager) Stop

func (am *ApplyManager) Stop(ctx context.Context) error

Stop stops the apply manager.

func (*ApplyManager) WaitForApply

func (am *ApplyManager) WaitForApply(index uint64, timeout time.Duration) error

WaitForApply waits for entries up to index to be applied.

type ApplyManagerConfig

type ApplyManagerConfig struct {
	PipelineDepth int
	WorkerCount   int
	BatchSize     int
	BatchTimeout  time.Duration
}

ApplyManagerConfig contains apply manager configuration.

type Command

type Command struct {
	Op    string
	Key   string
	Value []byte
}

Command represents a state machine command.

type MemoryStateMachine

type MemoryStateMachine struct {
	// contains filtered or unexported fields
}

MemoryStateMachine implements a simple in-memory state machine.

func NewMemoryStateMachine

func NewMemoryStateMachine(config MemoryStateMachineConfig, logger forge.Logger) *MemoryStateMachine

NewMemoryStateMachine creates a new memory state machine.

func (*MemoryStateMachine) Apply

func (sm *MemoryStateMachine) Apply(entry internal.LogEntry) error

Apply applies a log entry to the state machine.

func (*MemoryStateMachine) Clear

func (sm *MemoryStateMachine) Clear()

Clear clears the state machine (for testing).

func (*MemoryStateMachine) GetState

func (sm *MemoryStateMachine) GetState() map[string]any

GetState returns a copy of the current state (for testing).

func (*MemoryStateMachine) Query

func (sm *MemoryStateMachine) Query(query any) (any, error)

Query performs a read-only query.

func (*MemoryStateMachine) Restore

func (sm *MemoryStateMachine) Restore(snapshot *internal.Snapshot) error

Restore restores the state machine from a snapshot.

func (*MemoryStateMachine) Snapshot

func (sm *MemoryStateMachine) Snapshot() (*internal.Snapshot, error)

Snapshot creates a snapshot of the current state.

type MemoryStateMachineConfig

type MemoryStateMachineConfig struct {
	InitialCapacity int
}

MemoryStateMachineConfig contains configuration for memory state machine.

type PersistentStateMachine

type PersistentStateMachine struct {
	// contains filtered or unexported fields
}

PersistentStateMachine is a state machine backed by persistent storage.

func NewPersistentStateMachine

func NewPersistentStateMachine(config PersistentStateMachineConfig, logger forge.Logger) (*PersistentStateMachine, error)

NewPersistentStateMachine creates a new persistent state machine.

func (*PersistentStateMachine) Apply

func (psm *PersistentStateMachine) Apply(entry internal.LogEntry) error

Apply applies a log entry to the state machine.

func (*PersistentStateMachine) CreateSnapshot

func (psm *PersistentStateMachine) CreateSnapshot() ([]byte, error)

CreateSnapshot creates a snapshot of the state machine.

func (*PersistentStateMachine) Get

func (psm *PersistentStateMachine) Get(key string) ([]byte, error)

Get retrieves a value from the state machine.

func (*PersistentStateMachine) GetStats

func (psm *PersistentStateMachine) GetStats() map[string]any

GetStats returns state machine statistics.

func (*PersistentStateMachine) RestoreSnapshot

func (psm *PersistentStateMachine) RestoreSnapshot(data []byte) error

RestoreSnapshot restores a snapshot to the state machine.

func (*PersistentStateMachine) Start

func (psm *PersistentStateMachine) Start(ctx context.Context) error

Start starts the persistent state machine.

func (*PersistentStateMachine) Stop

Stop stops the persistent state machine.

type PersistentStateMachineConfig

type PersistentStateMachineConfig struct {
	Storage       internal.Storage
	EnableCache   bool
	MaxCacheSize  int
	BatchSize     int
	BatchInterval time.Duration
	SyncWrites    bool
}

PersistentStateMachineConfig contains persistent state machine configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL