storage

package
v0.69.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WalBlockMagic byte = 101

	WalBlockHeaderLen      = 27
	WalBlockCurrentVersion = uint8(1)
)
View Source
const (
	// WalMagic is the magic number at the start of WAL files
	WalMagic uint32 = 0xFFFFFFFF
	// WalCurrentVersion is the current WAL format version.
	WalCurrentVersion = _WalVersionInitial
	// WalHeaderSize is the size of the WAL header in bytes (4 bytes magic + 1 byte version). 59 bytes are also reserved
	// due to alignment
	WalHeaderSize = 5
)
View Source
const (
	DocBlockHeaderLen = 33
)
View Source
const (
	IndexBlockHeaderSize = 33
)
View Source
const (
	// WalBlockAlignment is the alignment boundary for blocks in the new WAL format. Must be greater than
	// WalBlock header (27 bytes) to prevent header torn writes and allow faster navigation during replay
	// of corrupted WAL file
	WalBlockAlignment int64 = 64
)

Variables

This section is empty.

Functions

func IsWalBlock added in v0.69.0

func IsWalBlock(data []byte) bool

IsWalBlock checks if this data is possibly a meta block. Returns true if the data has at least WalBlockHeaderLen bytes and starts with magic byte. This doesn't check for corruption, use IsCorrect() for checksum validation.

Types

type Codec

type Codec byte
const (
	CodecNo Codec = iota
	CodecLZ4
	CodecZSTD
)

type DocBlock

type DocBlock []byte

func CompressDocBlock

func CompressDocBlock(src []byte, dst DocBlock, zstdLevel int) DocBlock

func NewBlock

func NewBlock() DocBlock

func PackDocBlock

func PackDocBlock(payload []byte, dst DocBlock) DocBlock

func PackWalBlockToDocBlock added in v0.69.0

func PackWalBlockToDocBlock(walBlock WalBlock, dst DocBlock) DocBlock

PackWalBlockToDocBlock converts WalBlock to legacy DocBlock.

func (DocBlock) CalcLen

func (b DocBlock) CalcLen()

func (DocBlock) Codec

func (b DocBlock) Codec() Codec

func (DocBlock) DecompressTo

func (b DocBlock) DecompressTo(dst []byte) ([]byte, error)

DecompressTo always put the result in `dst` regardless of whether unpacking is required or part of the DocBlock can be enough.

So DocBlock does not share the same data with `dst` and can be used safely

func (DocBlock) FullLen

func (b DocBlock) FullLen() uint64

func (DocBlock) GetExt1

func (b DocBlock) GetExt1() uint64

func (DocBlock) GetExt2

func (b DocBlock) GetExt2() uint64

func (DocBlock) Len

func (b DocBlock) Len() uint64

func (DocBlock) Payload

func (b DocBlock) Payload() []byte

func (DocBlock) RawLen

func (b DocBlock) RawLen() uint64

func (DocBlock) SetCodec

func (b DocBlock) SetCodec(codecVal Codec)

func (DocBlock) SetExt1

func (b DocBlock) SetExt1(x uint64)

func (DocBlock) SetExt2

func (b DocBlock) SetExt2(x uint64)

func (DocBlock) SetLen

func (b DocBlock) SetLen(val uint64)

func (DocBlock) SetRawLen

func (b DocBlock) SetRawLen(x uint64)

type DocBlocksReader

type DocBlocksReader struct {
	// contains filtered or unexported fields
}

func NewDocBlocksReader

func NewDocBlocksReader(limiter *ReadLimiter, reader io.ReaderAt) DocBlocksReader

func (*DocBlocksReader) ReadDocBlock

func (r *DocBlocksReader) ReadDocBlock(offset int64) ([]byte, uint64, error)

func (*DocBlocksReader) ReadDocBlockPayload

func (r *DocBlocksReader) ReadDocBlockPayload(offset int64) ([]byte, uint64, error)

type DocsReader

type DocsReader struct {
	// contains filtered or unexported fields
}

func NewDocsReader

func NewDocsReader(limiter *ReadLimiter, reader io.ReaderAt, docsCache *cache.Cache[[]byte]) DocsReader

func (*DocsReader) ReadDocs

func (r *DocsReader) ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error)

func (*DocsReader) ReadDocsFunc

func (r *DocsReader) ReadDocsFunc(blockOffset uint64, docOffsets []uint64, cb func([]byte) error) error

type FileWriter added in v0.69.0

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter optimizes sequential writing and fsync calls for concurrent writers.

The write offset is calculated strictly sequentially using atomic. After that, a request for fsync is sent. The request waits if fsync is being performed from previous requests. During this wait, other fsync requests may arrive that are also waiting for the previous one to complete. After that, a new fsync is performed, after which all requests receive a response about the successful (or unsuccessful) fsync.

This results in one fsync system call for several writers performing a write at approximately the same time.

FileWriter does not interpret block format; it only writes bytes and triggers fsync.

func NewFileWriter added in v0.69.0

func NewFileWriter(ws fileWriterSyncer, offset int64, skipSync bool) *FileWriter

NewFileWriter creates a new FileWriter. offset is the initial write position for sequential Write.

func (*FileWriter) ReserveSpace added in v0.69.0

func (fs *FileWriter) ReserveSpace(size int64) int64

func (*FileWriter) Stop added in v0.69.0

func (fs *FileWriter) Stop()

func (*FileWriter) Write added in v0.69.0

func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error)

func (*FileWriter) WriteAt added in v0.69.0

func (fs *FileWriter) WriteAt(offset int64, data []byte, sw *stopwatch.Stopwatch) (int64, error)

type ImmutableFile

type ImmutableFile interface {
	Name() string
	Stat() (os.FileInfo, error)

	io.Reader
	io.ReaderAt

	io.Seeker
	io.Closer
}

type IndexBlockHeader

type IndexBlockHeader []byte

func NewEmptyIndexBlockHeader

func NewEmptyIndexBlockHeader() IndexBlockHeader

func NewIndexBlockHeader

func NewIndexBlockHeader(pos int64, ext1, ext2 uint64, size, rawSize uint32, codec Codec) IndexBlockHeader

func (IndexBlockHeader) Codec

func (b IndexBlockHeader) Codec() Codec

func (IndexBlockHeader) GetExt1

func (b IndexBlockHeader) GetExt1() uint64

func (IndexBlockHeader) GetExt2

func (b IndexBlockHeader) GetExt2() uint64

func (IndexBlockHeader) GetPos

func (b IndexBlockHeader) GetPos() uint64

func (IndexBlockHeader) Len

func (b IndexBlockHeader) Len() uint32

func (IndexBlockHeader) RawLen

func (b IndexBlockHeader) RawLen() uint32

func (IndexBlockHeader) SetCodec

func (b IndexBlockHeader) SetCodec(codecVal Codec)

func (IndexBlockHeader) SetExt1

func (b IndexBlockHeader) SetExt1(x uint64)

func (IndexBlockHeader) SetExt2

func (b IndexBlockHeader) SetExt2(x uint64)

func (IndexBlockHeader) SetLen

func (b IndexBlockHeader) SetLen(val uint32)

func (IndexBlockHeader) SetPos

func (b IndexBlockHeader) SetPos(x uint64)

func (IndexBlockHeader) SetRawLen

func (b IndexBlockHeader) SetRawLen(x uint32)

type IndexReader

type IndexReader struct {
	// contains filtered or unexported fields
}

func NewIndexReader

func NewIndexReader(
	limiter *ReadLimiter, readerName string,
	reader io.ReaderAt, registryCache *cache.Cache[[]byte],
) IndexReader

func (*IndexReader) GetBlockHeader

func (r *IndexReader) GetBlockHeader(index uint32) (IndexBlockHeader, error)

func (*IndexReader) ReadIndexBlock

func (r *IndexReader) ReadIndexBlock(blockIndex uint32, dst []byte) ([]byte, uint64, error)

type ReadLimiter

type ReadLimiter struct {
	// contains filtered or unexported fields
}

func NewReadLimiter

func NewReadLimiter(maxReadsNum int, counter prometheus.Counter) *ReadLimiter

func (*ReadLimiter) ReadAt

func (rl *ReadLimiter) ReadAt(r io.ReaderAt, buf []byte, offset int64) (int, error)

type Uploader

type Uploader interface {
	Upload(context.Context, ImmutableFile) error
}

type WalBlock added in v0.69.0

type WalBlock []byte

func CompressWalBlock added in v0.69.0

func CompressWalBlock(src []byte, dst WalBlock, zstdLevel int) WalBlock

func PackDocBlockToWalBlock added in v0.69.0

func PackDocBlockToWalBlock(docBlock DocBlock) WalBlock

PackDocBlockToWalBlock converts DocBlock to WalBlock in place without copying payload. docBlock will be invalid after packing

func PackWalBlock added in v0.69.0

func PackWalBlock(payload []byte, dst WalBlock) WalBlock

func (WalBlock) CalcHeaderChecksum added in v0.69.0

func (b WalBlock) CalcHeaderChecksum()

func (WalBlock) CalcLen added in v0.69.0

func (b WalBlock) CalcLen()

func (WalBlock) CalcPayloadChecksum added in v0.69.0

func (b WalBlock) CalcPayloadChecksum()

func (WalBlock) Codec added in v0.69.0

func (b WalBlock) Codec() Codec

func (WalBlock) DecompressTo added in v0.69.0

func (b WalBlock) DecompressTo(dst []byte) ([]byte, error)

DecompressTo always put the result in `dst` regardless of whether unpacking is required or part of the WalBlock can be enough.

So WalBlock does not share the same data with `dst` and can be used safely

func (WalBlock) DocsOffset added in v0.69.0

func (b WalBlock) DocsOffset() uint64

func (WalBlock) FullLen added in v0.69.0

func (b WalBlock) FullLen() uint32

func (WalBlock) HeaderChecksum added in v0.69.0

func (b WalBlock) HeaderChecksum() uint32

func (WalBlock) IsCorrect added in v0.69.0

func (b WalBlock) IsCorrect() bool

IsCorrect checks if this is a correct meta block by checking header and payload checksums

func (WalBlock) IsHeaderCorrect added in v0.69.0

func (b WalBlock) IsHeaderCorrect() bool

IsHeaderCorrect checks if header checksum is correct

func (WalBlock) IsPayloadCorrect added in v0.69.0

func (b WalBlock) IsPayloadCorrect() bool

IsPayloadCorrect checks if payload checksum is valid

func (WalBlock) Len added in v0.69.0

func (b WalBlock) Len() uint32

func (WalBlock) Magic added in v0.69.0

func (b WalBlock) Magic() byte

func (WalBlock) Payload added in v0.69.0

func (b WalBlock) Payload() []byte

func (WalBlock) PayloadChecksum added in v0.69.0

func (b WalBlock) PayloadChecksum() uint32

func (WalBlock) RawLen added in v0.69.0

func (b WalBlock) RawLen() uint32

func (WalBlock) SetCodec added in v0.69.0

func (b WalBlock) SetCodec(codecVal Codec)

func (WalBlock) SetDocsOffset added in v0.69.0

func (b WalBlock) SetDocsOffset(x uint64)

SetDocsOffset updates docs offset. It will also recalc header checksum (cheap).

func (WalBlock) SetHeaderChecksum added in v0.69.0

func (b WalBlock) SetHeaderChecksum(x uint32)

func (WalBlock) SetLen added in v0.69.0

func (b WalBlock) SetLen(val uint32)

func (WalBlock) SetPayloadChecksum added in v0.69.0

func (b WalBlock) SetPayloadChecksum(x uint32)

func (WalBlock) SetRawLen added in v0.69.0

func (b WalBlock) SetRawLen(x uint32)

func (WalBlock) SetVersion added in v0.69.0

func (b WalBlock) SetVersion(version uint8)

func (WalBlock) Version added in v0.69.0

func (b WalBlock) Version() uint8

type WalReader added in v0.69.0

type WalReader struct {
	// contains filtered or unexported fields
}

func NewWalReader added in v0.69.0

func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) (*WalReader, error)

func (*WalReader) Entries added in v0.69.0

func (r *WalReader) Entries() iter.Seq[WalRecord]

Entries iterates through WAL file. Corrupted entries are skipped and never propagated to a client. Corruption ranges are logged with "from" and "to" offsets.

type WalRecord added in v0.69.0

type WalRecord struct {
	Data   WalBlock
	Offset int64
	Size   int64
	Err    error
}

type WalWriter added in v0.69.0

type WalWriter struct {
	// contains filtered or unexported fields
}

WalWriter writes WalBlock to a WAL file with header and 64-byte alignment. It works on top of FileWriter, but it also maintains header at the beginning of the file and align block offsets to WalBlockAlignment. Format: [Header 5B] [... -> align to 64] WalBlock [... -> align to 64] WalBlock ...

func NewWalWriter added in v0.69.0

func NewWalWriter(ws fileWriterSyncer, offset int64, skipSync bool) *WalWriter

func (*WalWriter) Stop added in v0.69.0

func (w *WalWriter) Stop()

func (*WalWriter) Write added in v0.69.0

func (w *WalWriter) Write(data WalBlock, sw *stopwatch.Stopwatch) (int64, error)

Write writes a WalBlock to the WAL file. The data must already be a WalBlock. Returns the offset where the WalBlock starts.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL