Documentation
¶
Overview ¶
internal/task_scheduler/k8s_job_manager.go
internal/task_scheduler/k8s_manager.go
Index ¶
- type K8sJobManager
- func (jm *K8sJobManager) CancelTask(ctx context.Context, taskID string) error
- func (jm *K8sJobManager) CleanupCompletedTasks(ctx context.Context, schedulerName string, olderThan time.Duration) error
- func (jm *K8sJobManager) GetTaskLogs(ctx context.Context, taskID string) (string, error)
- func (jm *K8sJobManager) GetTaskStatistics(ctx context.Context, schedulerName string) (*TaskStatistics, error)
- func (jm *K8sJobManager) GetTaskStatus(ctx context.Context, taskID string) (*TaskStatus, error)
- func (jm *K8sJobManager) ListTasks(ctx context.Context, schedulerName string) ([]*TaskStatus, error)
- func (jm *K8sJobManager) SubmitTask(ctx context.Context, task *TaskRequest) (*TaskStatus, error)
- type K8sManager
- func (m *K8sManager) CancelTask(taskID string) error
- func (m *K8sManager) CleanupOldTasks(olderThan time.Duration) error
- func (m *K8sManager) ExecuteTask(task *TaskRequest) (*TaskStatus, error)
- func (m *K8sManager) GetLogs() (string, error)
- func (m *K8sManager) GetStatus() (string, error)
- func (m *K8sManager) GetTaskLogs(taskID string) (string, error)
- func (m *K8sManager) GetTaskStatistics() (*TaskStatistics, error)
- func (m *K8sManager) GetTaskStatus(taskID string) (*TaskStatus, error)
- func (m *K8sManager) ListTasks() ([]*TaskStatus, error)
- func (m *K8sManager) Restart() error
- func (m *K8sManager) SetConfigFile(configFile string)
- func (m *K8sManager) Start() error
- func (m *K8sManager) Stop() error
- type TaskRequest
- type TaskResourceConfig
- type TaskRetryConfig
- type TaskStatistics
- type TaskStatus
- type TaskVolumeConfig
- type TaskVolumeConfigMap
- type TaskVolumeConfigMapItem
- type TaskVolumeEmptyDir
- type TaskVolumePVC
- type TaskVolumeSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type K8sJobManager ¶
type K8sJobManager struct {
// contains filtered or unexported fields
}
K8sJobManager manages Kubernetes Jobs for task execution
func NewK8sJobManager ¶
func NewK8sJobManager(namespace string, config *config.ComposeConfig, logger *logging.Logger) (*K8sJobManager, error)
NewK8sJobManager creates a new Kubernetes Job manager
func (*K8sJobManager) CancelTask ¶
func (jm *K8sJobManager) CancelTask(ctx context.Context, taskID string) error
CancelTask cancels a running task by deleting its Job
func (*K8sJobManager) CleanupCompletedTasks ¶
func (jm *K8sJobManager) CleanupCompletedTasks(ctx context.Context, schedulerName string, olderThan time.Duration) error
CleanupCompletedTasks removes completed tasks older than the specified duration
func (*K8sJobManager) GetTaskLogs ¶
GetTaskLogs retrieves logs from a task's pod
func (*K8sJobManager) GetTaskStatistics ¶
func (jm *K8sJobManager) GetTaskStatistics(ctx context.Context, schedulerName string) (*TaskStatistics, error)
GetTaskStatistics returns statistics about tasks managed by this scheduler
func (*K8sJobManager) GetTaskStatus ¶
func (jm *K8sJobManager) GetTaskStatus(ctx context.Context, taskID string) (*TaskStatus, error)
GetTaskStatus retrieves the current status of a task
func (*K8sJobManager) ListTasks ¶
func (jm *K8sJobManager) ListTasks(ctx context.Context, schedulerName string) ([]*TaskStatus, error)
ListTasks lists all tasks (jobs) created by the task scheduler
func (*K8sJobManager) SubmitTask ¶
func (jm *K8sJobManager) SubmitTask(ctx context.Context, task *TaskRequest) (*TaskStatus, error)
SubmitTask submits a task for execution as a Kubernetes Job
type K8sManager ¶
type K8sManager struct {
// contains filtered or unexported fields
}
K8sManager is a system manager for the task scheduler service
func NewK8sManager ¶
func NewK8sManager(cfg *config.ComposeConfig, k8sClient client.Client, namespace string) *K8sManager
NewK8sManager creates a new system task scheduler manager
func NewManagerFromConfig ¶
func NewManagerFromConfig(cfg *config.ComposeConfig, k8sClient client.Client, namespace string) *K8sManager
Helper function to convert the old Manager interface to the new K8sManager
func (*K8sManager) CancelTask ¶
func (m *K8sManager) CancelTask(taskID string) error
CancelTask cancels a running task
func (*K8sManager) CleanupOldTasks ¶
func (m *K8sManager) CleanupOldTasks(olderThan time.Duration) error
CleanupOldTasks cleans up old completed tasks
func (*K8sManager) ExecuteTask ¶
func (m *K8sManager) ExecuteTask(task *TaskRequest) (*TaskStatus, error)
ExecuteTask executes a task using Kubernetes Jobs
func (*K8sManager) GetLogs ¶
func (m *K8sManager) GetLogs() (string, error)
GetLogs returns logs from the task scheduler service
func (*K8sManager) GetStatus ¶
func (m *K8sManager) GetStatus() (string, error)
GetStatus returns the status of the task scheduler service
func (*K8sManager) GetTaskLogs ¶
func (m *K8sManager) GetTaskLogs(taskID string) (string, error)
GetTaskLogs gets logs from a specific task
func (*K8sManager) GetTaskStatistics ¶
func (m *K8sManager) GetTaskStatistics() (*TaskStatistics, error)
GetTaskStatistics gets task execution statistics
func (*K8sManager) GetTaskStatus ¶
func (m *K8sManager) GetTaskStatus(taskID string) (*TaskStatus, error)
GetTaskStatus gets the status of a specific task
func (*K8sManager) ListTasks ¶
func (m *K8sManager) ListTasks() ([]*TaskStatus, error)
ListTasks lists all tasks
func (*K8sManager) Restart ¶
func (m *K8sManager) Restart() error
Restart restarts the task scheduler service
func (*K8sManager) SetConfigFile ¶
func (m *K8sManager) SetConfigFile(configFile string)
SetConfigFile sets the configuration file path
func (*K8sManager) Start ¶
func (m *K8sManager) Start() error
Start starts the task scheduler service using Kubernetes resources
type TaskRequest ¶
type TaskRequest struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Command []string `json:"command"`
Args []string `json:"args"`
Image string `json:"image"`
Env map[string]string `json:"env"`
Timeout time.Duration `json:"timeout"`
Retry TaskRetryConfig `json:"retry"`
Resources TaskResourceConfig `json:"resources"`
Volumes []TaskVolumeConfig `json:"volumes,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
}
TaskRequest represents a task to be executed
type TaskResourceConfig ¶
TaskResourceConfig defines resource requirements for tasks
type TaskRetryConfig ¶
type TaskRetryConfig struct {
MaxRetries int `json:"maxRetries"`
RetryDelay time.Duration `json:"retryDelay"`
BackoffStrategy string `json:"backoffStrategy"`
}
TaskRetryConfig defines retry behavior for tasks
type TaskStatistics ¶
type TaskStatistics struct {
TotalTasks int64 `json:"totalTasks"`
CompletedTasks int64 `json:"completedTasks"`
FailedTasks int64 `json:"failedTasks"`
RunningTasks int64 `json:"runningTasks"`
ScheduledTasks int64 `json:"scheduledTasks"`
LastTaskTime string `json:"lastTaskTime"`
AverageTaskTime string `json:"averageTaskTime"`
}
TaskStatistics represents task execution statistics
type TaskStatus ¶
type TaskStatus struct {
ID string `json:"id"`
Phase string `json:"phase"`
StartTime *time.Time `json:"startTime,omitempty"`
CompletionTime *time.Time `json:"completionTime,omitempty"`
Message string `json:"message"`
Reason string `json:"reason"`
ExitCode *int32 `json:"exitCode,omitempty"`
JobName string `json:"jobName"`
PodName string `json:"podName"`
Logs string `json:"logs,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
}
TaskStatus represents the status of a task
type TaskVolumeConfig ¶
type TaskVolumeConfig struct {
// Name of the volume
Name string `json:"name"`
// Mount path in the container
MountPath string `json:"mountPath"`
// Volume type (pvc, configmap, secret, emptyDir)
Type string `json:"type"`
// Source configuration (depends on type)
Source TaskVolumeSource `json:"source"`
// ReadOnly mount
ReadOnly bool `json:"readOnly,omitempty"`
}
TaskVolumeConfig defines volume configuration for tasks
type TaskVolumeConfigMap ¶
type TaskVolumeConfigMap struct {
// ConfigMap name
Name string `json:"name"`
// Items to project from configMap
Items []TaskVolumeConfigMapItem `json:"items,omitempty"`
}
TaskVolumeConfigMap defines configMap volume source
type TaskVolumeConfigMapItem ¶
type TaskVolumeConfigMapItem struct {
// Key in configMap
Key string `json:"key"`
// Path to mount the key
Path string `json:"path"`
}
TaskVolumeConfigMapItem defines a configMap item projection
type TaskVolumeEmptyDir ¶
type TaskVolumeEmptyDir struct {
// Size limit for emptyDir
SizeLimit string `json:"sizeLimit,omitempty"`
// Medium (Memory for tmpfs)
Medium string `json:"medium,omitempty"`
}
TaskVolumeEmptyDir defines emptyDir volume source
type TaskVolumePVC ¶
type TaskVolumePVC struct {
// Claim name (if existing) or auto-generated if empty
ClaimName string `json:"claimName,omitempty"`
// Size for auto-created PVC
Size string `json:"size,omitempty"`
// Storage class for auto-created PVC
StorageClass string `json:"storageClass,omitempty"`
// Access modes for auto-created PVC
AccessModes []string `json:"accessModes,omitempty"`
// Auto-delete PVC after job completion
AutoDelete bool `json:"autoDelete,omitempty"`
}
TaskVolumePVC defines PVC volume source
type TaskVolumeSource ¶
type TaskVolumeSource struct {
// PVC configuration (for type: pvc)
PVC *TaskVolumePVC `json:"pvc,omitempty"`
// EmptyDir configuration (for type: emptyDir)
EmptyDir *TaskVolumeEmptyDir `json:"emptyDir,omitempty"`
// ConfigMap configuration (for type: configmap)
ConfigMap *TaskVolumeConfigMap `json:"configMap,omitempty"`
}
TaskVolumeSource defines the source of a volume