Documentation
¶
Index ¶
- func EnsureLbdDevices(log *slog.Logger) error
- func EnsureLoopDevices(log *slog.Logger) error
- func LoopDeviceAvailable() bool
- type ActiveMount
- type CloudDiskClient
- type CloudSegmentUploader
- type DiskMountController
- func (c *DiskMountController) Index() entity.Attr
- func (c *DiskMountController) Init(ctx context.Context) error
- func (c *DiskMountController) Reconcile(ctx context.Context, mount *storage_v1alpha.DiskMount, meta *entity.Meta) error
- func (c *DiskMountController) ReconcileWithEntities(ctx context.Context) error
- func (c *DiskMountController) SetCloudClient(client CloudDiskClient)
- func (c *DiskMountController) SetEAC(eac *entityserver_v1alpha.EntityAccessClient)
- func (c *DiskMountController) SetKeepMounts(v bool)
- func (c *DiskMountController) Shutdown()
- type DiskMountOps
- type DiskVolumeController
- func (c *DiskVolumeController) HasPendingMigration(ctx context.Context) bool
- func (c *DiskVolumeController) Index() entity.Attr
- func (c *DiskVolumeController) Init(ctx context.Context) error
- func (c *DiskVolumeController) Reconcile(ctx context.Context, volume *storage_v1alpha.DiskVolume, meta *entity.Meta) error
- func (c *DiskVolumeController) ReconcileWithEntities(ctx context.Context) error
- func (c *DiskVolumeController) SetEAC(eac *entityserver_v1alpha.EntityAccessClient)
- func (c *DiskVolumeController) SetKeepMounts(v bool)
- func (c *DiskVolumeController) Shutdown()
- type DiskVolumeOps
- type LogSegmentInfo
- type LogSegmentUploader
- type LogWatcher
- type MountState
- type State
- func (s *State) DeleteMount(entityId string)
- func (s *State) DeleteMountAndSave(entityId string) error
- func (s *State) DeleteVolume(entityId string)
- func (s *State) DeleteVolumeAndSave(entityId string) error
- func (s *State) GetMount(entityId string) *MountState
- func (s *State) GetVolume(entityId string) *VolumeState
- func (s *State) GetVolumeByVolumeId(volumeId string) *VolumeState
- func (s *State) ListMounts() []*MountState
- func (s *State) ListVolumes() []*VolumeState
- func (s *State) Save() error
- func (s *State) SetMount(entityId string, mount *MountState)
- func (s *State) SetMountAndSave(entityId string, mount *MountState) error
- func (s *State) SetMountFromVolume(volumeId string, mount *MountState) (devicePath, mountPath string, err error)
- func (s *State) SetPath(dataPath string)
- func (s *State) SetVolume(entityId string, volume *VolumeState)
- func (s *State) SetVolumeAndSave(entityId string, volume *VolumeState) error
- type VolumeState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EnsureLbdDevices ¶
EnsureLbdDevices checks if the lbd kernel module and lbdctl are available.
func EnsureLoopDevices ¶
EnsureLoopDevices ensures /dev/loop-control and loop device nodes are available. In containers, /dev may be a fresh mount that doesn't include loop devices even when the kernel supports them. This function creates the device nodes via mknod if they're missing.
func LoopDeviceAvailable ¶
func LoopDeviceAvailable() bool
LoopDeviceAvailable checks if loop devices can be used.
Types ¶
type ActiveMount ¶
ActiveMount describes a mount found on the running system.
type CloudDiskClient ¶
type CloudDiskClient interface {
AcquireLease(ctx context.Context, volumeID string) (nonce string, err error)
ReleaseLease(ctx context.Context, volumeID string, nonce string) error
ListLogSegments(ctx context.Context, volumeID string) ([]LogSegmentInfo, error)
DownloadLogSegment(ctx context.Context, volumeID, segmentID string) (io.ReadCloser, error)
}
CloudDiskClient abstracts the cloud operations needed by the mount controller for volume lease management and log segment retrieval.
func NewCloudDiskClient ¶
func NewCloudDiskClient(log *slog.Logger, baseURL string, authClient *cloudauth.AuthClient) CloudDiskClient
NewCloudDiskClient creates a new CloudDiskClient.
type CloudSegmentUploader ¶
type CloudSegmentUploader struct {
// contains filtered or unexported fields
}
CloudSegmentUploader implements LogSegmentUploader by uploading segments to miren.cloud using presigned URLs. It follows the same 3-step pattern as DiskAPISegmentAccess.NewSegment: request upload URL, PUT data, POST completion.
func NewCloudSegmentUploader ¶
func NewCloudSegmentUploader(log *slog.Logger, baseURL string, authClient *cloudauth.AuthClient, state *State) *CloudSegmentUploader
func (*CloudSegmentUploader) UploadSegment ¶
func (u *CloudSegmentUploader) UploadSegment(ctx context.Context, volumeID, segmentPath string) (string, error)
UploadSegment uploads a log segment file to the cloud and returns the cloud segment ID.
type DiskMountController ¶
type DiskMountController struct {
// contains filtered or unexported fields
}
DiskMountController watches disk_mount entities and manages loop-device mounts.
func NewDiskMountController ¶
func NewDiskMountController(log *slog.Logger, dataPath, nodeId string, state *State, ops DiskMountOps) *DiskMountController
func (*DiskMountController) Index ¶
func (c *DiskMountController) Index() entity.Attr
func (*DiskMountController) Reconcile ¶
func (c *DiskMountController) Reconcile(ctx context.Context, mount *storage_v1alpha.DiskMount, meta *entity.Meta) error
func (*DiskMountController) ReconcileWithEntities ¶
func (c *DiskMountController) ReconcileWithEntities(ctx context.Context) error
ReconcileWithEntities reconciles local state with entity server
func (*DiskMountController) SetCloudClient ¶
func (c *DiskMountController) SetCloudClient(client CloudDiskClient)
SetCloudClient sets the cloud client for lease management and segment replay.
func (*DiskMountController) SetEAC ¶
func (c *DiskMountController) SetEAC(eac *entityserver_v1alpha.EntityAccessClient)
func (*DiskMountController) SetKeepMounts ¶
func (c *DiskMountController) SetKeepMounts(v bool)
SetKeepMounts tells the controller to skip unmounting during Shutdown. Used during reload so the replacement process inherits the mounts.
func (*DiskMountController) Shutdown ¶
func (c *DiskMountController) Shutdown()
Shutdown releases cloud leases. Mount/detach cleanup is handled by DiskVolumeController.Shutdown which owns all mount lifecycle. If keepMounts is set (reload), everything is skipped.
type DiskMountOps ¶
type DiskMountOps interface {
CreateDir(path string, perm os.FileMode) error
RemoveFile(path string) error
LoopAttach(imagePath string) (devicePath string, err error)
LoopDetach(devicePath string) error
LbdAttach(ctx context.Context, imagePath, logDir string) (devicePath string, err error)
LbdDetach(ctx context.Context, devicePath string) error
LbdAvailable() bool
Mount(device, mountPath, filesystem string, readOnly bool) error
Unmount(path string) error
IsMounted(path string) bool
IsFormatted(ctx context.Context, device, filesystem string) (bool, error)
FormatDevice(ctx context.Context, device, filesystem string) error
// FindMounts returns all mounts whose mount path starts with the given prefix.
FindMounts(pathPrefix string) []ActiveMount
}
DiskMountOps abstracts OS operations for disk mount management. This interface enables testing without requiring actual loop device or mount operations.
func NewRealDiskMountOps ¶
func NewRealDiskMountOps(log *slog.Logger) DiskMountOps
type DiskVolumeController ¶
type DiskVolumeController struct {
// contains filtered or unexported fields
}
DiskVolumeController watches disk_volume entities and manages sparse disk images using loop devices.
func NewDiskVolumeController ¶
func NewDiskVolumeController(log *slog.Logger, dataPath, nodeId string, state *State, ops DiskVolumeOps, mntOps DiskMountOps) *DiskVolumeController
func (*DiskVolumeController) HasPendingMigration ¶
func (c *DiskVolumeController) HasPendingMigration(ctx context.Context) bool
HasPendingMigration checks whether any disks have LSVD data that still needs to be migrated. This is used at startup to decide whether containers must be paused before reconciliation.
func (*DiskVolumeController) Index ¶
func (c *DiskVolumeController) Index() entity.Attr
func (*DiskVolumeController) Reconcile ¶
func (c *DiskVolumeController) Reconcile(ctx context.Context, volume *storage_v1alpha.DiskVolume, meta *entity.Meta) error
func (*DiskVolumeController) ReconcileWithEntities ¶
func (c *DiskVolumeController) ReconcileWithEntities(ctx context.Context) error
ReconcileWithEntities reconciles local state with entity server
func (*DiskVolumeController) SetEAC ¶
func (c *DiskVolumeController) SetEAC(eac *entityserver_v1alpha.EntityAccessClient)
func (*DiskVolumeController) SetKeepMounts ¶
func (c *DiskVolumeController) SetKeepMounts(v bool)
SetKeepMounts tells the controller to skip unmounting during Shutdown. Used during reload so the replacement process inherits the mounts.
func (*DiskVolumeController) Shutdown ¶
func (c *DiskVolumeController) Shutdown()
Shutdown unmounts all disk volumes and detaches their backing devices. It uses the actual kernel mount table rather than trusting persisted state, finding all mounts under diskMountBasePath and tearing them down. If keepMounts is set (reload), everything is left in place for the new process.
type DiskVolumeOps ¶
type DiskVolumeOps interface {
CreateVolumeDir(path string) error
RemoveVolumeDir(path string) error
VolumePathExists(path string) bool
CreateDiskImage(path string, sizeBytes int64) error
RemoveDiskImage(path string) error
}
DiskVolumeOps abstracts OS operations for disk volume management. This interface enables testing without requiring actual filesystem operations.
func NewRealDiskVolumeOps ¶
func NewRealDiskVolumeOps(log *slog.Logger) DiskVolumeOps
type LogSegmentInfo ¶
LogSegmentInfo describes a remote log segment with its cloud ID and TAI64N label.
type LogSegmentUploader ¶
type LogSegmentUploader interface {
// UploadSegment uploads a log segment and returns the cloud segment ID.
UploadSegment(ctx context.Context, volumeID, segmentPath string) (segmentID string, err error)
}
LogSegmentUploader uploads completed log segments to cloud storage.
type LogWatcher ¶
type LogWatcher struct {
// contains filtered or unexported fields
}
LogWatcher monitors accelerator volume log directories for completed segments. When an uploader is configured, segments are uploaded then removed. When no uploader is configured (cloud not available), segments are simply deleted.
func NewLogWatcher ¶
func NewLogWatcher(log *slog.Logger, state *State, uploader LogSegmentUploader, interval time.Duration) *LogWatcher
NewLogWatcher creates a new LogWatcher that scans at the given interval. Pass nil for uploader to just delete logs without uploading.
func (*LogWatcher) Run ¶
func (w *LogWatcher) Run(ctx context.Context) error
Run starts the log watcher loop. It blocks until the context is cancelled.
func (*LogWatcher) Wait ¶
func (w *LogWatcher) Wait()
Wait blocks until the watcher's Run method has returned.
type MountState ¶
type MountState struct {
// EntityId is the ID of the disk_mount entity
EntityId string `json:"entity_id"`
// VolumeId is the ID of the disk_volume entity
VolumeId string `json:"volume_id"`
// NbdIndex is the NBD device index (legacy, kept for state file compatibility)
NbdIndex uint32 `json:"nbd_index"`
// DevicePath is the path to the loop/NBD device node
DevicePath string `json:"device_path"`
// MountPath is where the volume is mounted
MountPath string `json:"mount_path"`
// Mounted indicates if the volume is currently mounted
Mounted bool `json:"mounted"`
// ReadOnly indicates if the mount is read-only
ReadOnly bool `json:"read_only"`
// Mode is the disk I/O mode used for this mount (universal or accelerator)
Mode storage_v1alpha.DiskVolumeVolumeMode `json:"mode,omitempty"`
// LeaseNonce is the volume lease nonce from remote Disk API
LeaseNonce string `json:"lease_nonce,omitempty"`
}
MountState represents the state of a disk mount
type State ¶
type State struct {
Volumes map[string]*VolumeState `json:"volumes"`
Mounts map[string]*MountState `json:"mounts"`
// contains filtered or unexported fields
}
State represents the persisted state of disk volumes and mounts
func LoadState ¶
LoadState loads state from the data path. It tries the current filename first, then falls back to the legacy name for backward compatibility.
func (*State) DeleteMount ¶
DeleteMount removes a mount state
func (*State) DeleteMountAndSave ¶
DeleteMountAndSave atomically removes a mount state and persists to disk.
func (*State) DeleteVolume ¶
DeleteVolume removes a volume state
func (*State) DeleteVolumeAndSave ¶
DeleteVolumeAndSave atomically removes a volume state and persists to disk.
func (*State) GetMount ¶
func (s *State) GetMount(entityId string) *MountState
GetMount returns a copy of a mount state by entity ID
func (*State) GetVolume ¶
func (s *State) GetVolume(entityId string) *VolumeState
GetVolume returns a copy of a volume state by entity ID
func (*State) GetVolumeByVolumeId ¶
func (s *State) GetVolumeByVolumeId(volumeId string) *VolumeState
GetVolumeByVolumeId returns a copy of a volume state by volume ID
func (*State) ListMounts ¶
func (s *State) ListMounts() []*MountState
ListMounts returns copies of all mount states
func (*State) ListVolumes ¶
func (s *State) ListVolumes() []*VolumeState
ListVolumes returns copies of all volume states
func (*State) Save ¶
Save persists the state to disk atomically. Callers that need to mutate and save atomically should use the combined methods (SetVolumeAndSave, SetMountAndSave, etc.) instead.
func (*State) SetMount ¶
func (s *State) SetMount(entityId string, mount *MountState)
SetMount sets a mount state
func (*State) SetMountAndSave ¶
func (s *State) SetMountAndSave(entityId string, mount *MountState) error
SetMountAndSave atomically sets a mount state and persists to disk.
func (*State) SetMountFromVolume ¶
func (s *State) SetMountFromVolume(volumeId string, mount *MountState) (devicePath, mountPath string, err error)
SetMountFromVolume atomically reads the current volume state and, if the volume is mounted, creates a mount entry using the volume's live device and mount paths. This avoids a TOCTOU race where the volume controller could update mount fields between a GetVolume call and a SetMount call. Returns the volume's DevicePath and MountPath on success, or an error if the volume is not found or not mounted.
func (*State) SetVolume ¶
func (s *State) SetVolume(entityId string, volume *VolumeState)
SetVolume sets a volume state
func (*State) SetVolumeAndSave ¶
func (s *State) SetVolumeAndSave(entityId string, volume *VolumeState) error
SetVolumeAndSave atomically sets a volume state and persists to disk.
type VolumeState ¶
type VolumeState struct {
// EntityId is the ID of the disk_volume entity
EntityId string `json:"entity_id"`
// VolumeId is the volume identifier
VolumeId string `json:"volume_id"`
// Name is the human-readable name (from parent disk)
Name string `json:"name,omitempty"`
// DiskPath is the path to the volume data directory
DiskPath string `json:"disk_path"`
// SizeBytes is the volume size
SizeBytes int64 `json:"size_bytes"`
// Filesystem type (ext4, xfs, btrfs)
Filesystem string `json:"filesystem"`
// RemoteOnly indicates if this uses only remote storage
RemoteOnly bool `json:"remote_only"`
// Mode is the disk I/O mode (universal or accelerator)
Mode storage_v1alpha.DiskVolumeVolumeMode `json:"mode,omitempty"`
// DevicePath is the loop device backing this volume (alwaysMount modes only)
DevicePath string `json:"device_path,omitempty"`
// MountPath is where the volume is mounted (alwaysMount modes only)
MountPath string `json:"mount_path,omitempty"`
// Mounted indicates if the volume is currently mounted (alwaysMount modes only)
Mounted bool `json:"mounted,omitempty"`
}
VolumeState represents the state of a disk volume