wal

package
v0.0.0-...-706b979 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WALChecksumCRC32 = 0x01 // CRC32 checksum type

	// Entry format: [Length: 4 bytes] [Timestamp: 8 bytes] [Checksum: 4 bytes] [Payload: N bytes]
	WALEntryHeaderSize = 16
	WALFileHeaderSize  = 7 // Magic(4) + Version(2) + ChecksumType(1)

	// MaxWALPayloadSize is the maximum allowed payload size for a single WAL entry.
	// This limit prevents integer overflow during buffer allocation (CWE-190) and
	// aligns with the replication protocol limit (100MB).
	MaxWALPayloadSize = 100 * 1024 * 1024 // 100MB

	// WALEnvelopeMarker is the first byte of an enveloped WAL payload.
	// Enveloped format: [0x01][2-byte db name length][db name][original msgpack]
	// Since msgpack maps/arrays always start with bytes >= 0x80, 0x01 is unambiguous.
	WALEnvelopeMarker = 0x01
)

Variables

View Source
var (
	WALMagic   = []byte{'A', 'R', 'C', 'W'} // Magic bytes
	WALVersion = uint16(0x0001)             // Version 1
)

WAL file format constants

View Source
var ErrPayloadTooLarge = errors.New("WAL payload exceeds maximum allowed size")

ErrPayloadTooLarge indicates the payload exceeds MaxWALPayloadSize.

View Source
var ErrWALDropped = errors.New("WAL entry dropped: async buffer full")

ErrWALDropped is returned by Append/AppendRaw/AppendRawWithMeta when the async entry channel is full and the entry is dropped. Previous behavior returned nil and silently incremented DroppedEntries — callers logging "data preserved in WAL for recovery" downstream were reporting durability they did not actually have. Returning a sentinel lets callers (the ingestion buffer in particular) increment their own error counters and surface accurate operator-facing messages. Use errors.Is to detect.

Functions

func ParseEnvelope

func ParseEnvelope(payload []byte, defaultDB string) (database string, msgpackData []byte)

ParseEnvelope extracts the database name and msgpack payload from a WAL entry. If the payload uses the envelope format [0x01][2-byte dbLen][dbName]msgpack, it returns the database name and the inner msgpack bytes. Otherwise, it returns defaultDB and the original payload unchanged.

Types

type ColumnarEntry

type ColumnarEntry struct {
	Database    string // From envelope metadata (empty = "default")
	Measurement string
	Columns     map[string][]interface{}
}

ColumnarEntry represents a columnar WAL entry written via the zero-copy path

type ColumnarRecoveryCallback

type ColumnarRecoveryCallback func(ctx context.Context, database, measurement string, columns map[string][]interface{}) error

ColumnarRecoveryCallback is called for columnar WAL entries during recovery database may be empty if the WAL entry predates the envelope format (defaults to "default")

type Entry

type Entry struct {
	TimestampUS  uint64                   // Microseconds since epoch
	Records      []map[string]interface{} // Row format (from Append path)
	ColumnarData *ColumnarEntry           // Columnar format (from AppendRaw path)
}

Entry represents a single WAL entry

type Reader

type Reader struct {

	// Metrics
	TotalEntries     int64
	TotalBytes       int64
	CorruptedEntries int64
	// contains filtered or unexported fields
}

Reader reads WAL files for recovery operations

func NewReader

func NewReader(filePath string, logger zerolog.Logger) *Reader

NewReader creates a new WAL reader

func (*Reader) ReadAll

func (r *Reader) ReadAll() ([]Entry, error)

ReadAll reads all entries from the WAL file

type Recovery

type Recovery struct {
	// contains filtered or unexported fields
}

Recovery manages WAL recovery operations

func NewRecovery

func NewRecovery(walDir string, logger zerolog.Logger) *Recovery

NewRecovery creates a new WAL recovery manager

func (*Recovery) CleanupOldWALs

func (r *Recovery) CleanupOldWALs(maxAge time.Duration) (int, int64, error)

CleanupOldWALs removes legacy .recovered WAL files older than the specified age. Note: As of the current implementation, WAL files are deleted immediately after successful recovery, so this function is primarily for cleaning up legacy files from previous versions that renamed files to .recovered instead of deleting them.

func (*Recovery) ListWALFiles

func (r *Recovery) ListWALFiles() (active []string, recovered []string, err error)

ListWALFiles lists all WAL files in the directory. Returns active (pending) WAL files and legacy .recovered files. Note: As of the current implementation, WAL files are deleted immediately after successful recovery, so the recovered list will typically be empty or contain only legacy files from previous versions.

func (*Recovery) Recover

func (r *Recovery) Recover(ctx context.Context, callback RecoveryCallback) (*RecoveryStats, error)

Recover scans the WAL directory and replays all WAL files

func (*Recovery) RecoverWithOptions

func (r *Recovery) RecoverWithOptions(ctx context.Context, callback RecoveryCallback, opts *RecoveryOptions) (*RecoveryStats, error)

RecoverWithOptions scans the WAL directory and replays WAL files with configurable options

type RecoveryCallback

type RecoveryCallback func(ctx context.Context, records []map[string]interface{}) error

RecoveryCallback is called for each batch of records during recovery (row format)

type RecoveryOptions

type RecoveryOptions struct {
	// SkipActiveFile is the path to the currently active WAL file that should be skipped
	// during periodic recovery (to avoid reading a file being actively written)
	SkipActiveFile string

	// BatchSize limits how many records are replayed per callback invocation
	// This provides backpressure during mass recovery after prolonged outages
	// 0 means no limit (all records in an entry replayed at once)
	BatchSize int

	// ColumnarCallback handles columnar WAL entries from the zero-copy write path
	ColumnarCallback ColumnarRecoveryCallback
}

RecoveryOptions configures WAL recovery behavior

type RecoveryStats

type RecoveryStats struct {
	RecoveredFiles   int
	RecoveredBatches int
	RecoveredEntries int
	CorruptedEntries int
	SkippedFiles     int
	RecoveryDuration time.Duration
}

RecoveryStats holds statistics about WAL recovery

type ReplicationEntry

type ReplicationEntry struct {
	// Sequence is a monotonically increasing number for ordering
	Sequence uint64

	// TimestampUS is the entry timestamp in microseconds since epoch
	TimestampUS uint64

	// Payload is the raw msgpack data
	Payload []byte
}

ReplicationEntry represents a WAL entry for replication. This is passed to the replication hook for streaming to readers.

type ReplicationHook

type ReplicationHook func(entry *ReplicationEntry)

ReplicationHook is called for each WAL entry before it's written locally. This enables real-time streaming of entries to reader nodes.

type SyncMode

type SyncMode string

SyncMode defines how WAL syncs to disk

const (
	SyncModeFsync     SyncMode = "fsync"     // Full sync: data + metadata (safest)
	SyncModeFdatasync SyncMode = "fdatasync" // Data sync only (balanced, default)
	SyncModeAsync     SyncMode = "async"     // No explicit sync (fastest, least safe)
)

type Writer

type Writer struct {

	// Metrics (atomic for lock-free reads)
	TotalEntries   int64
	TotalBytes     int64
	TotalSyncs     int64
	TotalRotations int64
	DroppedEntries int64 // Entries dropped due to full buffer
	// contains filtered or unexported fields
}

Writer is a Write-Ahead Log writer with configurable durability

func NewWriter

func NewWriter(cfg *WriterConfig) (*Writer, error)

NewWriter creates a new WAL writer

func (*Writer) Append

func (w *Writer) Append(records []map[string]interface{}) error

Append writes records to the WAL asynchronously (non-blocking)

func (*Writer) AppendRaw

func (w *Writer) AppendRaw(payload []byte) error

AppendRaw writes raw (already serialized) msgpack bytes to the WAL asynchronously This is a zero-copy optimization - use this when you already have msgpack bytes

func (*Writer) AppendRawWithMeta

func (w *Writer) AppendRawWithMeta(database string, payload []byte) error

AppendRawWithMeta writes raw msgpack bytes with database metadata envelope. Format: [0x01 marker][2-byte db name length][db name][original msgpack] This preserves the database name for correct recovery routing.

Unlike calling AppendRaw with a pre-built envelope, this method builds the WAL entry in a single allocation to avoid copying the payload twice.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the WAL writer

func (*Writer) CurrentFile

func (w *Writer) CurrentFile() string

CurrentFile returns the current WAL file path

func (*Writer) CurrentSequence

func (w *Writer) CurrentSequence() uint64

CurrentSequence returns the current replication sequence number.

func (*Writer) PurgeAll

func (w *Writer) PurgeAll() (int, error)

PurgeAll deletes all WAL files in the directory. Call this after a clean shutdown where all data has been flushed to storage, so that recovery on next startup doesn't replay already-persisted data.

func (*Writer) PurgeInactive

func (w *Writer) PurgeInactive() (int, error)

PurgeInactive deletes all WAL files except the currently active one. Use this during normal operation (unlike PurgeAll which is for shutdown) to clean up rotated WAL files after their data has been flushed to storage.

func (*Writer) PurgeOlderThan

func (w *Writer) PurgeOlderThan(minAge time.Duration) (int, error)

PurgeOlderThan deletes inactive WAL files whose modification time is older than the given threshold. The active WAL file is never deleted. Use this during normal operation to safely purge rotated WAL files whose data has been flushed to parquet by the normal buffer flush cycle.

func (*Writer) SetReplicationHook

func (w *Writer) SetReplicationHook(hook ReplicationHook)

SetReplicationHook sets the hook function called for each WAL entry. This enables cluster replication by streaming entries to reader nodes. The hook is called synchronously before the entry is written locally.

func (*Writer) Stats

func (w *Writer) Stats() map[string]interface{}

Stats returns WAL statistics

type WriterConfig

type WriterConfig struct {
	WALDir       string        // Directory for WAL files
	SyncMode     SyncMode      // Sync mode: fsync, fdatasync, async
	MaxSizeBytes int64         // Rotate WAL when it reaches this size (default: 100MB)
	MaxAge       time.Duration // Rotate WAL after this duration (default: 1 hour)
	SyncInterval time.Duration // Sync at most this often (default: 100ms, 0 = sync every write)
	SyncBytes    int64         // Sync after this many bytes written (default: 1MB, 0 = no byte threshold)
	BufferSize   int           // Size of async write buffer (default: 10000)
	Logger       zerolog.Logger
}

WriterConfig holds configuration for WAL writer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL