segment

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package segment provides segment file management for LedgerQ.

Segments are append-only log files containing sequential message entries. Each segment has:

  • A .log file containing the segment header and message entries
  • A .idx file containing the sparse index for fast lookups
  • A base offset (first message ID) that uniquely identifies it

File naming convention:

  • Segment: {baseOffset:020d}.log (e.g., 00000000000000001000.log)
  • Index: {baseOffset:020d}.idx (e.g., 00000000000000001000.idx)

The 20-digit zero-padding ensures lexicographic sorting matches numeric ordering.

Index

Constants

View Source
const (
	// SegmentFileExtension is the file extension for segment files
	SegmentFileExtension = ".log"

	// IndexFileExtension is the file extension for index files
	IndexFileExtension = ".idx"

	// SegmentNameWidth is the number of digits in segment filenames (20 digits for uint64)
	SegmentNameWidth = 20
)

Variables

This section is empty.

Functions

func FormatIndexName

func FormatIndexName(baseOffset uint64) string

FormatIndexName creates an index filename from a base offset. Returns a zero-padded 20-digit filename (e.g., "00000000000000001000.idx").

func FormatSegmentName

func FormatSegmentName(baseOffset uint64) string

FormatSegmentName creates a segment filename from a base offset. Returns a zero-padded 20-digit filename (e.g., "00000000000000001000.log").

func ParseIndexName

func ParseIndexName(filename string) (uint64, error)

ParseIndexName extracts the base offset from an index filename. Returns an error if the filename doesn't match the expected format.

func ParseSegmentName

func ParseSegmentName(filename string) (uint64, error)

ParseSegmentName extracts the base offset from a segment filename. Returns an error if the filename doesn't match the expected format.

func ValidateSegmentSequence

func ValidateSegmentSequence(segments []*SegmentInfo) error

ValidateSegmentSequence checks that segments form a valid sequence without gaps. Returns an error if there are missing segments or overlapping base offsets.

Types

type ActiveWriterStats

type ActiveWriterStats struct {
	BaseOffset      uint64
	BytesWritten    uint64
	MessagesWritten uint64
	Age             time.Duration
}

ActiveWriterStats returns statistics about the active writer.

type CompactionResult

type CompactionResult struct {
	// SegmentsRemoved is the number of segments removed
	SegmentsRemoved int

	// BytesFreed is the total bytes freed
	BytesFreed int64

	// OldestSegmentAge is the age of the oldest remaining segment
	OldestSegmentAge time.Duration
}

CompactionResult contains information about a compaction operation.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages a collection of segments with automatic rotation.

func NewManager

func NewManager(opts *ManagerOptions) (*Manager, error)

NewManager creates a new segment manager. Discovers existing segments and prepares for writing.

func (*Manager) Close

func (m *Manager) Close() error

Close closes the manager and all open segments.

func (*Manager) Compact

func (m *Manager) Compact() (*CompactionResult, error)

Compact removes old segments according to the retention policy. Returns information about what was removed.

func (*Manager) GetActiveSegment

func (m *Manager) GetActiveSegment() *SegmentInfo

GetActiveSegment returns the currently active segment for writing.

func (*Manager) GetActiveWriterStats

func (m *Manager) GetActiveWriterStats() *ActiveWriterStats

GetActiveWriterStats returns statistics about the active writer.

func (*Manager) GetSegments

func (m *Manager) GetSegments() []*SegmentInfo

GetSegments returns information about all segments.

func (*Manager) IsClosed

func (m *Manager) IsClosed() bool

IsClosed returns whether the manager has been closed.

func (*Manager) OpenReader

func (m *Manager) OpenReader(baseOffset uint64) (*Reader, error)

OpenReader opens a reader for a specific segment by base offset.

func (*Manager) Sync

func (m *Manager) Sync() error

Sync syncs the active segment to disk.

func (*Manager) Write

func (m *Manager) Write(entry *format.Entry) (uint64, error)

Write writes an entry to the active segment. Automatically rotates to a new segment if rotation policy is met.

type ManagerOptions

type ManagerOptions struct {
	// Directory where segments are stored
	Directory string

	// RotationPolicy determines when to rotate segments
	RotationPolicy RotationPolicy

	// MaxSegmentSize is the max size in bytes before rotation (for RotateBySize, RotateByBoth)
	MaxSegmentSize uint64

	// MaxSegmentMessages is the max message count before rotation (for RotateByCount, RotateByBoth)
	MaxSegmentMessages uint64

	// MaxSegmentAge is the max duration before rotation (for RotateByTime)
	MaxSegmentAge time.Duration

	// WriterOptions for creating new segments
	WriterOptions *WriterOptions

	// RetentionPolicy for cleaning up old segments
	RetentionPolicy *RetentionPolicy
}

ManagerOptions configures segment manager behavior.

func DefaultManagerOptions

func DefaultManagerOptions(dir string) *ManagerOptions

DefaultManagerOptions returns sensible defaults for segment management.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides sequential and indexed reads from a segment file.

func NewReader

func NewReader(path string) (*Reader, error)

NewReader opens a segment file for reading. Loads the segment header and index (if available).

func (*Reader) BaseOffset

func (r *Reader) BaseOffset() uint64

BaseOffset returns the base offset of this segment.

func (*Reader) Close

func (r *Reader) Close() error

Close closes the segment reader.

func (*Reader) FindByMessageID

func (r *Reader) FindByMessageID(msgID uint64) (*format.Entry, uint64, uint64, error)

FindByMessageID uses the index to find an entry by message ID. Returns the entry, its file offset, and the next offset. Falls back to sequential scan if no index is available.

func (*Reader) FindByTimestamp

func (r *Reader) FindByTimestamp(timestamp int64) (*format.Entry, uint64, uint64, error)

FindByTimestamp uses the index to find entries near a timestamp. Returns the first entry at or after the given timestamp.

func (*Reader) HasIndex

func (r *Reader) HasIndex() bool

HasIndex returns whether this reader has a loaded index.

func (*Reader) Header

func (r *Reader) Header() *format.SegmentHeader

Header returns the segment header.

func (*Reader) Read

func (r *Reader) Read() (*format.Entry, uint64, uint64, error)

Read reads the next entry from the current file position. Returns the entry, its file offset, and the offset of the next entry.

func (*Reader) ReadAt

func (r *Reader) ReadAt(offset uint64) (*format.Entry, uint64, error)

ReadAt reads an entry at a specific file offset. Returns the entry and the offset of the next entry.

func (*Reader) ScanAll

func (r *Reader) ScanAll(visitor func(*format.Entry, uint64) error) error

ScanAll reads all entries from the segment sequentially. Calls the visitor function for each entry with (entry, fileOffset). Stops if visitor returns an error.

func (*Reader) Seek

func (r *Reader) Seek(offset uint64) error

Seek positions the reader at a specific file offset.

func (*Reader) SeekToStart

func (r *Reader) SeekToStart() error

SeekToStart positions the reader at the first entry (after the header).

func (*Reader) Size

func (r *Reader) Size() int64

Size returns the file size in bytes.

type RetentionPolicy

type RetentionPolicy struct {
	// MaxAge is the max age of segments to keep (0 = no age limit)
	MaxAge time.Duration

	// MaxSize is the max total size of all segments (0 = no size limit)
	MaxSize uint64

	// MaxSegments is the max number of segments to keep (0 = no count limit)
	MaxSegments int

	// MinSegments is the minimum number of segments to always keep (even if they exceed limits)
	MinSegments int
}

RetentionPolicy defines how to retain/remove old segments.

func DefaultRetentionPolicy

func DefaultRetentionPolicy() *RetentionPolicy

DefaultRetentionPolicy returns sensible defaults.

type RotationPolicy

type RotationPolicy int

RotationPolicy defines when to rotate to a new segment.

const (
	// RotateBySize rotates when segment reaches size limit
	RotateBySize RotationPolicy = iota

	// RotateByCount rotates when segment reaches message count limit
	RotateByCount

	// RotateByTime rotates after a time duration
	RotateByTime

	// RotateByBoth rotates when size OR count limit is reached
	RotateByBoth
)

type SegmentInfo

type SegmentInfo struct {
	// BaseOffset is the first message ID in this segment
	BaseOffset uint64

	// Path is the absolute path to the segment file
	Path string

	// IndexPath is the absolute path to the index file (may not exist)
	IndexPath string

	// Size is the segment file size in bytes
	Size int64
}

SegmentInfo holds information about a discovered segment file.

func DiscoverSegments

func DiscoverSegments(dir string) ([]*SegmentInfo, error)

DiscoverSegments finds all segment files in a directory and returns them sorted by base offset. Only returns segments with valid naming format. Missing index files are noted but not an error.

type SyncPolicy

type SyncPolicy int

SyncPolicy defines when to fsync data to disk.

const (
	// SyncImmediate fsyncs after every write (safest, slowest)
	SyncImmediate SyncPolicy = iota

	// SyncInterval fsyncs at regular intervals (balanced)
	SyncInterval

	// SyncManual requires explicit Sync() calls (fastest, riskiest)
	SyncManual
)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer provides buffered, append-only writes to a segment file.

func NewWriter

func NewWriter(dir string, baseOffset uint64, opts *WriterOptions) (*Writer, error)

NewWriter creates a new segment writer. Creates the segment file and writes the header.

func (*Writer) BaseOffset

func (w *Writer) BaseOffset() uint64

BaseOffset returns the base offset of this segment.

func (*Writer) BytesWritten

func (w *Writer) BytesWritten() uint64

BytesWritten returns the total number of bytes written (excluding header).

func (*Writer) Close

func (w *Writer) Close() error

Close closes the segment writer, flushing all data and writing the index.

func (*Writer) Flush

func (w *Writer) Flush() error

Flush flushes the write buffer to the OS.

func (*Writer) IsClosed

func (w *Writer) IsClosed() bool

IsClosed returns whether the writer has been closed.

func (*Writer) MessagesWritten

func (w *Writer) MessagesWritten() uint64

MessagesWritten returns the total number of messages written.

func (*Writer) Sync

func (w *Writer) Sync() error

Sync flushes the buffer and fsyncs data to disk.

func (*Writer) Write

func (w *Writer) Write(entry *format.Entry) (uint64, error)

Write appends an entry to the segment. Returns the file offset where the entry was written.

type WriterOptions

type WriterOptions struct {
	// SyncPolicy determines when data is fsynced to disk
	SyncPolicy SyncPolicy

	// SyncInterval is the duration between automatic fsyncs (for SyncInterval policy)
	SyncInterval time.Duration

	// BufferSize is the size of the write buffer in bytes
	BufferSize int

	// IndexInterval is the number of bytes between sparse index entries
	IndexInterval int
}

WriterOptions configures segment writer behavior.

func DefaultWriterOptions

func DefaultWriterOptions() *WriterOptions

DefaultWriterOptions returns sensible defaults for segment writers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL