Documentation
¶
Overview ¶
Package format provides binary encoding/decoding for LedgerQ file formats.
This package implements:
- Entry format: message entries with CRC32C checksums
- Segment format: append-only log segments with headers
- Index format: sparse indexes for offset-to-position mapping
- Metadata format: JSON-encoded queue state with atomic updates
- Checksum utilities: CRC32C (Castagnoli) computation and verification
Index ¶
- Constants
- func CompressPayload(payload []byte, compression CompressionType, level int) ([]byte, error)
- func ComputeCRC32C(data []byte) uint32
- func DecompressPayload(payload []byte, compression CompressionType) ([]byte, error)
- func ShouldCompress(originalSize, compressedSize, minSize int) bool
- func VerifyCRC32C(data []byte, expected uint32) bool
- func WriteIndexFile(path string, idx *Index) error
- func WriteMetadata(path string, meta *QueueMetadata) error
- func WriteSegmentHeader(path string, header *SegmentHeader) error
- type CompressionType
- type Entry
- type Index
- type IndexEntry
- type IndexHeader
- type MetadataFile
- type PriorityIndex
- func (pi *PriorityIndex) Clear()
- func (pi *PriorityIndex) Count() int
- func (pi *PriorityIndex) CountByPriority(priority uint8) int
- func (pi *PriorityIndex) Get(offset uint64) *PriorityIndexEntry
- func (pi *PriorityIndex) Insert(offset uint64, position uint32, priority uint8, timestamp int64)
- func (pi *PriorityIndex) NextInPriority(priority uint8, afterOffset uint64) (uint64, bool)
- func (pi *PriorityIndex) OldestInPriority(priority uint8) *PriorityIndexEntry
- func (pi *PriorityIndex) Remove(offset uint64)
- type PriorityIndexEntry
- type QueueMetadata
- type SegmentHeader
Constants ¶
const ( EntryTypeData uint8 = 1 // Regular data message EntryTypeTombstone uint8 = 2 // Deletion marker EntryTypeCheckpoint uint8 = 3 // Checkpoint marker )
Entry types
const ( EntryFlagNone uint8 = 0 EntryFlagCompressed uint8 = 1 << 0 // Payload is compressed EntryFlagEncrypted uint8 = 1 << 1 // Payload is encrypted EntryFlagTTL uint8 = 1 << 2 // Entry has TTL (expiration time) EntryFlagHeaders uint8 = 1 << 3 // Entry has headers (metadata) EntryFlagPriority uint8 = 1 << 4 // Entry has priority (v1.1.0+) )
Entry flags
const ( PriorityLow uint8 = 0 // Default for backward compatibility PriorityMedium uint8 = 1 PriorityHigh uint8 = 2 )
Priority levels for messages (v1.1.0+)
const ( SegmentMagic uint32 = 0x4C445151 // "LDQQ" - LedgerQ Queue IndexMagic uint32 = 0x4C445149 // "LDQI" - LedgerQ Index MetadataMagic uint32 = 0x4C44514D // "LDQM" - LedgerQ Metadata )
Magic numbers for file type identification
const ( FormatVersion1 uint16 = 1 CurrentVersion uint16 = FormatVersion1 )
Format version
const ( SegmentFlagNone uint16 = 0 SegmentFlagCompressed uint16 = 1 << 0 SegmentFlagEncrypted uint16 = 1 << 1 )
Segment header flags
const DefaultIndexInterval = 4096
DefaultIndexInterval is the default number of bytes between index entries (4KB)
const EntryHeaderSize = 22
EntryHeaderSize is the size of the entry header in bytes (22 bytes). Layout: Type(1) + Flags(1) + MsgID(8) + Timestamp(8) = 22 bytes
const IndexEntrySize = 24
IndexEntrySize is the fixed size of each index entry (24 bytes)
const IndexHeaderSize = 32
IndexHeaderSize is the fixed size of the index file header (32 bytes)
const MaxDecompressedSize = 100 * 1024 * 1024 // 100 MB
MaxDecompressedSize is the maximum size allowed for a decompressed payload to prevent decompression bomb attacks.
const SegmentHeaderSize = 64
SegmentHeaderSize is the fixed size of the segment header (64 bytes - cache-line aligned) Layout: Magic(4) + Version(2) + Flags(2) + BaseOffset(8) + CreatedAt(8) +
MinMsgID(8) + MaxMsgID(8) + Reserved(20) + HeaderCRC(4) = 64 bytes
Variables ¶
This section is empty.
Functions ¶
func CompressPayload ¶
func CompressPayload(payload []byte, compression CompressionType, level int) ([]byte, error)
CompressPayload compresses a payload using the specified algorithm and level. Returns the compressed data or an error if compression fails.
Parameters:
- payload: The data to compress
- compression: The compression algorithm to use
- level: The compression level (algorithm-specific, 0 = default)
For GZIP:
- Level 1 (gzip.BestSpeed): Fastest, ~50% compression
- Level 6 (gzip.DefaultCompression): Balanced, ~60-70% compression
- Level 9 (gzip.BestCompression): Best ratio, ~65-75% compression
func ComputeCRC32C ¶
ComputeCRC32C computes a CRC32C checksum over the given data. Uses the Castagnoli polynomial for better performance on modern hardware.
func DecompressPayload ¶
func DecompressPayload(payload []byte, compression CompressionType) ([]byte, error)
DecompressPayload decompresses a payload using the specified algorithm. Returns the decompressed data or an error if decompression fails.
This function includes protection against decompression bombs by limiting the maximum decompressed size to MaxDecompressedSize.
func ShouldCompress ¶
ShouldCompress determines whether a payload should be compressed based on the minimum size threshold and compression efficiency.
Parameters:
- originalSize: Size of the original payload in bytes
- compressedSize: Size of the compressed payload in bytes
- minSize: Minimum payload size to consider for compression
Returns true if:
- Original size >= minSize
- Compressed size < original size * 0.95 (at least 5% savings)
func VerifyCRC32C ¶
VerifyCRC32C verifies that the computed CRC matches the expected value.
func WriteIndexFile ¶
WriteIndexFile writes an index to a file.
func WriteMetadata ¶
func WriteMetadata(path string, meta *QueueMetadata) error
WriteMetadata atomically writes metadata to a file using double-buffering.
This ensures crash safety: if the process crashes during write, the old metadata file remains intact, or the new one is complete.
func WriteSegmentHeader ¶
func WriteSegmentHeader(path string, header *SegmentHeader) error
WriteSegmentHeader writes a segment header to a file.
Types ¶
type CompressionType ¶
type CompressionType uint8
CompressionType represents the compression algorithm used for a message payload.
const ( // CompressionNone indicates no compression (default) CompressionNone CompressionType = 0 // CompressionGzip indicates GZIP compression (compress/gzip) CompressionGzip CompressionType = 1 )
func (CompressionType) String ¶
func (c CompressionType) String() string
String returns the string representation of the compression type.
type Entry ¶
type Entry struct {
// Length is the total size of the entry including header and CRC (excludes the length field itself)
Length uint32
// Type indicates the entry type (Data, Tombstone, Checkpoint)
Type uint8
// Flags contains entry flags (compressed, encrypted, TTL, headers, priority, etc.)
Flags uint8
// MsgID is the unique message identifier
MsgID uint64
// Timestamp is the Unix time in nanoseconds when the entry was created
Timestamp int64
// Priority is the message priority level (Low/Medium/High) (v1.1.0+)
// Only serialized if EntryFlagPriority is set
// Defaults to PriorityLow for backward compatibility
Priority uint8
// ExpiresAt is the Unix time in nanoseconds when the message expires (0 = no expiration)
// Only serialized if EntryFlagTTL is set
ExpiresAt int64
// Headers contains key-value metadata for the message
// Only serialized if EntryFlagHeaders is set
Headers map[string]string
// Compression is the compression algorithm used for the payload (v1.3.0+)
// Only serialized if EntryFlagCompressed is set
// Defaults to CompressionNone for backward compatibility
Compression CompressionType
// Payload is the message data (may be compressed if Compression != CompressionNone)
Payload []byte
}
Entry represents a single message entry in a segment file.
Binary format (little-endian):
[Length:4][Type:1][Flags:1][MsgID:8][Timestamp:8][Priority:1?][ExpiresAt:8?][HeadersSize:2?][Headers:N?][Compression:1?][Payload:N][CRC32C:4]
Optional fields (included based on flags):
- Priority (1 byte): Present if EntryFlagPriority is set (v1.1.0+)
- ExpiresAt (8 bytes): Present if EntryFlagTTL is set
- HeadersSize (2 bytes) + Headers (N bytes): Present if EntryFlagHeaders is set
- Compression (1 byte): Present if EntryFlagCompressed is set (v1.3.0+)
Headers encoding:
[NumHeaders:2][Key1Len:2][Key1:N][Value1Len:2][Value1:N]...[KeyNLen:2][KeyN:N][ValueNLen:2][ValueN:N]
Total header size: 22 bytes (base) + 1 (Priority) + 8 (TTL) + 2+N (Headers) + 1 (Compression)
func Unmarshal ¶
Unmarshal decodes an entry from the given reader. Returns an error if the entry is corrupted or invalid.
func (*Entry) IsExpired ¶
IsExpired returns true if the message has expired based on the given current time. Returns false if the message has no TTL.
func (*Entry) Marshal ¶
Marshal encodes the entry into binary format with CRC32C checksum. Returns the complete entry bytes ready to be written to disk.
type Index ¶
type Index struct {
Header *IndexHeader
Entries []*IndexEntry
}
Index represents an in-memory sparse index.
func ReadIndexFile ¶
ReadIndexFile reads an index from a file.
func (*Index) Find ¶
func (idx *Index) Find(messageID uint64) *IndexEntry
Find performs a binary search to find the entry for the given message ID. Returns the entry with the largest ID that is <= the given ID. Returns nil if no such entry exists.
type IndexEntry ¶
type IndexEntry struct {
// MessageID is the message identifier
MessageID uint64
// FileOffset is the byte position in the segment file
FileOffset uint64
// Timestamp is the Unix time in nanoseconds
Timestamp int64
}
IndexEntry represents a single entry in the sparse index.
Binary format (little-endian, 24 bytes):
[MessageID:8][FileOffset:8][Timestamp:8]
func UnmarshalIndexEntry ¶
func UnmarshalIndexEntry(r io.Reader) (*IndexEntry, error)
UnmarshalIndexEntry decodes an index entry from the given reader.
func (*IndexEntry) Marshal ¶
func (e *IndexEntry) Marshal() []byte
Marshal encodes an index entry into binary format.
type IndexHeader ¶
type IndexHeader struct {
// Magic is the file format identifier (0x4C445149 for indexes)
Magic uint32
// Version is the format version
Version uint16
// BaseID is the base message ID for this index
BaseID uint64
// EntryCount is the number of index entries
EntryCount uint64
// Reserved is space for future extensions (4 bytes)
Reserved uint32
// HeaderCRC is the checksum of the header (excluding this field)
HeaderCRC uint32
// contains filtered or unexported fields
}
IndexHeader represents the header of an index file.
Binary format (little-endian, 32 bytes):
[Magic:4][Version:2][_:2][BaseID:8][EntryCount:8][Reserved:4][HeaderCRC:4]
func NewIndexHeader ¶
func NewIndexHeader(baseID uint64) *IndexHeader
NewIndexHeader creates a new index header with default values.
func UnmarshalIndexHeader ¶
func UnmarshalIndexHeader(r io.Reader) (*IndexHeader, error)
UnmarshalIndexHeader decodes an index header from the given reader.
func (*IndexHeader) Marshal ¶
func (h *IndexHeader) Marshal() []byte
Marshal encodes the index header into binary format with CRC32C checksum.
func (*IndexHeader) Validate ¶
func (h *IndexHeader) Validate() error
Validate checks if the index header is valid.
type MetadataFile ¶
type MetadataFile struct {
// contains filtered or unexported fields
}
MetadataFile manages atomic reads and writes of queue metadata.
Thread-safe implementation using mutex for concurrent access. Writes use double-buffering: write to .tmp file, fsync, then atomic rename.
func NewMetadataFile ¶
func NewMetadataFile(path string) *MetadataFile
NewMetadataFile creates a new metadata file manager.
func (*MetadataFile) Read ¶
func (mf *MetadataFile) Read() (*QueueMetadata, error)
Read reads and validates the metadata from disk.
Returns an error if the file doesn't exist or contains invalid data.
func (*MetadataFile) Write ¶
func (mf *MetadataFile) Write(meta *QueueMetadata) error
Write atomically writes metadata to disk using double-buffering.
Process:
- Marshal metadata to JSON
- Write to temporary file (.tmp)
- Fsync temporary file
- Atomic rename to final path
- Fsync directory (ensures rename is durable)
type PriorityIndex ¶
type PriorityIndex struct {
// contains filtered or unexported fields
}
PriorityIndex maintains separate indices for each priority level. This allows efficient O(log n) lookups within each priority tier while maintaining FIFO order within the same priority.
func NewPriorityIndex ¶
func NewPriorityIndex() *PriorityIndex
NewPriorityIndex creates a new empty priority index.
func (*PriorityIndex) Clear ¶
func (pi *PriorityIndex) Clear()
Clear removes all entries from the index.
func (*PriorityIndex) Count ¶
func (pi *PriorityIndex) Count() int
Count returns the total number of entries across all priorities.
func (*PriorityIndex) CountByPriority ¶
func (pi *PriorityIndex) CountByPriority(priority uint8) int
CountByPriority returns the number of entries for a specific priority.
func (*PriorityIndex) Get ¶
func (pi *PriorityIndex) Get(offset uint64) *PriorityIndexEntry
Get retrieves an entry by offset. Returns nil if not found.
func (*PriorityIndex) Insert ¶
func (pi *PriorityIndex) Insert(offset uint64, position uint32, priority uint8, timestamp int64)
Insert adds a new entry to the appropriate priority slice.
func (*PriorityIndex) NextInPriority ¶
func (pi *PriorityIndex) NextInPriority(priority uint8, afterOffset uint64) (uint64, bool)
NextInPriority returns the first offset in the given priority level that is greater than afterOffset. Returns (0, false) if not found.
func (*PriorityIndex) OldestInPriority ¶
func (pi *PriorityIndex) OldestInPriority(priority uint8) *PriorityIndexEntry
OldestInPriority returns the oldest (first) entry in the given priority level. Returns nil if the priority level is empty.
func (*PriorityIndex) Remove ¶
func (pi *PriorityIndex) Remove(offset uint64)
Remove removes an entry from the index by offset.
type PriorityIndexEntry ¶
type PriorityIndexEntry struct {
Offset uint64 // Message offset
Position uint32 // Position in segment file
Priority uint8 // Priority level
Timestamp int64 // For starvation prevention
}
PriorityIndexEntry represents a single entry in the priority index.
type QueueMetadata ¶
type QueueMetadata struct {
// Version is the metadata format version
Version uint16 `json:"version"`
// WriteOffset is the next offset to write to (monotonically increasing)
WriteOffset uint64 `json:"write_offset"`
// ReadOffset is the next offset to read from
ReadOffset uint64 `json:"read_offset"`
// AckOffset is the highest contiguously acknowledged offset
AckOffset uint64 `json:"ack_offset"`
// ActiveSegmentID is the ID of the current write segment
ActiveSegmentID uint64 `json:"active_segment_id"`
// OldestSegmentID is the ID of the oldest segment (for cleanup)
OldestSegmentID uint64 `json:"oldest_segment_id"`
// MessageCount is the total number of messages written
MessageCount uint64 `json:"message_count"`
// AckCount is the total number of messages acknowledged
AckCount uint64 `json:"ack_count"`
// LastWriteTime is the Unix timestamp (nanoseconds) of the last write
LastWriteTime int64 `json:"last_write_time"`
// LastAckTime is the Unix timestamp (nanoseconds) of the last acknowledgment
LastAckTime int64 `json:"last_ack_time"`
// LastCompactTime is the Unix timestamp (nanoseconds) of the last compaction
LastCompactTime int64 `json:"last_compact_time,omitempty"`
}
QueueMetadata represents the persistent state of a queue.
This structure is serialized to JSON for debuggability and extensibility. Updates use double-buffering (write to .tmp, then atomic rename) for crash safety.
func NewQueueMetadata ¶
func NewQueueMetadata() *QueueMetadata
NewQueueMetadata creates a new metadata structure with default values.
func ReadFrom ¶
func ReadFrom(r io.Reader) (*QueueMetadata, error)
ReadFrom reads metadata from a reader.
func ReadMetadata ¶
func ReadMetadata(path string) (*QueueMetadata, error)
ReadMetadata reads metadata from a file.
func ReadMetadataOrCreate ¶
func ReadMetadataOrCreate(path string) (*QueueMetadata, error)
ReadMetadataOrCreate reads metadata from a file, or creates a new one if it doesn't exist.
func UnmarshalMetadata ¶
func UnmarshalMetadata(data []byte) (*QueueMetadata, error)
UnmarshalMetadata decodes metadata from JSON.
func (*QueueMetadata) Marshal ¶
func (m *QueueMetadata) Marshal() ([]byte, error)
Marshal encodes the metadata to JSON with indentation for readability.
func (*QueueMetadata) Validate ¶
func (m *QueueMetadata) Validate() error
Validate checks if the metadata is consistent.
type SegmentHeader ¶
type SegmentHeader struct {
// Magic is the file format identifier (0x4C445151 for segments)
Magic uint32
// Version is the format version
Version uint16
// Flags contains segment properties (compressed, encrypted, etc.)
Flags uint16
// BaseOffset is the starting offset of this segment in the queue
BaseOffset uint64
// CreatedAt is the Unix timestamp (nanoseconds) when segment was created
CreatedAt int64
// MinMsgID is the smallest message ID in this segment
MinMsgID uint64
// MaxMsgID is the largest message ID in this segment
MaxMsgID uint64
// Reserved is space for future extensions (20 bytes)
Reserved [20]byte
// HeaderCRC is the checksum of the header (excluding this field)
HeaderCRC uint32
}
SegmentHeader represents the header of a segment file.
Binary format (little-endian, 64 bytes):
[Magic:4][Version:2][Flags:2][BaseOffset:8][CreatedAt:8] [MinMsgID:8][MaxMsgID:8][Reserved:20][HeaderCRC:4]
func NewSegmentHeader ¶
func NewSegmentHeader(baseOffset uint64) *SegmentHeader
NewSegmentHeader creates a new segment header with default values.
func ReadSegmentHeader ¶
func ReadSegmentHeader(path string) (*SegmentHeader, error)
ReadSegmentHeader reads and validates a segment header from a file.
func UnmarshalSegmentHeader ¶
func UnmarshalSegmentHeader(r io.Reader) (*SegmentHeader, error)
UnmarshalSegmentHeader decodes a segment header from the given reader.
func (*SegmentHeader) Marshal ¶
func (h *SegmentHeader) Marshal() []byte
Marshal encodes the segment header into binary format with CRC32C checksum.
func (*SegmentHeader) Validate ¶
func (h *SegmentHeader) Validate() error
Validate checks if the segment header is valid.