Documentation
¶
Index ¶
- Constants
- func ExecuteTasksInBatches(ctx context.Context, tasks []Task, batchSize int) error
- func ExecuteWithRetry(ctx context.Context, task TaskInterface, maxRetries int, backoff time.Duration) error
- type BackpressureAction
- type BackpressureConfig
- type BackpressureManager
- type BackpressureMetrics
- type BackpressureThresholds
- type BatchProcessor
- type ControllerTaskResult
- type DependencyAwareStrategy
- type DependencyGraph
- func (dg *DependencyGraph) AddTask(taskID string, dependencies, dependents []string)
- func (dg *DependencyGraph) AreDependenciesSatisfied(taskID string) bool
- func (dg *DependencyGraph) GetReadyTasks() []string
- func (dg *DependencyGraph) GetTaskStatus(taskID string) (completed, failed, exists bool)
- func (dg *DependencyGraph) MarkTaskCompleted(taskID string, success bool)
- type DependencyNode
- type IntentProcessor
- type LLMProcessor
- type LeastConnectionsLoadBalancer
- type LoadBalancedStrategy
- type LoadBalancer
- type LoadBalancingStrategy
- type ParallelProcessingConfig
- type ParallelProcessingEngine
- func (e *ParallelProcessingEngine) GetDependencyMetrics() map[string]interface{}
- func (e *ParallelProcessingEngine) GetMetrics() *ProcessingMetrics
- func (e *ParallelProcessingEngine) GetTask(taskID string) (*Task, bool)
- func (e *ParallelProcessingEngine) GetTaskStatus(taskID string) *Task
- func (e *ParallelProcessingEngine) HealthCheck() error
- func (e *ParallelProcessingEngine) ProcessIntentWorkflow(ctx context.Context, intent *v1.NetworkIntent) (*WorkflowResult, error)
- func (e *ParallelProcessingEngine) Start(ctx context.Context) error
- func (e *ParallelProcessingEngine) Stop()
- func (e *ParallelProcessingEngine) SubmitTask(task *Task) error
- type ParallelProcessor
- func (pp *ParallelProcessor) GetMetrics() ProcessorMetrics
- func (pp *ParallelProcessor) GetResult() *ControllerTaskResult
- func (pp *ParallelProcessor) Shutdown() error
- func (pp *ParallelProcessor) SubmitTask(task TaskInterface) error
- func (pp *ParallelProcessor) SubmitTasks(tasks []TaskInterface) error
- func (pp *ParallelProcessor) WaitForAllTasks()
- func (pp *ParallelProcessor) WaitForAllTasksWithTimeout(timeout time.Duration) error
- func (pp *ParallelProcessor) WaitForResult(timeout time.Duration) *ControllerTaskResult
- type ParallelReconciler
- type PoolMetrics
- type PriorityFirstStrategy
- type PriorityQueue
- func (pq *PriorityQueue) Clear()
- func (pq *PriorityQueue) Dequeue() *Task
- func (pq *PriorityQueue) DequeueWithTimeout(timeout time.Duration) *Task
- func (pq *PriorityQueue) Enqueue(task *Task) error
- func (pq *PriorityQueue) GetTasks() []*Task
- func (pq *PriorityQueue) IsEmpty() bool
- func (pq *PriorityQueue) IsFull() bool
- func (pq *PriorityQueue) Peek() *Task
- func (pq *PriorityQueue) Size() int
- type ProcessingEngineConfig
- type ProcessingMetrics
- type ProcessingTask
- type ProcessorConfig
- type ProcessorMetrics
- type ReconcileTask
- type ResourceLimiter
- type RoundRobinLoadBalancer
- type SchedulingStrategy
- type Task
- func (t *Task) Execute(ctx context.Context) error
- func (t *Task) GetCorrelationID() string
- func (t *Task) GetEffectiveRetryConfig() *TaskRetryConfig
- func (t *Task) GetEffectiveTimeout() time.Duration
- func (t *Task) GetID() string
- func (t *Task) GetPriority() int
- func (t *Task) GetTimeout() time.Duration
- func (t *Task) IsEvolved() bool
- type TaskBuilder
- func (tb *TaskBuilder) Build() *Task
- func (tb *TaskBuilder) WithCorrelationID(correlationID string) *TaskBuilder
- func (tb *TaskBuilder) WithDependencies(dependencies []string) *TaskBuilder
- func (tb *TaskBuilder) WithInputData(inputData map[string]interface{}) *TaskBuilder
- func (tb *TaskBuilder) WithMetadata(metadata map[string]string) *TaskBuilder
- func (tb *TaskBuilder) WithPriority(priority int) *TaskBuilder
- func (tb *TaskBuilder) WithRetryConfig(config *TaskRetryConfig) *TaskBuilder
- func (tb *TaskBuilder) WithTimeout(timeout time.Duration) *TaskBuilder
- func (tb *TaskBuilder) WithType(taskType TaskType) *TaskBuilder
- type TaskCompatibilityLayer
- type TaskEvolution
- type TaskInterface
- type TaskPriorityQueue
- type TaskProcessor
- type TaskResult
- type TaskRetryConfig
- type TaskScheduler
- type TaskStatus
- type TaskType
- type ValidationRule
- type ValidationTask
- type WeightedResponseTimeLoadBalancer
- type Worker
- type WorkerPool
- type WorkflowResult
Constants ¶
const ( DefaultMaxConcurrency = 10 DefaultTimeout = 30 * time.Second DefaultQueueSize = 1000 DefaultRetryAttempts = 3 DefaultRetryBackoff = 1 * time.Second // Task priorities PriorityHigh = 100 PriorityNormal = 50 PriorityLow = 10 // Task types TaskTypeReconcile = "reconcile" TaskTypeValidation = "validation" TaskTypeProcessing = "processing" TaskTypeBatch = "batch" )
Constants for parallel processing
Variables ¶
This section is empty.
Functions ¶
func ExecuteTasksInBatches ¶
ExecuteTasksInBatches executes tasks in batches with specified batch size
func ExecuteWithRetry ¶
func ExecuteWithRetry(ctx context.Context, task TaskInterface, maxRetries int, backoff time.Duration) error
ExecuteWithRetry executes a task with retry logic
Types ¶
type BackpressureAction ¶
type BackpressureAction struct {
Name string `json:"name"`
Threshold float64 `json:"threshold"`
Action string `json:"action"` // throttle, reject, shed_load, degrade
Parameters json.RawMessage `json:"parameters"`
}
BackpressureAction defines actions to take under backpressure.
type BackpressureConfig ¶
type BackpressureConfig struct {
Enabled bool `json:"enabled"`
LoadThreshold float64 `json:"loadThreshold"`
Actions map[string]BackpressureAction `json:"actions"`
EvaluationWindow time.Duration `json:"evaluationWindow"`
}
BackpressureConfig configures backpressure management.
type BackpressureManager ¶
type BackpressureManager struct {
// contains filtered or unexported fields
}
BackpressureManager handles system backpressure.
func NewBackpressureManager ¶
func NewBackpressureManager(config *BackpressureConfig, logger logr.Logger) *BackpressureManager
NewBackpressureManager creates a new BackpressureManager.
func (*BackpressureManager) GetStats ¶
func (bm *BackpressureManager) GetStats() (map[string]interface{}, error)
GetStats returns statistics for BackpressureManager (interface-compatible method).
func (*BackpressureManager) ShouldReject ¶
func (bm *BackpressureManager) ShouldReject(taskType TaskType) bool
ShouldReject performs shouldreject operation.
func (*BackpressureManager) Start ¶
func (bm *BackpressureManager) Start(ctx context.Context)
Start performs backpressure monitoring.
func (*BackpressureManager) Stop ¶
func (bm *BackpressureManager) Stop()
Stop performs stop operation.
type BackpressureMetrics ¶
type BackpressureMetrics struct {
CurrentLoad float64 `json:"currentLoad"`
ThrottledRequests int64 `json:"throttledRequests"`
RejectedRequests int64 `json:"rejectedRequests"`
ShedRequests int64 `json:"shedRequests"`
DegradedRequests int64 `json:"degradedRequests"`
LastAction string `json:"lastAction"`
LastActionTime time.Time `json:"lastActionTime"`
}
BackpressureMetrics tracks backpressure metrics.
type BackpressureThresholds ¶
type BackpressureThresholds struct {
Low float64 `json:"low"`
Medium float64 `json:"medium"`
High float64 `json:"high"`
}
BackpressureThresholds defines load thresholds for backpressure.
type BatchProcessor ¶
type BatchProcessor struct {
// contains filtered or unexported fields
}
BatchProcessor handles batch processing of related tasks
func NewBatchProcessor ¶
func NewBatchProcessor(batchSize int, flushInterval time.Duration, processor func(context.Context, []TaskInterface) error) *BatchProcessor
NewBatchProcessor creates a new batch processor
func (*BatchProcessor) AddTask ¶
func (bp *BatchProcessor) AddTask(task TaskInterface)
AddTask adds a task to the batch buffer
func (*BatchProcessor) Close ¶
func (bp *BatchProcessor) Close() error
Close gracefully shuts down the batch processor
type ControllerTaskResult ¶
type ControllerTaskResult struct {
TaskID string `json:"task_id"`
Success bool `json:"success"`
Error error `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
CompletedAt time.Time `json:"completed_at"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
ControllerTaskResult represents the result of a parallel task execution
type DependencyAwareStrategy ¶
type DependencyAwareStrategy struct {
// contains filtered or unexported fields
}
func NewDependencyAwareStrategy ¶
func NewDependencyAwareStrategy() *DependencyAwareStrategy
func (*DependencyAwareStrategy) GetMetrics ¶
func (das *DependencyAwareStrategy) GetMetrics() map[string]float64
func (*DependencyAwareStrategy) GetStrategyName ¶
func (das *DependencyAwareStrategy) GetStrategyName() string
func (*DependencyAwareStrategy) ScheduleTask ¶
type DependencyGraph ¶
type DependencyGraph struct {
// contains filtered or unexported fields
}
DependencyGraph manages task dependencies and execution order
func NewDependencyGraph ¶
func NewDependencyGraph() *DependencyGraph
func (*DependencyGraph) AddTask ¶
func (dg *DependencyGraph) AddTask(taskID string, dependencies, dependents []string)
func (*DependencyGraph) AreDependenciesSatisfied ¶
func (dg *DependencyGraph) AreDependenciesSatisfied(taskID string) bool
func (*DependencyGraph) GetReadyTasks ¶
func (dg *DependencyGraph) GetReadyTasks() []string
func (*DependencyGraph) GetTaskStatus ¶
func (dg *DependencyGraph) GetTaskStatus(taskID string) (completed, failed, exists bool)
func (*DependencyGraph) MarkTaskCompleted ¶
func (dg *DependencyGraph) MarkTaskCompleted(taskID string, success bool)
type DependencyNode ¶
type DependencyNode struct {
TaskID string
Dependencies []*DependencyNode
Dependents []*DependencyNode
Completed bool
Failed bool
// contains filtered or unexported fields
}
DependencyNode represents a single node in the dependency graph
type IntentProcessor ¶
type IntentProcessor struct {
// contains filtered or unexported fields
}
func NewIntentProcessor ¶
func NewIntentProcessor(logger logr.Logger) *IntentProcessor
func (*IntentProcessor) GetMetrics ¶
func (ip *IntentProcessor) GetMetrics() map[string]interface{}
func (*IntentProcessor) GetProcessorType ¶
func (ip *IntentProcessor) GetProcessorType() TaskType
func (*IntentProcessor) HealthCheck ¶
func (ip *IntentProcessor) HealthCheck(ctx context.Context) error
func (*IntentProcessor) ProcessTask ¶
func (ip *IntentProcessor) ProcessTask(ctx context.Context, task *Task) (*TaskResult, error)
type LLMProcessor ¶
type LLMProcessor struct {
// contains filtered or unexported fields
}
func NewLLMProcessor ¶
func NewLLMProcessor(logger logr.Logger) *LLMProcessor
func (*LLMProcessor) GetMetrics ¶
func (lp *LLMProcessor) GetMetrics() map[string]interface{}
func (*LLMProcessor) GetProcessorType ¶
func (lp *LLMProcessor) GetProcessorType() TaskType
func (*LLMProcessor) HealthCheck ¶
func (lp *LLMProcessor) HealthCheck(ctx context.Context) error
func (*LLMProcessor) ProcessTask ¶
func (lp *LLMProcessor) ProcessTask(ctx context.Context, task *Task) (*TaskResult, error)
type LeastConnectionsLoadBalancer ¶
type LeastConnectionsLoadBalancer struct {
// contains filtered or unexported fields
}
func NewLeastConnectionsLoadBalancer ¶
func NewLeastConnectionsLoadBalancer() *LeastConnectionsLoadBalancer
func (*LeastConnectionsLoadBalancer) GetStrategyName ¶
func (lc *LeastConnectionsLoadBalancer) GetStrategyName() string
func (*LeastConnectionsLoadBalancer) SelectPool ¶
func (lc *LeastConnectionsLoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)
func (*LeastConnectionsLoadBalancer) UpdateMetrics ¶
func (lc *LeastConnectionsLoadBalancer) UpdateMetrics(poolName string, metrics *PoolMetrics)
type LoadBalancedStrategy ¶
type LoadBalancedStrategy struct {
// contains filtered or unexported fields
}
func NewLoadBalancedStrategy ¶
func NewLoadBalancedStrategy() *LoadBalancedStrategy
func (*LoadBalancedStrategy) GetMetrics ¶
func (lbs *LoadBalancedStrategy) GetMetrics() map[string]float64
func (*LoadBalancedStrategy) GetStrategyName ¶
func (lbs *LoadBalancedStrategy) GetStrategyName() string
func (*LoadBalancedStrategy) ScheduleTask ¶
type LoadBalancer ¶
type LoadBalancer struct {
// contains filtered or unexported fields
}
LoadBalancer manages load balancing across worker pools
func NewLoadBalancer ¶
func NewLoadBalancer(logger logr.Logger) *LoadBalancer
func (*LoadBalancer) SelectPool ¶
func (lb *LoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)
func (*LoadBalancer) UpdatePoolMetrics ¶
func (lb *LoadBalancer) UpdatePoolMetrics(poolName string, metrics *PoolMetrics)
type LoadBalancingStrategy ¶
type LoadBalancingStrategy interface {
SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)
GetStrategyName() string
UpdateMetrics(poolName string, metrics *PoolMetrics)
}
LoadBalancingStrategy defines the interface for load balancing strategies
type ParallelProcessingConfig ¶
type ParallelProcessingConfig struct {
// MaxConcurrentIntents limits concurrent intent processing
MaxConcurrentIntents int
// Worker pool sizes for different stages
IntentPoolSize int
LLMPoolSize int
RAGPoolSize int
ResourcePoolSize int
ManifestPoolSize int
GitOpsPoolSize int
DeploymentPoolSize int
// Task queue size
TaskQueueSize int
// Health check interval
HealthCheckInterval time.Duration
// Processing timeouts
IntentTimeout time.Duration
LLMTimeout time.Duration
ManifestTimeout time.Duration
DeploymentTimeout time.Duration
// Retry configurations
MaxRetries int
RetryBackoff time.Duration
RetryMultiplier float64
}
ParallelProcessingConfig defines configuration for parallel processing
type ParallelProcessingEngine ¶
type ParallelProcessingEngine struct {
// contains filtered or unexported fields
}
ParallelProcessingEngine orchestrates parallel processing of NetworkIntents
func NewParallelProcessingEngine ¶
func NewParallelProcessingEngine( config *ParallelProcessingConfig, resilienceManager *resiliencecontroller.ResilienceManager, errorTracker *monitoring.ErrorTracker, logger logr.Logger, ) (*ParallelProcessingEngine, error)
NewParallelProcessingEngine creates a new parallel processing engine
func (*ParallelProcessingEngine) GetDependencyMetrics ¶
func (e *ParallelProcessingEngine) GetDependencyMetrics() map[string]interface{}
GetDependencyMetrics returns dependency-related metrics
func (*ParallelProcessingEngine) GetMetrics ¶
func (e *ParallelProcessingEngine) GetMetrics() *ProcessingMetrics
GetMetrics returns current processing metrics
func (*ParallelProcessingEngine) GetTask ¶
func (e *ParallelProcessingEngine) GetTask(taskID string) (*Task, bool)
GetTask retrieves a task by ID
func (*ParallelProcessingEngine) GetTaskStatus ¶
func (e *ParallelProcessingEngine) GetTaskStatus(taskID string) *Task
GetTaskStatus returns the status of a specific task
func (*ParallelProcessingEngine) HealthCheck ¶
func (e *ParallelProcessingEngine) HealthCheck() error
HealthCheck returns the health status of the processing engine
func (*ParallelProcessingEngine) ProcessIntentWorkflow ¶
func (e *ParallelProcessingEngine) ProcessIntentWorkflow(ctx context.Context, intent *v1.NetworkIntent) (*WorkflowResult, error)
ProcessIntentWorkflow processes a NetworkIntent through the complete workflow
func (*ParallelProcessingEngine) Start ¶
func (e *ParallelProcessingEngine) Start(ctx context.Context) error
Start initializes and starts the parallel processing engine
func (*ParallelProcessingEngine) Stop ¶
func (e *ParallelProcessingEngine) Stop()
Stop gracefully shuts down the parallel processing engine
func (*ParallelProcessingEngine) SubmitTask ¶
func (e *ParallelProcessingEngine) SubmitTask(task *Task) error
SubmitTask submits a task for processing
type ParallelProcessor ¶
type ParallelProcessor struct {
// contains filtered or unexported fields
}
ParallelProcessor manages parallel execution of controller tasks
func NewParallelProcessor ¶
func NewParallelProcessor(config *ProcessorConfig) *ParallelProcessor
NewParallelProcessor creates a new parallel processor
func (*ParallelProcessor) GetMetrics ¶
func (pp *ParallelProcessor) GetMetrics() ProcessorMetrics
GetMetrics returns current processing metrics
func (*ParallelProcessor) GetResult ¶
func (pp *ParallelProcessor) GetResult() *ControllerTaskResult
GetResult retrieves a task result (non-blocking)
func (*ParallelProcessor) Shutdown ¶
func (pp *ParallelProcessor) Shutdown() error
Shutdown gracefully shuts down the processor
func (*ParallelProcessor) SubmitTask ¶
func (pp *ParallelProcessor) SubmitTask(task TaskInterface) error
SubmitTask submits a task for parallel execution
func (*ParallelProcessor) SubmitTasks ¶
func (pp *ParallelProcessor) SubmitTasks(tasks []TaskInterface) error
SubmitTasks submits multiple tasks for parallel execution
func (*ParallelProcessor) WaitForAllTasks ¶
func (pp *ParallelProcessor) WaitForAllTasks()
WaitForAllTasks waits for all submitted tasks to complete
func (*ParallelProcessor) WaitForAllTasksWithTimeout ¶
func (pp *ParallelProcessor) WaitForAllTasksWithTimeout(timeout time.Duration) error
WaitForAllTasksWithTimeout waits for all tasks with a timeout
func (*ParallelProcessor) WaitForResult ¶
func (pp *ParallelProcessor) WaitForResult(timeout time.Duration) *ControllerTaskResult
WaitForResult waits for a task result with timeout
type ParallelReconciler ¶
type ParallelReconciler struct {
Client client.Client
Scheme *runtime.Scheme
Processor *ParallelProcessor
Logger logr.Logger
}
ParallelReconciler provides parallel processing capabilities for controllers
func NewParallelReconciler ¶
func NewParallelReconciler(processor *ParallelProcessor) *ParallelReconciler
NewParallelReconciler creates a new parallel reconciler
func (*ParallelReconciler) ReconcileParallel ¶
func (pr *ParallelReconciler) ReconcileParallel(ctx context.Context, tasks []TaskInterface) error
ReconcileParallel executes reconciliation tasks in parallel
type PoolMetrics ¶
type PoolMetrics struct {
ActiveWorkers int32
ProcessedTasks int64
FailedTasks int64
AverageLatency time.Duration
MemoryUsage int64
CPUUsage float64
QueueLength int
QueueCapacity int
Throughput float64
SuccessRate float64
LastUpdated time.Time
}
PoolMetrics contains metrics for a worker pool
type PriorityFirstStrategy ¶
type PriorityFirstStrategy struct {
// contains filtered or unexported fields
}
func NewPriorityFirstStrategy ¶
func NewPriorityFirstStrategy() *PriorityFirstStrategy
func (*PriorityFirstStrategy) GetMetrics ¶
func (pfs *PriorityFirstStrategy) GetMetrics() map[string]float64
func (*PriorityFirstStrategy) GetStrategyName ¶
func (pfs *PriorityFirstStrategy) GetStrategyName() string
func (*PriorityFirstStrategy) ScheduleTask ¶
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue implements a priority-based task queue
func NewPriorityQueue ¶
func NewPriorityQueue(maxSize int) *PriorityQueue
func (*PriorityQueue) Clear ¶
func (pq *PriorityQueue) Clear()
func (*PriorityQueue) Dequeue ¶
func (pq *PriorityQueue) Dequeue() *Task
func (*PriorityQueue) DequeueWithTimeout ¶
func (pq *PriorityQueue) DequeueWithTimeout(timeout time.Duration) *Task
func (*PriorityQueue) Enqueue ¶
func (pq *PriorityQueue) Enqueue(task *Task) error
func (*PriorityQueue) GetTasks ¶
func (pq *PriorityQueue) GetTasks() []*Task
func (*PriorityQueue) IsEmpty ¶
func (pq *PriorityQueue) IsEmpty() bool
func (*PriorityQueue) IsFull ¶
func (pq *PriorityQueue) IsFull() bool
func (*PriorityQueue) Peek ¶
func (pq *PriorityQueue) Peek() *Task
func (*PriorityQueue) Size ¶
func (pq *PriorityQueue) Size() int
type ProcessingEngineConfig ¶
type ProcessingEngineConfig struct {
// Worker pool configurations.
IntentWorkers int `json:"intentWorkers"`
LLMWorkers int `json:"llmWorkers"`
RAGWorkers int `json:"ragWorkers"`
ResourceWorkers int `json:"resourceWorkers"`
ManifestWorkers int `json:"manifestWorkers"`
GitOpsWorkers int `json:"gitopsWorkers"`
DeploymentWorkers int `json:"deploymentWorkers"`
// Queue configurations.
MaxQueueSize int `json:"maxQueueSize"`
QueueTimeout time.Duration `json:"queueTimeout"`
// Performance tuning.
MaxConcurrentIntents int `json:"maxConcurrentIntents"`
ProcessingTimeout time.Duration `json:"processingTimeout"`
HealthCheckInterval time.Duration `json:"healthCheckInterval"`
// Resource limits.
MaxMemoryPerWorker int64 `json:"maxMemoryPerWorker"`
MaxCPUPerWorker float64 `json:"maxCpuPerWorker"`
// Backpressure settings.
BackpressureEnabled bool `json:"backpressureEnabled"`
BackpressureThreshold float64 `json:"backpressureThreshold"`
// Load balancing.
LoadBalancingStrategy string `json:"loadBalancingStrategy"`
// Retry and circuit breaker.
MaxRetries int `json:"maxRetries"`
CircuitBreakerEnabled bool `json:"circuitBreakerEnabled"`
// Monitoring.
MetricsEnabled bool `json:"metricsEnabled"`
DetailedLogging bool `json:"detailedLogging"`
}
ProcessingEngineConfig holds configuration for the parallel processing engine.
type ProcessingMetrics ¶
type ProcessingMetrics struct {
// Total number of tasks processed
TotalTasks int64
// Number of successful tasks
SuccessfulTasks int64
// Number of failed tasks
FailedTasks int64
// Current number of running tasks
RunningTasks int64
// Current number of pending tasks
PendingTasks int64
// Average task processing latency
AverageLatency time.Duration
// Success rate (0.0 to 1.0)
SuccessRate float64
// Tasks per second throughput
Throughput float64
// Worker pool utilization
WorkerUtilization map[TaskType]float64
// Queue depths for different task types
QueueDepths map[TaskType]int
// Last updated timestamp
LastUpdated time.Time
}
ProcessingMetrics contains metrics about parallel processing performance
type ProcessingTask ¶
type ProcessingTask struct {
ID string
Priority int
Timeout time.Duration
Processor func(context.Context) error
Metadata map[string]interface{}
}
ProcessingTask represents a generic processing task
func (*ProcessingTask) Execute ¶
func (pt *ProcessingTask) Execute(ctx context.Context) error
Execute implements the TaskInterface for ProcessingTask
func (*ProcessingTask) GetID ¶
func (pt *ProcessingTask) GetID() string
GetID implements the TaskInterface for ProcessingTask
func (*ProcessingTask) GetPriority ¶
func (pt *ProcessingTask) GetPriority() int
GetPriority implements the TaskInterface for ProcessingTask
func (*ProcessingTask) GetTimeout ¶
func (pt *ProcessingTask) GetTimeout() time.Duration
GetTimeout implements the TaskInterface for ProcessingTask
type ProcessorConfig ¶
type ProcessorConfig struct {
MaxConcurrency int `json:"max_concurrency"`
DefaultTimeout time.Duration `json:"default_timeout"`
QueueSize int `json:"queue_size"`
EnableMetrics bool `json:"enable_metrics"`
RetryAttempts int `json:"retry_attempts"`
RetryBackoff time.Duration `json:"retry_backoff"`
}
ProcessorConfig contains configuration for parallel processing
func NewDefaultConfig ¶
func NewDefaultConfig() *ProcessorConfig
NewDefaultConfig returns a default parallel processor configuration
type ProcessorMetrics ¶
type ProcessorMetrics struct {
TotalTasks int64 `json:"total_tasks"`
CompletedTasks int64 `json:"completed_tasks"`
FailedTasks int64 `json:"failed_tasks"`
AverageTime time.Duration `json:"average_time"`
MaxTime time.Duration `json:"max_time"`
MinTime time.Duration `json:"min_time"`
// contains filtered or unexported fields
}
ProcessorMetrics tracks parallel processing metrics
func NewProcessorMetrics ¶
func NewProcessorMetrics() *ProcessorMetrics
NewProcessorMetrics creates new processor metrics
type ReconcileTask ¶
type ReconcileTask struct {
ID string
Priority int
Timeout time.Duration
Object client.Object
Req ctrl.Request
Client client.Client
Scheme *runtime.Scheme
Reconciler func(context.Context, ctrl.Request) (ctrl.Result, error)
}
ReconcileTask represents a Kubernetes reconciliation task
func (*ReconcileTask) Execute ¶
func (rt *ReconcileTask) Execute(ctx context.Context) error
Execute implements the Task interface for ReconcileTask
func (*ReconcileTask) GetID ¶
func (rt *ReconcileTask) GetID() string
GetID implements the TaskInterface for ReconcileTask
func (*ReconcileTask) GetPriority ¶
func (rt *ReconcileTask) GetPriority() int
GetPriority implements the TaskInterface for ReconcileTask
func (*ReconcileTask) GetTimeout ¶
func (rt *ReconcileTask) GetTimeout() time.Duration
GetTimeout implements the TaskInterface for ReconcileTask
type ResourceLimiter ¶
type ResourceLimiter struct {
// contains filtered or unexported fields
}
ResourceLimiter manages resource constraints.
func NewResourceLimiter ¶
func NewResourceLimiter(maxMemory, maxCPU int64, logger logr.Logger) *ResourceLimiter
NewResourceLimiter creates a new ResourceLimiter.
type RoundRobinLoadBalancer ¶
type RoundRobinLoadBalancer struct {
// contains filtered or unexported fields
}
func NewRoundRobinLoadBalancer ¶
func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer
func (*RoundRobinLoadBalancer) GetStrategyName ¶
func (rr *RoundRobinLoadBalancer) GetStrategyName() string
func (*RoundRobinLoadBalancer) SelectPool ¶
func (rr *RoundRobinLoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)
func (*RoundRobinLoadBalancer) UpdateMetrics ¶
func (rr *RoundRobinLoadBalancer) UpdateMetrics(poolName string, metrics *PoolMetrics)
type SchedulingStrategy ¶
type SchedulingStrategy interface {
ScheduleTask(task *Task, availableWorkers map[string]int) (string, error)
GetStrategyName() string
GetMetrics() map[string]float64
}
SchedulingStrategy defines the interface for task scheduling strategies
type Task ¶
type Task struct {
// ID is a unique identifier for the task
ID string `json:"id"`
// IntentID links the task to a NetworkIntent
IntentID string `json:"intent_id"`
// CorrelationID for tracing and error correlation (Go 1.24+ migration)
// Added for backward compatibility with existing tests
CorrelationID string `json:"correlation_id,omitempty"`
// Type specifies the type of task
Type TaskType `json:"type"`
// Priority determines processing priority
Priority int `json:"priority"`
// Status tracks the current task status
Status TaskStatus `json:"status"`
// InputData contains input parameters for the task
InputData json.RawMessage `json:"input_data,omitempty"`
// OutputData contains results from task execution
OutputData json.RawMessage `json:"output_data,omitempty"`
// Error contains error information if task failed
Error error `json:"-"`
// Context for the task execution
Context context.Context `json:"-"`
// Cancel function for task cancellation
Cancel context.CancelFunc `json:"-"`
// Timeout for task execution
Timeout time.Duration `json:"timeout"`
// RetryConfig for advanced retry configuration (Go 1.24+ evolution)
RetryConfig *TaskRetryConfig `json:"retry_config,omitempty"`
// RetryCount tracks number of retry attempts (maintained for compatibility)
RetryCount int `json:"retry_count"`
// CreatedAt timestamp
CreatedAt time.Time `json:"created_at"`
// StartedAt timestamp
StartedAt *time.Time `json:"started_at,omitempty"`
// CompletedAt timestamp
CompletedAt *time.Time `json:"completed_at,omitempty"`
// ScheduledAt timestamp when task was scheduled
ScheduledAt *time.Time `json:"scheduled_at,omitempty"`
// Dependencies are tasks that must complete before this task
Dependencies []string `json:"dependencies,omitempty"`
// Metadata for additional task information
Metadata map[string]string `json:"metadata,omitempty"`
// Intent reference for context
Intent *v1.NetworkIntent `json:"-"`
// Callback functions
OnSuccess func(*TaskResult) `json:"-"`
OnFailure func(*TaskResult) `json:"-"`
// Version field for struct evolution tracking
Version int `json:"version,omitempty"`
}
Task represents a unit of work in the parallel processing system Using Go 1.24+ struct evolution patterns for backward compatibility
func (*Task) Execute ¶
Execute implements TaskInterface - this is a placeholder Actual execution logic should be implemented by task processors
func (*Task) GetCorrelationID ¶
GetCorrelationID returns the correlation ID with fallback logic
func (*Task) GetEffectiveRetryConfig ¶
func (t *Task) GetEffectiveRetryConfig() *TaskRetryConfig
GetEffectiveRetryConfig returns retry config with backward compatibility
func (*Task) GetEffectiveTimeout ¶
GetEffectiveTimeout returns the timeout with backward compatibility
func (*Task) GetTimeout ¶
GetTimeout implements TaskInterface
type TaskBuilder ¶
type TaskBuilder struct {
// contains filtered or unexported fields
}
TaskBuilder provides Go 1.24+ struct evolution patterns for Task construction
func NewTaskBuilder ¶
func NewTaskBuilder(id, intentID string) *TaskBuilder
NewTaskBuilder creates a new task builder with Go 1.24+ evolution patterns
func (*TaskBuilder) WithCorrelationID ¶
func (tb *TaskBuilder) WithCorrelationID(correlationID string) *TaskBuilder
WithCorrelationID sets the correlation ID for tracing (Go 1.24+ migration)
func (*TaskBuilder) WithDependencies ¶
func (tb *TaskBuilder) WithDependencies(dependencies []string) *TaskBuilder
WithDependencies sets task dependencies
func (*TaskBuilder) WithInputData ¶
func (tb *TaskBuilder) WithInputData(inputData map[string]interface{}) *TaskBuilder
WithInputData sets input data
func (*TaskBuilder) WithMetadata ¶
func (tb *TaskBuilder) WithMetadata(metadata map[string]string) *TaskBuilder
WithMetadata sets task metadata
func (*TaskBuilder) WithPriority ¶
func (tb *TaskBuilder) WithPriority(priority int) *TaskBuilder
WithPriority sets the task priority
func (*TaskBuilder) WithRetryConfig ¶
func (tb *TaskBuilder) WithRetryConfig(config *TaskRetryConfig) *TaskBuilder
WithRetryConfig sets advanced retry configuration (Go 1.24+ evolution)
func (*TaskBuilder) WithTimeout ¶
func (tb *TaskBuilder) WithTimeout(timeout time.Duration) *TaskBuilder
WithTimeout sets the task timeout
func (*TaskBuilder) WithType ¶
func (tb *TaskBuilder) WithType(taskType TaskType) *TaskBuilder
WithType sets the task type
type TaskCompatibilityLayer ¶
type TaskCompatibilityLayer struct{}
TaskCompatibilityLayer provides methods for zero-downtime migration
func (*TaskCompatibilityLayer) AdaptLegacyTask ¶
func (tcl *TaskCompatibilityLayer) AdaptLegacyTask(task *Task) *Task
AdaptLegacyTask adapts a task created with legacy patterns
func (*TaskCompatibilityLayer) ValidateTaskSchema ¶
func (tcl *TaskCompatibilityLayer) ValidateTaskSchema(task *Task) error
ValidateTaskSchema validates task schema evolution
type TaskEvolution ¶
type TaskEvolution struct{}
TaskEvolution provides backward compatibility and migration helpers
func (*TaskEvolution) MigrateFromV1 ¶
func (te *TaskEvolution) MigrateFromV1(legacyTask map[string]interface{}) *Task
MigrateFromV1 migrates a legacy task to new schema (Go 1.24+ migration)
type TaskInterface ¶
type TaskInterface interface {
Execute(ctx context.Context) error
GetID() string
GetPriority() int
GetTimeout() time.Duration
}
TaskInterface represents a parallelizable task in controller processing
type TaskPriorityQueue ¶
type TaskPriorityQueue struct {
// contains filtered or unexported fields
}
TaskPriorityQueue implements a priority queue for tasks
func NewTaskPriorityQueue ¶
func NewTaskPriorityQueue() *TaskPriorityQueue
NewTaskPriorityQueue creates a new priority queue for tasks
func (*TaskPriorityQueue) IsEmpty ¶
func (pq *TaskPriorityQueue) IsEmpty() bool
IsEmpty returns true if the queue is empty
func (*TaskPriorityQueue) Len ¶
func (pq *TaskPriorityQueue) Len() int
Len returns the number of tasks in the queue
func (*TaskPriorityQueue) Pop ¶
func (pq *TaskPriorityQueue) Pop() TaskInterface
Pop removes and returns the highest priority task
func (*TaskPriorityQueue) Push ¶
func (pq *TaskPriorityQueue) Push(task TaskInterface)
Push adds a task to the priority queue
type TaskProcessor ¶
type TaskProcessor interface {
ProcessTask(ctx context.Context, task *Task) (*TaskResult, error)
GetProcessorType() TaskType
HealthCheck(ctx context.Context) error
GetMetrics() map[string]interface{}
}
TaskProcessor defines the interface for processing tasks
type TaskResult ¶
type TaskResult struct {
// Task identification
TaskID string
WorkerID int
PoolName string
// Result status
Success bool
Error error
ErrorMessage string
// Timing information
Duration time.Duration
QueueTime time.Duration
ProcessingTime time.Duration
CompletedAt time.Time
// Resource usage
MemoryUsed int64
CPUUsed float64
// Output data
OutputData map[string]interface{}
ProcessingResult *contracts.ProcessingResult
}
TaskResult contains the result of task processing
type TaskRetryConfig ¶
type TaskRetryConfig struct {
MaxAttempts int `json:"maxAttempts"`
InitialDelay time.Duration `json:"initialDelay"`
BackoffFactor float64 `json:"backoffFactor"`
MaxDelay time.Duration `json:"maxDelay"`
}
TaskRetryConfig defines retry configuration for tasks
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
TaskScheduler manages task scheduling and dependency resolution
func NewTaskScheduler ¶
func NewTaskScheduler(engine *ParallelProcessingEngine, logger logr.Logger) *TaskScheduler
func (*TaskScheduler) Start ¶
func (ts *TaskScheduler) Start(ctx context.Context)
func (*TaskScheduler) Stop ¶
func (ts *TaskScheduler) Stop()
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the status of a task
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusCancelled TaskStatus = "cancelled" TaskStatusRetrying TaskStatus = "retrying" )
type TaskType ¶
type TaskType string
TaskType represents different types of processing tasks
const ( TaskTypeIntentParsing TaskType = "intent_parsing" TaskTypeIntentProcessing TaskType = "intent_processing" TaskTypeLLMProcessing TaskType = "llm_processing" TaskTypeRAGQuery TaskType = "rag_query" TaskTypeRAGRetrieval TaskType = "rag_retrieval" TaskTypeResourcePlanning TaskType = "resource_planning" TaskTypeManifestGeneration TaskType = "manifest_generation" TaskTypeGitOpsCommit TaskType = "gitops_commit" TaskTypeDeployment TaskType = "deployment" TaskTypeDeploymentVerify TaskType = "deployment_verify" )
type ValidationRule ¶
type ValidationRule struct {
Name string
Description string
Validator func(context.Context, client.Object, client.Client) error
}
ValidationRule represents a validation rule
type ValidationTask ¶
type ValidationTask struct {
ID string
Priority int
Timeout time.Duration
Object client.Object
Rules []ValidationRule
Client client.Client
}
ValidationTask represents a validation task that can run in parallel
func (*ValidationTask) Execute ¶
func (vt *ValidationTask) Execute(ctx context.Context) error
Execute implements the TaskInterface for ValidationTask
func (*ValidationTask) GetID ¶
func (vt *ValidationTask) GetID() string
GetID implements the TaskInterface for ValidationTask
func (*ValidationTask) GetPriority ¶
func (vt *ValidationTask) GetPriority() int
GetPriority implements the TaskInterface for ValidationTask
func (*ValidationTask) GetTimeout ¶
func (vt *ValidationTask) GetTimeout() time.Duration
GetTimeout implements the TaskInterface for ValidationTask
type WeightedResponseTimeLoadBalancer ¶
type WeightedResponseTimeLoadBalancer struct {
// contains filtered or unexported fields
}
func NewWeightedResponseTimeLoadBalancer ¶
func NewWeightedResponseTimeLoadBalancer() *WeightedResponseTimeLoadBalancer
func (*WeightedResponseTimeLoadBalancer) GetStrategyName ¶
func (wrt *WeightedResponseTimeLoadBalancer) GetStrategyName() string
func (*WeightedResponseTimeLoadBalancer) SelectPool ¶
func (wrt *WeightedResponseTimeLoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)
func (*WeightedResponseTimeLoadBalancer) UpdateMetrics ¶
func (wrt *WeightedResponseTimeLoadBalancer) UpdateMetrics(poolName string, metrics *PoolMetrics)
type Worker ¶
type Worker struct {
// Basic identification
ID int
Type TaskType
Queue chan *Task
Results chan *Task
Context context.Context
Cancel context.CancelFunc
Engine *ParallelProcessingEngine
// contains filtered or unexported fields
}
Worker represents a worker goroutine for processing tasks
func (*Worker) GetMetrics ¶
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool represents a pool of workers for processing tasks
func NewWorkerPool ¶
func NewWorkerPool(name string, workerCount int, processor TaskProcessor, logger logr.Logger) *WorkerPool
func (*WorkerPool) GetHealth ¶
func (wp *WorkerPool) GetHealth() map[string]interface{}
func (*WorkerPool) GetMetrics ¶
func (wp *WorkerPool) GetMetrics() map[string]interface{}
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()
func (*WorkerPool) SubmitTask ¶
func (wp *WorkerPool) SubmitTask(task *Task) error
type WorkflowResult ¶
type WorkflowResult struct {
// IntentID is the ID of the processed intent
IntentID string
// Success indicates if the workflow completed successfully
Success bool
// Error contains error information if workflow failed
Error error
// Tasks contains information about all tasks in the workflow
Tasks []*Task
// StartTime when workflow processing began
StartTime time.Time
// EndTime when workflow processing completed
EndTime time.Time
// Duration of workflow processing
Duration time.Duration
// Results contains structured output from the workflow
Results map[string]interface{}
}
WorkflowResult contains the result of processing a NetworkIntent workflow