valuelog

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Version = 1

	HeaderSize = 4 + 1 + 1 + 2 + 8 + 4
)
View Source
const (
	FrameVersion    = 1
	FrameHeaderSize = 12
	MaxFrameK       = 128
)
View Source
const DefaultKeepSafetyMargin = 0.10
View Source
const (
	FrameFlagCompressed byte = 1 << 0
)

Variables

View Source
var (
	ErrCorrupt         = errors.New("valuelog: corrupt record")
	ErrRecordTooLarge  = errors.New("valuelog: record too large")
	ErrMissingDict     = errors.New("valuelog: missing dict bytes")
	ErrMissingTemplate = errors.New("valuelog: missing template bytes")
)
View Source
var ErrSegmentIDRange = errors.New("valuelog: segment id out of range")
View Source
var MaxDeadMappings = 64

MaxDeadMappings caps the number of old mmaps retained to avoid exhausting vm.max_map_count. Set <= 0 to disable the cap.

Each remap retains the previous mapping until the file is closed to avoid use-after-unmap with concurrent readers.

Functions

func ChooseBlockGroupK added in v0.3.0

func ChooseBlockGroupK(records, rawPayloadBytes, targetCompressedBytes int, observedRatio float64) int

ChooseBlockGroupK estimates grouped frame size K for block compression.

observedRatio is stored/raw payload ratio. Values >= 0.98 are treated as expansion-risk/incompressible and force k=1.

func DecodeFileID

func DecodeFileID(fileID uint32) (lane uint32, seq uint32)

func DecodeSegmentID

func DecodeSegmentID(id uint32) (lane uint32, seq uint32)

func EncodeFileID

func EncodeFileID(lane uint32, seq uint32) (uint32, error)

func EncodeSegmentID

func EncodeSegmentID(lane uint32, seq uint32) (uint32, error)

func GenerateAutotuneValues

func GenerateAutotuneValues(workload AutotuneWorkload, size, count int, seed int64) [][]byte

GenerateAutotuneValues builds deterministic samples for a workload.

func NormalizeBlockTargetCompressedBytes added in v0.3.0

func NormalizeBlockTargetCompressedBytes(v int) int

NormalizeBlockTargetCompressedBytes returns a bounded grouped block target.

func ReadAt

func ReadAt(f *os.File, ptr page.ValuePtr, verifyCRC bool) ([]byte, error)

func ReadAtWithDict

func ReadAtWithDict(f *os.File, ptr page.ValuePtr, verifyCRC bool, dictLookup DictLookup, templateLookup TemplateLookup, templateCache *templateDefCache, templateOpts templ.DecodeOptions) ([]byte, error)

func ShouldKeepCompressed

func ShouldKeepCompressed(rawBytes, encodedBytes int, encodeNs int64, ioNsPerStoredByte, safetyMargin float64) bool

ShouldKeepCompressed decides whether to keep compressed bytes for a frame. It compares estimated IO savings against encode cost with a safety margin.

Types

type AutotuneMode

type AutotuneMode uint8
const (
	// AutotuneUnset indicates the caller did not explicitly configure the
	// autotuner mode. NormalizeAutotuneOptions maps this to a sensible default
	// based on whether the value log is enabled.
	AutotuneUnset AutotuneMode = iota
	AutotuneOff
	AutotuneMedium
	AutotuneAggressive
)

type AutotuneOptions

type AutotuneOptions struct {
	Mode AutotuneMode

	CandidateK            []int
	CandidateHistoryBytes []int
	CandidateDictBytes    []int

	MinGainToSwitch float64
	MinDwellFrames  uint64

	SampleStride     uint64
	MaxSampleBytes   uint64
	TrainCPUFraction float64

	ProbeBytes uint64
	PauseBytes uint64

	DisableBelowValueBytes int
}

func NormalizeAutotuneOptions

func NormalizeAutotuneOptions(opts AutotuneOptions, valueLogEnabled bool) AutotuneOptions

type AutotuneWorkload

type AutotuneWorkload struct {
	Name string
	Make func(rng *rand.Rand, size int) []byte
}

AutotuneWorkload describes a deterministic value generator.

func AutotuneWorkloads

func AutotuneWorkloads() []AutotuneWorkload

AutotuneWorkloads returns the standard deterministic workloads used by the autotune bench suite.

func LookupAutotuneWorkload

func LookupAutotuneWorkload(name string) (AutotuneWorkload, bool)

LookupAutotuneWorkload returns the workload with the given name.

type BlockCodec added in v0.3.0

type BlockCodec uint8

BlockCodec identifies non-dictionary value-log frame compression codecs.

const (
	BlockCodecNone BlockCodec = iota
	BlockCodecSnappy
	BlockCodecLZ4
)

type Clock

type Clock interface {
	Now() time.Time
}

type DictLookup

type DictLookup func(dictID uint64) ([]byte, error)

type EncodeCostModel

type EncodeCostModel interface {
	EncodeNs(rawPayloadBytes int, records int) int64
}

EncodeCostModel provides deterministic encode cost estimates (ns) for tests/benches. When set on a Writer, it overrides wall-time measurement for sampled frames.

type EncodeCostModelFunc

type EncodeCostModelFunc func(rawPayloadBytes int, records int) int64

EncodeCostModelFunc adapts a function to the EncodeCostModel interface.

func (EncodeCostModelFunc) EncodeNs

func (f EncodeCostModelFunc) EncodeNs(rawPayloadBytes int, records int) int64

type File

type File struct {
	ID       uint32
	Path     string
	File     *os.File
	RefCount atomic.Int64
	IsZombie atomic.Bool
	// contains filtered or unexported fields
}

File represents a value-log segment on disk.

func (*File) Close

func (f *File) Close() error

func (*File) Read

func (f *File) Read(ptr page.ValuePtr, verifyCRC bool) ([]byte, error)

func (*File) ReadAppend

func (f *File) ReadAppend(ptr page.ValuePtr, verifyCRC bool, dst []byte) ([]byte, error)

func (*File) ReadUnsafe

func (f *File) ReadUnsafe(ptr page.ValuePtr, verifyCRC bool) ([]byte, error)

func (*File) ReadUnsafeAppend

func (f *File) ReadUnsafeAppend(ptr page.ValuePtr, verifyCRC bool, dst []byte) ([]byte, error)

type FrameHeader

type FrameHeader struct {
	Version byte
	Flags   byte
	K       uint8
	// Reserved stores per-frame metadata. For compressed block frames
	// (Flags&FrameFlagCompressed, DictID==0), this holds the BlockCodec ID.
	Reserved uint8
	DictID   uint64
}

func DecodeFrame

func DecodeFrame(body []byte) (FrameHeader, []uint64, []uint32, []byte, error)

func EncodeFrame

func EncodeFrame(dictID uint64, dict []byte, records []Record) ([]byte, FrameHeader, error)

func EncodeFrameWithOptions added in v0.3.0

func EncodeFrameWithOptions(dictID uint64, dict []byte, records []Record, level zstd.EncoderLevel, enableEntropy bool) ([]byte, FrameHeader, error)

type FramePreparer added in v0.3.0

type FramePreparer struct {
	// contains filtered or unexported fields
}

FramePreparer reuses dict codecs and scratch buffers to prepare grouped frame bodies outside the value-log write lock.

It is not safe for concurrent use.

func NewFramePreparer added in v0.3.0

func NewFramePreparer() *FramePreparer

func (*FramePreparer) PrepareFrame added in v0.3.0

func (p *FramePreparer) PrepareFrame(dictID uint64, dict []byte, records []Record) ([]byte, FrameStats, error)

func (*FramePreparer) PrepareFrameInto added in v0.3.0

func (p *FramePreparer) PrepareFrameInto(dst []byte, dictID uint64, dict []byte, records []Record) ([]byte, FrameStats, error)

PrepareFrameInto behaves like PrepareFrame, but writes the frame body into dst when capacity allows. Callers can reuse large frame buffers across requests.

func (*FramePreparer) ResetCompressionHints added in v0.3.0

func (p *FramePreparer) ResetCompressionHints()

func (*FramePreparer) SetBlockCompression added in v0.3.0

func (p *FramePreparer) SetBlockCompression(codec BlockCodec, enabled bool)

func (*FramePreparer) SetClock added in v0.3.0

func (p *FramePreparer) SetClock(clock Clock)

func (*FramePreparer) SetDictFrameEncoderOptions added in v0.3.0

func (p *FramePreparer) SetDictFrameEncoderOptions(level zstd.EncoderLevel, enableEntropy bool)

func (*FramePreparer) SetEncodeCostModel added in v0.3.0

func (p *FramePreparer) SetEncodeCostModel(model EncodeCostModel)

func (*FramePreparer) SetEncodeSampleStride added in v0.3.0

func (p *FramePreparer) SetEncodeSampleStride(stride uint64)

func (*FramePreparer) SetKeepPolicy added in v0.3.0

func (p *FramePreparer) SetKeepPolicy(ioNsPerStoredByte, encodeNsPerRawByte, safetyMargin float64)

type FrameStats

type FrameStats struct {
	Records            int
	RawPayloadBytes    int
	StoredPayloadBytes int
	// Attempted is true when zstd encoding was executed (even if we fell back to raw).
	Attempted bool
	// Kept is true when compressed bytes were stored.
	Kept bool
	// EncodeNs is the sampled encode time (monotonic ns); 0 when not measured.
	EncodeNs int64
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(dir string) (*Manager, error)

func (*Manager) Acquire

func (m *Manager) Acquire(set *Set)

Acquire increments the Set refcount (O(1)).

func (*Manager) Close

func (m *Manager) Close() error

func (*Manager) CurrentSet

func (m *Manager) CurrentSet() *Set

CurrentSet returns a snapshot of the current value-log files.

func (*Manager) EvictSegment added in v0.3.0

func (m *Manager) EvictSegment(id uint32) error

EvictSegment closes and forgets a segment without deleting it from disk. This is useful when another component owns lifecycle/deletion.

func (*Manager) MarkZombie

func (m *Manager) MarkZombie(id uint32) error

func (*Manager) Read

func (m *Manager) Read(ptr page.ValuePtr) ([]byte, error)

func (*Manager) ReadAppend

func (m *Manager) ReadAppend(ptr page.ValuePtr, dst []byte) ([]byte, error)

func (*Manager) ReadUnsafe

func (m *Manager) ReadUnsafe(ptr page.ValuePtr) ([]byte, error)

func (*Manager) ReadUnsafeAppend

func (m *Manager) ReadUnsafeAppend(ptr page.ValuePtr, dst []byte) ([]byte, error)

func (*Manager) Refresh

func (m *Manager) Refresh() error

Refresh scans the directory and registers any new segments.

func (*Manager) Release

func (m *Manager) Release(set *Set) error

Release decrements the Set refcount and removes zombie files once unpinned.

func (*Manager) RemapStats

func (m *Manager) RemapStats() (remaps uint64, deadMappings uint64)

func (*Manager) RemoveSegment

func (m *Manager) RemoveSegment(id uint32) error

func (*Manager) RemoveSegmentForce

func (m *Manager) RemoveSegmentForce(id uint32) error

RemoveSegmentForce removes a segment without refcount checks. Intended for recovery cleanup before any snapshots are live.

func (*Manager) SegmentPath

func (m *Manager) SegmentPath(id uint32) string

func (*Manager) SetDictLookup

func (m *Manager) SetDictLookup(lookup DictLookup)

func (*Manager) SetDisableReadChecksum

func (m *Manager) SetDisableReadChecksum(disable bool)

func (*Manager) SetTemplateLookup added in v0.3.0

func (m *Manager) SetTemplateLookup(lookup TemplateLookup, opts templ.DecodeOptions)

func (*Manager) TemplateDefCacheStats added in v0.3.0

func (m *Manager) TemplateDefCacheStats() (hits, misses uint64, entries, capacity int)

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader(path string, fileID uint32) (*Reader, error)

func (*Reader) Close

func (r *Reader) Close() error

func (*Reader) DisableChecksum

func (r *Reader) DisableChecksum()

func (*Reader) DisableValueDecode

func (r *Reader) DisableValueDecode()

func (*Reader) ReadNext

func (r *Reader) ReadNext() (uint64, []byte, page.ValuePtr, error)

func (*Reader) SetDictLookup

func (r *Reader) SetDictLookup(lookup DictLookup)

func (*Reader) SetTemplateLookup added in v0.3.0

func (r *Reader) SetTemplateLookup(lookup TemplateLookup, opts templ.DecodeOptions)

func (*Reader) ValidateDicts

func (r *Reader) ValidateDicts()

ValidateDicts enables dictionary existence checks even when value decoding is disabled. This provides a low-cost "fail fast" validation pass during WAL replay and open.

type RealClock

type RealClock struct{}

func (RealClock) Now

func (RealClock) Now() time.Time

type Record

type Record struct {
	RID   uint64
	Value []byte
}

type Set

type Set struct {
	Files    map[uint32]*File
	RefCount atomic.Int64
	// contains filtered or unexported fields
}

Set is an immutable snapshot of value-log files for snapshot isolation.

func (*Set) Read

func (s *Set) Read(ptr page.ValuePtr) ([]byte, error)

func (*Set) ReadAppend

func (s *Set) ReadAppend(ptr page.ValuePtr, dst []byte) ([]byte, error)

func (*Set) ReadUnsafe

func (s *Set) ReadUnsafe(ptr page.ValuePtr) ([]byte, error)

func (*Set) ReadUnsafeAppend

func (s *Set) ReadUnsafeAppend(ptr page.ValuePtr, dst []byte) ([]byte, error)

type TemplateLookup added in v0.3.0

type TemplateLookup func(templateID uint64) ([]byte, error)

type UnsupportedBlockCodecError added in v0.3.0

type UnsupportedBlockCodecError struct {
	CodecID uint8
}

UnsupportedBlockCodecError reports an unknown per-frame block codec ID.

func (UnsupportedBlockCodecError) Error added in v0.3.0

type VirtualClock

type VirtualClock struct {
	// contains filtered or unexported fields
}

VirtualClock provides deterministic time control for tests.

func NewVirtualClock

func NewVirtualClock(start time.Time) *VirtualClock

func (*VirtualClock) Advance

func (c *VirtualClock) Advance(ns int64)

func (*VirtualClock) Now

func (c *VirtualClock) Now() time.Time

type VirtualSink

type VirtualSink struct {
	Clock          interface{ Advance(int64) }
	NsPerByte      int64
	SyncPenaltyNs  int64
	FlushPenaltyNs int64
}

VirtualSink advances a clock based on written bytes to simulate IO wall time. Intended for deterministic tests/benchmarks.

func (*VirtualSink) Flush

func (s *VirtualSink) Flush() error

func (*VirtualSink) Sync

func (s *VirtualSink) Sync() error

func (*VirtualSink) Write

func (s *VirtualSink) Write(p []byte) (int, error)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(path string, fileID uint32) (*Writer, error)

func NewWriterWithSink

func NewWriterWithSink(sink io.Writer, fileID uint32) *Writer

NewWriterWithSink creates a value-log writer that writes to the provided sink. Intended for deterministic tests/benchmarks (no file-backed durability).

func (*Writer) Append

func (w *Writer) Append(dictID uint64, dict []byte, rid uint64, value []byte) (page.ValuePtr, error)

func (*Writer) AppendEncodedFrameInto added in v0.3.0

func (w *Writer) AppendEncodedFrameInto(body []byte, k int, dst []page.ValuePtr) ([]page.ValuePtr, error)

AppendEncodedFrameInto appends a pre-encoded grouped frame body and fills dst with grouped value pointers (dst must be at least k long).

The body must be produced by EncodeFrame/EncodeFrameWithOptions.

func (*Writer) AppendFrame

func (w *Writer) AppendFrame(dictID uint64, dict []byte, records []Record) ([]page.ValuePtr, error)

func (*Writer) AppendFrameWithStats

func (w *Writer) AppendFrameWithStats(dictID uint64, dict []byte, records []Record) ([]page.ValuePtr, FrameStats, error)

func (*Writer) AppendFrameWithStatsInto

func (w *Writer) AppendFrameWithStatsInto(dictID uint64, dict []byte, records []Record, dst []page.ValuePtr) ([]page.ValuePtr, FrameStats, error)

AppendFrameWithStatsInto appends a grouped frame and fills dst with the returned pointers (dst must be at least len(records) long).

This is a performance-oriented helper to avoid allocating a new pointer slice on every frame append.

func (*Writer) AppendRawFramesWritevInto

func (w *Writer) AppendRawFramesWritevInto(records []Record, k int, dst []page.ValuePtr) ([]page.ValuePtr, FrameStats, error)

AppendRawFramesWritevInto appends raw (uncompressed) grouped frames using a writev batching strategy.

This avoids concatenating payloads into a contiguous frame buffer, reducing user-space copying for large value workloads. It always flushes the writev queue before returning, so it does not retain references to the input slices.

dst must be at least len(records) long.

func (*Writer) AppendRawRecord added in v0.3.0

func (w *Writer) AppendRawRecord(raw []byte, length uint32) (page.ValuePtr, error)

AppendRawRecord appends a raw value-log record (CRC + header + payload) without re-encoding. The length argument should include any pointer flags.

func (*Writer) Close

func (w *Writer) Close() error

func (*Writer) FileID

func (w *Writer) FileID() uint32

func (*Writer) Flush

func (w *Writer) Flush() error

func (*Writer) ResetCompressionHints

func (w *Writer) ResetCompressionHints()

ResetCompressionHints clears skip/backoff state for deterministic benches.

func (*Writer) RotateTo

func (w *Writer) RotateTo(path string, fileID uint32) error

func (*Writer) SetBlockCompression added in v0.3.0

func (w *Writer) SetBlockCompression(codec BlockCodec, enabled bool)

SetBlockCompression configures grouped frame block compression for dictID=0 append paths.

func (*Writer) SetClock

func (w *Writer) SetClock(clock Clock)

func (*Writer) SetDictFrameEncoderOptions added in v0.3.0

func (w *Writer) SetDictFrameEncoderOptions(level zstd.EncoderLevel, enableEntropy bool)

func (*Writer) SetEncodeCostModel

func (w *Writer) SetEncodeCostModel(model EncodeCostModel)

func (*Writer) SetEncodeSampleStride

func (w *Writer) SetEncodeSampleStride(stride uint64)

func (*Writer) SetKeepPolicy

func (w *Writer) SetKeepPolicy(ioNsPerStoredByte, encodeNsPerRawByte, safetyMargin float64)

func (*Writer) SetRawWritevStrategy added in v0.3.0

func (w *Writer) SetRawWritevStrategy(minAvgBytes, minBatchRecs int)

SetRawWritevStrategy configures grouped raw writev usage heuristics.

minAvgBytes:

  • 0 enables adaptive mode (no average-bytes floor).
  • >0 requires average payload bytes/record to meet this floor.

minBatchRecs:

  • <=0 uses the default.

func (*Writer) Size

func (w *Writer) Size() int64

func (*Writer) Sync

func (w *Writer) Sync() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL