Documentation
¶
Index ¶
- Constants
- Variables
- func CleanSourcePath(src *slog.Source)
- func CreateFile(filename string, fi os.FileInfo) (*os.File, error)
- func Fileinfo(fi os.FileInfo) (uid, gid int)
- func Hexdump(data []byte) string
- func InitLog(w io.Writer, level, typ string, addSource bool)
- func LimitReadCloser(r io.ReadCloser, n int64) io.ReadCloser
- func LockFileExclusive(f *os.File) error
- func MkdirAll(path string, fi os.FileInfo) error
- func NewLZ4Reader(r io.ReadCloser) io.ReadCloser
- func ReplaceAttr(groups []string, a slog.Attr) slog.Attr
- func UnlockFile(f *os.File) error
- type LTXFileOpener
- type LZ4ReadCloser
- type LimitedReadCloser
- type LoggingConfig
- type ReadCloser
- type ReadCounter
- type ResumableReader
Constants ¶
const LevelTrace = slog.LevelDebug - 4
Variables ¶
var ( OperationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "replicate_replica_operation_total", Help: "The number of replica operations performed", }, []string{"replica_type", "operation"}) OperationBytesCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "replicate_replica_operation_bytes", Help: "The number of bytes used by replica operations", }, []string{"replica_type", "operation"}) OperationDurationHistogramVec = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "replicate_replica_operation_duration_seconds", Help: "Duration of replica operations by type and operation", Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60}, }, []string{"replica_type", "operation"}) OperationErrorCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "replicate_replica_operation_errors_total", Help: "Number of replica operation errors by type, operation, and error code", }, []string{"replica_type", "operation", "code"}) L0RetentionGaugeVec = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "replicate_l0_retention_files_total", Help: "Number of L0 files by status during retention enforcement", }, []string{"db", "status"}) )
Shared replica metrics.
Functions ¶
func CleanSourcePath ¶
func CreateFile ¶
CreateFile creates the file and matches the mode & uid/gid of fi.
func LimitReadCloser ¶
func LimitReadCloser(r io.ReadCloser, n int64) io.ReadCloser
Copied from the io package to implement io.Closer.
func LockFileExclusive ¶
func MkdirAll ¶
MkdirAll is a copy of os.MkdirAll() except that it attempts to set the mode/uid/gid to match fi for each created directory.
func NewLZ4Reader ¶
func NewLZ4Reader(r io.ReadCloser) io.ReadCloser
NewLZ4Reader creates an LZ4 decompressing reader that wraps the source. Closing the returned reader also closes the underlying source.
func UnlockFile ¶
Types ¶
type LTXFileOpener ¶
type LZ4ReadCloser ¶
LZ4ReadCloser wraps an LZ4 reader with the underlying source for proper closing.
func (*LZ4ReadCloser) Close ¶
func (r *LZ4ReadCloser) Close() error
type LimitedReadCloser ¶
type LimitedReadCloser struct {
R io.ReadCloser // underlying reader
N int64 // max bytes remaining
}
func (*LimitedReadCloser) Close ¶
func (l *LimitedReadCloser) Close() error
type LoggingConfig ¶
type LoggingConfig struct {
Level string `yaml:"level"`
Type string `yaml:"type"`
Stderr bool `yaml:"stderr"`
Source bool `yaml:"source"`
}
LoggingConfig configures logging.
type ReadCloser ¶
type ReadCloser struct {
// contains filtered or unexported fields
}
ReadCloser wraps a reader to also attach a separate closer.
func NewReadCloser ¶
func NewReadCloser(r io.Reader, c io.Closer) *ReadCloser
NewReadCloser returns a new instance of ReadCloser.
func (*ReadCloser) Close ¶
func (r *ReadCloser) Close() error
Close closes the reader (if implementing io.ReadCloser) and the Closer.
type ReadCounter ¶
type ReadCounter struct {
// contains filtered or unexported fields
}
ReadCounter wraps an io.Reader and counts the total number of bytes read.
func NewReadCounter ¶
func NewReadCounter(r io.Reader) *ReadCounter
NewReadCounter returns a new instance of ReadCounter that wraps r.
type ResumableReader ¶
type ResumableReader struct {
// contains filtered or unexported fields
}
resumableReader wraps an io.ReadCloser from a remote storage backend with automatic reconnection on read errors.
During restore, the LTX compactor opens all LTX file streams upfront, then processes pages in page-number order. Incremental LTX files that only contain high-numbered pages may have their S3/storage streams sit idle for minutes while the compactor works through lower-numbered pages from the snapshot. Storage providers (S3, Tigris, etc.) may close these idle connections, causing "unexpected EOF" errors.
This reader detects two failure modes:
- Non-EOF errors (connection reset, timeout) - the stream broke mid-transfer.
- Premature EOF - the server closed the connection cleanly, but we haven't read all bytes yet (detected by comparing offset against known file size).
On failure, it closes the dead stream and reopens from the current byte offset using the storage backend's range request support (the offset parameter of OpenLTXFile). Callers like io.ReadFull see a seamless byte stream because partial reads are returned without error, prompting the caller to request remaining bytes on the next Read call.
func NewResumableReader ¶
func NewResumableReader(ctx context.Context, client LTXFileOpener, level int, minTXID, maxTXID ltx.TXID, size int64, rc io.ReadCloser, logger *slog.Logger) *ResumableReader
NewResumableReader creates a ResumableReader. Primarily exposed for testing.
func (*ResumableReader) Close ¶
func (r *ResumableReader) Close() error