scheduler

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2025 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Overview

internal/scheduler/cron_engine.go

internal/scheduler/k8s_workflow_executor.go

internal/scheduler/pvc_mount_manager.go

internal/scheduler/templates.go

internal/scheduler/tool_executor.go

internal/scheduler/workflow_engine.go

internal/scheduler/workflow_scheduler.go

internal/scheduler/workflow_storage.go

internal/scheduler/workflow_store.go

internal/scheduler/workspace_cleanup.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConvertToCronExpression

func ConvertToCronExpression(natural string) (string, error)

ConvertToCronExpression provides helper functions to convert natural language to cron

func NextScheduledTime

func NextScheduledTime(expr string, timezone *time.Location) (time.Time, error)

NextScheduledTime calculates the next time a cron expression will trigger

func ParseCronExpression

func ParseCronExpression(expr string) error

ParseCronExpression validates and parses a cron expression

Types

type CronEngine

type CronEngine struct {
	// contains filtered or unexported fields
}

func NewCronEngine

func NewCronEngine(logger logr.Logger) *CronEngine

func (*CronEngine) AddJob

func (e *CronEngine) AddJob(jobSpec *JobSpec) error

func (*CronEngine) DisableJob

func (e *CronEngine) DisableJob(jobID string) error

func (*CronEngine) EnableJob

func (e *CronEngine) EnableJob(jobID string) error

func (*CronEngine) GetJob

func (e *CronEngine) GetJob(jobID string) (*JobSpec, bool)

func (*CronEngine) ListJobs

func (e *CronEngine) ListJobs() []*JobSpec

func (*CronEngine) RemoveJob

func (e *CronEngine) RemoveJob(jobID string) error

func (*CronEngine) Start

func (e *CronEngine) Start()

func (*CronEngine) Stop

func (e *CronEngine) Stop()

type JobExecution

type JobExecution struct {
	JobID     string
	StartTime time.Time
	EndTime   *time.Time
	Success   bool
	Error     error
	Attempt   int
}

type JobHandler

type JobHandler func(ctx context.Context, jobID string) error

type JobSpec

type JobSpec struct {
	ID         string
	Name       string
	Schedule   string
	Timezone   *time.Location
	Enabled    bool
	LastRun    *time.Time
	NextRun    *time.Time
	Handler    JobHandler
	MaxRetries int
	RetryDelay time.Duration
	// contains filtered or unexported fields
}

type K8sWorkflowExecutor

type K8sWorkflowExecutor struct {
	// contains filtered or unexported fields
}

K8sWorkflowExecutor executes workflows as Kubernetes Jobs

func NewK8sWorkflowExecutor

func NewK8sWorkflowExecutor(namespace string, config *config.ComposeConfig, logger logr.Logger) (*K8sWorkflowExecutor, error)

NewK8sWorkflowExecutor creates a new Kubernetes workflow executor

func (*K8sWorkflowExecutor) CancelWorkflowJob

func (kwe *K8sWorkflowExecutor) CancelWorkflowJob(ctx context.Context, workflowName, executionID string) error

CancelWorkflowJob cancels a running workflow job

func (*K8sWorkflowExecutor) CleanupOldWorkflowJobs

func (kwe *K8sWorkflowExecutor) CleanupOldWorkflowJobs(ctx context.Context, olderThan time.Duration) error

CleanupOldWorkflowJobs removes old completed workflow jobs

func (*K8sWorkflowExecutor) ExecuteWorkflowAsJob

func (kwe *K8sWorkflowExecutor) ExecuteWorkflowAsJob(ctx context.Context, workflowDef *crd.WorkflowDefinition, workflowName, executionID string) (*task_scheduler.TaskStatus, error)

ExecuteWorkflowAsJob executes a workflow definition as a Kubernetes Job

func (*K8sWorkflowExecutor) GetWorkflowJobLogs

func (kwe *K8sWorkflowExecutor) GetWorkflowJobLogs(ctx context.Context, workflowName, executionID string) (string, error)

GetWorkflowJobLogs gets logs from a workflow job

func (*K8sWorkflowExecutor) GetWorkflowJobStatus

func (kwe *K8sWorkflowExecutor) GetWorkflowJobStatus(ctx context.Context, workflowName, executionID string) (*task_scheduler.TaskStatus, error)

GetWorkflowJobStatus gets the status of a workflow job

func (*K8sWorkflowExecutor) ListWorkflowJobs

func (kwe *K8sWorkflowExecutor) ListWorkflowJobs(ctx context.Context) ([]*task_scheduler.TaskStatus, error)

ListWorkflowJobs lists all workflow jobs

type MCPToolCall

type MCPToolCall struct {
	Tool       string                 `json:"tool"`
	Parameters map[string]interface{} `json:"parameters"`
}

type MCPToolResponse

type MCPToolResponse struct {
	Success bool                   `json:"success"`
	Result  map[string]interface{} `json:"result,omitempty"`
	Error   string                 `json:"error,omitempty"`
}

type MountInfo

type MountInfo struct {
	WorkflowName string    `json:"workflowName"`
	ExecutionID  string    `json:"executionID"`
	PVCName      string    `json:"pvcName"`
	MountPath    string    `json:"mountPath"`
	MountedAt    time.Time `json:"mountedAt"`
	LastAccess   time.Time `json:"lastAccess"`
	AccessCount  int64     `json:"accessCount"`
}

MountInfo represents information about a mounted workspace

type PVCMountManager

type PVCMountManager struct {
	// contains filtered or unexported fields
}

PVCMountManager handles mounting and unmounting of workspace PVCs for chat agent access

func NewPVCMountManager

func NewPVCMountManager(k8sClient kubernetes.Interface, namespace string, logger logr.Logger) *PVCMountManager

NewPVCMountManager creates a new PVC mount manager

func (*PVCMountManager) CleanupExpiredMounts

func (pmm *PVCMountManager) CleanupExpiredMounts()

CleanupExpiredMounts unmounts workspaces that haven't been accessed recently

func (*PVCMountManager) GetMountPath

func (pmm *PVCMountManager) GetMountPath(workflowName, executionID string) (string, bool)

GetMountPath returns the mount path for a workspace if it's mounted

func (*PVCMountManager) ListMountedWorkspaces

func (pmm *PVCMountManager) ListMountedWorkspaces() []MountInfo

ListMountedWorkspaces returns a list of currently mounted workspaces

func (*PVCMountManager) MountWorkspacePVC

func (pmm *PVCMountManager) MountWorkspacePVC(workflowName, executionID string) (string, error)

MountWorkspacePVC mounts a workspace PVC and returns the mount path

func (*PVCMountManager) SetCleanupInterval

func (pmm *PVCMountManager) SetCleanupInterval(interval time.Duration)

SetCleanupInterval configures how often to run cleanup

func (*PVCMountManager) SetMountTimeout

func (pmm *PVCMountManager) SetMountTimeout(timeout time.Duration)

SetMountTimeout configures how long to keep mounts active without access

func (*PVCMountManager) Stop

func (pmm *PVCMountManager) Stop()

Stop stops the mount manager and cleans up all mounts

func (*PVCMountManager) UnmountWorkspacePVC

func (pmm *PVCMountManager) UnmountWorkspacePVC(workflowName, executionID string) error

UnmountWorkspacePVC unmounts a workspace PVC

func (*PVCMountManager) UpdateAccessTime

func (pmm *PVCMountManager) UpdateAccessTime(workflowName, executionID string)

UpdateAccessTime updates the last access time for a mounted workspace

type StepContext

type StepContext struct {
	WorkflowName      string
	WorkflowNamespace string
	StepName          string
	Attempt           int
	PreviousOutputs   map[string]interface{}
}

StepContext provides context information for step execution

type StepExecution

type StepExecution struct {
	Name       string
	Tool       string
	Parameters map[string]interface{}
	StartTime  time.Time
	EndTime    *time.Time
	Status     StepExecutionStatus
	Output     interface{}
	Error      string
	Attempts   int
	Duration   time.Duration
	Condition  string
	Skipped    bool
	SkipReason string
}

type StepExecutionContext

type StepExecutionContext struct {
	StepName string
}

type StepExecutionResult

type StepExecutionResult struct {
	Status   string
	Message  string
	Success  bool
	Output   interface{}
	Duration time.Duration
	Error    error
}

type StepExecutionStatus

type StepExecutionStatus string
const (
	StepExecutionStatusPending   StepExecutionStatus = "Pending"
	StepExecutionStatusRunning   StepExecutionStatus = "Running"
	StepExecutionStatusSucceeded StepExecutionStatus = "Succeeded"
	StepExecutionStatusFailed    StepExecutionStatus = "Failed"
	StepExecutionStatusSkipped   StepExecutionStatus = "Skipped"
	StepExecutionStatusRetrying  StepExecutionStatus = "Retrying"
)

type TemplateEngine

type TemplateEngine struct {
	// contains filtered or unexported fields
}

TemplateEngine handles parameter templating with step outputs

func NewTemplateEngine

func NewTemplateEngine(logger logr.Logger) *TemplateEngine

func (*TemplateEngine) EvaluateCondition

func (te *TemplateEngine) EvaluateCondition(condition string) (bool, error)

func (*TemplateEngine) RenderParameters

func (te *TemplateEngine) RenderParameters(parameters map[string]interface{}) (map[string]interface{}, error)

func (*TemplateEngine) SetStepOutput

func (te *TemplateEngine) SetStepOutput(stepName string, output interface{})

type TemplateParameter

type TemplateParameter struct {
	Name        string      `json:"name"`
	Description string      `json:"description"`
	Type        string      `json:"type"`
	Required    bool        `json:"required"`
	Default     interface{} `json:"default,omitempty"`
	Options     []string    `json:"options,omitempty"`
	Validation  string      `json:"validation,omitempty"`
}

type TemplateRegistry

type TemplateRegistry struct {
	// contains filtered or unexported fields
}

func NewTemplateRegistry

func NewTemplateRegistry() *TemplateRegistry

func (*TemplateRegistry) CreateWorkflowFromTemplate

func (tr *TemplateRegistry) CreateWorkflowFromTemplate(templateName string, workflowName string, parameters map[string]interface{}) (*crd.WorkflowDefinition, error)

func (*TemplateRegistry) GetTemplate

func (tr *TemplateRegistry) GetTemplate(name string) (*WorkflowTemplate, bool)

func (*TemplateRegistry) ListTemplates

func (tr *TemplateRegistry) ListTemplates() []*WorkflowTemplate

func (*TemplateRegistry) ListTemplatesByCategory

func (tr *TemplateRegistry) ListTemplatesByCategory(category string) []*WorkflowTemplate

func (*TemplateRegistry) RegisterTemplate

func (tr *TemplateRegistry) RegisterTemplate(template *WorkflowTemplate)

type ToolExecutionRequest

type ToolExecutionRequest struct {
	Tool       string                 `json:"tool"`
	Parameters map[string]interface{} `json:"parameters"`
	Timeout    time.Duration          `json:"timeout"`
}

type ToolExecutionResult

type ToolExecutionResult struct {
	Success  bool                   `json:"success"`
	Output   map[string]interface{} `json:"output,omitempty"`
	Error    string                 `json:"error,omitempty"`
	Duration time.Duration          `json:"duration"`
}

type ToolExecutor

type ToolExecutor struct {
	// contains filtered or unexported fields
}

func NewToolExecutor

func NewToolExecutor(mcpProxyURL, mcpProxyAPIKey string, logger logr.Logger) *ToolExecutor

func (*ToolExecutor) DiscoverTools

func (te *ToolExecutor) DiscoverTools(ctx context.Context) ([]string, error)

func (*ToolExecutor) ExecuteStepWithContext

func (te *ToolExecutor) ExecuteStepWithContext(ctx context.Context, stepContext *StepExecutionContext, toolName string, args map[string]interface{}) (*StepExecutionResult, error)

ExecuteStepWithContext executes a step and returns structured result

func (*ToolExecutor) ExecuteStepWithContextAndTimeout

func (te *ToolExecutor) ExecuteStepWithContextAndTimeout(ctx context.Context, stepCtx *StepContext, tool string, parameters map[string]interface{}, timeout time.Duration) (*ToolExecutionResult, error)

ExecuteStepWithContextAndTimeout executes a single step with full context and timeout

func (*ToolExecutor) ExecuteTool

func (te *ToolExecutor) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (interface{}, error)

ExecuteToolByName interface method (legacy compatibility)

func (*ToolExecutor) HealthCheck

func (te *ToolExecutor) HealthCheck(ctx context.Context) error

HealthCheck verifies the MCP proxy is accessible

func (*ToolExecutor) ValidateTool

func (te *ToolExecutor) ValidateTool(ctx context.Context, toolName string) error

type ToolExecutorInterface

type ToolExecutorInterface interface {
	ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (interface{}, error)
	ExecuteStepWithContext(ctx context.Context, stepContext *StepExecutionContext, toolName string, args map[string]interface{}) (*StepExecutionResult, error)
}

ToolExecutorInterface defines the interface for executing tools

type WorkflowEngine

type WorkflowEngine struct {
	// contains filtered or unexported fields
}

func NewWorkflowEngine

func NewWorkflowEngine(mcpProxyURL, mcpProxyAPIKey string, logger logr.Logger) *WorkflowEngine

func (*WorkflowEngine) ExecuteWorkflow

func (we *WorkflowEngine) ExecuteWorkflow(ctx context.Context, workflowDefinition *crd.WorkflowDefinition, workflowName, workflowNamespace string) (*WorkflowExecution, error)

func (*WorkflowEngine) ValidateWorkflowDefinition

func (we *WorkflowEngine) ValidateWorkflowDefinition(spec *crd.WorkflowDefinition) error

ValidateWorkflowSpec validates a workflow specification

type WorkflowExecution

type WorkflowExecution struct {
	ID                string
	WorkflowName      string
	WorkflowNamespace string
	StartTime         time.Time
	EndTime           *time.Time
	Status            WorkflowExecutionStatus
	Steps             []StepExecution
	Error             string
	Context           *WorkflowExecutionContext
}

type WorkflowExecutionContext

type WorkflowExecutionContext struct {
	WorkflowDefinition *crd.WorkflowDefinition
	ExecutionTimeout   time.Duration
	StepOutputs        map[string]interface{}
	FailureThreshold   int
	FailureCount       int
	ContinueOnFailure  bool
}

type WorkflowExecutionDBRecord

type WorkflowExecutionDBRecord struct {
	ID           uuid.UUID              `json:"id"`
	WorkflowID   uuid.UUID              `json:"workflow_id"`
	RunID        string                 `json:"run_id"`
	Status       string                 `json:"status"`
	TriggerType  string                 `json:"trigger_type"`
	StartedAt    time.Time              `json:"started_at"`
	CompletedAt  *time.Time             `json:"completed_at"`
	ErrorMessage *string                `json:"error_message"`
	Result       map[string]interface{} `json:"result"`
}

WorkflowExecutionDBRecord represents a workflow execution in the database

type WorkflowExecutionRecord

type WorkflowExecutionRecord struct {
	ID                string     `json:"id"`
	WorkflowName      string     `json:"workflowName"`
	WorkflowNamespace string     `json:"workflowNamespace"`
	StartTime         time.Time  `json:"startTime"`
	EndTime           *time.Time `json:"endTime,omitempty"`
	Status            string     `json:"status"`
	Steps             string     `json:"steps"` // JSON-encoded steps
	Error             string     `json:"error,omitempty"`
	CreatedAt         time.Time  `json:"createdAt"`
	UpdatedAt         time.Time  `json:"updatedAt"`
}

WorkflowExecutionRecord represents a stored workflow execution

type WorkflowExecutionStatus

type WorkflowExecutionStatus string
const (
	WorkflowExecutionStatusRunning   WorkflowExecutionStatus = "Running"
	WorkflowExecutionStatusSucceeded WorkflowExecutionStatus = "Succeeded"
	WorkflowExecutionStatusFailed    WorkflowExecutionStatus = "Failed"
	WorkflowExecutionStatusCancelled WorkflowExecutionStatus = "Cancelled"
)

type WorkflowRecord

type WorkflowRecord struct {
	ID                uuid.UUID                  `json:"id"`
	Name              string                     `json:"name"`
	Description       string                     `json:"description"`
	Schedule          string                     `json:"schedule"`
	Timezone          string                     `json:"timezone"`
	Enabled           bool                       `json:"enabled"`
	Status            string                     `json:"status"` // active, disabled, deleted
	ConcurrencyPolicy string                     `json:"concurrency_policy"`
	TimeoutSeconds    *int                       `json:"timeout_seconds"`
	Steps             []WorkflowStepRecord       `json:"steps"`
	RetryPolicy       *WorkflowRetryPolicyRecord `json:"retry_policy"`
	CreatedAt         time.Time                  `json:"created_at"`
	UpdatedAt         time.Time                  `json:"updated_at"`
}

WorkflowRecord represents a workflow in the database

func (*WorkflowRecord) ToCRDWorkflowDefinition

func (wr *WorkflowRecord) ToCRDWorkflowDefinition() crd.WorkflowDefinition

ToCRDWorkflowDefinition converts a WorkflowRecord to CRD WorkflowDefinition

func (*WorkflowRecord) ToWorkflowDefinition

func (wr *WorkflowRecord) ToWorkflowDefinition() crd.WorkflowDefinition

ConvertToWorkflowDefinition converts a WorkflowRecord to a crd.WorkflowDefinition

type WorkflowRetryPolicyRecord

type WorkflowRetryPolicyRecord struct {
	WorkflowID           uuid.UUID `json:"workflow_id"`
	MaxRetries           int       `json:"max_retries"`
	RetryDelaySeconds    int       `json:"retry_delay_seconds"`
	BackoffStrategy      string    `json:"backoff_strategy"`
	MaxRetryDelaySeconds int       `json:"max_retry_delay_seconds"`
	BackoffMultiplier    float64   `json:"backoff_multiplier"`
}

WorkflowRetryPolicyRecord represents retry policy in the database

type WorkflowScheduler

type WorkflowScheduler struct {
	// contains filtered or unexported fields
}

WorkflowScheduler integrates CronEngine with WorkflowEngine for scheduled workflow execution

func NewWorkflowScheduler

func NewWorkflowScheduler(cronEngine *CronEngine, workflowEngine *WorkflowEngine, k8sClient client.Client, namespace string, config *config.ComposeConfig, logger logr.Logger) *WorkflowScheduler

NewWorkflowScheduler creates a new workflow scheduler

func (*WorkflowScheduler) EnableDeletionSync

func (ws *WorkflowScheduler) EnableDeletionSync()

EnableDeletionSync enables deletion sync with safety checks to prevent accidental mass deletion

func (*WorkflowScheduler) ExecuteWorkflowManually

func (ws *WorkflowScheduler) ExecuteWorkflowManually(ctx context.Context, workflowDef *crd.WorkflowDefinition, executionID string) error

ExecuteWorkflowManually executes a workflow manually with a specific execution ID

func (*WorkflowScheduler) ListScheduledWorkflows

func (ws *WorkflowScheduler) ListScheduledWorkflows() []*JobSpec

ListScheduledWorkflows returns all currently scheduled workflows

func (*WorkflowScheduler) RemoveWorkflow

func (ws *WorkflowScheduler) RemoveWorkflow(workflowName string) error

RemoveWorkflow removes a workflow from scheduling

func (*WorkflowScheduler) SetWorkflowStore

func (ws *WorkflowScheduler) SetWorkflowStore(store *WorkflowStore)

SetWorkflowStore sets the workflow store for PostgreSQL persistence and CRD sync

func (*WorkflowScheduler) Start

func (ws *WorkflowScheduler) Start() error

Start starts the workflow scheduler

func (*WorkflowScheduler) Stop

func (ws *WorkflowScheduler) Stop()

Stop stops the workflow scheduler

func (*WorkflowScheduler) SyncWorkflow

func (ws *WorkflowScheduler) SyncWorkflow(workflow *crd.WorkflowDefinition) error

SyncWorkflow synchronizes a single workflow schedule

type WorkflowStepRecord

type WorkflowStepRecord struct {
	ID              uuid.UUID              `json:"id"`
	WorkflowID      uuid.UUID              `json:"workflow_id"`
	Name            string                 `json:"name"`
	StepOrder       int                    `json:"step_order"`
	Tool            string                 `json:"tool"`
	Parameters      map[string]interface{} `json:"parameters"`
	ConditionExpr   string                 `json:"condition_expr"`
	ContinueOnError bool                   `json:"continue_on_error"`
	TimeoutSeconds  *int                   `json:"timeout_seconds"`
	DependsOn       []string               `json:"depends_on"`
	CreatedAt       time.Time              `json:"created_at"`
}

WorkflowStepRecord represents a workflow step in the database

type WorkflowStorage

type WorkflowStorage struct {
	// contains filtered or unexported fields
}

WorkflowStorage provides PostgreSQL-backed workflow execution storage

func NewWorkflowStorage

func NewWorkflowStorage(databaseURL string, logger logr.Logger) (*WorkflowStorage, error)

NewWorkflowStorage creates a new workflow storage instance

func (*WorkflowStorage) Close

func (ws *WorkflowStorage) Close() error

func (*WorkflowStorage) GetCronJobs

func (ws *WorkflowStorage) GetCronJobs() ([]*JobSpec, error)

GetCronJobs retrieves all cron jobs

func (*WorkflowStorage) GetWorkflowExecution

func (ws *WorkflowStorage) GetWorkflowExecution(executionID string) (*WorkflowExecutionRecord, error)

GetWorkflowExecution retrieves a workflow execution by ID

func (*WorkflowStorage) GetWorkflowStats

func (ws *WorkflowStorage) GetWorkflowStats(workflowName, workflowNamespace string, days int) (map[string]interface{}, error)

GetWorkflowStats retrieves workflow execution statistics

func (*WorkflowStorage) HealthCheck

func (ws *WorkflowStorage) HealthCheck() error

HealthCheck verifies database connectivity

func (*WorkflowStorage) ListWorkflowExecutions

func (ws *WorkflowStorage) ListWorkflowExecutions(workflowName, workflowNamespace string, limit, offset int) ([]*WorkflowExecutionRecord, error)

ListWorkflowExecutions lists workflow executions with pagination

func (*WorkflowStorage) StoreCronJob

func (ws *WorkflowStorage) StoreCronJob(jobSpec *JobSpec) error

StoreCronJob stores a cron job configuration

func (*WorkflowStorage) StoreWorkflowExecution

func (ws *WorkflowStorage) StoreWorkflowExecution(execution *WorkflowExecution) error

StoreWorkflowExecution stores a workflow execution record

type WorkflowStore

type WorkflowStore struct {
	// contains filtered or unexported fields
}

WorkflowStore provides PostgreSQL-backed workflow persistence

func NewWorkflowStore

func NewWorkflowStore(databaseURL string, logger logr.Logger) (*WorkflowStore, error)

NewWorkflowStore creates a new workflow store connected to PostgreSQL

func (*WorkflowStore) Close

func (ws *WorkflowStore) Close() error

Close closes the database connection

func (*WorkflowStore) CreateWorkflow

func (ws *WorkflowStore) CreateWorkflow(workflow crd.WorkflowDefinition) (*WorkflowRecord, error)

CreateWorkflow creates a new workflow in the database

func (*WorkflowStore) DeleteWorkflow

func (ws *WorkflowStore) DeleteWorkflow(name string) error

DeleteWorkflow soft deletes a workflow by marking it as deleted

func (*WorkflowStore) GetWorkflow

func (ws *WorkflowStore) GetWorkflow(name string) (*WorkflowRecord, error)

GetWorkflow retrieves a workflow by name

func (*WorkflowStore) ListWorkflows

func (ws *WorkflowStore) ListWorkflows() ([]WorkflowRecord, error)

ListWorkflows retrieves all active workflows

func (*WorkflowStore) UpdateWorkflowEnabled

func (ws *WorkflowStore) UpdateWorkflowEnabled(name string, enabled bool) error

UpdateWorkflowEnabled updates the enabled status of a workflow

type WorkflowTemplate

type WorkflowTemplate struct {
	Name        string                 `json:"name"`
	Description string                 `json:"description"`
	Category    string                 `json:"category"`
	Tags        []string               `json:"tags"`
	Parameters  []TemplateParameter    `json:"parameters"`
	Spec        crd.WorkflowDefinition `json:"spec"`
}

type WorkspaceCleanupManager

type WorkspaceCleanupManager struct {
	// contains filtered or unexported fields
}

WorkspaceCleanupManager handles cleanup of retained workspace PVCs based on retention policies

func NewWorkspaceCleanupManager

func NewWorkspaceCleanupManager(k8sClient kubernetes.Interface, namespace string, logger logr.Logger) *WorkspaceCleanupManager

NewWorkspaceCleanupManager creates a new workspace cleanup manager

func (*WorkspaceCleanupManager) CleanupExpiredWorkspaces

func (wcm *WorkspaceCleanupManager) CleanupExpiredWorkspaces() (*WorkspaceCleanupStats, error)

CleanupExpiredWorkspaces removes workspace PVCs that have exceeded their retention period

func (*WorkspaceCleanupManager) CleanupWorkspaceByName

func (wcm *WorkspaceCleanupManager) CleanupWorkspaceByName(workflowName, executionID string) error

CleanupWorkspaceByName manually deletes a specific workspace PVC

func (*WorkspaceCleanupManager) GetWorkspaceStats

func (wcm *WorkspaceCleanupManager) GetWorkspaceStats() (map[string]interface{}, error)

GetWorkspaceStats returns information about workspace PVCs and their retention status

func (*WorkspaceCleanupManager) SetCleanupInterval

func (wcm *WorkspaceCleanupManager) SetCleanupInterval(interval time.Duration)

SetCleanupInterval configures how often to run cleanup

func (*WorkspaceCleanupManager) Stop

func (wcm *WorkspaceCleanupManager) Stop()

Stop stops the cleanup manager

type WorkspaceCleanupStats

type WorkspaceCleanupStats struct {
	TotalWorkspaces   int32     `json:"totalWorkspaces"`
	ExpiredWorkspaces int32     `json:"expiredWorkspaces"`
	DeletedCount      int32     `json:"deletedCount"`
	ErrorCount        int32     `json:"errorCount"`
	LastCleanupTime   time.Time `json:"lastCleanupTime"`
	NextCleanupTime   time.Time `json:"nextCleanupTime"`
}

WorkspaceCleanupStats represents statistics about workspace cleanup operations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL