record

package
v2.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: BSD-3-Clause Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const NoSyncIndex = -1

NoSyncIndex is the value of PendingSyncIndex when a sync is not requested.

View Source
const (

	// SyncConcurrency is the maximum number of concurrent sync operations that
	// can be performed. Note that a sync operation is initiated either by a call
	// to SyncRecord or by a call to Close. Exported as this value also limits
	// the commit concurrency in commitPipeline.
	SyncConcurrency = 1 << syncConcurrencyBits
)

Variables

View Source
var (
	// ErrNotAnIOSeeker is returned if the io.Reader underlying a Reader does not implement io.Seeker.
	ErrNotAnIOSeeker = errors.New("pebble/record: reader does not implement io.Seeker")

	// ErrNoLastRecord is returned if LastRecordOffset is called and there is no previous record.
	ErrNoLastRecord = errors.New("pebble/record: no last record exists")

	// ErrZeroedChunk is returned if a chunk is encountered that is zeroed. This
	// usually occurs due to log file preallocation.
	ErrZeroedChunk = errors.New("pebble/record: zeroed chunk")

	// ErrInvalidChunk is returned if a chunk is encountered with an invalid
	// header, length, or checksum. This usually occurs when a log is recycled,
	// but can also occur due to corruption.
	ErrInvalidChunk = errors.New("pebble/record: invalid chunk")

	// ErrUnexpectedEOF is returned if a log file ends unexpectedly. It
	// indicates the unexpected end of the log file or an in-progress record
	// envelope itself. ErrUnexpectedEOF may be returned by Reader when it
	// encounters an invalid chunk but observes no evidence that the invalid
	// chunk is caused by corruption (i.e., no future chunk indicates that
	// offset should be valid and durably synced.)
	//
	// This error is defined separately from io.ErrUnexpectedEOF to disambiguate
	// this case from from the case of an unexpected end of the record's payload
	// while decoding at a higher-level (eg, version edit decoding). If a
	// higher-level decoding routine returns record.ErrUnexpectedEOF, it
	// unambiguously indicates that the log file itself ended unexpectedly. The
	// record.Reader will never return io.ErrUnexpectedEOF, just record.ErrUnexpectedEOF.
	ErrUnexpectedEOF = errors.New("pebble/record: unexpected EOF")
)

Functions

func IsInvalidRecord

func IsInvalidRecord(err error) bool

IsInvalidRecord returns true if the error matches one of the error types returned for invalid records. These are treated in a way similar to io.EOF in recovery code.

Types

type ExternalSyncQueueCallback

type ExternalSyncQueueCallback func(doneSync PendingSyncIndex, err error)

ExternalSyncQueueCallback is to be run when a PendingSync has been processed, either successfully or with an error.

type LogWriter

type LogWriter struct {
	// contains filtered or unexported fields
}

LogWriter writes records to an underlying io.Writer. In order to support WAL file reuse, a LogWriter's records are tagged with the WAL's file number. When reading a log file a record from a previous incarnation of the file will return the error ErrInvalidLogNum.

func NewLogWriter

func NewLogWriter(
	w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
) *LogWriter

NewLogWriter returns a new LogWriter.

The io.Writer may also be used as an io.Closer and syncer. No other methods will be called on the writer.

func (*LogWriter) Close

func (w *LogWriter) Close() error

Close flushes and syncs any unwritten data and closes the writer. Where required, external synchronisation is provided by commitPipeline.mu.

func (*LogWriter) CloseWithLastQueuedRecord

func (w *LogWriter) CloseWithLastQueuedRecord(lastQueuedRecord PendingSyncIndex) error

CloseWithLastQueuedRecord is like Close, but optionally accepts a lastQueuedRecord, that the caller will be notified about when synced.

func (*LogWriter) Metrics

func (w *LogWriter) Metrics() LogWriterMetrics

Metrics must typically be called after Close, since the callee will no longer modify the returned LogWriterMetrics. It is also current if there is nothing left to flush in the flush loop, but that is an implementation detail that callers should not rely on.

func (*LogWriter) Size

func (w *LogWriter) Size() int64

Size returns the current size of the file. External synchronisation provided by commitPipeline.mu.

func (*LogWriter) SyncRecord

func (w *LogWriter) SyncRecord(
	p []byte, wg *sync.WaitGroup, err *error,
) (logSize int64, err2 error)

SyncRecord writes a complete record. If wg != nil the record will be asynchronously persisted to the underlying writer and done will be called on the wait group upon completion. Returns the offset just past the end of the record. External synchronisation provided by commitPipeline.mu.

func (*LogWriter) SyncRecordGeneralized

func (w *LogWriter) SyncRecordGeneralized(p []byte, ps PendingSync) (logSize int64, err2 error)

SyncRecordGeneralized is a version of SyncRecord that accepts a PendingSync.

func (*LogWriter) WriteRecord

func (w *LogWriter) WriteRecord(p []byte) (int64, error)

WriteRecord writes a complete record. Returns the offset just past the end of the record. External synchronisation provided by commitPipeline.mu.

type LogWriterConfig

type LogWriterConfig struct {
	WALMinSyncInterval durationFunc
	WALFsyncLatency    prometheus.Histogram
	// QueueSemChan is an optional channel to pop from when popping from
	// LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
	// the syncQueue from overflowing (which will cause a panic). All production
	// code ensures this is non-nil.
	QueueSemChan chan struct{}

	// ExternalSyncQueueCallback is set to non-nil when the LogWriter is used
	// as part of a WAL implementation that can failover between LogWriters.
	//
	// In this case, QueueSemChan is always nil, and SyncRecordGeneralized must
	// be used with a PendingSync parameter that is implemented by
	// PendingSyncIndex. When an index is synced (which implies all earlier
	// indices are also synced), this callback is invoked. The caller must not
	// hold any mutex when invoking this callback, since the lock ordering
	// requirement in this case is that any higher layer locks (in the wal
	// package) precede the lower layer locks (in the record package). These
	// callbacks are serialized since they are invoked from the flushLoop.
	ExternalSyncQueueCallback ExternalSyncQueueCallback

	// WriteWALSyncOffsets determines whether to write WAL sync chunk offsets.
	// The format major version can change (ratchet) at runtime, so this must be
	// a function rather than a static bool to ensure we use the latest format version.
	WriteWALSyncOffsets func() bool
}

LogWriterConfig is a struct used for configuring new LogWriters

type LogWriterMetrics

type LogWriterMetrics struct {
	WriteThroughput  base.ThroughputMetric
	PendingBufferLen base.GaugeSampleMetric
	SyncQueueLen     base.GaugeSampleMetric
}

LogWriterMetrics contains misc metrics for the log writer.

func (*LogWriterMetrics) Merge

Merge merges metrics from x. Requires that x is non-nil.

type PendingSync

type PendingSync interface {
	// contains filtered or unexported methods
}

PendingSync abstracts the sync specification for a record queued on the LogWriter. The only implementations are provided in this package since syncRequested is not exported.

type PendingSyncIndex

type PendingSyncIndex struct {
	// Index is some state meaningful to the user of LogWriter. The LogWriter
	// itself only examines whether Index is equal to NoSyncIndex.
	Index int64
}

PendingSyncIndex implements both pendingSyncsSnapshot and PendingSync.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads records from an underlying io.Reader.

func NewReader

func NewReader(r io.Reader, logNum base.DiskFileNum) *Reader

NewReader returns a new reader. If the file contains records encoded using the recyclable record format, then the log number in those records must match the specified logNum.

func (*Reader) Next

func (r *Reader) Next() (io.Reader, error)

Next returns a reader for the next record. It returns io.EOF if there are no more records. The reader returned becomes stale after the next Next call, and should no longer be used.

func (*Reader) Offset

func (r *Reader) Offset() int64

Offset returns the current offset within the file. If called immediately before a call to Next(), Offset() will return the record offset.

type RotationHelper

type RotationHelper struct {
	// contains filtered or unexported fields
}

RotationHelper is a type used to inform the decision of rotating a record log file.

The assumption is that multiple records can be coalesced into a single record (called a snapshot). Starting a new file, where the first record is a snapshot of the current state is referred to as "rotating" the log.

Normally we rotate files when a certain file size is reached. But in certain cases (e.g. contents become very large), this can result in too frequent rotation. This helper contains logic to impose extra conditions on the rotation.

The rotation helper uses "size" as a unit-less estimation that is correlated with the on-disk size of a record or snapshot.

func (*RotationHelper) AddRecord

func (rh *RotationHelper) AddRecord(recordSize int64)

AddRecord makes the rotation helper aware of a new record.

func (*RotationHelper) DebugInfo

func (rh *RotationHelper) DebugInfo() (lastSnapshotSize int64, sizeSinceLastSnapshot int64)

DebugInfo returns the last snapshot size and size of the edits since the last snapshot; used for testing and debugging.

func (*RotationHelper) Rotate

func (rh *RotationHelper) Rotate(snapshotSize int64)

Rotate makes the rotation helper aware that we are rotating to a new snapshot (to which we will apply the latest edit).

func (*RotationHelper) ShouldRotate

func (rh *RotationHelper) ShouldRotate(nextSnapshotSize int64) bool

ShouldRotate returns whether we should start a new log file (with a snapshot). Does not need to be called if other rotation factors (log file size) are not satisfied.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer writes records to an underlying io.Writer.

func NewWriter

func NewWriter(w io.Writer) *Writer

NewWriter returns a new Writer.

func (*Writer) Close

func (w *Writer) Close() error

Close finishes the current record and closes the writer.

func (*Writer) Flush

func (w *Writer) Flush() error

Flush finishes the current record, writes to the underlying writer, and flushes it if that writer implements interface{ Flush() error }.

func (*Writer) LastRecordOffset

func (w *Writer) LastRecordOffset() (int64, error)

LastRecordOffset returns the offset in the underlying io.Writer of the last record so far - the one created by the most recent Next call. It is the offset of the first chunk header, suitable to pass to Reader.SeekRecord.

If that io.Writer also implements io.Seeker, the return value is an absolute offset, in the sense of io.SeekStart, regardless of whether the io.Writer was initially at the zero position when passed to NewWriter. Otherwise, the return value is a relative offset, being the number of bytes written between the NewWriter call and any records written prior to the last record.

If there is no last record, i.e. nothing was written, LastRecordOffset will return ErrNoLastRecord.

func (*Writer) Next

func (w *Writer) Next() (io.Writer, error)

Next returns a writer for the next record. The writer returned becomes stale after the next Close, Flush or Next call, and should no longer be used.

func (*Writer) Size

func (w *Writer) Size() int64

Size returns the current size of the file.

func (*Writer) WriteRecord

func (w *Writer) WriteRecord(p []byte) (int64, error)

WriteRecord writes a complete record. Returns the offset just past the end of the record.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL