Documentation
¶
Index ¶
- Variables
- type CompactionConfig
- type CompactionResult
- type CompactionStrategy
- type CompressionType
- type Config
- type FsyncPolicy
- type Index
- type Iterator
- type Log
- type MemTable
- type MemTableConfig
- type Message
- type MessageBatch
- type Offset
- type RetentionConfig
- type RetentionManager
- func (rm *RetentionManager) EnforceNow() RetentionStats
- func (rm *RetentionManager) RegisterLog(partitionID string, log Log)
- func (rm *RetentionManager) Start(ctx context.Context)
- func (rm *RetentionManager) Stats() RetentionStats
- func (rm *RetentionManager) Stop()
- func (rm *RetentionManager) UnregisterLog(partitionID string)
- type RetentionStats
- type SSTable
- type SSTableConfig
- type Segment
- type Storage
- type WAL
- type WALConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrOffsetOutOfRange = errors.New("offset out of range") ErrLogClosed = errors.New("log is closed") ErrLogCorrupted = errors.New("log is corrupted") ErrInvalidOffset = errors.New("invalid offset") ErrSegmentNotFound = errors.New("segment not found") ErrChecksumMismatch = errors.New("checksum mismatch") )
Common errors
Functions ¶
This section is empty.
Types ¶
type CompactionConfig ¶
type CompactionConfig struct {
// Strategy
Strategy CompactionStrategy
// Max concurrent compactions
MaxConcurrent int
// Size ratio for leveled compaction
SizeRatio int
}
CompactionConfig holds compaction configuration
type CompactionResult ¶ added in v1.1.0
type CompactionResult struct {
// MergedEntries is the total number of key-value entries in the output.
MergedEntries int
// InputTables is the number of input tables that were compacted.
InputTables int
// OutputSize is the total byte size of the compacted output.
OutputSize int64
}
CompactionResult holds the outcome of a compaction run.
type CompactionStrategy ¶
type CompactionStrategy int
CompactionStrategy represents the compaction strategy
const ( CompactionLeveled CompactionStrategy = iota CompactionSizeTiered CompactionTimeWindow )
type CompressionType ¶
type CompressionType int8
CompressionType represents the compression algorithm
const ( CompressionNone CompressionType = iota CompressionGzip CompressionSnappy CompressionLZ4 CompressionZstd )
type Config ¶
type Config struct {
// Directory for data files
DataDir string
// WAL configuration
WAL WALConfig
// MemTable configuration
MemTable MemTableConfig
// SSTable configuration
SSTable SSTableConfig
// Compaction configuration
Compaction CompactionConfig
}
Config holds storage engine configuration
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns default storage configuration
type FsyncPolicy ¶
type FsyncPolicy int
FsyncPolicy determines when to fsync writes
const ( FsyncAlways FsyncPolicy = iota FsyncInterval FsyncNever )
type Index ¶
type Index interface {
// Lookup finds the file position for an offset
Lookup(offset Offset) (position int64, err error)
// Add adds an offset mapping
Add(offset Offset, position int64) error
// Truncate truncates the index
Truncate(beforeOffset Offset) error
// Sync syncs the index to disk
Sync() error
// Close closes the index
Close() error
}
Index provides offset to file position mapping
type Iterator ¶
type Iterator interface {
// Next advances to the next entry
Next() bool
// Key returns the current key
Key() []byte
// Value returns the current value
Value() []byte
// Err returns any error encountered
Err() error
// Close closes the iterator
Close() error
}
Iterator provides ordered iteration over key-value pairs
type Log ¶
type Log interface {
// Append appends a batch of messages
Append(batch *MessageBatch) ([]Offset, error)
// Read reads messages starting at offset
Read(offset Offset, maxBytes int) ([]*Message, error)
// ReadRange reads messages in range
ReadRange(startOffset, endOffset Offset) ([]*Message, error)
// HighWaterMark returns the high water mark
HighWaterMark() Offset
// StartOffset returns the first available offset
StartOffset() Offset
// EndOffset returns the last offset + 1
EndOffset() Offset
// FindOffsetByTimestamp finds the first offset whose message timestamp is >= the given timestamp.
// Returns the offset and the actual timestamp of that message.
// If no message is found at or after the timestamp, returns EndOffset() and zero timestamp.
// timestamp is in Unix nanoseconds.
FindOffsetByTimestamp(timestamp int64) (Offset, int64, error)
// Flush flushes pending writes
Flush() error
// Compact triggers compaction
Compact() error
// Delete deletes messages before offset
Delete(beforeOffset Offset) error
// Close closes the log
Close() error
}
Log represents a partition log
type MemTable ¶
type MemTable interface {
// Put adds a key-value pair
Put(key []byte, value []byte) error
// Get retrieves a value by key
Get(key []byte) ([]byte, bool, error)
// Delete marks a key as deleted
Delete(key []byte) error
// Size returns the current size in bytes
Size() int64
// Iterator returns an iterator over all entries
Iterator() Iterator
// Clear clears all entries
Clear()
}
MemTable represents an in-memory table
type MemTableConfig ¶
type MemTableConfig struct {
// Max size before flush (default: 64MB)
MaxSize int64
// Number of immutable memtables to keep
NumImmutable int
}
MemTableConfig holds MemTable configuration
type Message ¶
type Message struct {
Offset Offset // Message offset
Key []byte // Message key (optional)
Value []byte // Message value
Headers map[string][]byte // Message headers
Timestamp time.Time // Message timestamp
CRC uint32 // CRC32C checksum
}
Message represents a single message in the log
type MessageBatch ¶
type MessageBatch struct {
Messages []Message
BaseOffset Offset
Compression CompressionType
Timestamp time.Time
ProducerID int64
ProducerEpoch int16
// LeaderEpoch is the epoch of the partition leader when this batch was written.
// Used for leader fencing and offset validation during replication.
LeaderEpoch int64
}
MessageBatch represents a batch of messages
type RetentionConfig ¶ added in v1.1.0
type RetentionConfig struct {
// CheckInterval is how often the retention manager checks for expired data.
// Defaults to 5 minutes.
CheckInterval time.Duration
// RetentionMs is the maximum age of messages in milliseconds.
// Messages older than this are eligible for deletion.
// A value <= 0 means no time-based retention.
RetentionMs int64
// RetentionBytes is the maximum size per partition in bytes.
// When exceeded, the oldest messages are deleted first (FIFO).
// A value <= 0 means no size-based retention.
RetentionBytes int64
}
RetentionConfig holds configuration for retention enforcement.
func DefaultRetentionConfig ¶ added in v1.1.0
func DefaultRetentionConfig() RetentionConfig
DefaultRetentionConfig returns a default retention configuration with no retention limits.
type RetentionManager ¶ added in v1.1.0
type RetentionManager struct {
// contains filtered or unexported fields
}
RetentionManager enforces retention policies on partition logs. It runs a background goroutine that periodically checks all registered logs and deletes data that exceeds the configured retention limits.
func NewRetentionManager ¶ added in v1.1.0
func NewRetentionManager(config RetentionConfig) *RetentionManager
NewRetentionManager creates a new RetentionManager with the given configuration. The manager does not start automatically; call Start to begin enforcement.
func (*RetentionManager) EnforceNow ¶ added in v1.1.0
func (rm *RetentionManager) EnforceNow() RetentionStats
EnforceNow runs retention enforcement immediately (useful for testing). Returns stats for this enforcement cycle.
func (*RetentionManager) RegisterLog ¶ added in v1.1.0
func (rm *RetentionManager) RegisterLog(partitionID string, log Log)
RegisterLog adds a partition log to be managed by this retention manager.
func (*RetentionManager) Start ¶ added in v1.1.0
func (rm *RetentionManager) Start(ctx context.Context)
Start begins the background retention enforcement loop. It blocks until the context is cancelled or Stop is called.
func (*RetentionManager) Stats ¶ added in v1.1.0
func (rm *RetentionManager) Stats() RetentionStats
Stats returns cumulative retention stats.
func (*RetentionManager) Stop ¶ added in v1.1.0
func (rm *RetentionManager) Stop()
Stop signals the background goroutine to stop and waits for it to finish.
func (*RetentionManager) UnregisterLog ¶ added in v1.1.0
func (rm *RetentionManager) UnregisterLog(partitionID string)
UnregisterLog removes a partition log from the retention manager.
type RetentionStats ¶ added in v1.1.0
type RetentionStats struct {
// SegmentsDeleted is the number of logical segments (offset ranges) deleted.
SegmentsDeleted int64
// BytesReclaimed is the estimated bytes reclaimed.
BytesReclaimed int64
// PartitionsChecked is the number of partitions that were evaluated.
PartitionsChecked int64
}
RetentionStats tracks metrics for a single enforcement cycle.
type SSTable ¶
type SSTable interface {
// Get retrieves a value by key
Get(key []byte) ([]byte, bool, error)
// Iterator returns an iterator over all entries
Iterator() Iterator
// Close closes the SSTable
Close() error
}
SSTable represents a Sorted String Table
type SSTableConfig ¶
type SSTableConfig struct {
// Block size (default: 4KB)
BlockSize int
// Enable bloom filters
BloomFilterEnabled bool
// Bloom filter false positive rate
BloomFilterFPRate float64
// Compression for data blocks
Compression CompressionType
}
SSTableConfig holds SSTable configuration
type Segment ¶
type Segment interface {
// BaseOffset returns the base offset of this segment
BaseOffset() Offset
// NextOffset returns the next offset to be written
NextOffset() Offset
// Size returns the size in bytes
Size() int64
// Append appends a batch
Append(batch *MessageBatch) error
// Read reads messages starting at offset
Read(offset Offset, maxBytes int) ([]*Message, error)
// Sync syncs to disk
Sync() error
// Close closes the segment
Close() error
// IsFull checks if segment is full
IsFull() bool
io.Closer
}
Segment represents a log segment
type Storage ¶
type Storage interface {
// Append appends a batch of messages to a partition and returns their offsets
Append(ctx context.Context, partition int, batch *MessageBatch) ([]Offset, error)
// Read reads messages from a partition starting at the given offset
Read(ctx context.Context, partition int, offset Offset, maxBytes int) ([]*Message, error)
// ReadRange reads messages in the specified offset range
ReadRange(ctx context.Context, partition int, startOffset, endOffset Offset) ([]*Message, error)
// GetHighWaterMark returns the high water mark (last committed offset + 1)
GetHighWaterMark(partition int) (Offset, error)
// GetLogStartOffset returns the first available offset
GetLogStartOffset(partition int) (Offset, error)
// GetLogEndOffset returns the last offset + 1
GetLogEndOffset(partition int) (Offset, error)
// Flush forces all pending writes to disk
Flush() error
// Compact triggers compaction for a partition
Compact(partition int) error
// Delete deletes messages before the given offset
Delete(partition int, beforeOffset Offset) error
// Close closes the storage engine
Close() error
}
Storage is the main interface for the storage engine
type WAL ¶
type WAL interface {
// Append appends a record to the WAL
Append(data []byte) (Offset, error)
// Read reads a record at the given offset
Read(offset Offset) ([]byte, error)
// Sync forces all pending writes to disk
Sync() error
// Truncate truncates the WAL before the given offset
Truncate(beforeOffset Offset) error
// NextOffset returns the next available offset
NextOffset() Offset
// Close closes the WAL
Close() error
}
WAL represents the Write-Ahead Log