lsvd

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2025 License: Apache-2.0 Imports: 54 Imported by: 0

README

LSVD - Log Structured Virtual Disk

Status: Alpha

lsvd provides an implementation of LSVD, which is an abstraction on top of object storage systems to provide block device semantics and speed.

This is done by storing data in chunks as the data is written, and caching those chunks locally as well as uploading them to an object storage system.

This implementation is very faithful to the original paper, but adds at last one additional feature: compression. As data is added a chunk, it is first compressed with LZ4. When the data is then read from the cache, it is uncompressed before being returned. Because of the way most OSes cache block device data, this has virtually zero impact negative impact on read speed. In fact because the data is compressed, in the case of cache misses, the read speed is improved as there is less data to download from the object storage system.

Usage

The virtual disk is exposed as a Network Block Device (nbd). As such, once the lsvd program is running the OS tools to mount it are required. Here is a brief session of creating a disk and mounting it on linux:


# Create a config file
cat <<DONE > lsvd.hcl
cache_path = "./data/cache"

storage {
  s3 {
    bucket = "lsvdinttest"
    region = "us-east-1"
    access_key = "admin"
    secret_key = "password"
    host = "http://localhost:9000"
  }
}
DONE

# Create a 2 Terabyte disk named test.
$ lsvd volume init -c lsvd.hcl -n test -S 2T

# Start the disk
$ lsvd nbd -c lsvd.hcl -n test -p ./data/cache -a localhost:8989 &

# Attach it to a nbd
$ nbd-client lsvd localhost 8989 nbd0 -b 4096 -name test

# Create a filesystem on it
$ mkfs.ext4 /dev/nbd0

# Mount it
$ mount /dev/nbd0 /mnt/lsvd

Documentation

Index

Constants

View Source
const (
	GCDensityThreshold = 70.0
	GCTotalThreshold   = 1024 * 1024 // 1MB
)
View Source
const (
	MaxBlocksPerSmallPack = 20_000
	SmallSegmentCutOff    = 200
	TargetDensity         = 90
)
View Source
const (
	// The size of all blocks in bytes
	BlockSize = 4 * 1024

	// How big the segment gets before we flush it to S3
	FlushThreshHold = 32 * 1024 * 1024

	// Maximum time a segment can stay open before being flushed
	MaxSegmentLifetime = 10 * time.Minute
)
View Source
const (
	MaxBlocks = math.MaxUint16
	MaxLBA    = (1 << 48) - 1
)
View Source
const (
	Uncompressed = 0
	Compressed   = 1
	Empty        = 2
)
View Source
const BufferSliceSize = 1024 * 1024
View Source
const DefaultExtentsSize = 20000
View Source
const LevelTrace = slog.LevelDebug - 1
View Source
const MaxPoolBufferSize = 8 * 1024 * 1024

MaxPoolBufferSize is the maximum size of a buffer that will be returned to the pool. Buffers larger than this are discarded to prevent memory bloat.

View Source
const MaxSegBuilderBufSize = 16 * 1024 * 1024

MaxSegBuilderBufSize is the maximum buffer size for a SegmentBuilder to be returned to the pool. Builders with larger buffers are discarded.

View Source
const SegmentIdSize = 16

Variables

View Source
var DefaultTortureConfig = TortureConfig{
	Operations:         10000,
	MaxLBA:             100000,
	MaxBlocks:          64,
	Weights:            DefaultTortureWeights,
	OverlapProbability: 0.3,
	VerifyEvery:        1000,
	PatternWeights:     [4]int{60, 10, 20, 10},
}

DefaultTortureConfig provides a sensible default configuration

View Source
var DefaultTortureWeights = TortureOpWeights{
	Write:       50,
	Read:        30,
	Zero:        10,
	Sync:        5,
	CloseReopen: 5,
}

DefaultTortureWeights provides sensible defaults for torture testing

View Source
var EnableAutoGC = func(o *opts) {
	o.autoGC = true
}
View Source
var ErrReadOnly = errors.New("disk open'd read-only")

Functions

func AlignToBlock

func AlignToBlock(b []byte) []byte

func EncodeTortureConfig

func EncodeTortureConfig(cfg TortureConfig) string

EncodeTortureConfig encodes a TortureConfig to a base64 JSON string for reproduction

func FillFromeCache

func FillFromeCache(d []byte, cps []CachePosition) error

func GenerateTortureData

func GenerateTortureData(rng *rand.Rand, pattern TortureDataPattern, blocks uint32) []byte

GenerateTortureData generates deterministic data from a seed and pattern

func LogMetrics

func LogMetrics(log *slog.Logger)

func NBDWrapper

func NBDWrapper(ctx context.Context, log *slog.Logger, d *Disk) *nbdWrapper

func ReadSegmentHeaders

func ReadSegmentHeaders(r io.Reader) (SegmentHeader, []ExtentHeader, error)

func ReadUvarint

func ReadUvarint(r io.ByteReader) (uint64, int, error)

ReadUvarint reads an encoded unsigned integer from r and returns it as a uint64. The error is io.EOF only if no bytes were read. If an io.EOF happens after reading some but not all the bytes, ReadUvarint returns io.ErrUnexpectedEOF.

func ReturnBuffers

func ReturnBuffers(buf *Buffers)

func ReturnSegmentBuilder

func ReturnSegmentBuilder(seg *SegmentBuilder)

func RoundToBlockSize

func RoundToBlockSize(sz int64) int64

func ToReader

func ToReader(ra io.ReaderAt) io.Reader

func WriteUvarint

func WriteUvarint(w io.ByteWriter, x uint64) (int, error)

PutUvarint encodes a uint64 into buf and returns the number of bytes written. If the buffer is too small, PutUvarint will panic.

Types

type AcquireLeaseRequest added in v0.0.2

type AcquireLeaseRequest struct {
	Metadata  map[string]any `json:"metadata,omitempty"`
	ExpiresIn string         `json:"expires_in,omitempty"`
}

Lease API types

type Buffers

type Buffers struct {
	// contains filtered or unexported fields
}

func B

func B(ctx context.Context) *Buffers

func NewBuffers

func NewBuffers() *Buffers

func (*Buffers) Inject

func (b *Buffers) Inject(ctx context.Context) context.Context

func (*Buffers) Marker

func (b *Buffers) Marker() int

func (*Buffers) Reset

func (b *Buffers) Reset()

func (*Buffers) ResetTo

func (b *Buffers) ResetTo(marker int)

type CachePosition

type CachePosition struct {
	// contains filtered or unexported fields
}

type CompleteSegmentUploadResponse

type CompleteSegmentUploadResponse struct {
	SegmentID string `json:"segment_id"`
	Status    string `json:"status"`
}

type CompleteUploadRequest

type CompleteUploadRequest struct {
	MD5         string `json:"md5"`
	CRC32C      string `json:"crc32c"`
	Size        int64  `json:"size"`
	BlockLayout string `json:"block_layout,omitempty"`
	VolumeID    string `json:"volume_id,omitempty"`
	LeaseNonce  string `json:"lease_nonce,omitempty"`
}

type Config

type Config struct {
	CachePath string `hcl:"cache_path"`

	Storage struct {
		FilePath string `hcl:"file_path,optional"`
		S3       struct {
			Bucket    string `hcl:"bucket"`
			Region    string `hcl:"region"`
			AccessKey string `hcl:"access_key,optional"`
			SecretKey string `hcl:"secret_key,optional"`
			Directory string `hcl:"directory,optional"`
			URL       string `hcl:"host,optional"`
		} `hcl:"s3,block"`
	} `hcl:"storage,block"`
}

func LoadConfig

func LoadConfig(path string) (*Config, error)

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

func NewContext

func NewContext(ctx context.Context) *Context

func (*Context) Allocate

func (c *Context) Allocate(sz int) []byte

func (*Context) AllocateZero

func (c *Context) AllocateZero(sz int) []byte

func (*Context) Close

func (c *Context) Close()

func (*Context) Marker

func (c *Context) Marker() int

func (*Context) Reset

func (c *Context) Reset()

func (*Context) ResetTo

func (c *Context) ResetTo(marker int)

type ControlMessage

type ControlMessage struct {
	Kind      string `json:"kind"`
	SegmentId string `json:"segment,omitempty"`
}

type Controller

type Controller struct {
	// contains filtered or unexported fields
}

func NewController

func NewController(ctx context.Context, d *Disk) (*Controller, error)

func (*Controller) EventsCh

func (c *Controller) EventsCh() chan Event

func (*Controller) Run

func (c *Controller) Run(ctx context.Context)

type CopyIterator

type CopyIterator struct {
	// contains filtered or unexported fields
}

func (*CopyIterator) Close

func (c *CopyIterator) Close(ctx context.Context) error

func (*CopyIterator) ProcessFromExtents

func (c *CopyIterator) ProcessFromExtents(ctx *Context, log *slog.Logger) error

func (*CopyIterator) Reset

func (ci *CopyIterator) Reset(ctx context.Context, seg SegmentId) error

type Cover

type Cover int
const (
	CoverSuperRange Cover = iota
	CoverExact
	CoverPartly
	CoverNone
)

func (Cover) String

func (c Cover) String() string

type CreateVolumeRequest

type CreateVolumeRequest struct {
	Name         string         `json:"name"`
	UUID         string         `json:"uuid"`
	DeclaredSize int64          `json:"declared_size"`
	DataFormat   string         `json:"data_format,omitempty"`
	AppFormat    string         `json:"app_format,omitempty"`
	Segments     []string       `json:"segments"`
	BlockMap     map[string]any `json:"block_map"`
	Metadata     map[string]any `json:"metadata,omitempty"`
}

Volume API types

type DeleteSegmentResponse

type DeleteSegmentResponse struct {
	LsvdID string `json:"lsvd_id"`
	Status string `json:"status"`
}

type Disk

type Disk struct {
	SeqGen func() ulid.ULID
	// contains filtered or unexported fields
}

func NewDisk

func NewDisk(ctx context.Context, log *slog.Logger, path string, options ...Option) (*Disk, error)

func (*Disk) Close

func (d *Disk) Close(ctx context.Context) error

func (*Disk) CloseSegment

func (d *Disk) CloseSegment(ctx context.Context) error

CloseSegment synchronously closes the current segment, as well as giving any background GC process to finish up first.

func (*Disk) CopyIterator

func (d *Disk) CopyIterator(ctx context.Context, seg SegmentId) (*CopyIterator, error)

func (*Disk) Extents

func (d *Disk) Extents() int

func (*Disk) GCOnce

func (d *Disk) GCOnce(ctx *Context) (SegmentId, error)

func (*Disk) Pack

func (d *Disk) Pack(ctx context.Context) error

func (*Disk) ReadExtent

func (d *Disk) ReadExtent(ctx *Context, rng Extent) (RangeData, error)

func (*Disk) ReadExtentInto

func (d *Disk) ReadExtentInto(ctx *Context, data RangeData) (CachePosition, error)

func (*Disk) SetAfterNS

func (r *Disk) SetAfterNS(f func(SegmentId))

func (*Disk) Size

func (d *Disk) Size() int64

func (*Disk) SyncWriteCache

func (d *Disk) SyncWriteCache() error

func (*Disk) WriteExtent

func (d *Disk) WriteExtent(ctx context.Context, data RangeData) error

func (*Disk) WriteExtents

func (d *Disk) WriteExtents(ctx context.Context, ranges []RangeData) error

WriteExtents writes multiple extents without performing any segment flush checking between them, thusly making sure that all of them end up in the same segment.

func (*Disk) ZeroBlocks

func (d *Disk) ZeroBlocks(ctx context.Context, rng Extent) error

type DiskAPISegmentAccess

type DiskAPISegmentAccess struct {
	// contains filtered or unexported fields
}

func NewDiskAPISegmentAccess

func NewDiskAPISegmentAccess(log *slog.Logger, baseURL string, authClient *cloudauth.AuthClient) *DiskAPISegmentAccess

func (*DiskAPISegmentAccess) AcquireLease added in v0.0.2

func (d *DiskAPISegmentAccess) AcquireLease(ctx context.Context, volumeId string, metadata map[string]any) (string, error)

AcquireLease acquires an exclusive lease on a volume. Returns the lease nonce if successful.

func (*DiskAPISegmentAccess) GetVolumeInfo

func (d *DiskAPISegmentAccess) GetVolumeInfo(ctx context.Context, vol string) (*VolumeInfo, error)

func (*DiskAPISegmentAccess) InitContainer

func (d *DiskAPISegmentAccess) InitContainer(ctx context.Context) error

func (*DiskAPISegmentAccess) InitVolume

func (d *DiskAPISegmentAccess) InitVolume(ctx context.Context, vol *VolumeInfo) error

func (*DiskAPISegmentAccess) InitVolumeWithID added in v0.0.2

func (d *DiskAPISegmentAccess) InitVolumeWithID(ctx context.Context, vol *VolumeInfo) (string, error)

InitVolumeWithID creates a volume and returns the server-generated volume ID. If vol.Name is empty, the server generates the volume ID.

func (*DiskAPISegmentAccess) ListVolumes

func (d *DiskAPISegmentAccess) ListVolumes(ctx context.Context) ([]string, error)

func (*DiskAPISegmentAccess) OpenVolume

func (d *DiskAPISegmentAccess) OpenVolume(ctx context.Context, vol string) (Volume, error)

func (*DiskAPISegmentAccess) ReleaseLease added in v0.0.2

func (d *DiskAPISegmentAccess) ReleaseLease(ctx context.Context, volumeId string, nonce string) error

ReleaseLease releases an active lease on a volume.

func (*DiskAPISegmentAccess) RemoveSegment

func (d *DiskAPISegmentAccess) RemoveSegment(ctx context.Context, seg SegmentId) error

func (*DiskAPISegmentAccess) SetLeaseNonce added in v0.0.2

func (d *DiskAPISegmentAccess) SetLeaseNonce(nonce string)

SetLeaseNonce sets the lease nonce for this segment access. The nonce will be passed to any volumes opened from this access.

type DiskAPISegmentReader

type DiskAPISegmentReader struct {
	// contains filtered or unexported fields
}

func (*DiskAPISegmentReader) Close

func (r *DiskAPISegmentReader) Close() error

func (*DiskAPISegmentReader) Layout

func (*DiskAPISegmentReader) ReadAt

func (r *DiskAPISegmentReader) ReadAt(p []byte, off int64) (int, error)

type DiskAPIVolume

type DiskAPIVolume struct {
	// contains filtered or unexported fields
}

func (*DiskAPIVolume) Info

func (v *DiskAPIVolume) Info(ctx context.Context) (*VolumeInfo, error)

func (*DiskAPIVolume) ListSegments

func (v *DiskAPIVolume) ListSegments(ctx context.Context) ([]SegmentId, error)

func (*DiskAPIVolume) NewSegment

func (v *DiskAPIVolume) NewSegment(ctx context.Context, seg SegmentId, layout *SegmentLayout, data *os.File) error

func (*DiskAPIVolume) OpenSegment

func (v *DiskAPIVolume) OpenSegment(ctx context.Context, seg SegmentId) (SegmentReader, error)

func (*DiskAPIVolume) RemoveSegment

func (v *DiskAPIVolume) RemoveSegment(ctx context.Context, seg SegmentId) error

func (*DiskAPIVolume) SetLeaseNonce added in v0.0.2

func (v *DiskAPIVolume) SetLeaseNonce(nonce string)

SetLeaseNonce sets the lease nonce for this volume. The nonce will be included in requests that update the volume.

type ErrorResponse

type ErrorResponse struct {
	Error string `json:"error"`
}

type Event

type Event struct {
	Kind      EventKind
	Value     any
	SegmentId SegmentId
	Done      chan EventResult
}

type EventKind

type EventKind int
const (
	CloseSegment EventKind = iota
	CleanupSegments
	StartGC
	SweepSmallSegments
	ImproveDensity
)

type EventResult

type EventResult struct {
	Segment SegmentId
	Error   error
}

type Extent

type Extent struct {
	LBA    LBA
	Blocks uint32
}

func ExtentFrom

func ExtentFrom(a, b LBA) (Extent, bool)

func (Extent) ByteSize

func (e Extent) ByteSize() int

func (Extent) Clamp

func (e Extent) Clamp(y Extent) (Extent, bool)

Returns a new Extent that the region of +e+ that overlaps with +y+.

func (Extent) Contains

func (e Extent) Contains(lba LBA) bool

func (Extent) Cover

func (e Extent) Cover(y Extent) Cover

func (Extent) Last

func (e Extent) Last() LBA

func (Extent) Range

func (e Extent) Range() (LBA, LBA)

func (Extent) StartMask

func (e Extent) StartMask() *Mask

func (Extent) String

func (e Extent) String() string

func (Extent) Sub

func (e Extent) Sub(o Extent) ([]Extent, bool)

func (Extent) SubMany

func (e Extent) SubMany(subs []Extent) ([]Extent, bool)

func (Extent) SubSpecific

func (e Extent) SubSpecific(o Extent) (Extent, Extent, bool)

func (Extent) Valid

func (e Extent) Valid() bool

type ExtentCache

type ExtentCache struct {
	// contains filtered or unexported fields
}

func NewExtentCache

func NewExtentCache(log hclog.Logger, path string) (*ExtentCache, error)

func (*ExtentCache) Close

func (e *ExtentCache) Close() error

func (*ExtentCache) ReadExtent

func (e *ExtentCache) ReadExtent(robpb *PartialExtent, data []byte) (bool, error)

func (*ExtentCache) WriteExtent

func (e *ExtentCache) WriteExtent(robpb *PartialExtent, data []byte) error

type ExtentHeader

type ExtentHeader struct {
	Extent `json:"extent" cbor:"1,keyasint"`
	Size   uint32 `json:"size" cbor:"2,keyasint"`
	Offset uint32 `json:"offset" cbor:"3,keyasint"`

	// used when the extent is compressed
	RawSize uint32 `json:"raw_size,omitempty" cbor:"4,keyasint,omitempty"`
}

func (*ExtentHeader) Flags

func (e *ExtentHeader) Flags() byte

func (*ExtentHeader) Read

func (e *ExtentHeader) Read(r io.ByteReader) (int, error)

func (*ExtentHeader) Write

func (e *ExtentHeader) Write(w io.ByteWriter) (int, error)

type ExtentLocation

type ExtentLocation struct {
	ExtentHeader `json:"header" cbor:"1,keyasint"`
	Segment      SegmentId `json:"segment" cbor:"2,keyasint"`
	Disk         uint16    `json:"disk" cbor:"3,keyasint"`
}

type ExtentMap

type ExtentMap struct {
	// contains filtered or unexported fields
}

func NewExtentMap

func NewExtentMap() *ExtentMap

func (*ExtentMap) Iterator

func (e *ExtentMap) Iterator() *Iterator

func (*ExtentMap) Len

func (e *ExtentMap) Len() int

func (*ExtentMap) LockToPatch

func (e *ExtentMap) LockToPatch(fn func() error) error

func (*ExtentMap) LockedIterator

func (m *ExtentMap) LockedIterator() *Iterator

func (*ExtentMap) Populate

func (e *ExtentMap) Populate(log *slog.Logger, o *ExtentMap, diskId uint16) error

func (*ExtentMap) Render

func (e *ExtentMap) Render() string

func (*ExtentMap) RenderExpanded

func (e *ExtentMap) RenderExpanded() string

func (*ExtentMap) Resolve

func (e *ExtentMap) Resolve(log *slog.Logger, rng Extent, ret []PartialExtent) ([]PartialExtent, error)

func (*ExtentMap) ToPE

func (m *ExtentMap) ToPE(c compactPE) PartialExtent

func (*ExtentMap) Update

func (e *ExtentMap) Update(log *slog.Logger, pba ExtentLocation, affected []PartialExtent) ([]PartialExtent, error)

func (*ExtentMap) UpdateBatch

func (e *ExtentMap) UpdateBatch(log *slog.Logger, entries []ExtentLocation, segId SegmentId, s *Segments) error

func (*ExtentMap) Validate

func (e *ExtentMap) Validate(log *slog.Logger) error

type ExtentReader

type ExtentReader struct {
	// contains filtered or unexported fields
}

func NewExtentReader

func NewExtentReader(log *slog.Logger, path string, vol Volume) (*ExtentReader, error)

func (*ExtentReader) Close

func (d *ExtentReader) Close() error

type ExternalExtentHeader

type ExternalExtentHeader struct {
	// contains filtered or unexported fields
}

func (*ExternalExtentHeader) Blocks

func (v *ExternalExtentHeader) Blocks() uint32

func (*ExternalExtentHeader) HasBlocks

func (v *ExternalExtentHeader) HasBlocks() bool

func (*ExternalExtentHeader) HasLba

func (v *ExternalExtentHeader) HasLba() bool

func (*ExternalExtentHeader) HasOffset

func (v *ExternalExtentHeader) HasOffset() bool

func (*ExternalExtentHeader) HasRawSize

func (v *ExternalExtentHeader) HasRawSize() bool

func (*ExternalExtentHeader) HasSize

func (v *ExternalExtentHeader) HasSize() bool

func (*ExternalExtentHeader) Lba

func (v *ExternalExtentHeader) Lba() uint64

func (*ExternalExtentHeader) MarshalCBOR

func (v *ExternalExtentHeader) MarshalCBOR() ([]byte, error)

func (*ExternalExtentHeader) MarshalJSON

func (v *ExternalExtentHeader) MarshalJSON() ([]byte, error)

func (*ExternalExtentHeader) Offset

func (v *ExternalExtentHeader) Offset() uint32

func (*ExternalExtentHeader) RawSize

func (v *ExternalExtentHeader) RawSize() uint32

func (*ExternalExtentHeader) SetBlocks

func (v *ExternalExtentHeader) SetBlocks(blocks uint32)

func (*ExternalExtentHeader) SetLba

func (v *ExternalExtentHeader) SetLba(lba uint64)

func (*ExternalExtentHeader) SetOffset

func (v *ExternalExtentHeader) SetOffset(offset uint32)

func (*ExternalExtentHeader) SetRawSize

func (v *ExternalExtentHeader) SetRawSize(raw_size uint32)

func (*ExternalExtentHeader) SetSize

func (v *ExternalExtentHeader) SetSize(size uint32)

func (*ExternalExtentHeader) Size

func (v *ExternalExtentHeader) Size() uint32

func (*ExternalExtentHeader) UnmarshalCBOR

func (v *ExternalExtentHeader) UnmarshalCBOR(data []byte) error

func (*ExternalExtentHeader) UnmarshalJSON

func (v *ExternalExtentHeader) UnmarshalJSON(data []byte) error

type FlushReason

type FlushReason int
const (
	FlushNo FlushReason = iota
	FlushSize
	FlushTime
)

type Iterator

type Iterator struct {
	treemap.ForwardIterator[LBA, compactPE]
	// contains filtered or unexported fields
}

func (*Iterator) CompactValue

func (e *Iterator) CompactValue() compactPE

func (*Iterator) CompactValuePtr

func (e *Iterator) CompactValuePtr() *compactPE

func (*Iterator) Valid

func (e *Iterator) Valid() bool

func (*Iterator) Value

func (e *Iterator) Value() PartialExtent

type LBA

type LBA uint64

type LeaseResponse added in v0.0.2

type LeaseResponse struct {
	LeaseID   string         `json:"lease_id"`
	VolumeID  string         `json:"volume_id"`
	Nonce     string         `json:"nonce"`
	Metadata  map[string]any `json:"metadata,omitempty"`
	CreatedAt time.Time      `json:"created_at"`
	ExpiresAt *time.Time     `json:"expires_at,omitempty"`
}

type ListVolumesResponse

type ListVolumesResponse struct {
	Volumes    []VolumeInfoResponse `json:"volumes"`
	NextCursor string               `json:"next_cursor,omitempty"`
}

type LocalFile

type LocalFile struct {
	// contains filtered or unexported fields
}

func OpenLocalFile

func OpenLocalFile(path string, log *slog.Logger) (*LocalFile, error)

func (*LocalFile) Close

func (l *LocalFile) Close() error

func (*LocalFile) Layout

func (l *LocalFile) Layout(_ context.Context) (*SegmentLayout, error)

func (*LocalFile) ReadAt

func (l *LocalFile) ReadAt(b []byte, off int64) (int, error)

type LocalFileAccess

type LocalFileAccess struct {
	Log *slog.Logger
	Dir string
}

func (*LocalFileAccess) AppendSegment

func (l *LocalFileAccess) AppendSegment(ctx context.Context, vol string, seg SegmentId, f *os.File) error

func (*LocalFileAccess) AppendToSegments

func (l *LocalFileAccess) AppendToSegments(ctx context.Context, vol string, seg SegmentId) error

func (*LocalFileAccess) GetVolumeInfo

func (l *LocalFileAccess) GetVolumeInfo(ctx context.Context, vol string) (*VolumeInfo, error)

func (*LocalFileAccess) InitContainer

func (l *LocalFileAccess) InitContainer(ctx context.Context) error

func (*LocalFileAccess) InitVolume

func (l *LocalFileAccess) InitVolume(ctx context.Context, vol *VolumeInfo) error

func (*LocalFileAccess) ListSegments

func (l *LocalFileAccess) ListSegments(ctx context.Context, vol string) ([]SegmentId, error)

func (*LocalFileAccess) ListVolumes

func (l *LocalFileAccess) ListVolumes(ctx context.Context) ([]string, error)

func (*LocalFileAccess) OpenSegment

func (l *LocalFileAccess) OpenSegment(ctx context.Context, _ string, seg SegmentId) (SegmentReader, error)

func (*LocalFileAccess) OpenVolume

func (l *LocalFileAccess) OpenVolume(ctx context.Context, vol string) (Volume, error)

func (*LocalFileAccess) ReadMetadata

func (l *LocalFileAccess) ReadMetadata(ctx context.Context, vol, name string) (io.ReadCloser, error)

func (*LocalFileAccess) RemoveSegment

func (l *LocalFileAccess) RemoveSegment(ctx context.Context, seg SegmentId) error

func (*LocalFileAccess) RemoveSegmentFromVolume

func (l *LocalFileAccess) RemoveSegmentFromVolume(ctx context.Context, vol string, seg SegmentId) error

func (*LocalFileAccess) UploadSegment

func (l *LocalFileAccess) UploadSegment(ctx context.Context, seg SegmentId, f *os.File) error

func (*LocalFileAccess) WriteMetadata

func (l *LocalFileAccess) WriteMetadata(ctx context.Context, vol, name string) (io.WriteCloser, error)

func (*LocalFileAccess) WriteSegment

func (l *LocalFileAccess) WriteSegment(ctx context.Context, seg SegmentId) (io.WriteCloser, error)

type LocalVolume

type LocalVolume struct {
	Log  *slog.Logger
	Dir  string
	Name string
}

func (*LocalVolume) AppendSegment

func (l *LocalVolume) AppendSegment(ctx context.Context, seg SegmentId, f *os.File) error

func (*LocalVolume) Info

func (l *LocalVolume) Info(_ context.Context) (*VolumeInfo, error)

func (*LocalVolume) ListSegments

func (l *LocalVolume) ListSegments(_ context.Context) ([]SegmentId, error)

func (*LocalVolume) NewSegment

func (l *LocalVolume) NewSegment(ctx context.Context, seg SegmentId, layout *SegmentLayout, data *os.File) error

func (*LocalVolume) OpenSegment

func (l *LocalVolume) OpenSegment(_ context.Context, seg SegmentId) (SegmentReader, error)

func (*LocalVolume) RemoveSegment

func (l *LocalVolume) RemoveSegment(_ context.Context, seg SegmentId) error

type Mask

type Mask struct {
	// contains filtered or unexported fields
}

func (*Mask) Cover

func (m *Mask) Cover(x Extent) error

func (*Mask) Holes

func (h *Mask) Holes() []Extent

type MetricSnapshot

type MetricSnapshot struct {
	BlocksWritten   int64
	BlocksRead      int64
	IOPS            int64
	SegmentsWritten int64
}

func GetMetrics

func GetMetrics() MetricSnapshot

type NATSConnector

type NATSConnector struct {
	// contains filtered or unexported fields
}

func NewNATSConnector

func NewNATSConnector(log *slog.Logger, d *Disk, url, id string) (*NATSConnector, error)

func (*NATSConnector) Start

func (n *NATSConnector) Start(ctx context.Context) error

type NBDBackendOpen

type NBDBackendOpen struct {
	Ctx  context.Context
	Log  *slog.Logger
	Disk *Disk
}

func (*NBDBackendOpen) Close

func (n *NBDBackendOpen) Close(b nbd.Backend)

func (*NBDBackendOpen) Open

func (n *NBDBackendOpen) Open() nbd.Backend

type Option

type Option func(o *opts)

func AfterNewSegment

func AfterNewSegment(f func(SegmentId)) Option

func AutoCreate

func AutoCreate(ok bool) Option

func ReadOnly

func ReadOnly() Option

func WithLowerLayer

func WithLowerLayer(d *Disk) Option

func WithSegmentAccess

func WithSegmentAccess(sa SegmentAccess) Option

func WithSeqGen

func WithSeqGen(f func() ulid.ULID) Option

func WithVolumeName

func WithVolumeName(name string) Option

func WithZstd

func WithZstd() Option

type Packer

type Packer struct {
	// contains filtered or unexported fields
}

func (*Packer) Pack

func (p *Packer) Pack(gctx context.Context) error

type PartialExtent

type PartialExtent struct {
	Live           Extent `json:"live" cbor:"1,keyasint"`
	ExtentLocation `json:"location" cbor:"2,keyasint"`
}

func (*PartialExtent) String

func (r *PartialExtent) String() string

type PreviousCache

type PreviousCache struct {
	// contains filtered or unexported fields
}

PreviousCache manages holding onto a single segment creator as the previous cache. It uses reference counting to ensure the SegmentCreator is not closed while reads are still in progress.

func NewPreviousCache

func NewPreviousCache() *PreviousCache

func (*PreviousCache) Acquire added in v0.0.3

func (p *PreviousCache) Acquire() *SegmentCreator

Acquire returns the current SegmentCreator and increments the reference count. The caller MUST call Release() when done using the SegmentCreator. Returns nil if there is no previous cache.

func (*PreviousCache) Clear

func (p *PreviousCache) Clear()

Clear waits for all active readers to finish and then clears the cache. After this returns, no readers will be able to access the old SegmentCreator.

func (*PreviousCache) Load

func (p *PreviousCache) Load() *SegmentCreator

Load returns the current SegmentCreator without incrementing the reference count. This should only be used when the caller does not need to perform I/O operations on the returned SegmentCreator, as it may be closed at any time.

func (*PreviousCache) Release added in v0.0.3

func (p *PreviousCache) Release()

Release decrements the reference count after the caller is done using the SegmentCreator obtained from Acquire(). Must be called exactly once for each successful Acquire() call that returned a non-nil value.

func (*PreviousCache) SetWhenClear

func (p *PreviousCache) SetWhenClear(sc *SegmentCreator)

type RangeCache

type RangeCache struct {
	// contains filtered or unexported fields
}

func NewRangeCache

func NewRangeCache(opts RangeCacheOptions) (*RangeCache, error)

func (*RangeCache) CachePositions

func (r *RangeCache) CachePositions(ctx context.Context, seg SegmentId, total, off int64, ret []CachePosition) ([]CachePosition, error)

func (*RangeCache) Close

func (r *RangeCache) Close() error

func (*RangeCache) ReadAt

func (r *RangeCache) ReadAt(ctx context.Context, seg SegmentId, buf []byte, off int64) (int, error)

type RangeCacheOptions

type RangeCacheOptions struct {
	Path      string
	ChunkSize int64
	MaxSize   int64
	Fetch     func(ctx context.Context, seg SegmentId, data []byte, off int64) error
}

type RangeData

type RangeData struct {
	Extent
	// contains filtered or unexported fields
}

func MapRangeData

func MapRangeData(ext Extent, srcData []byte) RangeData

func NewRangeData

func NewRangeData(ctx *Context, ext Extent) RangeData

func (RangeData) Append

func (r RangeData) Append(o RangeData) RangeData

func (*RangeData) CopyTo

func (e *RangeData) CopyTo(data []byte) error

func (*RangeData) EmptyP

func (r *RangeData) EmptyP() bool

func (*RangeData) RawBlocks

func (e *RangeData) RawBlocks() RawBlocks

func (*RangeData) ReadData

func (r *RangeData) ReadData() []byte

func (RangeData) Reader

func (r RangeData) Reader() io.Reader

func (*RangeData) SubRange

func (r *RangeData) SubRange(ext Extent) (RangeDataView, bool)

func (*RangeData) View

func (r *RangeData) View() RangeDataView

func (*RangeData) WriteData

func (r *RangeData) WriteData() []byte

type RangeDataView

type RangeDataView struct {
	Extent
	// contains filtered or unexported fields
}

func (RangeDataView) ByteSize

func (v RangeDataView) ByteSize() int

func (RangeDataView) Copy

func (d RangeDataView) Copy(s RangeDataView) int

func (RangeDataView) EmptyP

func (v RangeDataView) EmptyP() bool

func (RangeDataView) ReadData

func (v RangeDataView) ReadData() []byte

func (RangeDataView) SubRange

func (r RangeDataView) SubRange(ext Extent) (RangeDataView, bool)

func (RangeDataView) WriteData

func (v RangeDataView) WriteData() []byte

type RawBlocks

type RawBlocks []byte

func BlockDataView

func BlockDataView(blk []byte) RawBlocks

func (RawBlocks) BlockView

func (e RawBlocks) BlockView(cnt int) []byte

func (RawBlocks) Blocks

func (e RawBlocks) Blocks() uint32

func (RawBlocks) MapTo

func (e RawBlocks) MapTo(lba LBA) RangeData

type ReadExtenter

type ReadExtenter interface {
	ReadExtent(ctx context.Context, ext Extent) (RangeData, error)
}

type ReaderAtAsReader

type ReaderAtAsReader struct {
	// contains filtered or unexported fields
}

func (*ReaderAtAsReader) Read

func (r *ReaderAtAsReader) Read(b []byte) (int, error)

type ReconcileResult

type ReconcileResult struct {
	TotalPrimary   int
	TotalReplica   int
	Missing        int
	Uploaded       int
	Failed         int
	FailedSegments []SegmentId
}

ReconcileResult contains the results of a reconciliation operation

type ReleaseLeaseRequest added in v0.0.2

type ReleaseLeaseRequest struct {
	Nonce string `json:"nonce,omitempty"`
}

type S3Access

type S3Access struct {
	// contains filtered or unexported fields
}

func NewS3Access

func NewS3Access(log *slog.Logger, host, bucket string, cfg aws.Config) (*S3Access, error)

func (*S3Access) AppendSegment

func (s *S3Access) AppendSegment(ctx context.Context, vol string, seg SegmentId, f *os.File) error

func (*S3Access) AppendToSegments

func (s *S3Access) AppendToSegments(ctx context.Context, vol string, seg SegmentId) error

func (*S3Access) GetVolumeInfo

func (s *S3Access) GetVolumeInfo(ctx context.Context, vol string) (*VolumeInfo, error)

func (*S3Access) InitContainer

func (s *S3Access) InitContainer(ctx context.Context) error

func (*S3Access) InitVolume

func (s *S3Access) InitVolume(ctx context.Context, vol *VolumeInfo) error

func (*S3Access) ListSegments

func (s *S3Access) ListSegments(ctx context.Context, vol string) ([]SegmentId, error)

func (*S3Access) ListVolumes

func (s *S3Access) ListVolumes(ctx context.Context) ([]string, error)

func (*S3Access) OpenSegment

func (s *S3Access) OpenSegment(ctx context.Context, _ string, seg SegmentId) (SegmentReader, error)

func (*S3Access) OpenVolume

func (s *S3Access) OpenVolume(ctx context.Context, name string) (Volume, error)

func (*S3Access) ReadMetadata

func (s *S3Access) ReadMetadata(ctx context.Context, volName, name string) (io.ReadCloser, error)

func (*S3Access) RemoveSegment

func (s *S3Access) RemoveSegment(ctx context.Context, seg SegmentId) error

func (*S3Access) RemoveSegmentFromVolume

func (s *S3Access) RemoveSegmentFromVolume(ctx context.Context, vol string, seg SegmentId) error

func (*S3Access) UploadSegment

func (s *S3Access) UploadSegment(ctx context.Context, seg SegmentId, f *os.File) error

func (*S3Access) WriteMetadata

func (s *S3Access) WriteMetadata(ctx context.Context, volName, name string) (io.WriteCloser, error)

func (*S3Access) WriteSegment

func (s *S3Access) WriteSegment(ctx context.Context, seg SegmentId) (io.WriteCloser, error)

type S3ObjectReader

type S3ObjectReader struct {
	// contains filtered or unexported fields
}

func (*S3ObjectReader) Close

func (s *S3ObjectReader) Close() error

func (*S3ObjectReader) Layout

func (s *S3ObjectReader) Layout(ctx context.Context) (*SegmentLayout, error)

func (*S3ObjectReader) ReadAt

func (s *S3ObjectReader) ReadAt(dest []byte, off int64) (int, error)

type S3Volume

type S3Volume struct {
	// contains filtered or unexported fields
}

func (*S3Volume) AppendSegment

func (s *S3Volume) AppendSegment(ctx context.Context, seg SegmentId, f *os.File) error

func (*S3Volume) Info

func (s *S3Volume) Info(ctx context.Context) (*VolumeInfo, error)

func (*S3Volume) ListSegments

func (s *S3Volume) ListSegments(ctx context.Context) ([]SegmentId, error)

func (*S3Volume) NewSegment

func (s *S3Volume) NewSegment(ctx context.Context, seg SegmentId, layout *SegmentLayout, data *os.File) error

func (*S3Volume) OpenSegment

func (s *S3Volume) OpenSegment(ctx context.Context, seg SegmentId) (SegmentReader, error)

func (*S3Volume) RemoveSegment

func (s *S3Volume) RemoveSegment(ctx context.Context, seg SegmentId) error

type Segment

type Segment struct {
	Size    uint64
	Used    uint64
	Extents int // Number of extents in this segment
	// contains filtered or unexported fields
}

func (*Segment) Density

func (s *Segment) Density() float64

type SegmentAccess

type SegmentAccess interface {
	InitContainer(ctx context.Context) error
	InitVolume(ctx context.Context, vol *VolumeInfo) error
	ListVolumes(ctx context.Context) ([]string, error)
	RemoveSegment(ctx context.Context, seg SegmentId) error

	OpenVolume(ctx context.Context, vol string) (Volume, error)
	GetVolumeInfo(ctx context.Context, vol string) (*VolumeInfo, error)
}

func ReplicaWriter

func ReplicaWriter(log *slog.Logger, primary, replica SegmentAccess) SegmentAccess

type SegmentAccessWithID added in v0.0.2

type SegmentAccessWithID interface {
	SegmentAccess
	InitVolumeWithID(ctx context.Context, vol *VolumeInfo) (string, error)
}

SegmentAccessWithID extends SegmentAccess with InitVolumeWithID for implementations that can return server-generated volume IDs.

type SegmentBuilder

type SegmentBuilder struct {
	// contains filtered or unexported fields
}

func NewSegmentBuilder

func NewSegmentBuilder() *SegmentBuilder

func (*SegmentBuilder) BodySize

func (o *SegmentBuilder) BodySize() int

func (*SegmentBuilder) Close

func (o *SegmentBuilder) Close(log *slog.Logger) error

func (*SegmentBuilder) Flush

func (o *SegmentBuilder) Flush(ctx context.Context, log *slog.Logger,
	vol Volume, seg SegmentId, volName string,
) ([]ExtentLocation, *SegmentStats, error)

func (*SegmentBuilder) OpenP

func (o *SegmentBuilder) OpenP() bool

func (*SegmentBuilder) OpenWrite

func (o *SegmentBuilder) OpenWrite(path string, log *slog.Logger) error

func (*SegmentBuilder) Reset

func (s *SegmentBuilder) Reset()

func (*SegmentBuilder) ShouldFlush

func (o *SegmentBuilder) ShouldFlush(sizeThreshold int) FlushReason

func (*SegmentBuilder) Sync

func (o *SegmentBuilder) Sync() error

func (*SegmentBuilder) WriteExtent

func (o *SegmentBuilder) WriteExtent(log *slog.Logger, ext RangeDataView) ([]byte, ExtentHeader, error)

func (*SegmentBuilder) ZeroBlocks

func (o *SegmentBuilder) ZeroBlocks(rng Extent) error

type SegmentCreator

type SegmentCreator struct {
	// contains filtered or unexported fields
}

func NewSegmentCreator

func NewSegmentCreator(log *slog.Logger, vol, path string) (*SegmentCreator, error)

func (*SegmentCreator) AvgStorageRatio

func (o *SegmentCreator) AvgStorageRatio() float64

func (*SegmentCreator) BodySize

func (o *SegmentCreator) BodySize() int

func (*SegmentCreator) Close

func (o *SegmentCreator) Close() error

func (*SegmentCreator) CompressionRate

func (o *SegmentCreator) CompressionRate() float64

func (*SegmentCreator) CompressionRateHistogram

func (o *SegmentCreator) CompressionRateHistogram() []int

func (*SegmentCreator) EmptyBlocks

func (o *SegmentCreator) EmptyBlocks() int

func (*SegmentCreator) EmptyP

func (o *SegmentCreator) EmptyP() bool

func (*SegmentCreator) Entries

func (o *SegmentCreator) Entries() int

func (*SegmentCreator) FillExtent

func (o *SegmentCreator) FillExtent(ctx *Context, data RangeDataView) ([]Extent, error)

FillExtent attempts to fill as much of +data+ as possible, returning a list of Extents that was unable to fill. That later list is then feed to the system that reads data from segments.

func (*SegmentCreator) Flush

func (*SegmentCreator) InputBytes

func (o *SegmentCreator) InputBytes() int64

func (*SegmentCreator) ShouldFlush

func (o *SegmentCreator) ShouldFlush(sizeThreshold int) FlushReason

func (*SegmentCreator) StorageBytes

func (o *SegmentCreator) StorageBytes() int64

func (*SegmentCreator) StorageRatio

func (o *SegmentCreator) StorageRatio() float64

func (*SegmentCreator) TotalBlocks

func (o *SegmentCreator) TotalBlocks() int

func (*SegmentCreator) UseZstd

func (o *SegmentCreator) UseZstd()

func (*SegmentCreator) WriteExtent

func (o *SegmentCreator) WriteExtent(ext RangeData) error

func (*SegmentCreator) ZeroBlocks

func (o *SegmentCreator) ZeroBlocks(rng Extent) error

type SegmentDownloadResponse

type SegmentDownloadResponse struct {
	DownloadURL string    `json:"download_url"`
	ExpiresAt   time.Time `json:"expires_at"`
	Size        int64     `json:"size"`
	MD5         string    `json:"md5"`
}

type SegmentHeader

type SegmentHeader struct {
	ExtentCount uint32
	DataOffset  uint32
}

func ReadSegmentHeader

func ReadSegmentHeader(path string) (*SegmentHeader, error)

func (*SegmentHeader) Read

func (s *SegmentHeader) Read(r io.Reader) error

func (SegmentHeader) Write

func (s SegmentHeader) Write(w io.Writer) error

type SegmentId

type SegmentId ulid.ULID

func ParseSegment

func ParseSegment(str string) (SegmentId, error)

func ReadSegmentList

func ReadSegmentList(f io.Reader) ([]SegmentId, error)

func (SegmentId) String

func (s SegmentId) String() string

func (SegmentId) Valid

func (s SegmentId) Valid() bool

type SegmentLayout

type SegmentLayout struct {
	// contains filtered or unexported fields
}

func (*SegmentLayout) Extents

func (v *SegmentLayout) Extents() []ExternalExtentHeader

func (*SegmentLayout) HasExtents

func (v *SegmentLayout) HasExtents() bool

func (*SegmentLayout) MarshalCBOR

func (v *SegmentLayout) MarshalCBOR() ([]byte, error)

func (*SegmentLayout) MarshalJSON

func (v *SegmentLayout) MarshalJSON() ([]byte, error)

func (*SegmentLayout) SetExtents

func (v *SegmentLayout) SetExtents(extents []ExternalExtentHeader)

func (*SegmentLayout) UnmarshalCBOR

func (v *SegmentLayout) UnmarshalCBOR(data []byte) error

func (*SegmentLayout) UnmarshalJSON

func (v *SegmentLayout) UnmarshalJSON(data []byte) error

type SegmentReader

type SegmentReader interface {
	io.ReaderAt
	io.Closer

	Layout(ctx context.Context) (*SegmentLayout, error)
}

type SegmentReconciler

type SegmentReconciler struct {
	// contains filtered or unexported fields
}

SegmentReconciler syncs segments from primary to replica

func NewSegmentReconciler

func NewSegmentReconciler(log *slog.Logger, primary, replica Volume) *SegmentReconciler

NewSegmentReconciler creates a new segment reconciler

func (*SegmentReconciler) Reconcile

func (r *SegmentReconciler) Reconcile(ctx context.Context) (*ReconcileResult, error)

Reconcile finds segments in primary that are missing from replica and uploads them

type SegmentStats

type SegmentStats struct {
	Blocks     uint64
	TotalBytes uint64
	DataOffset uint32
}

type SegmentUploadRequest

type SegmentUploadRequest struct {
	LsvdID   string `json:"lsvd_id"`
	VolumeID string `json:"volume_id,omitempty"`
}

API request/response types based on the OpenAPI spec

type SegmentUploadResponse

type SegmentUploadResponse struct {
	SegmentID    string    `json:"segment_id"`
	UploadURL    string    `json:"upload_url"`
	CompletedURL string    `json:"completed_url"`
	ExpiresAt    time.Time `json:"expires_at"`
}

type Segments

type Segments struct {
	// contains filtered or unexported fields
}

func NewSegments

func NewSegments() *Segments

func (*Segments) AllDeadSegments

func (d *Segments) AllDeadSegments() ([]SegmentId, error)

func (*Segments) Clear

func (s *Segments) Clear()

Clear removes all segments

func (*Segments) Create

func (s *Segments) Create(segId SegmentId, stats *SegmentStats)

func (*Segments) CreateOrUpdate

func (s *Segments) CreateOrUpdate(segId SegmentId, usedBlocks uint64)

func (*Segments) CreateWithExtents

func (s *Segments) CreateWithExtents(segId SegmentId, stats *SegmentStats, extents int)

CreateWithExtents creates a segment with a specific extent count

func (*Segments) FindDeleted

func (s *Segments) FindDeleted() []SegmentId

func (*Segments) FindSmallSegments

func (s *Segments) FindSmallSegments(cutoff, max uint64) []SegmentId

func (*Segments) IncrementSegment

func (s *Segments) IncrementSegment(segId SegmentId, blocks uint64)

func (*Segments) LeastDenseSegment

func (d *Segments) LeastDenseSegment(log *slog.Logger) (SegmentId, uint64, bool, error)

func (*Segments) LiveSegments

func (s *Segments) LiveSegments() []SegmentId

func (*Segments) LogSegmentInfo

func (s *Segments) LogSegmentInfo(log hclog.Logger)

func (*Segments) PickSegmentToGC

func (d *Segments) PickSegmentToGC(log *slog.Logger, min float64, skip []SegmentId) (SegmentId, bool, error)

func (*Segments) PruneDeadSegments

func (s *Segments) PruneDeadSegments() (int, float64)

func (*Segments) SegmentBlocks

func (s *Segments) SegmentBlocks(seg SegmentId) (uint64, uint64)

func (*Segments) SegmentIds

func (s *Segments) SegmentIds() []SegmentId

func (*Segments) SegmentInfo

func (s *Segments) SegmentInfo(seg SegmentId) (uint64, uint64, int)

SegmentInfo returns size, used blocks, and extent count for a segment

func (*Segments) SetDeleted

func (s *Segments) SetDeleted(segId SegmentId, log *slog.Logger)

func (*Segments) SetSegment

func (s *Segments) SetSegment(segId SegmentId, total, used uint64)

func (*Segments) TotalBlocks

func (s *Segments) TotalBlocks() uint64

func (*Segments) TotalBytes

func (s *Segments) TotalBytes() uint64

func (*Segments) UpdateUsage

func (s *Segments) UpdateUsage(log *slog.Logger, self SegmentId, affected []PartialExtent)

func (*Segments) Usage

func (s *Segments) Usage() float64

func (*Segments) UsedBlocks

func (s *Segments) UsedBlocks() uint64

type StatsMessage

type StatsMessage struct {
	Id          string    `json:"id" cbor:"10,keyasint"`
	PublishTime time.Time `json:"published_at" cbor:"1,keyasint"`
	GCCycles    int64     `json:"gc_cycles" cbor:"2,keyasint"`
	GCTime      float64   `json:"gc_time" cbor:"3,keyasint"`
	Extents     int64     `json:"extents" cbor:"4,keyasint"`
	Density     float64   `json:"density" cbor:"5,keyasint"`
}

type TortureBlockHash

type TortureBlockHash [8]byte

TortureBlockHash is a compact representation of block contents for verification

func HashBlock

func HashBlock(data []byte) TortureBlockHash

type TortureConfig

type TortureConfig struct {
	Seed               int64            `json:"Seed"`
	Operations         int              `json:"Operations"`
	MaxLBA             LBA              `json:"MaxLBA"`
	MaxBlocks          uint32           `json:"MaxBlocks"`
	Weights            TortureOpWeights `json:"Weights"`
	OverlapProbability float64          `json:"OverlapProbability"`
	VerifyEvery        int              `json:"VerifyEvery"`
	PatternWeights     [4]int           `json:"PatternWeights"` // random, zero, compressible, sequential
}

TortureConfig contains all configuration for a torture test run

func DecodeTortureConfig

func DecodeTortureConfig(encoded string) (TortureConfig, error)

DecodeTortureConfig decodes a base64 JSON string back to a TortureConfig

type TortureDataPattern

type TortureDataPattern int

TortureDataPattern represents a data pattern for write operations

const (
	TorturePatternRandom TortureDataPattern = iota
	TorturePatternZero
	TorturePatternCompressible
	TorturePatternSequential
)

type TortureDiskModel

type TortureDiskModel struct {
	// contains filtered or unexported fields
}

TortureDiskModel is a simple reference model tracking expected disk state

func NewTortureDiskModel

func NewTortureDiskModel() *TortureDiskModel

func (*TortureDiskModel) BlockCount

func (m *TortureDiskModel) BlockCount() int

func (*TortureDiskModel) ExpectedHash

func (m *TortureDiskModel) ExpectedHash(lba LBA) (TortureBlockHash, bool)

func (*TortureDiskModel) HasBlock

func (m *TortureDiskModel) HasBlock(lba LBA) bool

func (*TortureDiskModel) WriteExtent

func (m *TortureDiskModel) WriteExtent(lba LBA, data []byte)

func (*TortureDiskModel) WrittenLBAs

func (m *TortureDiskModel) WrittenLBAs() []LBA

func (*TortureDiskModel) ZeroBlocks

func (m *TortureDiskModel) ZeroBlocks(lba LBA, blocks uint32)

type TortureGenerator

type TortureGenerator struct {
	// contains filtered or unexported fields
}

TortureGenerator generates deterministic operation sequences

func NewTortureGenerator

func NewTortureGenerator(cfg TortureConfig) *TortureGenerator

func (*TortureGenerator) Next

type TortureOpType

type TortureOpType int

TortureOpType represents a type of operation in the torture test

const (
	TortureOpWrite TortureOpType = iota
	TortureOpRead
	TortureOpZero
	TortureOpSync
	TortureOpCloseReopen
)

func (TortureOpType) String

func (o TortureOpType) String() string

type TortureOpWeights

type TortureOpWeights struct {
	Write       int `json:"Write"`
	Read        int `json:"Read"`
	Zero        int `json:"Zero"`
	Sync        int `json:"Sync"`
	CloseReopen int `json:"CloseReopen"`
}

TortureOpWeights defines the probability weights for each operation type

type TortureOperation

type TortureOperation struct {
	Type     TortureOpType
	Extent   Extent
	DataSeed int64
	Pattern  TortureDataPattern
}

TortureOperation represents a single operation in the torture test

func (TortureOperation) String

func (o TortureOperation) String() string

type TortureResult

type TortureResult struct {
	Success    bool
	Operations int
	LBAsUsed   int
	Error      error
	History    []TortureOperation
}

TortureResult contains the result of a torture test run

type TortureRunner

type TortureRunner struct {
	// contains filtered or unexported fields
}

TortureRunner runs torture tests against an LSVD disk

func NewTortureRunner

func NewTortureRunner(gctx context.Context, log *slog.Logger, tmpDir string, cfg TortureConfig) (*TortureRunner, error)

NewTortureRunner creates a new torture test runner

func (*TortureRunner) Cleanup

func (r *TortureRunner) Cleanup()

Cleanup cleans up resources used by the runner

func (*TortureRunner) DumpHistory

func (r *TortureRunner) DumpHistory(last int)

DumpHistory writes the last N operations to the output writer

func (*TortureRunner) DumpHistoryRange

func (r *TortureRunner) DumpHistoryRange(start, end int)

DumpHistoryRange prints operations in the range [start, end)

func (*TortureRunner) Run

func (r *TortureRunner) Run() TortureResult

Run executes the torture test

func (*TortureRunner) SetOutput

func (r *TortureRunner) SetOutput(w io.Writer)

SetOutput sets the output writer for progress messages

type TortureVariation

type TortureVariation struct {
	Name    string
	Weights TortureOpWeights
	Overlap float64
	MaxLBA  LBA
}

TortureVariation defines a named torture test configuration variation

func DefaultTortureVariations

func DefaultTortureVariations() []TortureVariation

DefaultTortureVariations returns the standard set of torture test variations

type UlidRecall

type UlidRecall struct {
	Past []ulid.ULID
}

func (*UlidRecall) First

func (u *UlidRecall) First() ulid.ULID

func (*UlidRecall) Gen

func (u *UlidRecall) Gen() ulid.ULID

type Volume

type Volume interface {
	Info(ctx context.Context) (*VolumeInfo, error)
	ListSegments(ctx context.Context) ([]SegmentId, error)
	OpenSegment(ctx context.Context, seg SegmentId) (SegmentReader, error)
	NewSegment(ctx context.Context, seg SegmentId, layout *SegmentLayout, data *os.File) error
	RemoveSegment(ctx context.Context, seg SegmentId) error
}

type VolumeInfo

type VolumeInfo struct {
	Name     string         `json:"name"`
	Size     units.Bytes    `json:"size"`
	Parent   string         `json:"parent"`
	UUID     string         `json:"uuid"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

func (*VolumeInfo) Normalize

func (vol *VolumeInfo) Normalize() error

type VolumeInfoResponse

type VolumeInfoResponse struct {
	VolumeID     string    `json:"volume_id"`
	Name         string    `json:"name"`
	UUID         string    `json:"uuid"`
	DeclaredSize int64     `json:"declared_size"`
	DataFormat   string    `json:"data_format,omitempty"`
	AppFormat    string    `json:"app_format,omitempty"`
	CreatedAt    time.Time `json:"created_at"`
	UpdatedAt    time.Time `json:"updated_at"`
}

type VolumeResponse

type VolumeResponse struct {
	VolumeID  string `json:"volume_id"`
	VersionID string `json:"version_id"`
}

Directories

Path Synopsis
cmd
lsvd command
torture command
lve
cli
cmd/lve command
pkg
id
nbd
nbdnl
Package nbdnl controls the Linux NBD driver via netlink.
Package nbdnl controls the Linux NBD driver via netlink.
treemap
Package treemap provides a generic key-sorted map.
Package treemap provides a generic key-sorted map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL