task_scheduler

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2025 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

internal/task_scheduler/k8s_job_manager.go

internal/task_scheduler/k8s_manager.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type K8sJobManager

type K8sJobManager struct {
	// contains filtered or unexported fields
}

K8sJobManager manages Kubernetes Jobs for task execution

func NewK8sJobManager

func NewK8sJobManager(namespace string, config *config.ComposeConfig, logger *logging.Logger) (*K8sJobManager, error)

NewK8sJobManager creates a new Kubernetes Job manager

func (*K8sJobManager) CancelTask

func (jm *K8sJobManager) CancelTask(ctx context.Context, taskID string) error

CancelTask cancels a running task by deleting its Job

func (*K8sJobManager) CleanupCompletedTasks

func (jm *K8sJobManager) CleanupCompletedTasks(ctx context.Context, schedulerName string, olderThan time.Duration) error

CleanupCompletedTasks removes completed tasks older than the specified duration

func (*K8sJobManager) GetTaskLogs

func (jm *K8sJobManager) GetTaskLogs(ctx context.Context, taskID string) (string, error)

GetTaskLogs retrieves logs from a task's pod

func (*K8sJobManager) GetTaskStatistics

func (jm *K8sJobManager) GetTaskStatistics(ctx context.Context, schedulerName string) (*TaskStatistics, error)

GetTaskStatistics returns statistics about tasks managed by this scheduler

func (*K8sJobManager) GetTaskStatus

func (jm *K8sJobManager) GetTaskStatus(ctx context.Context, taskID string) (*TaskStatus, error)

GetTaskStatus retrieves the current status of a task

func (*K8sJobManager) ListTasks

func (jm *K8sJobManager) ListTasks(ctx context.Context, schedulerName string) ([]*TaskStatus, error)

ListTasks lists all tasks (jobs) created by the task scheduler

func (*K8sJobManager) SubmitTask

func (jm *K8sJobManager) SubmitTask(ctx context.Context, task *TaskRequest) (*TaskStatus, error)

SubmitTask submits a task for execution as a Kubernetes Job

type K8sManager

type K8sManager struct {
	// contains filtered or unexported fields
}

K8sManager is a system manager for the task scheduler service

func NewK8sManager

func NewK8sManager(cfg *config.ComposeConfig, k8sClient client.Client, namespace string) *K8sManager

NewK8sManager creates a new system task scheduler manager

func NewManagerFromConfig

func NewManagerFromConfig(cfg *config.ComposeConfig, k8sClient client.Client, namespace string) *K8sManager

Helper function to convert the old Manager interface to the new K8sManager

func (*K8sManager) CancelTask

func (m *K8sManager) CancelTask(taskID string) error

CancelTask cancels a running task

func (*K8sManager) CleanupOldTasks

func (m *K8sManager) CleanupOldTasks(olderThan time.Duration) error

CleanupOldTasks cleans up old completed tasks

func (*K8sManager) ExecuteTask

func (m *K8sManager) ExecuteTask(task *TaskRequest) (*TaskStatus, error)

ExecuteTask executes a task using Kubernetes Jobs

func (*K8sManager) GetLogs

func (m *K8sManager) GetLogs() (string, error)

GetLogs returns logs from the task scheduler service

func (*K8sManager) GetStatus

func (m *K8sManager) GetStatus() (string, error)

GetStatus returns the status of the task scheduler service

func (*K8sManager) GetTaskLogs

func (m *K8sManager) GetTaskLogs(taskID string) (string, error)

GetTaskLogs gets logs from a specific task

func (*K8sManager) GetTaskStatistics

func (m *K8sManager) GetTaskStatistics() (*TaskStatistics, error)

GetTaskStatistics gets task execution statistics

func (*K8sManager) GetTaskStatus

func (m *K8sManager) GetTaskStatus(taskID string) (*TaskStatus, error)

GetTaskStatus gets the status of a specific task

func (*K8sManager) ListTasks

func (m *K8sManager) ListTasks() ([]*TaskStatus, error)

ListTasks lists all tasks

func (*K8sManager) Restart

func (m *K8sManager) Restart() error

Restart restarts the task scheduler service

func (*K8sManager) SetConfigFile

func (m *K8sManager) SetConfigFile(configFile string)

SetConfigFile sets the configuration file path

func (*K8sManager) Start

func (m *K8sManager) Start() error

Start starts the task scheduler service using Kubernetes resources

func (*K8sManager) Stop

func (m *K8sManager) Stop() error

Stop stops the task scheduler service

type TaskRequest

type TaskRequest struct {
	ID          string                 `json:"id"`
	Name        string                 `json:"name"`
	Description string                 `json:"description"`
	Command     []string               `json:"command"`
	Args        []string               `json:"args"`
	Image       string                 `json:"image"`
	Env         map[string]string      `json:"env"`
	Timeout     time.Duration          `json:"timeout"`
	Retry       TaskRetryConfig        `json:"retry"`
	Resources   TaskResourceConfig     `json:"resources"`
	Volumes     []TaskVolumeConfig     `json:"volumes,omitempty"`
	Metadata    map[string]interface{} `json:"metadata"`
}

TaskRequest represents a task to be executed

type TaskResourceConfig

type TaskResourceConfig struct {
	CPU    string `json:"cpu"`
	Memory string `json:"memory"`
}

TaskResourceConfig defines resource requirements for tasks

type TaskRetryConfig

type TaskRetryConfig struct {
	MaxRetries      int           `json:"maxRetries"`
	RetryDelay      time.Duration `json:"retryDelay"`
	BackoffStrategy string        `json:"backoffStrategy"`
}

TaskRetryConfig defines retry behavior for tasks

type TaskStatistics

type TaskStatistics struct {
	TotalTasks      int64  `json:"totalTasks"`
	CompletedTasks  int64  `json:"completedTasks"`
	FailedTasks     int64  `json:"failedTasks"`
	RunningTasks    int64  `json:"runningTasks"`
	ScheduledTasks  int64  `json:"scheduledTasks"`
	LastTaskTime    string `json:"lastTaskTime"`
	AverageTaskTime string `json:"averageTaskTime"`
}

TaskStatistics represents task execution statistics

type TaskStatus

type TaskStatus struct {
	ID             string                 `json:"id"`
	Phase          string                 `json:"phase"`
	StartTime      *time.Time             `json:"startTime,omitempty"`
	CompletionTime *time.Time             `json:"completionTime,omitempty"`
	Message        string                 `json:"message"`
	Reason         string                 `json:"reason"`
	ExitCode       *int32                 `json:"exitCode,omitempty"`
	JobName        string                 `json:"jobName"`
	PodName        string                 `json:"podName"`
	Logs           string                 `json:"logs,omitempty"`
	Metadata       map[string]interface{} `json:"metadata"`
}

TaskStatus represents the status of a task

type TaskVolumeConfig

type TaskVolumeConfig struct {
	// Name of the volume
	Name string `json:"name"`
	// Mount path in the container
	MountPath string `json:"mountPath"`
	// Volume type (pvc, configmap, secret, emptyDir)
	Type string `json:"type"`
	// Source configuration (depends on type)
	Source TaskVolumeSource `json:"source"`
	// ReadOnly mount
	ReadOnly bool `json:"readOnly,omitempty"`
}

TaskVolumeConfig defines volume configuration for tasks

type TaskVolumeConfigMap

type TaskVolumeConfigMap struct {
	// ConfigMap name
	Name string `json:"name"`
	// Items to project from configMap
	Items []TaskVolumeConfigMapItem `json:"items,omitempty"`
}

TaskVolumeConfigMap defines configMap volume source

type TaskVolumeConfigMapItem

type TaskVolumeConfigMapItem struct {
	// Key in configMap
	Key string `json:"key"`
	// Path to mount the key
	Path string `json:"path"`
}

TaskVolumeConfigMapItem defines a configMap item projection

type TaskVolumeEmptyDir

type TaskVolumeEmptyDir struct {
	// Size limit for emptyDir
	SizeLimit string `json:"sizeLimit,omitempty"`
	// Medium (Memory for tmpfs)
	Medium string `json:"medium,omitempty"`
}

TaskVolumeEmptyDir defines emptyDir volume source

type TaskVolumePVC

type TaskVolumePVC struct {
	// Claim name (if existing) or auto-generated if empty
	ClaimName string `json:"claimName,omitempty"`
	// Size for auto-created PVC
	Size string `json:"size,omitempty"`
	// Storage class for auto-created PVC
	StorageClass string `json:"storageClass,omitempty"`
	// Access modes for auto-created PVC
	AccessModes []string `json:"accessModes,omitempty"`
	// Auto-delete PVC after job completion
	AutoDelete bool `json:"autoDelete,omitempty"`
}

TaskVolumePVC defines PVC volume source

type TaskVolumeSource

type TaskVolumeSource struct {
	// PVC configuration (for type: pvc)
	PVC *TaskVolumePVC `json:"pvc,omitempty"`
	// EmptyDir configuration (for type: emptyDir)
	EmptyDir *TaskVolumeEmptyDir `json:"emptyDir,omitempty"`
	// ConfigMap configuration (for type: configmap)
	ConfigMap *TaskVolumeConfigMap `json:"configMap,omitempty"`
}

TaskVolumeSource defines the source of a volume

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL