format

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package format provides binary encoding/decoding for LedgerQ file formats.

This package implements:

  • Entry format: message entries with CRC32C checksums
  • Segment format: append-only log segments with headers
  • Index format: sparse indexes for offset-to-position mapping
  • Metadata format: JSON-encoded queue state with atomic updates
  • Checksum utilities: CRC32C (Castagnoli) computation and verification

Index

Constants

View Source
const (
	EntryTypeData       uint8 = 1 // Regular data message
	EntryTypeTombstone  uint8 = 2 // Deletion marker
	EntryTypeCheckpoint uint8 = 3 // Checkpoint marker
)

Entry types

View Source
const (
	EntryFlagNone       uint8 = 0
	EntryFlagCompressed uint8 = 1 << 0 // Payload is compressed
	EntryFlagEncrypted  uint8 = 1 << 1 // Payload is encrypted
	EntryFlagTTL        uint8 = 1 << 2 // Entry has TTL (expiration time)
	EntryFlagHeaders    uint8 = 1 << 3 // Entry has headers (metadata)
	EntryFlagPriority   uint8 = 1 << 4 // Entry has priority (v1.1.0+)
)

Entry flags

View Source
const (
	PriorityLow    uint8 = 0 // Default for backward compatibility
	PriorityMedium uint8 = 1
	PriorityHigh   uint8 = 2
)

Priority levels for messages (v1.1.0+)

View Source
const (
	SegmentMagic  uint32 = 0x4C445151 // "LDQQ" - LedgerQ Queue
	IndexMagic    uint32 = 0x4C445149 // "LDQI" - LedgerQ Index
	MetadataMagic uint32 = 0x4C44514D // "LDQM" - LedgerQ Metadata
)

Magic numbers for file type identification

View Source
const (
	FormatVersion1 uint16 = 1
	CurrentVersion uint16 = FormatVersion1
)

Format version

View Source
const (
	SegmentFlagNone       uint16 = 0
	SegmentFlagCompressed uint16 = 1 << 0
	SegmentFlagEncrypted  uint16 = 1 << 1
)

Segment header flags

View Source
const DefaultIndexInterval = 4096

DefaultIndexInterval is the default number of bytes between index entries (4KB)

View Source
const EntryHeaderSize = 22

EntryHeaderSize is the size of the entry header in bytes (22 bytes). Layout: Type(1) + Flags(1) + MsgID(8) + Timestamp(8) = 22 bytes

View Source
const IndexEntrySize = 24

IndexEntrySize is the fixed size of each index entry (24 bytes)

View Source
const IndexHeaderSize = 32

IndexHeaderSize is the fixed size of the index file header (32 bytes)

View Source
const MaxDecompressedSize = 100 * 1024 * 1024 // 100 MB

MaxDecompressedSize is the maximum size allowed for a decompressed payload to prevent decompression bomb attacks.

View Source
const SegmentHeaderSize = 64

SegmentHeaderSize is the fixed size of the segment header (64 bytes - cache-line aligned) Layout: Magic(4) + Version(2) + Flags(2) + BaseOffset(8) + CreatedAt(8) +

MinMsgID(8) + MaxMsgID(8) + Reserved(20) + HeaderCRC(4) = 64 bytes

Variables

This section is empty.

Functions

func CompressPayload

func CompressPayload(payload []byte, compression CompressionType, level int) ([]byte, error)

CompressPayload compresses a payload using the specified algorithm and level. Returns the compressed data or an error if compression fails.

Parameters:

  • payload: The data to compress
  • compression: The compression algorithm to use
  • level: The compression level (algorithm-specific, 0 = default)

For GZIP:

  • Level 1 (gzip.BestSpeed): Fastest, ~50% compression
  • Level 6 (gzip.DefaultCompression): Balanced, ~60-70% compression
  • Level 9 (gzip.BestCompression): Best ratio, ~65-75% compression

func ComputeCRC32C

func ComputeCRC32C(data []byte) uint32

ComputeCRC32C computes a CRC32C checksum over the given data. Uses the Castagnoli polynomial for better performance on modern hardware.

func DecompressPayload

func DecompressPayload(payload []byte, compression CompressionType) ([]byte, error)

DecompressPayload decompresses a payload using the specified algorithm. Returns the decompressed data or an error if decompression fails.

This function includes protection against decompression bombs by limiting the maximum decompressed size to MaxDecompressedSize.

func ShouldCompress

func ShouldCompress(originalSize, compressedSize, minSize int) bool

ShouldCompress determines whether a payload should be compressed based on the minimum size threshold and compression efficiency.

Parameters:

  • originalSize: Size of the original payload in bytes
  • compressedSize: Size of the compressed payload in bytes
  • minSize: Minimum payload size to consider for compression

Returns true if:

  1. Original size >= minSize
  2. Compressed size < original size * 0.95 (at least 5% savings)

func VerifyCRC32C

func VerifyCRC32C(data []byte, expected uint32) bool

VerifyCRC32C verifies that the computed CRC matches the expected value.

func WriteIndexFile

func WriteIndexFile(path string, idx *Index) error

WriteIndexFile writes an index to a file.

func WriteMetadata

func WriteMetadata(path string, meta *QueueMetadata) error

WriteMetadata atomically writes metadata to a file using double-buffering.

This ensures crash safety: if the process crashes during write, the old metadata file remains intact, or the new one is complete.

func WriteSegmentHeader

func WriteSegmentHeader(path string, header *SegmentHeader) error

WriteSegmentHeader writes a segment header to a file.

Types

type CompressionType

type CompressionType uint8

CompressionType represents the compression algorithm used for a message payload.

const (
	// CompressionNone indicates no compression (default)
	CompressionNone CompressionType = 0

	// CompressionGzip indicates GZIP compression (compress/gzip)
	CompressionGzip CompressionType = 1
)

func (CompressionType) String

func (c CompressionType) String() string

String returns the string representation of the compression type.

type Entry

type Entry struct {
	// Length is the total size of the entry including header and CRC (excludes the length field itself)
	Length uint32

	// Type indicates the entry type (Data, Tombstone, Checkpoint)
	Type uint8

	// Flags contains entry flags (compressed, encrypted, TTL, headers, priority, etc.)
	Flags uint8

	// MsgID is the unique message identifier
	MsgID uint64

	// Timestamp is the Unix time in nanoseconds when the entry was created
	Timestamp int64

	// Priority is the message priority level (Low/Medium/High) (v1.1.0+)
	// Only serialized if EntryFlagPriority is set
	// Defaults to PriorityLow for backward compatibility
	Priority uint8

	// ExpiresAt is the Unix time in nanoseconds when the message expires (0 = no expiration)
	// Only serialized if EntryFlagTTL is set
	ExpiresAt int64

	// Headers contains key-value metadata for the message
	// Only serialized if EntryFlagHeaders is set
	Headers map[string]string

	// Compression is the compression algorithm used for the payload (v1.3.0+)
	// Only serialized if EntryFlagCompressed is set
	// Defaults to CompressionNone for backward compatibility
	Compression CompressionType

	// Payload is the message data (may be compressed if Compression != CompressionNone)
	Payload []byte
}

Entry represents a single message entry in a segment file.

Binary format (little-endian):

[Length:4][Type:1][Flags:1][MsgID:8][Timestamp:8][Priority:1?][ExpiresAt:8?][HeadersSize:2?][Headers:N?][Compression:1?][Payload:N][CRC32C:4]

Optional fields (included based on flags):

  • Priority (1 byte): Present if EntryFlagPriority is set (v1.1.0+)
  • ExpiresAt (8 bytes): Present if EntryFlagTTL is set
  • HeadersSize (2 bytes) + Headers (N bytes): Present if EntryFlagHeaders is set
  • Compression (1 byte): Present if EntryFlagCompressed is set (v1.3.0+)

Headers encoding:

[NumHeaders:2][Key1Len:2][Key1:N][Value1Len:2][Value1:N]...[KeyNLen:2][KeyN:N][ValueNLen:2][ValueN:N]

Total header size: 22 bytes (base) + 1 (Priority) + 8 (TTL) + 2+N (Headers) + 1 (Compression)

func Unmarshal

func Unmarshal(r io.Reader) (*Entry, error)

Unmarshal decodes an entry from the given reader. Returns an error if the entry is corrupted or invalid.

func (*Entry) IsExpired

func (e *Entry) IsExpired(now int64) bool

IsExpired returns true if the message has expired based on the given current time. Returns false if the message has no TTL.

func (*Entry) Marshal

func (e *Entry) Marshal() []byte

Marshal encodes the entry into binary format with CRC32C checksum. Returns the complete entry bytes ready to be written to disk.

func (*Entry) TotalSize

func (e *Entry) TotalSize() int

TotalSize returns the total size of the entry on disk (including length field).

func (*Entry) Validate

func (e *Entry) Validate() error

Validate checks if the entry is valid.

type Index

type Index struct {
	Header  *IndexHeader
	Entries []*IndexEntry
}

Index represents an in-memory sparse index.

func NewIndex

func NewIndex(baseID uint64) *Index

NewIndex creates a new index with the given base ID.

func ReadIndex

func ReadIndex(r io.Reader) (*Index, error)

ReadIndex reads an index from the given reader.

func ReadIndexFile

func ReadIndexFile(path string) (*Index, error)

ReadIndexFile reads an index from a file.

func (*Index) Add

func (idx *Index) Add(entry *IndexEntry)

Add adds an entry to the index.

func (*Index) Find

func (idx *Index) Find(messageID uint64) *IndexEntry

Find performs a binary search to find the entry for the given message ID. Returns the entry with the largest ID that is <= the given ID. Returns nil if no such entry exists.

func (*Index) WriteTo

func (idx *Index) WriteTo(w io.Writer) (int64, error)

WriteTo writes the index to the given writer (implements io.WriterTo).

type IndexEntry

type IndexEntry struct {
	// MessageID is the message identifier
	MessageID uint64

	// FileOffset is the byte position in the segment file
	FileOffset uint64

	// Timestamp is the Unix time in nanoseconds
	Timestamp int64
}

IndexEntry represents a single entry in the sparse index.

Binary format (little-endian, 24 bytes):

[MessageID:8][FileOffset:8][Timestamp:8]

func UnmarshalIndexEntry

func UnmarshalIndexEntry(r io.Reader) (*IndexEntry, error)

UnmarshalIndexEntry decodes an index entry from the given reader.

func (*IndexEntry) Marshal

func (e *IndexEntry) Marshal() []byte

Marshal encodes an index entry into binary format.

type IndexHeader

type IndexHeader struct {
	// Magic is the file format identifier (0x4C445149 for indexes)
	Magic uint32

	// Version is the format version
	Version uint16

	// BaseID is the base message ID for this index
	BaseID uint64

	// EntryCount is the number of index entries
	EntryCount uint64

	// Reserved is space for future extensions (4 bytes)
	Reserved uint32

	// HeaderCRC is the checksum of the header (excluding this field)
	HeaderCRC uint32
	// contains filtered or unexported fields
}

IndexHeader represents the header of an index file.

Binary format (little-endian, 32 bytes):

[Magic:4][Version:2][_:2][BaseID:8][EntryCount:8][Reserved:4][HeaderCRC:4]

func NewIndexHeader

func NewIndexHeader(baseID uint64) *IndexHeader

NewIndexHeader creates a new index header with default values.

func UnmarshalIndexHeader

func UnmarshalIndexHeader(r io.Reader) (*IndexHeader, error)

UnmarshalIndexHeader decodes an index header from the given reader.

func (*IndexHeader) Marshal

func (h *IndexHeader) Marshal() []byte

Marshal encodes the index header into binary format with CRC32C checksum.

func (*IndexHeader) Validate

func (h *IndexHeader) Validate() error

Validate checks if the index header is valid.

type MetadataFile

type MetadataFile struct {
	// contains filtered or unexported fields
}

MetadataFile manages atomic reads and writes of queue metadata.

Thread-safe implementation using mutex for concurrent access. Writes use double-buffering: write to .tmp file, fsync, then atomic rename.

func NewMetadataFile

func NewMetadataFile(path string) *MetadataFile

NewMetadataFile creates a new metadata file manager.

func (*MetadataFile) Read

func (mf *MetadataFile) Read() (*QueueMetadata, error)

Read reads and validates the metadata from disk.

Returns an error if the file doesn't exist or contains invalid data.

func (*MetadataFile) Write

func (mf *MetadataFile) Write(meta *QueueMetadata) error

Write atomically writes metadata to disk using double-buffering.

Process:

  1. Marshal metadata to JSON
  2. Write to temporary file (.tmp)
  3. Fsync temporary file
  4. Atomic rename to final path
  5. Fsync directory (ensures rename is durable)

type PriorityIndex

type PriorityIndex struct {
	// contains filtered or unexported fields
}

PriorityIndex maintains separate indices for each priority level. This allows efficient O(log n) lookups within each priority tier while maintaining FIFO order within the same priority.

func NewPriorityIndex

func NewPriorityIndex() *PriorityIndex

NewPriorityIndex creates a new empty priority index.

func (*PriorityIndex) Clear

func (pi *PriorityIndex) Clear()

Clear removes all entries from the index.

func (*PriorityIndex) Count

func (pi *PriorityIndex) Count() int

Count returns the total number of entries across all priorities.

func (*PriorityIndex) CountByPriority

func (pi *PriorityIndex) CountByPriority(priority uint8) int

CountByPriority returns the number of entries for a specific priority.

func (*PriorityIndex) Get

func (pi *PriorityIndex) Get(offset uint64) *PriorityIndexEntry

Get retrieves an entry by offset. Returns nil if not found.

func (*PriorityIndex) Insert

func (pi *PriorityIndex) Insert(offset uint64, position uint32, priority uint8, timestamp int64)

Insert adds a new entry to the appropriate priority slice.

func (*PriorityIndex) NextInPriority

func (pi *PriorityIndex) NextInPriority(priority uint8, afterOffset uint64) (uint64, bool)

NextInPriority returns the first offset in the given priority level that is greater than afterOffset. Returns (0, false) if not found.

func (*PriorityIndex) OldestInPriority

func (pi *PriorityIndex) OldestInPriority(priority uint8) *PriorityIndexEntry

OldestInPriority returns the oldest (first) entry in the given priority level. Returns nil if the priority level is empty.

func (*PriorityIndex) Remove

func (pi *PriorityIndex) Remove(offset uint64)

Remove removes an entry from the index by offset.

type PriorityIndexEntry

type PriorityIndexEntry struct {
	Offset    uint64 // Message offset
	Position  uint32 // Position in segment file
	Priority  uint8  // Priority level
	Timestamp int64  // For starvation prevention
}

PriorityIndexEntry represents a single entry in the priority index.

type QueueMetadata

type QueueMetadata struct {
	// Version is the metadata format version
	Version uint16 `json:"version"`

	// WriteOffset is the next offset to write to (monotonically increasing)
	WriteOffset uint64 `json:"write_offset"`

	// ReadOffset is the next offset to read from
	ReadOffset uint64 `json:"read_offset"`

	// AckOffset is the highest contiguously acknowledged offset
	AckOffset uint64 `json:"ack_offset"`

	// ActiveSegmentID is the ID of the current write segment
	ActiveSegmentID uint64 `json:"active_segment_id"`

	// OldestSegmentID is the ID of the oldest segment (for cleanup)
	OldestSegmentID uint64 `json:"oldest_segment_id"`

	// MessageCount is the total number of messages written
	MessageCount uint64 `json:"message_count"`

	// AckCount is the total number of messages acknowledged
	AckCount uint64 `json:"ack_count"`

	// LastWriteTime is the Unix timestamp (nanoseconds) of the last write
	LastWriteTime int64 `json:"last_write_time"`

	// LastAckTime is the Unix timestamp (nanoseconds) of the last acknowledgment
	LastAckTime int64 `json:"last_ack_time"`

	// LastCompactTime is the Unix timestamp (nanoseconds) of the last compaction
	LastCompactTime int64 `json:"last_compact_time,omitempty"`
}

QueueMetadata represents the persistent state of a queue.

This structure is serialized to JSON for debuggability and extensibility. Updates use double-buffering (write to .tmp, then atomic rename) for crash safety.

func NewQueueMetadata

func NewQueueMetadata() *QueueMetadata

NewQueueMetadata creates a new metadata structure with default values.

func ReadFrom

func ReadFrom(r io.Reader) (*QueueMetadata, error)

ReadFrom reads metadata from a reader.

func ReadMetadata

func ReadMetadata(path string) (*QueueMetadata, error)

ReadMetadata reads metadata from a file.

func ReadMetadataOrCreate

func ReadMetadataOrCreate(path string) (*QueueMetadata, error)

ReadMetadataOrCreate reads metadata from a file, or creates a new one if it doesn't exist.

func UnmarshalMetadata

func UnmarshalMetadata(data []byte) (*QueueMetadata, error)

UnmarshalMetadata decodes metadata from JSON.

func (*QueueMetadata) Marshal

func (m *QueueMetadata) Marshal() ([]byte, error)

Marshal encodes the metadata to JSON with indentation for readability.

func (*QueueMetadata) Validate

func (m *QueueMetadata) Validate() error

Validate checks if the metadata is consistent.

func (*QueueMetadata) WriteTo

func (m *QueueMetadata) WriteTo(w io.Writer) (int64, error)

WriteTo writes metadata in a streaming fashion to the given writer (implements io.WriterTo). This is useful for testing and debugging.

type SegmentHeader

type SegmentHeader struct {
	// Magic is the file format identifier (0x4C445151 for segments)
	Magic uint32

	// Version is the format version
	Version uint16

	// Flags contains segment properties (compressed, encrypted, etc.)
	Flags uint16

	// BaseOffset is the starting offset of this segment in the queue
	BaseOffset uint64

	// CreatedAt is the Unix timestamp (nanoseconds) when segment was created
	CreatedAt int64

	// MinMsgID is the smallest message ID in this segment
	MinMsgID uint64

	// MaxMsgID is the largest message ID in this segment
	MaxMsgID uint64

	// Reserved is space for future extensions (20 bytes)
	Reserved [20]byte

	// HeaderCRC is the checksum of the header (excluding this field)
	HeaderCRC uint32
}

SegmentHeader represents the header of a segment file.

Binary format (little-endian, 64 bytes):

[Magic:4][Version:2][Flags:2][BaseOffset:8][CreatedAt:8]
[MinMsgID:8][MaxMsgID:8][Reserved:20][HeaderCRC:4]

func NewSegmentHeader

func NewSegmentHeader(baseOffset uint64) *SegmentHeader

NewSegmentHeader creates a new segment header with default values.

func ReadSegmentHeader

func ReadSegmentHeader(path string) (*SegmentHeader, error)

ReadSegmentHeader reads and validates a segment header from a file.

func UnmarshalSegmentHeader

func UnmarshalSegmentHeader(r io.Reader) (*SegmentHeader, error)

UnmarshalSegmentHeader decodes a segment header from the given reader.

func (*SegmentHeader) Marshal

func (h *SegmentHeader) Marshal() []byte

Marshal encodes the segment header into binary format with CRC32C checksum.

func (*SegmentHeader) Validate

func (h *SegmentHeader) Validate() error

Validate checks if the segment header is valid.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL