internal

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const LevelTrace = slog.LevelDebug - 4

Variables

View Source
var (
	OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "replicate_replica_operation_total",
		Help: "The number of replica operations performed",
	}, []string{"replica_type", "operation"})

	OperationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "replicate_replica_operation_bytes",
		Help: "The number of bytes used by replica operations",
	}, []string{"replica_type", "operation"})

	OperationDurationHistogramVec = promauto.NewHistogramVec(prometheus.HistogramOpts{
		Name:    "replicate_replica_operation_duration_seconds",
		Help:    "Duration of replica operations by type and operation",
		Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60},
	}, []string{"replica_type", "operation"})

	OperationErrorCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{
		Name: "replicate_replica_operation_errors_total",
		Help: "Number of replica operation errors by type, operation, and error code",
	}, []string{"replica_type", "operation", "code"})

	L0RetentionGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Name: "replicate_l0_retention_files_total",
		Help: "Number of L0 files by status during retention enforcement",
	}, []string{"db", "status"})
)

Shared replica metrics.

Functions

func CleanSourcePath

func CleanSourcePath(src *slog.Source)

func CreateFile

func CreateFile(filename string, fi os.FileInfo) (*os.File, error)

CreateFile creates the file and matches the mode & uid/gid of fi.

func Fileinfo

func Fileinfo(fi os.FileInfo) (uid, gid int)

Fileinfo returns syscall fields from a FileInfo object.

func Hexdump

func Hexdump(data []byte) string

func InitLog

func InitLog(w io.Writer, level, typ string, addSource bool)

func LimitReadCloser

func LimitReadCloser(r io.ReadCloser, n int64) io.ReadCloser

Copied from the io package to implement io.Closer.

func LockFileExclusive

func LockFileExclusive(f *os.File) error

func MkdirAll

func MkdirAll(path string, fi os.FileInfo) error

MkdirAll is a copy of os.MkdirAll() except that it attempts to set the mode/uid/gid to match fi for each created directory.

func NewLZ4Reader

func NewLZ4Reader(r io.ReadCloser) io.ReadCloser

NewLZ4Reader creates an LZ4 decompressing reader that wraps the source. Closing the returned reader also closes the underlying source.

func ReplaceAttr

func ReplaceAttr(groups []string, a slog.Attr) slog.Attr

func UnlockFile

func UnlockFile(f *os.File) error

Types

type LTXFileOpener

type LTXFileOpener interface {
	OpenLTXFile(ctx context.Context, level int, minTXID, maxTXID ltx.TXID, offset, size int64) (io.ReadCloser, error)
}

type LZ4ReadCloser

type LZ4ReadCloser struct {
	*lz4.Reader
	// contains filtered or unexported fields
}

LZ4ReadCloser wraps an LZ4 reader with the underlying source for proper closing.

func (*LZ4ReadCloser) Close

func (r *LZ4ReadCloser) Close() error

type LimitedReadCloser

type LimitedReadCloser struct {
	R io.ReadCloser // underlying reader
	N int64         // max bytes remaining
}

func (*LimitedReadCloser) Close

func (l *LimitedReadCloser) Close() error

func (*LimitedReadCloser) Read

func (l *LimitedReadCloser) Read(p []byte) (n int, err error)

type LoggingConfig

type LoggingConfig struct {
	Level  string `yaml:"level"`
	Type   string `yaml:"type"`
	Stderr bool   `yaml:"stderr"`
	Source bool   `yaml:"source"`
}

LoggingConfig configures logging.

type ReadCloser

type ReadCloser struct {
	// contains filtered or unexported fields
}

ReadCloser wraps a reader to also attach a separate closer.

func NewReadCloser

func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser

NewReadCloser returns a new instance of ReadCloser.

func (*ReadCloser) Close

func (r *ReadCloser) Close() error

Close closes the reader (if implementing io.ReadCloser) and the Closer.

func (*ReadCloser) Read

func (r *ReadCloser) Read(p []byte) (n int, err error)

Read reads bytes into the underlying reader.

type ReadCounter

type ReadCounter struct {
	// contains filtered or unexported fields
}

ReadCounter wraps an io.Reader and counts the total number of bytes read.

func NewReadCounter

func NewReadCounter(r io.Reader) *ReadCounter

NewReadCounter returns a new instance of ReadCounter that wraps r.

func (*ReadCounter) N

func (r *ReadCounter) N() int64

N returns the total number of bytes read.

func (*ReadCounter) Read

func (r *ReadCounter) Read(p []byte) (int, error)

Read reads from the underlying reader into p and adds the bytes read to the counter.

type ResumableReader

type ResumableReader struct {
	// contains filtered or unexported fields
}

resumableReader wraps an io.ReadCloser from a remote storage backend with automatic reconnection on read errors.

During restore, the LTX compactor opens all LTX file streams upfront, then processes pages in page-number order. Incremental LTX files that only contain high-numbered pages may have their S3/storage streams sit idle for minutes while the compactor works through lower-numbered pages from the snapshot. Storage providers (S3, Tigris, etc.) may close these idle connections, causing "unexpected EOF" errors.

This reader detects two failure modes:

  1. Non-EOF errors (connection reset, timeout) - the stream broke mid-transfer.
  2. Premature EOF - the server closed the connection cleanly, but we haven't read all bytes yet (detected by comparing offset against known file size).

On failure, it closes the dead stream and reopens from the current byte offset using the storage backend's range request support (the offset parameter of OpenLTXFile). Callers like io.ReadFull see a seamless byte stream because partial reads are returned without error, prompting the caller to request remaining bytes on the next Read call.

func NewResumableReader

func NewResumableReader(ctx context.Context, client LTXFileOpener, level int, minTXID, maxTXID ltx.TXID, size int64, rc io.ReadCloser, logger *slog.Logger) *ResumableReader

NewResumableReader creates a ResumableReader. Primarily exposed for testing.

func (*ResumableReader) Close

func (r *ResumableReader) Close() error

func (*ResumableReader) Read

func (r *ResumableReader) Read(p []byte) (int, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL