worker

package
v1.56.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 23, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxProcesses  = 2048
	MaxDevices    = 16
	MaxUUIDLen    = 64
	ShmPathSuffix = "shm"
)

Constants

Variables

View Source
var (
	ErrRefCountUnderflow = &RefCountError{Type: "underflow"}
)

Functions

func CleanupEmptyParentDirectories

func CleanupEmptyParentDirectories(filePath string, stopAtPath *string) error

CleanupEmptyParentDirectories removes empty parent directories after removing a file

Types

type DeviceConfig

type DeviceConfig struct {
	DeviceIdx      uint32
	DeviceUUID     string
	UpLimit        uint32
	MemLimit       uint64
	SMCount        uint32
	MaxThreadPerSM uint32
	TotalCudaCores uint32
}

DeviceConfig contains device configuration information

type DeviceEntry

type DeviceEntry = DeviceEntryV2

DeviceEntry is a type alias for backward compatibility

type DeviceEntryV1

type DeviceEntryV1 struct {
	UUID          [MaxUUIDLen]byte
	DeviceInfo    SharedDeviceInfoV1
	IsActiveField uint32
	// contains filtered or unexported fields
}

DeviceEntryV1 is the legacy device entry

func NewDeviceEntryV1

func NewDeviceEntryV1() *DeviceEntryV1

NewDeviceEntryV1 creates a new V1 device entry

func (*DeviceEntryV1) GetUUID

func (d *DeviceEntryV1) GetUUID() string

GetUUID gets the device UUID as a string

func (*DeviceEntryV1) IsActive

func (d *DeviceEntryV1) IsActive() bool

IsActive checks if this entry is active

func (*DeviceEntryV1) SetActive

func (d *DeviceEntryV1) SetActive(active bool)

SetActive sets the active status

func (*DeviceEntryV1) SetUUID

func (d *DeviceEntryV1) SetUUID(uuid string)

SetUUID sets the device UUID

type DeviceEntryV2

type DeviceEntryV2 struct {
	UUID          [MaxUUIDLen]byte
	DeviceInfo    SharedDeviceInfoV2
	IsActiveField uint32
}

DeviceEntryV2 is the V2 device entry with ERL

func NewDeviceEntryV2

func NewDeviceEntryV2() *DeviceEntryV2

NewDeviceEntryV2 creates a new V2 device entry

func (*DeviceEntryV2) GetUUID

func (d *DeviceEntryV2) GetUUID() string

GetUUID gets the device UUID as a string

func (*DeviceEntryV2) IsActive

func (d *DeviceEntryV2) IsActive() bool

IsActive checks if this entry is active

func (*DeviceEntryV2) SetActive

func (d *DeviceEntryV2) SetActive(active bool)

SetActive sets the active status

func (*DeviceEntryV2) SetUUID

func (d *DeviceEntryV2) SetUUID(uuid string)

SetUUID sets the device UUID

type PIDSet

type PIDSet struct {
	// contains filtered or unexported fields
}

PIDSet is a set of process IDs with a fixed capacity

func NewPIDSet

func NewPIDSet() *PIDSet

NewPIDSet creates a new PID set

func (*PIDSet) InsertIfAbsent

func (s *PIDSet) InsertIfAbsent(pid int) bool

InsertIfAbsent inserts a value if it's not already present

func (*PIDSet) RemoveValue

func (s *PIDSet) RemoveValue(pid int) bool

RemoveValue removes a value from the set

func (*PIDSet) Values

func (s *PIDSet) Values() []int

Values returns all values in the set

type PodIdentifier

type PodIdentifier struct {
	Namespace string
	Name      string
}

PodIdentifier contains namespace and name

func FromShmFilePath

func FromShmFilePath(path string) (*PodIdentifier, error)

FromShmFilePath parses a PodIdentifier from a full shared memory path Path format: {base_path}/{namespace}/{name}/shm

func NewPodIdentifier

func NewPodIdentifier(namespace, name string) *PodIdentifier

NewPodIdentifier creates a new PodIdentifier

func (*PodIdentifier) String

func (p *PodIdentifier) String() string

String returns the string representation

func (*PodIdentifier) ToPath

func (p *PodIdentifier) ToPath(basePath string) string

ToPath returns the path for this pod identifier

type RefCountError

type RefCountError struct {
	Type string
}

RefCountError represents errors in reference count operations

func (*RefCountError) Error

func (e *RefCountError) Error() string

type SharedDeviceInfo

type SharedDeviceInfo = SharedDeviceInfoV2

SharedDeviceInfo is a type alias for backward compatibility

type SharedDeviceInfoV1

type SharedDeviceInfoV1 struct {
	AvailableCudaCores int32
	UpLimit            uint32
	MemLimit           uint64
	TotalCudaCores     uint32
	PodMemoryUsed      uint64
}

SharedDeviceInfoV1 is the legacy device state (without ERL)

func NewSharedDeviceInfoV1

func NewSharedDeviceInfoV1(totalCudaCores, upLimit uint32, memLimit uint64) *SharedDeviceInfoV1

NewSharedDeviceInfoV1 creates a new V1 device info

type SharedDeviceInfoV2

type SharedDeviceInfoV2 struct {
	UpLimit        uint32
	MemLimit       uint64
	TotalCudaCores uint32
	PodMemoryUsed  uint64

	// ERL (Elastic Rate Limiting) - PID-controlled token bucket
	ERLTokenRefillRate uint64 // f64 stored as bits
	ERLTokenCapacity   uint64 // f64 stored as bits
	ERLCurrentTokens   uint64 // f64 stored as bits
	ERLLastTokenUpdate uint64 // f64 stored as bits
}

SharedDeviceInfoV2 is the V2 device state with ERL support

func NewSharedDeviceInfoV2

func NewSharedDeviceInfoV2(totalCudaCores, upLimit uint32, memLimit uint64) *SharedDeviceInfoV2

NewSharedDeviceInfoV2 creates a new V2 device info

func (*SharedDeviceInfoV2) FetchAddERLTokens

func (d *SharedDeviceInfoV2) FetchAddERLTokens(amount float64) float64

FetchAddERLTokens atomically adds tokens (capped at capacity) and returns the value before addition

func (*SharedDeviceInfoV2) FetchSubERLTokens

func (d *SharedDeviceInfoV2) FetchSubERLTokens(cost float64) float64

FetchSubERLTokens atomically subtracts tokens and returns the value before subtraction

func (*SharedDeviceInfoV2) GetERLCurrentTokens

func (d *SharedDeviceInfoV2) GetERLCurrentTokens() float64

GetERLCurrentTokens returns the current tokens

func (*SharedDeviceInfoV2) GetERLLastTokenUpdate

func (d *SharedDeviceInfoV2) GetERLLastTokenUpdate() float64

GetERLLastTokenUpdate returns the last token update timestamp

func (*SharedDeviceInfoV2) GetERLTokenCapacity

func (d *SharedDeviceInfoV2) GetERLTokenCapacity() float64

GetERLTokenCapacity returns the token capacity

func (*SharedDeviceInfoV2) GetERLTokenRefillRate

func (d *SharedDeviceInfoV2) GetERLTokenRefillRate() float64

GetERLTokenRefillRate returns the refill rate

func (*SharedDeviceInfoV2) LoadERLQuota

func (d *SharedDeviceInfoV2) LoadERLQuota() (float64, float64)

LoadERLQuota loads the quota configuration

func (*SharedDeviceInfoV2) LoadERLTokenState

func (d *SharedDeviceInfoV2) LoadERLTokenState() (float64, float64)

LoadERLTokenState loads the token state atomically

func (*SharedDeviceInfoV2) SetERLCurrentTokens

func (d *SharedDeviceInfoV2) SetERLCurrentTokens(tokens float64)

SetERLCurrentTokens sets the current tokens

func (*SharedDeviceInfoV2) SetERLLastTokenUpdate

func (d *SharedDeviceInfoV2) SetERLLastTokenUpdate(timestamp float64)

SetERLLastTokenUpdate sets the last token update timestamp

func (*SharedDeviceInfoV2) SetERLTokenCapacity

func (d *SharedDeviceInfoV2) SetERLTokenCapacity(capacity float64)

SetERLTokenCapacity sets the token capacity

func (*SharedDeviceInfoV2) SetERLTokenRefillRate

func (d *SharedDeviceInfoV2) SetERLTokenRefillRate(rate float64)

SetERLTokenRefillRate sets the refill rate

func (*SharedDeviceInfoV2) StoreERLTokenState

func (d *SharedDeviceInfoV2) StoreERLTokenState(tokens, timestamp float64)

StoreERLTokenState stores the token state atomically

type SharedDeviceState

type SharedDeviceState struct {
	V1 *SharedDeviceStateV1
	V2 *SharedDeviceStateV2
}

SharedDeviceState is a versioned enum for compatibility

func NewSharedDeviceState

func NewSharedDeviceState(configs []DeviceConfig) (*SharedDeviceState, error)

NewSharedDeviceState creates a new SharedDeviceState (defaults to V2)

func (*SharedDeviceState) AddPID

func (s *SharedDeviceState) AddPID(pid int)

AddPID adds a PID

func (*SharedDeviceState) CleanupOrphanedLocks

func (s *SharedDeviceState) CleanupOrphanedLocks()

CleanupOrphanedLocks cleans up orphaned locks

func (*SharedDeviceState) DeviceCount

func (s *SharedDeviceState) DeviceCount() int

DeviceCount returns the number of devices

func (*SharedDeviceState) GetAllPIDs

func (s *SharedDeviceState) GetAllPIDs() []int

GetAllPIDs returns all PIDs

func (*SharedDeviceState) GetLastHeartbeat

func (s *SharedDeviceState) GetLastHeartbeat() uint64

GetLastHeartbeat returns the last heartbeat

func (*SharedDeviceState) HasDevice

func (s *SharedDeviceState) HasDevice(index int) bool

HasDevice checks if a device exists

func (*SharedDeviceState) HasERL

func (s *SharedDeviceState) HasERL() bool

HasERL checks if this state uses ERL features

func (*SharedDeviceState) IsHealthy

func (s *SharedDeviceState) IsHealthy(timeout time.Duration) bool

IsHealthy checks if healthy

func (*SharedDeviceState) RemovePID

func (s *SharedDeviceState) RemovePID(pid int)

RemovePID removes a PID

func (*SharedDeviceState) SetPodMemoryUsed

func (s *SharedDeviceState) SetPodMemoryUsed(index int, memory uint64) bool

SetPodMemoryUsed sets pod memory used for a device

func (*SharedDeviceState) UpdateHeartbeat

func (s *SharedDeviceState) UpdateHeartbeat(timestamp uint64)

UpdateHeartbeat updates the heartbeat

func (*SharedDeviceState) Version

func (s *SharedDeviceState) Version() uint32

Version returns the version number

type SharedDeviceStateV1

type SharedDeviceStateV1 struct {
	Devices          [MaxDevices]DeviceEntryV1
	DeviceCountField uint32
	LastHeartbeat    uint64
	PIDs             *ShmMutex[*PIDSet]
}

SharedDeviceStateV1 is the V1 shared device state

func NewSharedDeviceStateV1

func NewSharedDeviceStateV1(configs []DeviceConfig) (*SharedDeviceStateV1, error)

NewSharedDeviceStateV1 creates a new V1 state

func (*SharedDeviceStateV1) AddPID

func (s *SharedDeviceStateV1) AddPID(pid int)

AddPID adds a PID to the set

func (*SharedDeviceStateV1) CleanupOrphanedLocks

func (s *SharedDeviceStateV1) CleanupOrphanedLocks()

CleanupOrphanedLocks cleans up any orphaned locks

func (*SharedDeviceStateV1) DeviceCount

func (s *SharedDeviceStateV1) DeviceCount() int

DeviceCount returns the number of devices

func (*SharedDeviceStateV1) GetAllPIDs

func (s *SharedDeviceStateV1) GetAllPIDs() []int

GetAllPIDs returns all PIDs currently stored

func (*SharedDeviceStateV1) GetLastHeartbeat

func (s *SharedDeviceStateV1) GetLastHeartbeat() uint64

GetLastHeartbeat returns the last heartbeat timestamp

func (*SharedDeviceStateV1) HasDevice

func (s *SharedDeviceStateV1) HasDevice(index int) bool

HasDevice checks if a device exists at the given index

func (*SharedDeviceStateV1) IsHealthy

func (s *SharedDeviceStateV1) IsHealthy(timeout time.Duration) bool

IsHealthy checks if the shared memory is healthy based on heartbeat

func (*SharedDeviceStateV1) RemovePID

func (s *SharedDeviceStateV1) RemovePID(pid int)

RemovePID removes a PID from the set

func (*SharedDeviceStateV1) UpdateHeartbeat

func (s *SharedDeviceStateV1) UpdateHeartbeat(timestamp uint64)

UpdateHeartbeat updates the heartbeat timestamp

type SharedDeviceStateV2

type SharedDeviceStateV2 struct {
	Devices          [MaxDevices]DeviceEntryV2
	DeviceCountField uint32
	LastHeartbeat    uint64
	PIDs             *ShmMutex[*PIDSet]
}

SharedDeviceStateV2 is the V2 shared device state with ERL

func NewSharedDeviceStateV2

func NewSharedDeviceStateV2(configs []DeviceConfig) (*SharedDeviceStateV2, error)

NewSharedDeviceStateV2 creates a new V2 state

func (*SharedDeviceStateV2) AddPID

func (s *SharedDeviceStateV2) AddPID(pid int)

AddPID adds a PID to the set

func (*SharedDeviceStateV2) CleanupOrphanedLocks

func (s *SharedDeviceStateV2) CleanupOrphanedLocks()

CleanupOrphanedLocks cleans up any orphaned locks

func (*SharedDeviceStateV2) DeviceCount

func (s *SharedDeviceStateV2) DeviceCount() int

DeviceCount returns the number of devices

func (*SharedDeviceStateV2) GetAllPIDs

func (s *SharedDeviceStateV2) GetAllPIDs() []int

GetAllPIDs returns all PIDs currently stored

func (*SharedDeviceStateV2) GetLastHeartbeat

func (s *SharedDeviceStateV2) GetLastHeartbeat() uint64

GetLastHeartbeat returns the last heartbeat timestamp

func (*SharedDeviceStateV2) HasDevice

func (s *SharedDeviceStateV2) HasDevice(index int) bool

HasDevice checks if a device exists at the given index

func (*SharedDeviceStateV2) IsHealthy

func (s *SharedDeviceStateV2) IsHealthy(timeout time.Duration) bool

IsHealthy checks if the shared memory is healthy based on heartbeat

func (*SharedDeviceStateV2) RemovePID

func (s *SharedDeviceStateV2) RemovePID(pid int)

RemovePID removes a PID from the set

func (*SharedDeviceStateV2) UpdateHeartbeat

func (s *SharedDeviceStateV2) UpdateHeartbeat(timestamp uint64)

UpdateHeartbeat updates the heartbeat timestamp

type SharedMemoryHandle

type SharedMemoryHandle struct {
	// contains filtered or unexported fields
}

SharedMemoryHandle manages a shared memory mapping

func CreateSharedMemoryHandle

func CreateSharedMemoryHandle(podPath string, configs []DeviceConfig) (*SharedMemoryHandle, error)

CreateSharedMemoryHandle creates a new shared memory handle

func OpenSharedMemoryHandle

func OpenSharedMemoryHandle(podPath string) (*SharedMemoryHandle, error)

OpenSharedMemoryHandle opens an existing shared memory handle

func (*SharedMemoryHandle) Cleanup

func (h *SharedMemoryHandle) Cleanup(stopAtPath *string) error

Cleanup removes the shared memory file and cleans up empty directories

func (*SharedMemoryHandle) Close

func (h *SharedMemoryHandle) Close() error

Close closes the shared memory handle

func (*SharedMemoryHandle) GetState

func (h *SharedMemoryHandle) GetState() *SharedDeviceState

GetState returns the shared device state

type ShmMutex

type ShmMutex[T any] struct {
	Value T
	// contains filtered or unexported fields
}

ShmMutex is a shared memory mutex wrapper

func NewShmMutex

func NewShmMutex[T any](value T) *ShmMutex[T]

NewShmMutex creates a new shared memory mutex

func (*ShmMutex[T]) CleanupOrphanedLock

func (m *ShmMutex[T]) CleanupOrphanedLock()

CleanupOrphanedLock cleans up orphaned locks (placeholder for now)

func (*ShmMutex[T]) Lock

func (m *ShmMutex[T]) Lock()

Lock locks the mutex

func (*ShmMutex[T]) Unlock

func (m *ShmMutex[T]) Unlock()

Unlock unlocks the mutex

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL