Documentation
¶
Overview ¶
Package segment provides segment file management for LedgerQ.
Segments are append-only log files containing sequential message entries. Each segment has:
- A .log file containing the segment header and message entries
- A .idx file containing the sparse index for fast lookups
- A base offset (first message ID) that uniquely identifies it
File naming convention:
- Segment: {baseOffset:020d}.log (e.g., 00000000000000001000.log)
- Index: {baseOffset:020d}.idx (e.g., 00000000000000001000.idx)
The 20-digit zero-padding ensures lexicographic sorting matches numeric ordering.
Index ¶
- Constants
- func FormatIndexName(baseOffset uint64) string
- func FormatSegmentName(baseOffset uint64) string
- func ParseIndexName(filename string) (uint64, error)
- func ParseSegmentName(filename string) (uint64, error)
- func ValidateSegmentSequence(segments []*SegmentInfo) error
- type ActiveWriterStats
- type CompactionResult
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) Compact() (*CompactionResult, error)
- func (m *Manager) GetActiveSegment() *SegmentInfo
- func (m *Manager) GetActiveWriterStats() *ActiveWriterStats
- func (m *Manager) GetSegments() []*SegmentInfo
- func (m *Manager) IsClosed() bool
- func (m *Manager) OpenReader(baseOffset uint64) (*Reader, error)
- func (m *Manager) Sync() error
- func (m *Manager) Write(entry *format.Entry) (uint64, error)
- type ManagerOptions
- type Reader
- func (r *Reader) BaseOffset() uint64
- func (r *Reader) Close() error
- func (r *Reader) FindByMessageID(msgID uint64) (*format.Entry, uint64, uint64, error)
- func (r *Reader) FindByTimestamp(timestamp int64) (*format.Entry, uint64, uint64, error)
- func (r *Reader) HasIndex() bool
- func (r *Reader) Header() *format.SegmentHeader
- func (r *Reader) Read() (*format.Entry, uint64, uint64, error)
- func (r *Reader) ReadAt(offset uint64) (*format.Entry, uint64, error)
- func (r *Reader) ScanAll(visitor func(*format.Entry, uint64) error) error
- func (r *Reader) Seek(offset uint64) error
- func (r *Reader) SeekToStart() error
- func (r *Reader) Size() int64
- type RetentionPolicy
- type RotationPolicy
- type SegmentInfo
- type SyncPolicy
- type Writer
- func (w *Writer) BaseOffset() uint64
- func (w *Writer) BytesWritten() uint64
- func (w *Writer) Close() error
- func (w *Writer) Flush() error
- func (w *Writer) IsClosed() bool
- func (w *Writer) MessagesWritten() uint64
- func (w *Writer) Sync() error
- func (w *Writer) Write(entry *format.Entry) (uint64, error)
- type WriterOptions
Constants ¶
const ( // SegmentFileExtension is the file extension for segment files SegmentFileExtension = ".log" // IndexFileExtension is the file extension for index files IndexFileExtension = ".idx" // SegmentNameWidth is the number of digits in segment filenames (20 digits for uint64) SegmentNameWidth = 20 )
Variables ¶
This section is empty.
Functions ¶
func FormatIndexName ¶
FormatIndexName creates an index filename from a base offset. Returns a zero-padded 20-digit filename (e.g., "00000000000000001000.idx").
func FormatSegmentName ¶
FormatSegmentName creates a segment filename from a base offset. Returns a zero-padded 20-digit filename (e.g., "00000000000000001000.log").
func ParseIndexName ¶
ParseIndexName extracts the base offset from an index filename. Returns an error if the filename doesn't match the expected format.
func ParseSegmentName ¶
ParseSegmentName extracts the base offset from a segment filename. Returns an error if the filename doesn't match the expected format.
func ValidateSegmentSequence ¶
func ValidateSegmentSequence(segments []*SegmentInfo) error
ValidateSegmentSequence checks that segments form a valid sequence without gaps. Returns an error if there are missing segments or overlapping base offsets.
Types ¶
type ActiveWriterStats ¶
type ActiveWriterStats struct {
BaseOffset uint64
BytesWritten uint64
MessagesWritten uint64
Age time.Duration
}
ActiveWriterStats returns statistics about the active writer.
type CompactionResult ¶
type CompactionResult struct {
// SegmentsRemoved is the number of segments removed
SegmentsRemoved int
// BytesFreed is the total bytes freed
BytesFreed int64
// OldestSegmentAge is the age of the oldest remaining segment
OldestSegmentAge time.Duration
}
CompactionResult contains information about a compaction operation.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages a collection of segments with automatic rotation.
func NewManager ¶
func NewManager(opts *ManagerOptions) (*Manager, error)
NewManager creates a new segment manager. Discovers existing segments and prepares for writing.
func (*Manager) Compact ¶
func (m *Manager) Compact() (*CompactionResult, error)
Compact removes old segments according to the retention policy. Returns information about what was removed.
func (*Manager) GetActiveSegment ¶
func (m *Manager) GetActiveSegment() *SegmentInfo
GetActiveSegment returns the currently active segment for writing.
func (*Manager) GetActiveWriterStats ¶
func (m *Manager) GetActiveWriterStats() *ActiveWriterStats
GetActiveWriterStats returns statistics about the active writer.
func (*Manager) GetSegments ¶
func (m *Manager) GetSegments() []*SegmentInfo
GetSegments returns information about all segments.
func (*Manager) OpenReader ¶
OpenReader opens a reader for a specific segment by base offset.
type ManagerOptions ¶
type ManagerOptions struct {
// Directory where segments are stored
Directory string
// RotationPolicy determines when to rotate segments
RotationPolicy RotationPolicy
// MaxSegmentSize is the max size in bytes before rotation (for RotateBySize, RotateByBoth)
MaxSegmentSize uint64
// MaxSegmentMessages is the max message count before rotation (for RotateByCount, RotateByBoth)
MaxSegmentMessages uint64
// MaxSegmentAge is the max duration before rotation (for RotateByTime)
MaxSegmentAge time.Duration
// WriterOptions for creating new segments
WriterOptions *WriterOptions
// RetentionPolicy for cleaning up old segments
RetentionPolicy *RetentionPolicy
}
ManagerOptions configures segment manager behavior.
func DefaultManagerOptions ¶
func DefaultManagerOptions(dir string) *ManagerOptions
DefaultManagerOptions returns sensible defaults for segment management.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides sequential and indexed reads from a segment file.
func NewReader ¶
NewReader opens a segment file for reading. Loads the segment header and index (if available).
func (*Reader) BaseOffset ¶
BaseOffset returns the base offset of this segment.
func (*Reader) FindByMessageID ¶
FindByMessageID uses the index to find an entry by message ID. Returns the entry, its file offset, and the next offset. Falls back to sequential scan if no index is available.
func (*Reader) FindByTimestamp ¶
FindByTimestamp uses the index to find entries near a timestamp. Returns the first entry at or after the given timestamp.
func (*Reader) Header ¶
func (r *Reader) Header() *format.SegmentHeader
Header returns the segment header.
func (*Reader) Read ¶
Read reads the next entry from the current file position. Returns the entry, its file offset, and the offset of the next entry.
func (*Reader) ReadAt ¶
ReadAt reads an entry at a specific file offset. Returns the entry and the offset of the next entry.
func (*Reader) ScanAll ¶
ScanAll reads all entries from the segment sequentially. Calls the visitor function for each entry with (entry, fileOffset). Stops if visitor returns an error.
func (*Reader) SeekToStart ¶
SeekToStart positions the reader at the first entry (after the header).
type RetentionPolicy ¶
type RetentionPolicy struct {
// MaxAge is the max age of segments to keep (0 = no age limit)
MaxAge time.Duration
// MaxSize is the max total size of all segments (0 = no size limit)
MaxSize uint64
// MaxSegments is the max number of segments to keep (0 = no count limit)
MaxSegments int
// MinSegments is the minimum number of segments to always keep (even if they exceed limits)
MinSegments int
}
RetentionPolicy defines how to retain/remove old segments.
func DefaultRetentionPolicy ¶
func DefaultRetentionPolicy() *RetentionPolicy
DefaultRetentionPolicy returns sensible defaults.
type RotationPolicy ¶
type RotationPolicy int
RotationPolicy defines when to rotate to a new segment.
const ( // RotateBySize rotates when segment reaches size limit RotateBySize RotationPolicy = iota // RotateByCount rotates when segment reaches message count limit RotateByCount // RotateByTime rotates after a time duration RotateByTime // RotateByBoth rotates when size OR count limit is reached RotateByBoth )
type SegmentInfo ¶
type SegmentInfo struct {
// BaseOffset is the first message ID in this segment
BaseOffset uint64
// Path is the absolute path to the segment file
Path string
// IndexPath is the absolute path to the index file (may not exist)
IndexPath string
// Size is the segment file size in bytes
Size int64
}
SegmentInfo holds information about a discovered segment file.
func DiscoverSegments ¶
func DiscoverSegments(dir string) ([]*SegmentInfo, error)
DiscoverSegments finds all segment files in a directory and returns them sorted by base offset. Only returns segments with valid naming format. Missing index files are noted but not an error.
type SyncPolicy ¶
type SyncPolicy int
SyncPolicy defines when to fsync data to disk.
const ( // SyncImmediate fsyncs after every write (safest, slowest) SyncImmediate SyncPolicy = iota // SyncInterval fsyncs at regular intervals (balanced) SyncInterval // SyncManual requires explicit Sync() calls (fastest, riskiest) SyncManual )
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer provides buffered, append-only writes to a segment file.
func NewWriter ¶
func NewWriter(dir string, baseOffset uint64, opts *WriterOptions) (*Writer, error)
NewWriter creates a new segment writer. Creates the segment file and writes the header.
func (*Writer) BaseOffset ¶
BaseOffset returns the base offset of this segment.
func (*Writer) BytesWritten ¶
BytesWritten returns the total number of bytes written (excluding header).
func (*Writer) MessagesWritten ¶
MessagesWritten returns the total number of messages written.
type WriterOptions ¶
type WriterOptions struct {
// SyncPolicy determines when data is fsynced to disk
SyncPolicy SyncPolicy
// SyncInterval is the duration between automatic fsyncs (for SyncInterval policy)
SyncInterval time.Duration
// BufferSize is the size of the write buffer in bytes
BufferSize int
// IndexInterval is the number of bytes between sparse index entries
IndexInterval int
}
WriterOptions configures segment writer behavior.
func DefaultWriterOptions ¶
func DefaultWriterOptions() *WriterOptions
DefaultWriterOptions returns sensible defaults for segment writers.