Documentation
¶
Overview ¶
internal/scheduler/cron_engine.go
internal/scheduler/k8s_workflow_executor.go
internal/scheduler/pvc_mount_manager.go
internal/scheduler/templates.go
internal/scheduler/tool_executor.go
internal/scheduler/workflow_engine.go
internal/scheduler/workflow_scheduler.go
internal/scheduler/workflow_storage.go
internal/scheduler/workflow_store.go
internal/scheduler/workspace_cleanup.go
Index ¶
- func ConvertToCronExpression(natural string) (string, error)
- func NextScheduledTime(expr string, timezone *time.Location) (time.Time, error)
- func ParseCronExpression(expr string) error
- type CronEngine
- func (e *CronEngine) AddJob(jobSpec *JobSpec) error
- func (e *CronEngine) DisableJob(jobID string) error
- func (e *CronEngine) EnableJob(jobID string) error
- func (e *CronEngine) GetJob(jobID string) (*JobSpec, bool)
- func (e *CronEngine) ListJobs() []*JobSpec
- func (e *CronEngine) RemoveJob(jobID string) error
- func (e *CronEngine) Start()
- func (e *CronEngine) Stop()
- type JobExecution
- type JobHandler
- type JobSpec
- type K8sWorkflowExecutor
- func (kwe *K8sWorkflowExecutor) CancelWorkflowJob(ctx context.Context, workflowName, executionID string) error
- func (kwe *K8sWorkflowExecutor) CleanupOldWorkflowJobs(ctx context.Context, olderThan time.Duration) error
- func (kwe *K8sWorkflowExecutor) ExecuteWorkflowAsJob(ctx context.Context, workflowDef *crd.WorkflowDefinition, ...) (*task_scheduler.TaskStatus, error)
- func (kwe *K8sWorkflowExecutor) GetWorkflowJobLogs(ctx context.Context, workflowName, executionID string) (string, error)
- func (kwe *K8sWorkflowExecutor) GetWorkflowJobStatus(ctx context.Context, workflowName, executionID string) (*task_scheduler.TaskStatus, error)
- func (kwe *K8sWorkflowExecutor) ListWorkflowJobs(ctx context.Context) ([]*task_scheduler.TaskStatus, error)
- type MCPToolCall
- type MCPToolResponse
- type MountInfo
- type PVCMountManager
- func (pmm *PVCMountManager) CleanupExpiredMounts()
- func (pmm *PVCMountManager) GetMountPath(workflowName, executionID string) (string, bool)
- func (pmm *PVCMountManager) ListMountedWorkspaces() []MountInfo
- func (pmm *PVCMountManager) MountWorkspacePVC(workflowName, executionID string) (string, error)
- func (pmm *PVCMountManager) SetCleanupInterval(interval time.Duration)
- func (pmm *PVCMountManager) SetMountTimeout(timeout time.Duration)
- func (pmm *PVCMountManager) Stop()
- func (pmm *PVCMountManager) UnmountWorkspacePVC(workflowName, executionID string) error
- func (pmm *PVCMountManager) UpdateAccessTime(workflowName, executionID string)
- type StepContext
- type StepExecution
- type StepExecutionContext
- type StepExecutionResult
- type StepExecutionStatus
- type TemplateEngine
- type TemplateParameter
- type TemplateRegistry
- func (tr *TemplateRegistry) CreateWorkflowFromTemplate(templateName string, workflowName string, parameters map[string]interface{}) (*crd.WorkflowDefinition, error)
- func (tr *TemplateRegistry) GetTemplate(name string) (*WorkflowTemplate, bool)
- func (tr *TemplateRegistry) ListTemplates() []*WorkflowTemplate
- func (tr *TemplateRegistry) ListTemplatesByCategory(category string) []*WorkflowTemplate
- func (tr *TemplateRegistry) RegisterTemplate(template *WorkflowTemplate)
- type ToolExecutionRequest
- type ToolExecutionResult
- type ToolExecutor
- func (te *ToolExecutor) DiscoverTools(ctx context.Context) ([]string, error)
- func (te *ToolExecutor) ExecuteStepWithContext(ctx context.Context, stepContext *StepExecutionContext, toolName string, ...) (*StepExecutionResult, error)
- func (te *ToolExecutor) ExecuteStepWithContextAndTimeout(ctx context.Context, stepCtx *StepContext, tool string, ...) (*ToolExecutionResult, error)
- func (te *ToolExecutor) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (interface{}, error)
- func (te *ToolExecutor) HealthCheck(ctx context.Context) error
- func (te *ToolExecutor) ValidateTool(ctx context.Context, toolName string) error
- type ToolExecutorInterface
- type WorkflowEngine
- type WorkflowExecution
- type WorkflowExecutionContext
- type WorkflowExecutionDBRecord
- type WorkflowExecutionRecord
- type WorkflowExecutionStatus
- type WorkflowRecord
- type WorkflowRetryPolicyRecord
- type WorkflowScheduler
- func (ws *WorkflowScheduler) EnableDeletionSync()
- func (ws *WorkflowScheduler) ExecuteWorkflowManually(ctx context.Context, workflowDef *crd.WorkflowDefinition, executionID string) error
- func (ws *WorkflowScheduler) ListScheduledWorkflows() []*JobSpec
- func (ws *WorkflowScheduler) RemoveWorkflow(workflowName string) error
- func (ws *WorkflowScheduler) SetWorkflowStore(store *WorkflowStore)
- func (ws *WorkflowScheduler) Start() error
- func (ws *WorkflowScheduler) Stop()
- func (ws *WorkflowScheduler) SyncWorkflow(workflow *crd.WorkflowDefinition) error
- type WorkflowStepRecord
- type WorkflowStorage
- func (ws *WorkflowStorage) Close() error
- func (ws *WorkflowStorage) GetCronJobs() ([]*JobSpec, error)
- func (ws *WorkflowStorage) GetWorkflowExecution(executionID string) (*WorkflowExecutionRecord, error)
- func (ws *WorkflowStorage) GetWorkflowStats(workflowName, workflowNamespace string, days int) (map[string]interface{}, error)
- func (ws *WorkflowStorage) HealthCheck() error
- func (ws *WorkflowStorage) ListWorkflowExecutions(workflowName, workflowNamespace string, limit, offset int) ([]*WorkflowExecutionRecord, error)
- func (ws *WorkflowStorage) StoreCronJob(jobSpec *JobSpec) error
- func (ws *WorkflowStorage) StoreWorkflowExecution(execution *WorkflowExecution) error
- type WorkflowStore
- func (ws *WorkflowStore) Close() error
- func (ws *WorkflowStore) CreateWorkflow(workflow crd.WorkflowDefinition) (*WorkflowRecord, error)
- func (ws *WorkflowStore) DeleteWorkflow(name string) error
- func (ws *WorkflowStore) GetWorkflow(name string) (*WorkflowRecord, error)
- func (ws *WorkflowStore) ListWorkflows() ([]WorkflowRecord, error)
- func (ws *WorkflowStore) UpdateWorkflowEnabled(name string, enabled bool) error
- type WorkflowTemplate
- type WorkspaceCleanupManager
- func (wcm *WorkspaceCleanupManager) CleanupExpiredWorkspaces() (*WorkspaceCleanupStats, error)
- func (wcm *WorkspaceCleanupManager) CleanupWorkspaceByName(workflowName, executionID string) error
- func (wcm *WorkspaceCleanupManager) GetWorkspaceStats() (map[string]interface{}, error)
- func (wcm *WorkspaceCleanupManager) SetCleanupInterval(interval time.Duration)
- func (wcm *WorkspaceCleanupManager) Stop()
- type WorkspaceCleanupStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConvertToCronExpression ¶
ConvertToCronExpression provides helper functions to convert natural language to cron
func NextScheduledTime ¶
NextScheduledTime calculates the next time a cron expression will trigger
func ParseCronExpression ¶
ParseCronExpression validates and parses a cron expression
Types ¶
type CronEngine ¶
type CronEngine struct {
// contains filtered or unexported fields
}
func NewCronEngine ¶
func NewCronEngine(logger logr.Logger) *CronEngine
func (*CronEngine) AddJob ¶
func (e *CronEngine) AddJob(jobSpec *JobSpec) error
func (*CronEngine) DisableJob ¶
func (e *CronEngine) DisableJob(jobID string) error
func (*CronEngine) EnableJob ¶
func (e *CronEngine) EnableJob(jobID string) error
func (*CronEngine) ListJobs ¶
func (e *CronEngine) ListJobs() []*JobSpec
func (*CronEngine) RemoveJob ¶
func (e *CronEngine) RemoveJob(jobID string) error
func (*CronEngine) Start ¶
func (e *CronEngine) Start()
func (*CronEngine) Stop ¶
func (e *CronEngine) Stop()
type JobExecution ¶
type K8sWorkflowExecutor ¶
type K8sWorkflowExecutor struct {
// contains filtered or unexported fields
}
K8sWorkflowExecutor executes workflows as Kubernetes Jobs
func NewK8sWorkflowExecutor ¶
func NewK8sWorkflowExecutor(namespace string, config *config.ComposeConfig, logger logr.Logger) (*K8sWorkflowExecutor, error)
NewK8sWorkflowExecutor creates a new Kubernetes workflow executor
func (*K8sWorkflowExecutor) CancelWorkflowJob ¶
func (kwe *K8sWorkflowExecutor) CancelWorkflowJob(ctx context.Context, workflowName, executionID string) error
CancelWorkflowJob cancels a running workflow job
func (*K8sWorkflowExecutor) CleanupOldWorkflowJobs ¶
func (kwe *K8sWorkflowExecutor) CleanupOldWorkflowJobs(ctx context.Context, olderThan time.Duration) error
CleanupOldWorkflowJobs removes old completed workflow jobs
func (*K8sWorkflowExecutor) ExecuteWorkflowAsJob ¶
func (kwe *K8sWorkflowExecutor) ExecuteWorkflowAsJob(ctx context.Context, workflowDef *crd.WorkflowDefinition, workflowName, executionID string) (*task_scheduler.TaskStatus, error)
ExecuteWorkflowAsJob executes a workflow definition as a Kubernetes Job
func (*K8sWorkflowExecutor) GetWorkflowJobLogs ¶
func (kwe *K8sWorkflowExecutor) GetWorkflowJobLogs(ctx context.Context, workflowName, executionID string) (string, error)
GetWorkflowJobLogs gets logs from a workflow job
func (*K8sWorkflowExecutor) GetWorkflowJobStatus ¶
func (kwe *K8sWorkflowExecutor) GetWorkflowJobStatus(ctx context.Context, workflowName, executionID string) (*task_scheduler.TaskStatus, error)
GetWorkflowJobStatus gets the status of a workflow job
func (*K8sWorkflowExecutor) ListWorkflowJobs ¶
func (kwe *K8sWorkflowExecutor) ListWorkflowJobs(ctx context.Context) ([]*task_scheduler.TaskStatus, error)
ListWorkflowJobs lists all workflow jobs
type MCPToolCall ¶
type MCPToolResponse ¶
type MountInfo ¶
type MountInfo struct {
WorkflowName string `json:"workflowName"`
ExecutionID string `json:"executionID"`
PVCName string `json:"pvcName"`
MountPath string `json:"mountPath"`
MountedAt time.Time `json:"mountedAt"`
LastAccess time.Time `json:"lastAccess"`
AccessCount int64 `json:"accessCount"`
}
MountInfo represents information about a mounted workspace
type PVCMountManager ¶
type PVCMountManager struct {
// contains filtered or unexported fields
}
PVCMountManager handles mounting and unmounting of workspace PVCs for chat agent access
func NewPVCMountManager ¶
func NewPVCMountManager(k8sClient kubernetes.Interface, namespace string, logger logr.Logger) *PVCMountManager
NewPVCMountManager creates a new PVC mount manager
func (*PVCMountManager) CleanupExpiredMounts ¶
func (pmm *PVCMountManager) CleanupExpiredMounts()
CleanupExpiredMounts unmounts workspaces that haven't been accessed recently
func (*PVCMountManager) GetMountPath ¶
func (pmm *PVCMountManager) GetMountPath(workflowName, executionID string) (string, bool)
GetMountPath returns the mount path for a workspace if it's mounted
func (*PVCMountManager) ListMountedWorkspaces ¶
func (pmm *PVCMountManager) ListMountedWorkspaces() []MountInfo
ListMountedWorkspaces returns a list of currently mounted workspaces
func (*PVCMountManager) MountWorkspacePVC ¶
func (pmm *PVCMountManager) MountWorkspacePVC(workflowName, executionID string) (string, error)
MountWorkspacePVC mounts a workspace PVC and returns the mount path
func (*PVCMountManager) SetCleanupInterval ¶
func (pmm *PVCMountManager) SetCleanupInterval(interval time.Duration)
SetCleanupInterval configures how often to run cleanup
func (*PVCMountManager) SetMountTimeout ¶
func (pmm *PVCMountManager) SetMountTimeout(timeout time.Duration)
SetMountTimeout configures how long to keep mounts active without access
func (*PVCMountManager) Stop ¶
func (pmm *PVCMountManager) Stop()
Stop stops the mount manager and cleans up all mounts
func (*PVCMountManager) UnmountWorkspacePVC ¶
func (pmm *PVCMountManager) UnmountWorkspacePVC(workflowName, executionID string) error
UnmountWorkspacePVC unmounts a workspace PVC
func (*PVCMountManager) UpdateAccessTime ¶
func (pmm *PVCMountManager) UpdateAccessTime(workflowName, executionID string)
UpdateAccessTime updates the last access time for a mounted workspace
type StepContext ¶
type StepContext struct {
WorkflowName string
WorkflowNamespace string
StepName string
Attempt int
PreviousOutputs map[string]interface{}
}
StepContext provides context information for step execution
type StepExecution ¶
type StepExecutionContext ¶
type StepExecutionContext struct {
StepName string
}
type StepExecutionResult ¶
type StepExecutionStatus ¶
type StepExecutionStatus string
const ( StepExecutionStatusPending StepExecutionStatus = "Pending" StepExecutionStatusRunning StepExecutionStatus = "Running" StepExecutionStatusSucceeded StepExecutionStatus = "Succeeded" StepExecutionStatusFailed StepExecutionStatus = "Failed" StepExecutionStatusSkipped StepExecutionStatus = "Skipped" StepExecutionStatusRetrying StepExecutionStatus = "Retrying" )
type TemplateEngine ¶
type TemplateEngine struct {
// contains filtered or unexported fields
}
TemplateEngine handles parameter templating with step outputs
func NewTemplateEngine ¶
func NewTemplateEngine(logger logr.Logger) *TemplateEngine
func (*TemplateEngine) EvaluateCondition ¶
func (te *TemplateEngine) EvaluateCondition(condition string) (bool, error)
func (*TemplateEngine) RenderParameters ¶
func (te *TemplateEngine) RenderParameters(parameters map[string]interface{}) (map[string]interface{}, error)
func (*TemplateEngine) SetStepOutput ¶
func (te *TemplateEngine) SetStepOutput(stepName string, output interface{})
type TemplateParameter ¶
type TemplateRegistry ¶
type TemplateRegistry struct {
// contains filtered or unexported fields
}
func NewTemplateRegistry ¶
func NewTemplateRegistry() *TemplateRegistry
func (*TemplateRegistry) CreateWorkflowFromTemplate ¶
func (tr *TemplateRegistry) CreateWorkflowFromTemplate(templateName string, workflowName string, parameters map[string]interface{}) (*crd.WorkflowDefinition, error)
func (*TemplateRegistry) GetTemplate ¶
func (tr *TemplateRegistry) GetTemplate(name string) (*WorkflowTemplate, bool)
func (*TemplateRegistry) ListTemplates ¶
func (tr *TemplateRegistry) ListTemplates() []*WorkflowTemplate
func (*TemplateRegistry) ListTemplatesByCategory ¶
func (tr *TemplateRegistry) ListTemplatesByCategory(category string) []*WorkflowTemplate
func (*TemplateRegistry) RegisterTemplate ¶
func (tr *TemplateRegistry) RegisterTemplate(template *WorkflowTemplate)
type ToolExecutionRequest ¶
type ToolExecutionResult ¶
type ToolExecutor ¶
type ToolExecutor struct {
// contains filtered or unexported fields
}
func NewToolExecutor ¶
func NewToolExecutor(mcpProxyURL, mcpProxyAPIKey string, logger logr.Logger) *ToolExecutor
func (*ToolExecutor) DiscoverTools ¶
func (te *ToolExecutor) DiscoverTools(ctx context.Context) ([]string, error)
func (*ToolExecutor) ExecuteStepWithContext ¶
func (te *ToolExecutor) ExecuteStepWithContext(ctx context.Context, stepContext *StepExecutionContext, toolName string, args map[string]interface{}) (*StepExecutionResult, error)
ExecuteStepWithContext executes a step and returns structured result
func (*ToolExecutor) ExecuteStepWithContextAndTimeout ¶
func (te *ToolExecutor) ExecuteStepWithContextAndTimeout(ctx context.Context, stepCtx *StepContext, tool string, parameters map[string]interface{}, timeout time.Duration) (*ToolExecutionResult, error)
ExecuteStepWithContextAndTimeout executes a single step with full context and timeout
func (*ToolExecutor) ExecuteTool ¶
func (te *ToolExecutor) ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (interface{}, error)
ExecuteToolByName interface method (legacy compatibility)
func (*ToolExecutor) HealthCheck ¶
func (te *ToolExecutor) HealthCheck(ctx context.Context) error
HealthCheck verifies the MCP proxy is accessible
func (*ToolExecutor) ValidateTool ¶
func (te *ToolExecutor) ValidateTool(ctx context.Context, toolName string) error
type ToolExecutorInterface ¶
type ToolExecutorInterface interface {
ExecuteTool(ctx context.Context, toolName string, args map[string]interface{}) (interface{}, error)
ExecuteStepWithContext(ctx context.Context, stepContext *StepExecutionContext, toolName string, args map[string]interface{}) (*StepExecutionResult, error)
}
ToolExecutorInterface defines the interface for executing tools
type WorkflowEngine ¶
type WorkflowEngine struct {
// contains filtered or unexported fields
}
func NewWorkflowEngine ¶
func NewWorkflowEngine(mcpProxyURL, mcpProxyAPIKey string, logger logr.Logger) *WorkflowEngine
func (*WorkflowEngine) ExecuteWorkflow ¶
func (we *WorkflowEngine) ExecuteWorkflow(ctx context.Context, workflowDefinition *crd.WorkflowDefinition, workflowName, workflowNamespace string) (*WorkflowExecution, error)
func (*WorkflowEngine) ValidateWorkflowDefinition ¶
func (we *WorkflowEngine) ValidateWorkflowDefinition(spec *crd.WorkflowDefinition) error
ValidateWorkflowSpec validates a workflow specification
type WorkflowExecution ¶
type WorkflowExecution struct {
ID string
WorkflowName string
WorkflowNamespace string
StartTime time.Time
EndTime *time.Time
Status WorkflowExecutionStatus
Steps []StepExecution
Error string
Context *WorkflowExecutionContext
}
type WorkflowExecutionDBRecord ¶
type WorkflowExecutionDBRecord struct {
ID uuid.UUID `json:"id"`
WorkflowID uuid.UUID `json:"workflow_id"`
RunID string `json:"run_id"`
Status string `json:"status"`
TriggerType string `json:"trigger_type"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at"`
ErrorMessage *string `json:"error_message"`
Result map[string]interface{} `json:"result"`
}
WorkflowExecutionDBRecord represents a workflow execution in the database
type WorkflowExecutionRecord ¶
type WorkflowExecutionRecord struct {
ID string `json:"id"`
WorkflowName string `json:"workflowName"`
WorkflowNamespace string `json:"workflowNamespace"`
StartTime time.Time `json:"startTime"`
EndTime *time.Time `json:"endTime,omitempty"`
Status string `json:"status"`
Steps string `json:"steps"` // JSON-encoded steps
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
WorkflowExecutionRecord represents a stored workflow execution
type WorkflowExecutionStatus ¶
type WorkflowExecutionStatus string
const ( WorkflowExecutionStatusRunning WorkflowExecutionStatus = "Running" WorkflowExecutionStatusSucceeded WorkflowExecutionStatus = "Succeeded" WorkflowExecutionStatusFailed WorkflowExecutionStatus = "Failed" WorkflowExecutionStatusCancelled WorkflowExecutionStatus = "Cancelled" )
type WorkflowRecord ¶
type WorkflowRecord struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Schedule string `json:"schedule"`
Timezone string `json:"timezone"`
Enabled bool `json:"enabled"`
Status string `json:"status"` // active, disabled, deleted
ConcurrencyPolicy string `json:"concurrency_policy"`
TimeoutSeconds *int `json:"timeout_seconds"`
Steps []WorkflowStepRecord `json:"steps"`
RetryPolicy *WorkflowRetryPolicyRecord `json:"retry_policy"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
WorkflowRecord represents a workflow in the database
func (*WorkflowRecord) ToCRDWorkflowDefinition ¶
func (wr *WorkflowRecord) ToCRDWorkflowDefinition() crd.WorkflowDefinition
ToCRDWorkflowDefinition converts a WorkflowRecord to CRD WorkflowDefinition
func (*WorkflowRecord) ToWorkflowDefinition ¶
func (wr *WorkflowRecord) ToWorkflowDefinition() crd.WorkflowDefinition
ConvertToWorkflowDefinition converts a WorkflowRecord to a crd.WorkflowDefinition
type WorkflowRetryPolicyRecord ¶
type WorkflowRetryPolicyRecord struct {
WorkflowID uuid.UUID `json:"workflow_id"`
MaxRetries int `json:"max_retries"`
RetryDelaySeconds int `json:"retry_delay_seconds"`
BackoffStrategy string `json:"backoff_strategy"`
MaxRetryDelaySeconds int `json:"max_retry_delay_seconds"`
BackoffMultiplier float64 `json:"backoff_multiplier"`
}
WorkflowRetryPolicyRecord represents retry policy in the database
type WorkflowScheduler ¶
type WorkflowScheduler struct {
// contains filtered or unexported fields
}
WorkflowScheduler integrates CronEngine with WorkflowEngine for scheduled workflow execution
func NewWorkflowScheduler ¶
func NewWorkflowScheduler(cronEngine *CronEngine, workflowEngine *WorkflowEngine, k8sClient client.Client, namespace string, config *config.ComposeConfig, logger logr.Logger) *WorkflowScheduler
NewWorkflowScheduler creates a new workflow scheduler
func (*WorkflowScheduler) EnableDeletionSync ¶
func (ws *WorkflowScheduler) EnableDeletionSync()
EnableDeletionSync enables deletion sync with safety checks to prevent accidental mass deletion
func (*WorkflowScheduler) ExecuteWorkflowManually ¶
func (ws *WorkflowScheduler) ExecuteWorkflowManually(ctx context.Context, workflowDef *crd.WorkflowDefinition, executionID string) error
ExecuteWorkflowManually executes a workflow manually with a specific execution ID
func (*WorkflowScheduler) ListScheduledWorkflows ¶
func (ws *WorkflowScheduler) ListScheduledWorkflows() []*JobSpec
ListScheduledWorkflows returns all currently scheduled workflows
func (*WorkflowScheduler) RemoveWorkflow ¶
func (ws *WorkflowScheduler) RemoveWorkflow(workflowName string) error
RemoveWorkflow removes a workflow from scheduling
func (*WorkflowScheduler) SetWorkflowStore ¶
func (ws *WorkflowScheduler) SetWorkflowStore(store *WorkflowStore)
SetWorkflowStore sets the workflow store for PostgreSQL persistence and CRD sync
func (*WorkflowScheduler) Start ¶
func (ws *WorkflowScheduler) Start() error
Start starts the workflow scheduler
func (*WorkflowScheduler) Stop ¶
func (ws *WorkflowScheduler) Stop()
Stop stops the workflow scheduler
func (*WorkflowScheduler) SyncWorkflow ¶
func (ws *WorkflowScheduler) SyncWorkflow(workflow *crd.WorkflowDefinition) error
SyncWorkflow synchronizes a single workflow schedule
type WorkflowStepRecord ¶
type WorkflowStepRecord struct {
ID uuid.UUID `json:"id"`
WorkflowID uuid.UUID `json:"workflow_id"`
Name string `json:"name"`
StepOrder int `json:"step_order"`
Tool string `json:"tool"`
Parameters map[string]interface{} `json:"parameters"`
ConditionExpr string `json:"condition_expr"`
ContinueOnError bool `json:"continue_on_error"`
TimeoutSeconds *int `json:"timeout_seconds"`
DependsOn []string `json:"depends_on"`
CreatedAt time.Time `json:"created_at"`
}
WorkflowStepRecord represents a workflow step in the database
type WorkflowStorage ¶
type WorkflowStorage struct {
// contains filtered or unexported fields
}
WorkflowStorage provides PostgreSQL-backed workflow execution storage
func NewWorkflowStorage ¶
func NewWorkflowStorage(databaseURL string, logger logr.Logger) (*WorkflowStorage, error)
NewWorkflowStorage creates a new workflow storage instance
func (*WorkflowStorage) Close ¶
func (ws *WorkflowStorage) Close() error
func (*WorkflowStorage) GetCronJobs ¶
func (ws *WorkflowStorage) GetCronJobs() ([]*JobSpec, error)
GetCronJobs retrieves all cron jobs
func (*WorkflowStorage) GetWorkflowExecution ¶
func (ws *WorkflowStorage) GetWorkflowExecution(executionID string) (*WorkflowExecutionRecord, error)
GetWorkflowExecution retrieves a workflow execution by ID
func (*WorkflowStorage) GetWorkflowStats ¶
func (ws *WorkflowStorage) GetWorkflowStats(workflowName, workflowNamespace string, days int) (map[string]interface{}, error)
GetWorkflowStats retrieves workflow execution statistics
func (*WorkflowStorage) HealthCheck ¶
func (ws *WorkflowStorage) HealthCheck() error
HealthCheck verifies database connectivity
func (*WorkflowStorage) ListWorkflowExecutions ¶
func (ws *WorkflowStorage) ListWorkflowExecutions(workflowName, workflowNamespace string, limit, offset int) ([]*WorkflowExecutionRecord, error)
ListWorkflowExecutions lists workflow executions with pagination
func (*WorkflowStorage) StoreCronJob ¶
func (ws *WorkflowStorage) StoreCronJob(jobSpec *JobSpec) error
StoreCronJob stores a cron job configuration
func (*WorkflowStorage) StoreWorkflowExecution ¶
func (ws *WorkflowStorage) StoreWorkflowExecution(execution *WorkflowExecution) error
StoreWorkflowExecution stores a workflow execution record
type WorkflowStore ¶
type WorkflowStore struct {
// contains filtered or unexported fields
}
WorkflowStore provides PostgreSQL-backed workflow persistence
func NewWorkflowStore ¶
func NewWorkflowStore(databaseURL string, logger logr.Logger) (*WorkflowStore, error)
NewWorkflowStore creates a new workflow store connected to PostgreSQL
func (*WorkflowStore) Close ¶
func (ws *WorkflowStore) Close() error
Close closes the database connection
func (*WorkflowStore) CreateWorkflow ¶
func (ws *WorkflowStore) CreateWorkflow(workflow crd.WorkflowDefinition) (*WorkflowRecord, error)
CreateWorkflow creates a new workflow in the database
func (*WorkflowStore) DeleteWorkflow ¶
func (ws *WorkflowStore) DeleteWorkflow(name string) error
DeleteWorkflow soft deletes a workflow by marking it as deleted
func (*WorkflowStore) GetWorkflow ¶
func (ws *WorkflowStore) GetWorkflow(name string) (*WorkflowRecord, error)
GetWorkflow retrieves a workflow by name
func (*WorkflowStore) ListWorkflows ¶
func (ws *WorkflowStore) ListWorkflows() ([]WorkflowRecord, error)
ListWorkflows retrieves all active workflows
func (*WorkflowStore) UpdateWorkflowEnabled ¶
func (ws *WorkflowStore) UpdateWorkflowEnabled(name string, enabled bool) error
UpdateWorkflowEnabled updates the enabled status of a workflow
type WorkflowTemplate ¶
type WorkflowTemplate struct {
Name string `json:"name"`
Description string `json:"description"`
Category string `json:"category"`
Tags []string `json:"tags"`
Parameters []TemplateParameter `json:"parameters"`
Spec crd.WorkflowDefinition `json:"spec"`
}
type WorkspaceCleanupManager ¶
type WorkspaceCleanupManager struct {
// contains filtered or unexported fields
}
WorkspaceCleanupManager handles cleanup of retained workspace PVCs based on retention policies
func NewWorkspaceCleanupManager ¶
func NewWorkspaceCleanupManager(k8sClient kubernetes.Interface, namespace string, logger logr.Logger) *WorkspaceCleanupManager
NewWorkspaceCleanupManager creates a new workspace cleanup manager
func (*WorkspaceCleanupManager) CleanupExpiredWorkspaces ¶
func (wcm *WorkspaceCleanupManager) CleanupExpiredWorkspaces() (*WorkspaceCleanupStats, error)
CleanupExpiredWorkspaces removes workspace PVCs that have exceeded their retention period
func (*WorkspaceCleanupManager) CleanupWorkspaceByName ¶
func (wcm *WorkspaceCleanupManager) CleanupWorkspaceByName(workflowName, executionID string) error
CleanupWorkspaceByName manually deletes a specific workspace PVC
func (*WorkspaceCleanupManager) GetWorkspaceStats ¶
func (wcm *WorkspaceCleanupManager) GetWorkspaceStats() (map[string]interface{}, error)
GetWorkspaceStats returns information about workspace PVCs and their retention status
func (*WorkspaceCleanupManager) SetCleanupInterval ¶
func (wcm *WorkspaceCleanupManager) SetCleanupInterval(interval time.Duration)
SetCleanupInterval configures how often to run cleanup
func (*WorkspaceCleanupManager) Stop ¶
func (wcm *WorkspaceCleanupManager) Stop()
Stop stops the cleanup manager
type WorkspaceCleanupStats ¶
type WorkspaceCleanupStats struct {
TotalWorkspaces int32 `json:"totalWorkspaces"`
ExpiredWorkspaces int32 `json:"expiredWorkspaces"`
DeletedCount int32 `json:"deletedCount"`
ErrorCount int32 `json:"errorCount"`
LastCleanupTime time.Time `json:"lastCleanupTime"`
NextCleanupTime time.Time `json:"nextCleanupTime"`
}
WorkspaceCleanupStats represents statistics about workspace cleanup operations