Documentation
¶
Index ¶
- Constants
- Variables
- func ParseEnvelope(payload []byte, defaultDB string) (database string, msgpackData []byte)
- type ColumnarEntry
- type ColumnarRecoveryCallback
- type Entry
- type Reader
- type Recovery
- func (r *Recovery) CleanupOldWALs(maxAge time.Duration) (int, int64, error)
- func (r *Recovery) ListWALFiles() (active []string, recovered []string, err error)
- func (r *Recovery) Recover(ctx context.Context, callback RecoveryCallback) (*RecoveryStats, error)
- func (r *Recovery) RecoverWithOptions(ctx context.Context, callback RecoveryCallback, opts *RecoveryOptions) (*RecoveryStats, error)
- type RecoveryCallback
- type RecoveryOptions
- type RecoveryStats
- type ReplicationEntry
- type ReplicationHook
- type SyncMode
- type Writer
- func (w *Writer) Append(records []map[string]interface{}) error
- func (w *Writer) AppendRaw(payload []byte) error
- func (w *Writer) AppendRawWithMeta(database string, payload []byte) error
- func (w *Writer) Close() error
- func (w *Writer) CurrentFile() string
- func (w *Writer) CurrentSequence() uint64
- func (w *Writer) PurgeAll() (int, error)
- func (w *Writer) PurgeInactive() (int, error)
- func (w *Writer) PurgeOlderThan(minAge time.Duration) (int, error)
- func (w *Writer) SetReplicationHook(hook ReplicationHook)
- func (w *Writer) Stats() map[string]interface{}
- type WriterConfig
Constants ¶
const ( WALChecksumCRC32 = 0x01 // CRC32 checksum type // Entry format: [Length: 4 bytes] [Timestamp: 8 bytes] [Checksum: 4 bytes] [Payload: N bytes] WALEntryHeaderSize = 16 WALFileHeaderSize = 7 // Magic(4) + Version(2) + ChecksumType(1) // MaxWALPayloadSize is the maximum allowed payload size for a single WAL entry. // This limit prevents integer overflow during buffer allocation (CWE-190) and // aligns with the replication protocol limit (100MB). MaxWALPayloadSize = 100 * 1024 * 1024 // 100MB // WALEnvelopeMarker is the first byte of an enveloped WAL payload. // Enveloped format: [0x01][2-byte db name length][db name][original msgpack] // Since msgpack maps/arrays always start with bytes >= 0x80, 0x01 is unambiguous. WALEnvelopeMarker = 0x01 )
Variables ¶
var ( WALMagic = []byte{'A', 'R', 'C', 'W'} // Magic bytes WALVersion = uint16(0x0001) // Version 1 )
WAL file format constants
var ErrPayloadTooLarge = errors.New("WAL payload exceeds maximum allowed size")
ErrPayloadTooLarge indicates the payload exceeds MaxWALPayloadSize.
var ErrWALDropped = errors.New("WAL entry dropped: async buffer full")
ErrWALDropped is returned by Append/AppendRaw/AppendRawWithMeta when the async entry channel is full and the entry is dropped. Previous behavior returned nil and silently incremented DroppedEntries — callers logging "data preserved in WAL for recovery" downstream were reporting durability they did not actually have. Returning a sentinel lets callers (the ingestion buffer in particular) increment their own error counters and surface accurate operator-facing messages. Use errors.Is to detect.
Functions ¶
func ParseEnvelope ¶
ParseEnvelope extracts the database name and msgpack payload from a WAL entry. If the payload uses the envelope format [0x01][2-byte dbLen][dbName]msgpack, it returns the database name and the inner msgpack bytes. Otherwise, it returns defaultDB and the original payload unchanged.
Types ¶
type ColumnarEntry ¶
type ColumnarEntry struct {
Database string // From envelope metadata (empty = "default")
Measurement string
Columns map[string][]interface{}
}
ColumnarEntry represents a columnar WAL entry written via the zero-copy path
type ColumnarRecoveryCallback ¶
type ColumnarRecoveryCallback func(ctx context.Context, database, measurement string, columns map[string][]interface{}) error
ColumnarRecoveryCallback is called for columnar WAL entries during recovery database may be empty if the WAL entry predates the envelope format (defaults to "default")
type Entry ¶
type Entry struct {
TimestampUS uint64 // Microseconds since epoch
Records []map[string]interface{} // Row format (from Append path)
ColumnarData *ColumnarEntry // Columnar format (from AppendRaw path)
}
Entry represents a single WAL entry
type Reader ¶
type Reader struct {
// Metrics
TotalEntries int64
TotalBytes int64
CorruptedEntries int64
// contains filtered or unexported fields
}
Reader reads WAL files for recovery operations
type Recovery ¶
type Recovery struct {
// contains filtered or unexported fields
}
Recovery manages WAL recovery operations
func NewRecovery ¶
NewRecovery creates a new WAL recovery manager
func (*Recovery) CleanupOldWALs ¶
CleanupOldWALs removes legacy .recovered WAL files older than the specified age. Note: As of the current implementation, WAL files are deleted immediately after successful recovery, so this function is primarily for cleaning up legacy files from previous versions that renamed files to .recovered instead of deleting them.
func (*Recovery) ListWALFiles ¶
ListWALFiles lists all WAL files in the directory. Returns active (pending) WAL files and legacy .recovered files. Note: As of the current implementation, WAL files are deleted immediately after successful recovery, so the recovered list will typically be empty or contain only legacy files from previous versions.
func (*Recovery) Recover ¶
func (r *Recovery) Recover(ctx context.Context, callback RecoveryCallback) (*RecoveryStats, error)
Recover scans the WAL directory and replays all WAL files
func (*Recovery) RecoverWithOptions ¶
func (r *Recovery) RecoverWithOptions(ctx context.Context, callback RecoveryCallback, opts *RecoveryOptions) (*RecoveryStats, error)
RecoverWithOptions scans the WAL directory and replays WAL files with configurable options
type RecoveryCallback ¶
RecoveryCallback is called for each batch of records during recovery (row format)
type RecoveryOptions ¶
type RecoveryOptions struct {
// SkipActiveFile is the path to the currently active WAL file that should be skipped
// during periodic recovery (to avoid reading a file being actively written)
SkipActiveFile string
// BatchSize limits how many records are replayed per callback invocation
// This provides backpressure during mass recovery after prolonged outages
// 0 means no limit (all records in an entry replayed at once)
BatchSize int
// ColumnarCallback handles columnar WAL entries from the zero-copy write path
ColumnarCallback ColumnarRecoveryCallback
}
RecoveryOptions configures WAL recovery behavior
type RecoveryStats ¶
type RecoveryStats struct {
RecoveredFiles int
RecoveredBatches int
RecoveredEntries int
CorruptedEntries int
SkippedFiles int
RecoveryDuration time.Duration
}
RecoveryStats holds statistics about WAL recovery
type ReplicationEntry ¶
type ReplicationEntry struct {
// Sequence is a monotonically increasing number for ordering
Sequence uint64
// TimestampUS is the entry timestamp in microseconds since epoch
TimestampUS uint64
// Payload is the raw msgpack data
Payload []byte
}
ReplicationEntry represents a WAL entry for replication. This is passed to the replication hook for streaming to readers.
type ReplicationHook ¶
type ReplicationHook func(entry *ReplicationEntry)
ReplicationHook is called for each WAL entry before it's written locally. This enables real-time streaming of entries to reader nodes.
type Writer ¶
type Writer struct {
// Metrics (atomic for lock-free reads)
TotalEntries int64
TotalBytes int64
TotalSyncs int64
TotalRotations int64
DroppedEntries int64 // Entries dropped due to full buffer
// contains filtered or unexported fields
}
Writer is a Write-Ahead Log writer with configurable durability
func NewWriter ¶
func NewWriter(cfg *WriterConfig) (*Writer, error)
NewWriter creates a new WAL writer
func (*Writer) AppendRaw ¶
AppendRaw writes raw (already serialized) msgpack bytes to the WAL asynchronously This is a zero-copy optimization - use this when you already have msgpack bytes
func (*Writer) AppendRawWithMeta ¶
AppendRawWithMeta writes raw msgpack bytes with database metadata envelope. Format: [0x01 marker][2-byte db name length][db name][original msgpack] This preserves the database name for correct recovery routing.
Unlike calling AppendRaw with a pre-built envelope, this method builds the WAL entry in a single allocation to avoid copying the payload twice.
func (*Writer) CurrentFile ¶
CurrentFile returns the current WAL file path
func (*Writer) CurrentSequence ¶
CurrentSequence returns the current replication sequence number.
func (*Writer) PurgeAll ¶
PurgeAll deletes all WAL files in the directory. Call this after a clean shutdown where all data has been flushed to storage, so that recovery on next startup doesn't replay already-persisted data.
func (*Writer) PurgeInactive ¶
PurgeInactive deletes all WAL files except the currently active one. Use this during normal operation (unlike PurgeAll which is for shutdown) to clean up rotated WAL files after their data has been flushed to storage.
func (*Writer) PurgeOlderThan ¶
PurgeOlderThan deletes inactive WAL files whose modification time is older than the given threshold. The active WAL file is never deleted. Use this during normal operation to safely purge rotated WAL files whose data has been flushed to parquet by the normal buffer flush cycle.
func (*Writer) SetReplicationHook ¶
func (w *Writer) SetReplicationHook(hook ReplicationHook)
SetReplicationHook sets the hook function called for each WAL entry. This enables cluster replication by streaming entries to reader nodes. The hook is called synchronously before the entry is written locally.
type WriterConfig ¶
type WriterConfig struct {
WALDir string // Directory for WAL files
SyncMode SyncMode // Sync mode: fsync, fdatasync, async
MaxSizeBytes int64 // Rotate WAL when it reaches this size (default: 100MB)
MaxAge time.Duration // Rotate WAL after this duration (default: 1 hour)
SyncInterval time.Duration // Sync at most this often (default: 100ms, 0 = sync every write)
SyncBytes int64 // Sync after this many bytes written (default: 1MB, 0 = no byte threshold)
BufferSize int // Size of async write buffer (default: 10000)
Logger zerolog.Logger
}
WriterConfig holds configuration for WAL writer