storage

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrOffsetOutOfRange = errors.New("offset out of range")
	ErrLogClosed        = errors.New("log is closed")
	ErrLogCorrupted     = errors.New("log is corrupted")
	ErrInvalidOffset    = errors.New("invalid offset")
	ErrSegmentNotFound  = errors.New("segment not found")
	ErrChecksumMismatch = errors.New("checksum mismatch")
)

Common errors

Functions

This section is empty.

Types

type CompactionConfig

type CompactionConfig struct {
	// Strategy
	Strategy CompactionStrategy

	// Max concurrent compactions
	MaxConcurrent int

	// Size ratio for leveled compaction
	SizeRatio int
}

CompactionConfig holds compaction configuration

type CompactionResult added in v1.1.0

type CompactionResult struct {
	// MergedEntries is the total number of key-value entries in the output.
	MergedEntries int
	// InputTables is the number of input tables that were compacted.
	InputTables int
	// OutputSize is the total byte size of the compacted output.
	OutputSize int64
}

CompactionResult holds the outcome of a compaction run.

type CompactionStrategy

type CompactionStrategy int

CompactionStrategy represents the compaction strategy

const (
	CompactionLeveled CompactionStrategy = iota
	CompactionSizeTiered
	CompactionTimeWindow
)

type CompressionType

type CompressionType int8

CompressionType represents the compression algorithm

const (
	CompressionNone CompressionType = iota
	CompressionGzip
	CompressionSnappy
	CompressionLZ4
	CompressionZstd
)

type Config

type Config struct {
	// Directory for data files
	DataDir string

	// WAL configuration
	WAL WALConfig

	// MemTable configuration
	MemTable MemTableConfig

	// SSTable configuration
	SSTable SSTableConfig

	// Compaction configuration
	Compaction CompactionConfig
}

Config holds storage engine configuration

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns default storage configuration

type FsyncPolicy

type FsyncPolicy int

FsyncPolicy determines when to fsync writes

const (
	FsyncAlways FsyncPolicy = iota
	FsyncInterval
	FsyncNever
)

type Index

type Index interface {
	// Lookup finds the file position for an offset
	Lookup(offset Offset) (position int64, err error)

	// Add adds an offset mapping
	Add(offset Offset, position int64) error

	// Truncate truncates the index
	Truncate(beforeOffset Offset) error

	// Sync syncs the index to disk
	Sync() error

	// Close closes the index
	Close() error
}

Index provides offset to file position mapping

func NewIndex

func NewIndex(path string) (Index, error)

NewIndex creates a new index

type Iterator

type Iterator interface {
	// Next advances to the next entry
	Next() bool

	// Key returns the current key
	Key() []byte

	// Value returns the current value
	Value() []byte

	// Err returns any error encountered
	Err() error

	// Close closes the iterator
	Close() error
}

Iterator provides ordered iteration over key-value pairs

type Log

type Log interface {
	// Append appends a batch of messages
	Append(batch *MessageBatch) ([]Offset, error)

	// Read reads messages starting at offset
	Read(offset Offset, maxBytes int) ([]*Message, error)

	// ReadRange reads messages in range
	ReadRange(startOffset, endOffset Offset) ([]*Message, error)

	// HighWaterMark returns the high water mark
	HighWaterMark() Offset

	// StartOffset returns the first available offset
	StartOffset() Offset

	// EndOffset returns the last offset + 1
	EndOffset() Offset

	// FindOffsetByTimestamp finds the first offset whose message timestamp is >= the given timestamp.
	// Returns the offset and the actual timestamp of that message.
	// If no message is found at or after the timestamp, returns EndOffset() and zero timestamp.
	// timestamp is in Unix nanoseconds.
	FindOffsetByTimestamp(timestamp int64) (Offset, int64, error)

	// Flush flushes pending writes
	Flush() error

	// Compact triggers compaction
	Compact() error

	// Delete deletes messages before offset
	Delete(beforeOffset Offset) error

	// Close closes the log
	Close() error
}

Log represents a partition log

func NewLog

func NewLog(dir string, config Config) (Log, error)

NewLog creates a new partition log

type MemTable

type MemTable interface {
	// Put adds a key-value pair
	Put(key []byte, value []byte) error

	// Get retrieves a value by key
	Get(key []byte) ([]byte, bool, error)

	// Delete marks a key as deleted
	Delete(key []byte) error

	// Size returns the current size in bytes
	Size() int64

	// Iterator returns an iterator over all entries
	Iterator() Iterator

	// Clear clears all entries
	Clear()
}

MemTable represents an in-memory table

func NewMemTable

func NewMemTable() MemTable

NewMemTable creates a new in-memory table

type MemTableConfig

type MemTableConfig struct {
	// Max size before flush (default: 64MB)
	MaxSize int64

	// Number of immutable memtables to keep
	NumImmutable int
}

MemTableConfig holds MemTable configuration

type Message

type Message struct {
	Offset    Offset            // Message offset
	Key       []byte            // Message key (optional)
	Value     []byte            // Message value
	Headers   map[string][]byte // Message headers
	Timestamp time.Time         // Message timestamp
	CRC       uint32            // CRC32C checksum
}

Message represents a single message in the log

type MessageBatch

type MessageBatch struct {
	Messages      []Message
	BaseOffset    Offset
	Compression   CompressionType
	Timestamp     time.Time
	ProducerID    int64
	ProducerEpoch int16
	// LeaderEpoch is the epoch of the partition leader when this batch was written.
	// Used for leader fencing and offset validation during replication.
	LeaderEpoch int64
}

MessageBatch represents a batch of messages

type Offset

type Offset int64

Offset represents a message offset in a partition

type RetentionConfig added in v1.1.0

type RetentionConfig struct {
	// CheckInterval is how often the retention manager checks for expired data.
	// Defaults to 5 minutes.
	CheckInterval time.Duration

	// RetentionMs is the maximum age of messages in milliseconds.
	// Messages older than this are eligible for deletion.
	// A value <= 0 means no time-based retention.
	RetentionMs int64

	// RetentionBytes is the maximum size per partition in bytes.
	// When exceeded, the oldest messages are deleted first (FIFO).
	// A value <= 0 means no size-based retention.
	RetentionBytes int64
}

RetentionConfig holds configuration for retention enforcement.

func DefaultRetentionConfig added in v1.1.0

func DefaultRetentionConfig() RetentionConfig

DefaultRetentionConfig returns a default retention configuration with no retention limits.

type RetentionManager added in v1.1.0

type RetentionManager struct {
	// contains filtered or unexported fields
}

RetentionManager enforces retention policies on partition logs. It runs a background goroutine that periodically checks all registered logs and deletes data that exceeds the configured retention limits.

func NewRetentionManager added in v1.1.0

func NewRetentionManager(config RetentionConfig) *RetentionManager

NewRetentionManager creates a new RetentionManager with the given configuration. The manager does not start automatically; call Start to begin enforcement.

func (*RetentionManager) EnforceNow added in v1.1.0

func (rm *RetentionManager) EnforceNow() RetentionStats

EnforceNow runs retention enforcement immediately (useful for testing). Returns stats for this enforcement cycle.

func (*RetentionManager) RegisterLog added in v1.1.0

func (rm *RetentionManager) RegisterLog(partitionID string, log Log)

RegisterLog adds a partition log to be managed by this retention manager.

func (*RetentionManager) Start added in v1.1.0

func (rm *RetentionManager) Start(ctx context.Context)

Start begins the background retention enforcement loop. It blocks until the context is cancelled or Stop is called.

func (*RetentionManager) Stats added in v1.1.0

func (rm *RetentionManager) Stats() RetentionStats

Stats returns cumulative retention stats.

func (*RetentionManager) Stop added in v1.1.0

func (rm *RetentionManager) Stop()

Stop signals the background goroutine to stop and waits for it to finish.

func (*RetentionManager) UnregisterLog added in v1.1.0

func (rm *RetentionManager) UnregisterLog(partitionID string)

UnregisterLog removes a partition log from the retention manager.

type RetentionStats added in v1.1.0

type RetentionStats struct {
	// SegmentsDeleted is the number of logical segments (offset ranges) deleted.
	SegmentsDeleted int64

	// BytesReclaimed is the estimated bytes reclaimed.
	BytesReclaimed int64

	// PartitionsChecked is the number of partitions that were evaluated.
	PartitionsChecked int64
}

RetentionStats tracks metrics for a single enforcement cycle.

type SSTable

type SSTable interface {
	// Get retrieves a value by key
	Get(key []byte) ([]byte, bool, error)

	// Iterator returns an iterator over all entries
	Iterator() Iterator

	// Close closes the SSTable
	Close() error
}

SSTable represents a Sorted String Table

type SSTableConfig

type SSTableConfig struct {
	// Block size (default: 4KB)
	BlockSize int

	// Enable bloom filters
	BloomFilterEnabled bool

	// Bloom filter false positive rate
	BloomFilterFPRate float64

	// Compression for data blocks
	Compression CompressionType
}

SSTableConfig holds SSTable configuration

type Segment

type Segment interface {
	// BaseOffset returns the base offset of this segment
	BaseOffset() Offset

	// NextOffset returns the next offset to be written
	NextOffset() Offset

	// Size returns the size in bytes
	Size() int64

	// Append appends a batch
	Append(batch *MessageBatch) error

	// Read reads messages starting at offset
	Read(offset Offset, maxBytes int) ([]*Message, error)

	// Sync syncs to disk
	Sync() error

	// Close closes the segment
	Close() error

	// IsFull checks if segment is full
	IsFull() bool

	io.Closer
}

Segment represents a log segment

type Storage

type Storage interface {
	// Append appends a batch of messages to a partition and returns their offsets
	Append(ctx context.Context, partition int, batch *MessageBatch) ([]Offset, error)

	// Read reads messages from a partition starting at the given offset
	Read(ctx context.Context, partition int, offset Offset, maxBytes int) ([]*Message, error)

	// ReadRange reads messages in the specified offset range
	ReadRange(ctx context.Context, partition int, startOffset, endOffset Offset) ([]*Message, error)

	// GetHighWaterMark returns the high water mark (last committed offset + 1)
	GetHighWaterMark(partition int) (Offset, error)

	// GetLogStartOffset returns the first available offset
	GetLogStartOffset(partition int) (Offset, error)

	// GetLogEndOffset returns the last offset + 1
	GetLogEndOffset(partition int) (Offset, error)

	// Flush forces all pending writes to disk
	Flush() error

	// Compact triggers compaction for a partition
	Compact(partition int) error

	// Delete deletes messages before the given offset
	Delete(partition int, beforeOffset Offset) error

	// Close closes the storage engine
	Close() error
}

Storage is the main interface for the storage engine

type WAL

type WAL interface {
	// Append appends a record to the WAL
	Append(data []byte) (Offset, error)

	// Read reads a record at the given offset
	Read(offset Offset) ([]byte, error)

	// Sync forces all pending writes to disk
	Sync() error

	// Truncate truncates the WAL before the given offset
	Truncate(beforeOffset Offset) error

	// NextOffset returns the next available offset
	NextOffset() Offset

	// Close closes the WAL
	Close() error
}

WAL represents the Write-Ahead Log

func NewWAL

func NewWAL(dir string, config WALConfig) (WAL, error)

NewWAL creates a new Write-Ahead Log

type WALConfig

type WALConfig struct {
	// Segment size in bytes (default: 1GB)
	SegmentSize int64

	// Fsync policy
	FsyncPolicy FsyncPolicy

	// Fsync interval for FsyncInterval policy
	FsyncInterval time.Duration
}

WALConfig holds Write-Ahead Log configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL