diskio

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EnsureLbdDevices

func EnsureLbdDevices(log *slog.Logger) error

EnsureLbdDevices checks if the lbd kernel module and lbdctl are available.

func EnsureLoopDevices

func EnsureLoopDevices(log *slog.Logger) error

EnsureLoopDevices ensures /dev/loop-control and loop device nodes are available. In containers, /dev may be a fresh mount that doesn't include loop devices even when the kernel supports them. This function creates the device nodes via mknod if they're missing.

func LoopDeviceAvailable

func LoopDeviceAvailable() bool

LoopDeviceAvailable checks if loop devices can be used.

Types

type ActiveMount

type ActiveMount struct {
	Device    string
	MountPath string
}

ActiveMount describes a mount found on the running system.

type CloudDiskClient

type CloudDiskClient interface {
	AcquireLease(ctx context.Context, volumeID string) (nonce string, err error)
	ReleaseLease(ctx context.Context, volumeID string, nonce string) error
	ListLogSegments(ctx context.Context, volumeID string) ([]LogSegmentInfo, error)
	DownloadLogSegment(ctx context.Context, volumeID, segmentID string) (io.ReadCloser, error)
}

CloudDiskClient abstracts the cloud operations needed by the mount controller for volume lease management and log segment retrieval.

func NewCloudDiskClient

func NewCloudDiskClient(log *slog.Logger, baseURL string, authClient *cloudauth.AuthClient) CloudDiskClient

NewCloudDiskClient creates a new CloudDiskClient.

type CloudSegmentUploader

type CloudSegmentUploader struct {
	// contains filtered or unexported fields
}

CloudSegmentUploader implements LogSegmentUploader by uploading segments to miren.cloud using presigned URLs. It follows the same 3-step pattern as DiskAPISegmentAccess.NewSegment: request upload URL, PUT data, POST completion.

func NewCloudSegmentUploader

func NewCloudSegmentUploader(log *slog.Logger, baseURL string, authClient *cloudauth.AuthClient, state *State) *CloudSegmentUploader

func (*CloudSegmentUploader) UploadSegment

func (u *CloudSegmentUploader) UploadSegment(ctx context.Context, volumeID, segmentPath string) (string, error)

UploadSegment uploads a log segment file to the cloud and returns the cloud segment ID.

type DiskMountController

type DiskMountController struct {
	// contains filtered or unexported fields
}

DiskMountController watches disk_mount entities and manages loop-device mounts.

func NewDiskMountController

func NewDiskMountController(log *slog.Logger, dataPath, nodeId string, state *State, ops DiskMountOps) *DiskMountController

func (*DiskMountController) Index

func (c *DiskMountController) Index() entity.Attr

func (*DiskMountController) Init

func (*DiskMountController) Reconcile

func (c *DiskMountController) Reconcile(ctx context.Context, mount *storage_v1alpha.DiskMount, meta *entity.Meta) error

func (*DiskMountController) ReconcileWithEntities

func (c *DiskMountController) ReconcileWithEntities(ctx context.Context) error

ReconcileWithEntities reconciles local state with entity server

func (*DiskMountController) SetCloudClient

func (c *DiskMountController) SetCloudClient(client CloudDiskClient)

SetCloudClient sets the cloud client for lease management and segment replay.

func (*DiskMountController) SetEAC

func (*DiskMountController) SetKeepMounts

func (c *DiskMountController) SetKeepMounts(v bool)

SetKeepMounts tells the controller to skip unmounting during Shutdown. Used during reload so the replacement process inherits the mounts.

func (*DiskMountController) Shutdown

func (c *DiskMountController) Shutdown()

Shutdown releases cloud leases. Mount/detach cleanup is handled by DiskVolumeController.Shutdown which owns all mount lifecycle. If keepMounts is set (reload), everything is skipped.

type DiskMountOps

type DiskMountOps interface {
	CreateDir(path string, perm os.FileMode) error
	RemoveFile(path string) error
	LoopAttach(imagePath string) (devicePath string, err error)
	LoopDetach(devicePath string) error
	LbdAttach(ctx context.Context, imagePath, logDir string) (devicePath string, err error)
	LbdDetach(ctx context.Context, devicePath string) error
	LbdAvailable() bool
	Mount(device, mountPath, filesystem string, readOnly bool) error
	Unmount(path string) error
	IsMounted(path string) bool
	IsFormatted(ctx context.Context, device, filesystem string) (bool, error)
	FormatDevice(ctx context.Context, device, filesystem string) error

	// FindMounts returns all mounts whose mount path starts with the given prefix.
	FindMounts(pathPrefix string) []ActiveMount
}

DiskMountOps abstracts OS operations for disk mount management. This interface enables testing without requiring actual loop device or mount operations.

func NewRealDiskMountOps

func NewRealDiskMountOps(log *slog.Logger) DiskMountOps

type DiskVolumeController

type DiskVolumeController struct {
	// contains filtered or unexported fields
}

DiskVolumeController watches disk_volume entities and manages sparse disk images using loop devices.

func NewDiskVolumeController

func NewDiskVolumeController(log *slog.Logger, dataPath, nodeId string, state *State, ops DiskVolumeOps, mntOps DiskMountOps) *DiskVolumeController

func (*DiskVolumeController) HasPendingMigration

func (c *DiskVolumeController) HasPendingMigration(ctx context.Context) bool

HasPendingMigration checks whether any disks have LSVD data that still needs to be migrated. This is used at startup to decide whether containers must be paused before reconciliation.

func (*DiskVolumeController) Index

func (c *DiskVolumeController) Index() entity.Attr

func (*DiskVolumeController) Init

func (*DiskVolumeController) Reconcile

func (c *DiskVolumeController) Reconcile(ctx context.Context, volume *storage_v1alpha.DiskVolume, meta *entity.Meta) error

func (*DiskVolumeController) ReconcileWithEntities

func (c *DiskVolumeController) ReconcileWithEntities(ctx context.Context) error

ReconcileWithEntities reconciles local state with entity server

func (*DiskVolumeController) SetEAC

func (*DiskVolumeController) SetKeepMounts

func (c *DiskVolumeController) SetKeepMounts(v bool)

SetKeepMounts tells the controller to skip unmounting during Shutdown. Used during reload so the replacement process inherits the mounts.

func (*DiskVolumeController) Shutdown

func (c *DiskVolumeController) Shutdown()

Shutdown unmounts all disk volumes and detaches their backing devices. It uses the actual kernel mount table rather than trusting persisted state, finding all mounts under diskMountBasePath and tearing them down. If keepMounts is set (reload), everything is left in place for the new process.

type DiskVolumeOps

type DiskVolumeOps interface {
	CreateVolumeDir(path string) error
	RemoveVolumeDir(path string) error
	VolumePathExists(path string) bool
	CreateDiskImage(path string, sizeBytes int64) error
	RemoveDiskImage(path string) error
}

DiskVolumeOps abstracts OS operations for disk volume management. This interface enables testing without requiring actual filesystem operations.

func NewRealDiskVolumeOps

func NewRealDiskVolumeOps(log *slog.Logger) DiskVolumeOps

type LogSegmentInfo

type LogSegmentInfo struct {
	SegmentID string
	Label     string
}

LogSegmentInfo describes a remote log segment with its cloud ID and TAI64N label.

type LogSegmentUploader

type LogSegmentUploader interface {
	// UploadSegment uploads a log segment and returns the cloud segment ID.
	UploadSegment(ctx context.Context, volumeID, segmentPath string) (segmentID string, err error)
}

LogSegmentUploader uploads completed log segments to cloud storage.

type LogWatcher

type LogWatcher struct {
	// contains filtered or unexported fields
}

LogWatcher monitors accelerator volume log directories for completed segments. When an uploader is configured, segments are uploaded then removed. When no uploader is configured (cloud not available), segments are simply deleted.

func NewLogWatcher

func NewLogWatcher(log *slog.Logger, state *State, uploader LogSegmentUploader, interval time.Duration) *LogWatcher

NewLogWatcher creates a new LogWatcher that scans at the given interval. Pass nil for uploader to just delete logs without uploading.

func (*LogWatcher) Run

func (w *LogWatcher) Run(ctx context.Context) error

Run starts the log watcher loop. It blocks until the context is cancelled.

func (*LogWatcher) Wait

func (w *LogWatcher) Wait()

Wait blocks until the watcher's Run method has returned.

type MountState

type MountState struct {
	// EntityId is the ID of the disk_mount entity
	EntityId string `json:"entity_id"`

	// VolumeId is the ID of the disk_volume entity
	VolumeId string `json:"volume_id"`

	// NbdIndex is the NBD device index (legacy, kept for state file compatibility)
	NbdIndex uint32 `json:"nbd_index"`

	// DevicePath is the path to the loop/NBD device node
	DevicePath string `json:"device_path"`

	// MountPath is where the volume is mounted
	MountPath string `json:"mount_path"`

	// Mounted indicates if the volume is currently mounted
	Mounted bool `json:"mounted"`

	// ReadOnly indicates if the mount is read-only
	ReadOnly bool `json:"read_only"`

	// Mode is the disk I/O mode used for this mount (universal or accelerator)
	Mode storage_v1alpha.DiskVolumeVolumeMode `json:"mode,omitempty"`

	// LeaseNonce is the volume lease nonce from remote Disk API
	LeaseNonce string `json:"lease_nonce,omitempty"`
}

MountState represents the state of a disk mount

type State

type State struct {
	Volumes map[string]*VolumeState `json:"volumes"`
	Mounts  map[string]*MountState  `json:"mounts"`
	// contains filtered or unexported fields
}

State represents the persisted state of disk volumes and mounts

func LoadState

func LoadState(dataPath string) (*State, error)

LoadState loads state from the data path. It tries the current filename first, then falls back to the legacy name for backward compatibility.

func NewState

func NewState() *State

NewState creates a new empty state

func (*State) DeleteMount

func (s *State) DeleteMount(entityId string)

DeleteMount removes a mount state

func (*State) DeleteMountAndSave

func (s *State) DeleteMountAndSave(entityId string) error

DeleteMountAndSave atomically removes a mount state and persists to disk.

func (*State) DeleteVolume

func (s *State) DeleteVolume(entityId string)

DeleteVolume removes a volume state

func (*State) DeleteVolumeAndSave

func (s *State) DeleteVolumeAndSave(entityId string) error

DeleteVolumeAndSave atomically removes a volume state and persists to disk.

func (*State) GetMount

func (s *State) GetMount(entityId string) *MountState

GetMount returns a copy of a mount state by entity ID

func (*State) GetVolume

func (s *State) GetVolume(entityId string) *VolumeState

GetVolume returns a copy of a volume state by entity ID

func (*State) GetVolumeByVolumeId

func (s *State) GetVolumeByVolumeId(volumeId string) *VolumeState

GetVolumeByVolumeId returns a copy of a volume state by volume ID

func (*State) ListMounts

func (s *State) ListMounts() []*MountState

ListMounts returns copies of all mount states

func (*State) ListVolumes

func (s *State) ListVolumes() []*VolumeState

ListVolumes returns copies of all volume states

func (*State) Save

func (s *State) Save() error

Save persists the state to disk atomically. Callers that need to mutate and save atomically should use the combined methods (SetVolumeAndSave, SetMountAndSave, etc.) instead.

func (*State) SetMount

func (s *State) SetMount(entityId string, mount *MountState)

SetMount sets a mount state

func (*State) SetMountAndSave

func (s *State) SetMountAndSave(entityId string, mount *MountState) error

SetMountAndSave atomically sets a mount state and persists to disk.

func (*State) SetMountFromVolume

func (s *State) SetMountFromVolume(volumeId string, mount *MountState) (devicePath, mountPath string, err error)

SetMountFromVolume atomically reads the current volume state and, if the volume is mounted, creates a mount entry using the volume's live device and mount paths. This avoids a TOCTOU race where the volume controller could update mount fields between a GetVolume call and a SetMount call. Returns the volume's DevicePath and MountPath on success, or an error if the volume is not found or not mounted.

func (*State) SetPath

func (s *State) SetPath(dataPath string)

SetPath sets the path for the state file

func (*State) SetVolume

func (s *State) SetVolume(entityId string, volume *VolumeState)

SetVolume sets a volume state

func (*State) SetVolumeAndSave

func (s *State) SetVolumeAndSave(entityId string, volume *VolumeState) error

SetVolumeAndSave atomically sets a volume state and persists to disk.

type VolumeState

type VolumeState struct {
	// EntityId is the ID of the disk_volume entity
	EntityId string `json:"entity_id"`

	// VolumeId is the volume identifier
	VolumeId string `json:"volume_id"`

	// Name is the human-readable name (from parent disk)
	Name string `json:"name,omitempty"`

	// DiskPath is the path to the volume data directory
	DiskPath string `json:"disk_path"`

	// SizeBytes is the volume size
	SizeBytes int64 `json:"size_bytes"`

	// Filesystem type (ext4, xfs, btrfs)
	Filesystem string `json:"filesystem"`

	// RemoteOnly indicates if this uses only remote storage
	RemoteOnly bool `json:"remote_only"`

	// Mode is the disk I/O mode (universal or accelerator)
	Mode storage_v1alpha.DiskVolumeVolumeMode `json:"mode,omitempty"`

	// DevicePath is the loop device backing this volume (alwaysMount modes only)
	DevicePath string `json:"device_path,omitempty"`

	// MountPath is where the volume is mounted (alwaysMount modes only)
	MountPath string `json:"mount_path,omitempty"`

	// Mounted indicates if the volume is currently mounted (alwaysMount modes only)
	Mounted bool `json:"mounted,omitempty"`
}

VolumeState represents the state of a disk volume

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL