parallel

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxConcurrency = 10
	DefaultTimeout        = 30 * time.Second
	DefaultQueueSize      = 1000
	DefaultRetryAttempts  = 3
	DefaultRetryBackoff   = 1 * time.Second

	// Task priorities
	PriorityHigh   = 100
	PriorityNormal = 50
	PriorityLow    = 10

	// Task types
	TaskTypeReconcile  = "reconcile"
	TaskTypeValidation = "validation"
	TaskTypeProcessing = "processing"
	TaskTypeBatch      = "batch"
)

Constants for parallel processing

Variables

This section is empty.

Functions

func ExecuteTasksInBatches

func ExecuteTasksInBatches(ctx context.Context, tasks []Task, batchSize int) error

ExecuteTasksInBatches executes tasks in batches with specified batch size

func ExecuteWithRetry

func ExecuteWithRetry(ctx context.Context, task TaskInterface, maxRetries int, backoff time.Duration) error

ExecuteWithRetry executes a task with retry logic

Types

type BackpressureAction

type BackpressureAction struct {
	Name       string          `json:"name"`
	Threshold  float64         `json:"threshold"`
	Action     string          `json:"action"` // throttle, reject, shed_load, degrade
	Parameters json.RawMessage `json:"parameters"`
}

BackpressureAction defines actions to take under backpressure.

type BackpressureConfig

type BackpressureConfig struct {
	Enabled          bool                          `json:"enabled"`
	LoadThreshold    float64                       `json:"loadThreshold"`
	Actions          map[string]BackpressureAction `json:"actions"`
	EvaluationWindow time.Duration                 `json:"evaluationWindow"`
}

BackpressureConfig configures backpressure management.

type BackpressureManager

type BackpressureManager struct {
	// contains filtered or unexported fields
}

BackpressureManager handles system backpressure.

func NewBackpressureManager

func NewBackpressureManager(config *BackpressureConfig, logger logr.Logger) *BackpressureManager

NewBackpressureManager creates a new BackpressureManager.

func (*BackpressureManager) GetStats

func (bm *BackpressureManager) GetStats() (map[string]interface{}, error)

GetStats returns statistics for BackpressureManager (interface-compatible method).

func (*BackpressureManager) ShouldReject

func (bm *BackpressureManager) ShouldReject(taskType TaskType) bool

ShouldReject performs shouldreject operation.

func (*BackpressureManager) Start

func (bm *BackpressureManager) Start(ctx context.Context)

Start performs backpressure monitoring.

func (*BackpressureManager) Stop

func (bm *BackpressureManager) Stop()

Stop performs stop operation.

type BackpressureMetrics

type BackpressureMetrics struct {
	CurrentLoad       float64   `json:"currentLoad"`
	ThrottledRequests int64     `json:"throttledRequests"`
	RejectedRequests  int64     `json:"rejectedRequests"`
	ShedRequests      int64     `json:"shedRequests"`
	DegradedRequests  int64     `json:"degradedRequests"`
	LastAction        string    `json:"lastAction"`
	LastActionTime    time.Time `json:"lastActionTime"`
}

BackpressureMetrics tracks backpressure metrics.

type BackpressureThresholds

type BackpressureThresholds struct {
	Low    float64 `json:"low"`
	Medium float64 `json:"medium"`
	High   float64 `json:"high"`
}

BackpressureThresholds defines load thresholds for backpressure.

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

BatchProcessor handles batch processing of related tasks

func NewBatchProcessor

func NewBatchProcessor(batchSize int, flushInterval time.Duration, processor func(context.Context, []TaskInterface) error) *BatchProcessor

NewBatchProcessor creates a new batch processor

func (*BatchProcessor) AddTask

func (bp *BatchProcessor) AddTask(task TaskInterface)

AddTask adds a task to the batch buffer

func (*BatchProcessor) Close

func (bp *BatchProcessor) Close() error

Close gracefully shuts down the batch processor

type ControllerTaskResult

type ControllerTaskResult struct {
	TaskID      string          `json:"task_id"`
	Success     bool            `json:"success"`
	Error       error           `json:"error,omitempty"`
	Duration    time.Duration   `json:"duration"`
	CompletedAt time.Time       `json:"completed_at"`
	Metadata    json.RawMessage `json:"metadata,omitempty"`
}

ControllerTaskResult represents the result of a parallel task execution

type DependencyAwareStrategy

type DependencyAwareStrategy struct {
	// contains filtered or unexported fields
}

func NewDependencyAwareStrategy

func NewDependencyAwareStrategy() *DependencyAwareStrategy

func (*DependencyAwareStrategy) GetMetrics

func (das *DependencyAwareStrategy) GetMetrics() map[string]float64

func (*DependencyAwareStrategy) GetStrategyName

func (das *DependencyAwareStrategy) GetStrategyName() string

func (*DependencyAwareStrategy) ScheduleTask

func (das *DependencyAwareStrategy) ScheduleTask(task *Task, availableWorkers map[string]int) (string, error)

type DependencyGraph

type DependencyGraph struct {
	// contains filtered or unexported fields
}

DependencyGraph manages task dependencies and execution order

func NewDependencyGraph

func NewDependencyGraph() *DependencyGraph

func (*DependencyGraph) AddTask

func (dg *DependencyGraph) AddTask(taskID string, dependencies, dependents []string)

func (*DependencyGraph) AreDependenciesSatisfied

func (dg *DependencyGraph) AreDependenciesSatisfied(taskID string) bool

func (*DependencyGraph) GetReadyTasks

func (dg *DependencyGraph) GetReadyTasks() []string

func (*DependencyGraph) GetTaskStatus

func (dg *DependencyGraph) GetTaskStatus(taskID string) (completed, failed, exists bool)

func (*DependencyGraph) MarkTaskCompleted

func (dg *DependencyGraph) MarkTaskCompleted(taskID string, success bool)

type DependencyNode

type DependencyNode struct {
	TaskID       string
	Dependencies []*DependencyNode
	Dependents   []*DependencyNode
	Completed    bool
	Failed       bool
	// contains filtered or unexported fields
}

DependencyNode represents a single node in the dependency graph

type IntentProcessor

type IntentProcessor struct {
	// contains filtered or unexported fields
}

func NewIntentProcessor

func NewIntentProcessor(logger logr.Logger) *IntentProcessor

func (*IntentProcessor) GetMetrics

func (ip *IntentProcessor) GetMetrics() map[string]interface{}

func (*IntentProcessor) GetProcessorType

func (ip *IntentProcessor) GetProcessorType() TaskType

func (*IntentProcessor) HealthCheck

func (ip *IntentProcessor) HealthCheck(ctx context.Context) error

func (*IntentProcessor) ProcessTask

func (ip *IntentProcessor) ProcessTask(ctx context.Context, task *Task) (*TaskResult, error)

type LLMProcessor

type LLMProcessor struct {
	// contains filtered or unexported fields
}

func NewLLMProcessor

func NewLLMProcessor(logger logr.Logger) *LLMProcessor

func (*LLMProcessor) GetMetrics

func (lp *LLMProcessor) GetMetrics() map[string]interface{}

func (*LLMProcessor) GetProcessorType

func (lp *LLMProcessor) GetProcessorType() TaskType

func (*LLMProcessor) HealthCheck

func (lp *LLMProcessor) HealthCheck(ctx context.Context) error

func (*LLMProcessor) ProcessTask

func (lp *LLMProcessor) ProcessTask(ctx context.Context, task *Task) (*TaskResult, error)

type LeastConnectionsLoadBalancer

type LeastConnectionsLoadBalancer struct {
	// contains filtered or unexported fields
}

func NewLeastConnectionsLoadBalancer

func NewLeastConnectionsLoadBalancer() *LeastConnectionsLoadBalancer

func (*LeastConnectionsLoadBalancer) GetStrategyName

func (lc *LeastConnectionsLoadBalancer) GetStrategyName() string

func (*LeastConnectionsLoadBalancer) SelectPool

func (lc *LeastConnectionsLoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)

func (*LeastConnectionsLoadBalancer) UpdateMetrics

func (lc *LeastConnectionsLoadBalancer) UpdateMetrics(poolName string, metrics *PoolMetrics)

type LoadBalancedStrategy

type LoadBalancedStrategy struct {
	// contains filtered or unexported fields
}

func NewLoadBalancedStrategy

func NewLoadBalancedStrategy() *LoadBalancedStrategy

func (*LoadBalancedStrategy) GetMetrics

func (lbs *LoadBalancedStrategy) GetMetrics() map[string]float64

func (*LoadBalancedStrategy) GetStrategyName

func (lbs *LoadBalancedStrategy) GetStrategyName() string

func (*LoadBalancedStrategy) ScheduleTask

func (lbs *LoadBalancedStrategy) ScheduleTask(task *Task, availableWorkers map[string]int) (string, error)

type LoadBalancer

type LoadBalancer struct {
	// contains filtered or unexported fields
}

LoadBalancer manages load balancing across worker pools

func NewLoadBalancer

func NewLoadBalancer(logger logr.Logger) *LoadBalancer

func (*LoadBalancer) SelectPool

func (lb *LoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)

func (*LoadBalancer) UpdatePoolMetrics

func (lb *LoadBalancer) UpdatePoolMetrics(poolName string, metrics *PoolMetrics)

type LoadBalancingStrategy

type LoadBalancingStrategy interface {
	SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)
	GetStrategyName() string
	UpdateMetrics(poolName string, metrics *PoolMetrics)
}

LoadBalancingStrategy defines the interface for load balancing strategies

type ParallelProcessingConfig

type ParallelProcessingConfig struct {
	// MaxConcurrentIntents limits concurrent intent processing
	MaxConcurrentIntents int

	// Worker pool sizes for different stages
	IntentPoolSize     int
	LLMPoolSize        int
	RAGPoolSize        int
	ResourcePoolSize   int
	ManifestPoolSize   int
	GitOpsPoolSize     int
	DeploymentPoolSize int

	// Task queue size
	TaskQueueSize int

	// Health check interval
	HealthCheckInterval time.Duration

	// Processing timeouts
	IntentTimeout     time.Duration
	LLMTimeout        time.Duration
	ManifestTimeout   time.Duration
	DeploymentTimeout time.Duration

	// Retry configurations
	MaxRetries      int
	RetryBackoff    time.Duration
	RetryMultiplier float64
}

ParallelProcessingConfig defines configuration for parallel processing

type ParallelProcessingEngine

type ParallelProcessingEngine struct {
	// contains filtered or unexported fields
}

ParallelProcessingEngine orchestrates parallel processing of NetworkIntents

func NewParallelProcessingEngine

func NewParallelProcessingEngine(
	config *ParallelProcessingConfig,
	resilienceManager *resiliencecontroller.ResilienceManager,
	errorTracker *monitoring.ErrorTracker,
	logger logr.Logger,
) (*ParallelProcessingEngine, error)

NewParallelProcessingEngine creates a new parallel processing engine

func (*ParallelProcessingEngine) GetDependencyMetrics

func (e *ParallelProcessingEngine) GetDependencyMetrics() map[string]interface{}

GetDependencyMetrics returns dependency-related metrics

func (*ParallelProcessingEngine) GetMetrics

GetMetrics returns current processing metrics

func (*ParallelProcessingEngine) GetTask

func (e *ParallelProcessingEngine) GetTask(taskID string) (*Task, bool)

GetTask retrieves a task by ID

func (*ParallelProcessingEngine) GetTaskStatus

func (e *ParallelProcessingEngine) GetTaskStatus(taskID string) *Task

GetTaskStatus returns the status of a specific task

func (*ParallelProcessingEngine) HealthCheck

func (e *ParallelProcessingEngine) HealthCheck() error

HealthCheck returns the health status of the processing engine

func (*ParallelProcessingEngine) ProcessIntentWorkflow

func (e *ParallelProcessingEngine) ProcessIntentWorkflow(ctx context.Context, intent *v1.NetworkIntent) (*WorkflowResult, error)

ProcessIntentWorkflow processes a NetworkIntent through the complete workflow

func (*ParallelProcessingEngine) Start

Start initializes and starts the parallel processing engine

func (*ParallelProcessingEngine) Stop

func (e *ParallelProcessingEngine) Stop()

Stop gracefully shuts down the parallel processing engine

func (*ParallelProcessingEngine) SubmitTask

func (e *ParallelProcessingEngine) SubmitTask(task *Task) error

SubmitTask submits a task for processing

type ParallelProcessor

type ParallelProcessor struct {
	// contains filtered or unexported fields
}

ParallelProcessor manages parallel execution of controller tasks

func NewParallelProcessor

func NewParallelProcessor(config *ProcessorConfig) *ParallelProcessor

NewParallelProcessor creates a new parallel processor

func (*ParallelProcessor) GetMetrics

func (pp *ParallelProcessor) GetMetrics() ProcessorMetrics

GetMetrics returns current processing metrics

func (*ParallelProcessor) GetResult

func (pp *ParallelProcessor) GetResult() *ControllerTaskResult

GetResult retrieves a task result (non-blocking)

func (*ParallelProcessor) Shutdown

func (pp *ParallelProcessor) Shutdown() error

Shutdown gracefully shuts down the processor

func (*ParallelProcessor) SubmitTask

func (pp *ParallelProcessor) SubmitTask(task TaskInterface) error

SubmitTask submits a task for parallel execution

func (*ParallelProcessor) SubmitTasks

func (pp *ParallelProcessor) SubmitTasks(tasks []TaskInterface) error

SubmitTasks submits multiple tasks for parallel execution

func (*ParallelProcessor) WaitForAllTasks

func (pp *ParallelProcessor) WaitForAllTasks()

WaitForAllTasks waits for all submitted tasks to complete

func (*ParallelProcessor) WaitForAllTasksWithTimeout

func (pp *ParallelProcessor) WaitForAllTasksWithTimeout(timeout time.Duration) error

WaitForAllTasksWithTimeout waits for all tasks with a timeout

func (*ParallelProcessor) WaitForResult

func (pp *ParallelProcessor) WaitForResult(timeout time.Duration) *ControllerTaskResult

WaitForResult waits for a task result with timeout

type ParallelReconciler

type ParallelReconciler struct {
	Client    client.Client
	Scheme    *runtime.Scheme
	Processor *ParallelProcessor
	Logger    logr.Logger
}

ParallelReconciler provides parallel processing capabilities for controllers

func NewParallelReconciler

func NewParallelReconciler(processor *ParallelProcessor) *ParallelReconciler

NewParallelReconciler creates a new parallel reconciler

func (*ParallelReconciler) ReconcileParallel

func (pr *ParallelReconciler) ReconcileParallel(ctx context.Context, tasks []TaskInterface) error

ReconcileParallel executes reconciliation tasks in parallel

type PoolMetrics

type PoolMetrics struct {
	ActiveWorkers  int32
	ProcessedTasks int64
	FailedTasks    int64
	AverageLatency time.Duration
	MemoryUsage    int64
	CPUUsage       float64
	QueueLength    int
	QueueCapacity  int
	Throughput     float64
	SuccessRate    float64
	LastUpdated    time.Time
}

PoolMetrics contains metrics for a worker pool

type PriorityFirstStrategy

type PriorityFirstStrategy struct {
	// contains filtered or unexported fields
}

func NewPriorityFirstStrategy

func NewPriorityFirstStrategy() *PriorityFirstStrategy

func (*PriorityFirstStrategy) GetMetrics

func (pfs *PriorityFirstStrategy) GetMetrics() map[string]float64

func (*PriorityFirstStrategy) GetStrategyName

func (pfs *PriorityFirstStrategy) GetStrategyName() string

func (*PriorityFirstStrategy) ScheduleTask

func (pfs *PriorityFirstStrategy) ScheduleTask(task *Task, availableWorkers map[string]int) (string, error)

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements a priority-based task queue

func NewPriorityQueue

func NewPriorityQueue(maxSize int) *PriorityQueue

func (*PriorityQueue) Clear

func (pq *PriorityQueue) Clear()

func (*PriorityQueue) Dequeue

func (pq *PriorityQueue) Dequeue() *Task

func (*PriorityQueue) DequeueWithTimeout

func (pq *PriorityQueue) DequeueWithTimeout(timeout time.Duration) *Task

func (*PriorityQueue) Enqueue

func (pq *PriorityQueue) Enqueue(task *Task) error

func (*PriorityQueue) GetTasks

func (pq *PriorityQueue) GetTasks() []*Task

func (*PriorityQueue) IsEmpty

func (pq *PriorityQueue) IsEmpty() bool

func (*PriorityQueue) IsFull

func (pq *PriorityQueue) IsFull() bool

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek() *Task

func (*PriorityQueue) Size

func (pq *PriorityQueue) Size() int

type ProcessingEngineConfig

type ProcessingEngineConfig struct {
	// Worker pool configurations.
	IntentWorkers     int `json:"intentWorkers"`
	LLMWorkers        int `json:"llmWorkers"`
	RAGWorkers        int `json:"ragWorkers"`
	ResourceWorkers   int `json:"resourceWorkers"`
	ManifestWorkers   int `json:"manifestWorkers"`
	GitOpsWorkers     int `json:"gitopsWorkers"`
	DeploymentWorkers int `json:"deploymentWorkers"`

	// Queue configurations.
	MaxQueueSize int           `json:"maxQueueSize"`
	QueueTimeout time.Duration `json:"queueTimeout"`

	// Performance tuning.
	MaxConcurrentIntents int           `json:"maxConcurrentIntents"`
	ProcessingTimeout    time.Duration `json:"processingTimeout"`
	HealthCheckInterval  time.Duration `json:"healthCheckInterval"`

	// Resource limits.
	MaxMemoryPerWorker int64   `json:"maxMemoryPerWorker"`
	MaxCPUPerWorker    float64 `json:"maxCpuPerWorker"`

	// Backpressure settings.
	BackpressureEnabled   bool    `json:"backpressureEnabled"`
	BackpressureThreshold float64 `json:"backpressureThreshold"`

	// Load balancing.
	LoadBalancingStrategy string `json:"loadBalancingStrategy"`

	// Retry and circuit breaker.
	MaxRetries            int  `json:"maxRetries"`
	CircuitBreakerEnabled bool `json:"circuitBreakerEnabled"`

	// Monitoring.
	MetricsEnabled  bool `json:"metricsEnabled"`
	DetailedLogging bool `json:"detailedLogging"`
}

ProcessingEngineConfig holds configuration for the parallel processing engine.

type ProcessingMetrics

type ProcessingMetrics struct {
	// Total number of tasks processed
	TotalTasks int64

	// Number of successful tasks
	SuccessfulTasks int64

	// Number of failed tasks
	FailedTasks int64

	// Current number of running tasks
	RunningTasks int64

	// Current number of pending tasks
	PendingTasks int64

	// Average task processing latency
	AverageLatency time.Duration

	// Success rate (0.0 to 1.0)
	SuccessRate float64

	// Tasks per second throughput
	Throughput float64

	// Worker pool utilization
	WorkerUtilization map[TaskType]float64

	// Queue depths for different task types
	QueueDepths map[TaskType]int

	// Last updated timestamp
	LastUpdated time.Time
}

ProcessingMetrics contains metrics about parallel processing performance

type ProcessingTask

type ProcessingTask struct {
	ID        string
	Priority  int
	Timeout   time.Duration
	Processor func(context.Context) error
	Metadata  map[string]interface{}
}

ProcessingTask represents a generic processing task

func (*ProcessingTask) Execute

func (pt *ProcessingTask) Execute(ctx context.Context) error

Execute implements the TaskInterface for ProcessingTask

func (*ProcessingTask) GetID

func (pt *ProcessingTask) GetID() string

GetID implements the TaskInterface for ProcessingTask

func (*ProcessingTask) GetPriority

func (pt *ProcessingTask) GetPriority() int

GetPriority implements the TaskInterface for ProcessingTask

func (*ProcessingTask) GetTimeout

func (pt *ProcessingTask) GetTimeout() time.Duration

GetTimeout implements the TaskInterface for ProcessingTask

type ProcessorConfig

type ProcessorConfig struct {
	MaxConcurrency int           `json:"max_concurrency"`
	DefaultTimeout time.Duration `json:"default_timeout"`
	QueueSize      int           `json:"queue_size"`
	EnableMetrics  bool          `json:"enable_metrics"`
	RetryAttempts  int           `json:"retry_attempts"`
	RetryBackoff   time.Duration `json:"retry_backoff"`
}

ProcessorConfig contains configuration for parallel processing

func NewDefaultConfig

func NewDefaultConfig() *ProcessorConfig

NewDefaultConfig returns a default parallel processor configuration

type ProcessorMetrics

type ProcessorMetrics struct {
	TotalTasks     int64         `json:"total_tasks"`
	CompletedTasks int64         `json:"completed_tasks"`
	FailedTasks    int64         `json:"failed_tasks"`
	AverageTime    time.Duration `json:"average_time"`
	MaxTime        time.Duration `json:"max_time"`
	MinTime        time.Duration `json:"min_time"`
	// contains filtered or unexported fields
}

ProcessorMetrics tracks parallel processing metrics

func NewProcessorMetrics

func NewProcessorMetrics() *ProcessorMetrics

NewProcessorMetrics creates new processor metrics

type ReconcileTask

type ReconcileTask struct {
	ID         string
	Priority   int
	Timeout    time.Duration
	Object     client.Object
	Req        ctrl.Request
	Client     client.Client
	Scheme     *runtime.Scheme
	Reconciler func(context.Context, ctrl.Request) (ctrl.Result, error)
}

ReconcileTask represents a Kubernetes reconciliation task

func (*ReconcileTask) Execute

func (rt *ReconcileTask) Execute(ctx context.Context) error

Execute implements the Task interface for ReconcileTask

func (*ReconcileTask) GetID

func (rt *ReconcileTask) GetID() string

GetID implements the TaskInterface for ReconcileTask

func (*ReconcileTask) GetPriority

func (rt *ReconcileTask) GetPriority() int

GetPriority implements the TaskInterface for ReconcileTask

func (*ReconcileTask) GetTimeout

func (rt *ReconcileTask) GetTimeout() time.Duration

GetTimeout implements the TaskInterface for ReconcileTask

type ResourceLimiter

type ResourceLimiter struct {
	// contains filtered or unexported fields
}

ResourceLimiter manages resource constraints.

func NewResourceLimiter

func NewResourceLimiter(maxMemory, maxCPU int64, logger logr.Logger) *ResourceLimiter

NewResourceLimiter creates a new ResourceLimiter.

type RoundRobinLoadBalancer

type RoundRobinLoadBalancer struct {
	// contains filtered or unexported fields
}

func NewRoundRobinLoadBalancer

func NewRoundRobinLoadBalancer() *RoundRobinLoadBalancer

func (*RoundRobinLoadBalancer) GetStrategyName

func (rr *RoundRobinLoadBalancer) GetStrategyName() string

func (*RoundRobinLoadBalancer) SelectPool

func (rr *RoundRobinLoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)

func (*RoundRobinLoadBalancer) UpdateMetrics

func (rr *RoundRobinLoadBalancer) UpdateMetrics(poolName string, metrics *PoolMetrics)

type SchedulingStrategy

type SchedulingStrategy interface {
	ScheduleTask(task *Task, availableWorkers map[string]int) (string, error)
	GetStrategyName() string
	GetMetrics() map[string]float64
}

SchedulingStrategy defines the interface for task scheduling strategies

type Task

type Task struct {
	// ID is a unique identifier for the task
	ID string `json:"id"`

	// IntentID links the task to a NetworkIntent
	IntentID string `json:"intent_id"`

	// CorrelationID for tracing and error correlation (Go 1.24+ migration)
	// Added for backward compatibility with existing tests
	CorrelationID string `json:"correlation_id,omitempty"`

	// Type specifies the type of task
	Type TaskType `json:"type"`

	// Priority determines processing priority
	Priority int `json:"priority"`

	// Status tracks the current task status
	Status TaskStatus `json:"status"`

	// InputData contains input parameters for the task
	InputData json.RawMessage `json:"input_data,omitempty"`

	// OutputData contains results from task execution
	OutputData json.RawMessage `json:"output_data,omitempty"`

	// Error contains error information if task failed
	Error error `json:"-"`

	// Context for the task execution
	Context context.Context `json:"-"`

	// Cancel function for task cancellation
	Cancel context.CancelFunc `json:"-"`

	// Timeout for task execution
	Timeout time.Duration `json:"timeout"`

	// RetryConfig for advanced retry configuration (Go 1.24+ evolution)
	RetryConfig *TaskRetryConfig `json:"retry_config,omitempty"`

	// RetryCount tracks number of retry attempts (maintained for compatibility)
	RetryCount int `json:"retry_count"`

	// CreatedAt timestamp
	CreatedAt time.Time `json:"created_at"`

	// StartedAt timestamp
	StartedAt *time.Time `json:"started_at,omitempty"`

	// CompletedAt timestamp
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// ScheduledAt timestamp when task was scheduled
	ScheduledAt *time.Time `json:"scheduled_at,omitempty"`

	// Dependencies are tasks that must complete before this task
	Dependencies []string `json:"dependencies,omitempty"`

	// Metadata for additional task information
	Metadata map[string]string `json:"metadata,omitempty"`

	// Intent reference for context
	Intent *v1.NetworkIntent `json:"-"`

	// Callback functions
	OnSuccess func(*TaskResult) `json:"-"`
	OnFailure func(*TaskResult) `json:"-"`

	// Version field for struct evolution tracking
	Version int `json:"version,omitempty"`
}

Task represents a unit of work in the parallel processing system Using Go 1.24+ struct evolution patterns for backward compatibility

func (*Task) Execute

func (t *Task) Execute(ctx context.Context) error

Execute implements TaskInterface - this is a placeholder Actual execution logic should be implemented by task processors

func (*Task) GetCorrelationID

func (t *Task) GetCorrelationID() string

GetCorrelationID returns the correlation ID with fallback logic

func (*Task) GetEffectiveRetryConfig

func (t *Task) GetEffectiveRetryConfig() *TaskRetryConfig

GetEffectiveRetryConfig returns retry config with backward compatibility

func (*Task) GetEffectiveTimeout

func (t *Task) GetEffectiveTimeout() time.Duration

GetEffectiveTimeout returns the timeout with backward compatibility

func (*Task) GetID

func (t *Task) GetID() string

GetID implements TaskInterface

func (*Task) GetPriority

func (t *Task) GetPriority() int

GetPriority implements TaskInterface

func (*Task) GetTimeout

func (t *Task) GetTimeout() time.Duration

GetTimeout implements TaskInterface

func (*Task) IsEvolved

func (t *Task) IsEvolved() bool

IsEvolved returns true if task uses the new schema

type TaskBuilder

type TaskBuilder struct {
	// contains filtered or unexported fields
}

TaskBuilder provides Go 1.24+ struct evolution patterns for Task construction

func NewTaskBuilder

func NewTaskBuilder(id, intentID string) *TaskBuilder

NewTaskBuilder creates a new task builder with Go 1.24+ evolution patterns

func (*TaskBuilder) Build

func (tb *TaskBuilder) Build() *Task

Build returns the constructed task

func (*TaskBuilder) WithCorrelationID

func (tb *TaskBuilder) WithCorrelationID(correlationID string) *TaskBuilder

WithCorrelationID sets the correlation ID for tracing (Go 1.24+ migration)

func (*TaskBuilder) WithDependencies

func (tb *TaskBuilder) WithDependencies(dependencies []string) *TaskBuilder

WithDependencies sets task dependencies

func (*TaskBuilder) WithInputData

func (tb *TaskBuilder) WithInputData(inputData map[string]interface{}) *TaskBuilder

WithInputData sets input data

func (*TaskBuilder) WithMetadata

func (tb *TaskBuilder) WithMetadata(metadata map[string]string) *TaskBuilder

WithMetadata sets task metadata

func (*TaskBuilder) WithPriority

func (tb *TaskBuilder) WithPriority(priority int) *TaskBuilder

WithPriority sets the task priority

func (*TaskBuilder) WithRetryConfig

func (tb *TaskBuilder) WithRetryConfig(config *TaskRetryConfig) *TaskBuilder

WithRetryConfig sets advanced retry configuration (Go 1.24+ evolution)

func (*TaskBuilder) WithTimeout

func (tb *TaskBuilder) WithTimeout(timeout time.Duration) *TaskBuilder

WithTimeout sets the task timeout

func (*TaskBuilder) WithType

func (tb *TaskBuilder) WithType(taskType TaskType) *TaskBuilder

WithType sets the task type

type TaskCompatibilityLayer

type TaskCompatibilityLayer struct{}

TaskCompatibilityLayer provides methods for zero-downtime migration

func (*TaskCompatibilityLayer) AdaptLegacyTask

func (tcl *TaskCompatibilityLayer) AdaptLegacyTask(task *Task) *Task

AdaptLegacyTask adapts a task created with legacy patterns

func (*TaskCompatibilityLayer) ValidateTaskSchema

func (tcl *TaskCompatibilityLayer) ValidateTaskSchema(task *Task) error

ValidateTaskSchema validates task schema evolution

type TaskEvolution

type TaskEvolution struct{}

TaskEvolution provides backward compatibility and migration helpers

func (*TaskEvolution) MigrateFromV1

func (te *TaskEvolution) MigrateFromV1(legacyTask map[string]interface{}) *Task

MigrateFromV1 migrates a legacy task to new schema (Go 1.24+ migration)

type TaskInterface

type TaskInterface interface {
	Execute(ctx context.Context) error
	GetID() string
	GetPriority() int
	GetTimeout() time.Duration
}

TaskInterface represents a parallelizable task in controller processing

type TaskPriorityQueue

type TaskPriorityQueue struct {
	// contains filtered or unexported fields
}

TaskPriorityQueue implements a priority queue for tasks

func NewTaskPriorityQueue

func NewTaskPriorityQueue() *TaskPriorityQueue

NewTaskPriorityQueue creates a new priority queue for tasks

func (*TaskPriorityQueue) IsEmpty

func (pq *TaskPriorityQueue) IsEmpty() bool

IsEmpty returns true if the queue is empty

func (*TaskPriorityQueue) Len

func (pq *TaskPriorityQueue) Len() int

Len returns the number of tasks in the queue

func (*TaskPriorityQueue) Pop

func (pq *TaskPriorityQueue) Pop() TaskInterface

Pop removes and returns the highest priority task

func (*TaskPriorityQueue) Push

func (pq *TaskPriorityQueue) Push(task TaskInterface)

Push adds a task to the priority queue

type TaskProcessor

type TaskProcessor interface {
	ProcessTask(ctx context.Context, task *Task) (*TaskResult, error)
	GetProcessorType() TaskType
	HealthCheck(ctx context.Context) error
	GetMetrics() map[string]interface{}
}

TaskProcessor defines the interface for processing tasks

type TaskResult

type TaskResult struct {
	// Task identification
	TaskID   string
	WorkerID int
	PoolName string

	// Result status
	Success      bool
	Error        error
	ErrorMessage string

	// Timing information
	Duration       time.Duration
	QueueTime      time.Duration
	ProcessingTime time.Duration
	CompletedAt    time.Time

	// Resource usage
	MemoryUsed int64
	CPUUsed    float64

	// Output data
	OutputData       map[string]interface{}
	ProcessingResult *contracts.ProcessingResult
}

TaskResult contains the result of task processing

type TaskRetryConfig

type TaskRetryConfig struct {
	MaxAttempts   int           `json:"maxAttempts"`
	InitialDelay  time.Duration `json:"initialDelay"`
	BackoffFactor float64       `json:"backoffFactor"`
	MaxDelay      time.Duration `json:"maxDelay"`
}

TaskRetryConfig defines retry configuration for tasks

type TaskScheduler

type TaskScheduler struct {
	// contains filtered or unexported fields
}

TaskScheduler manages task scheduling and dependency resolution

func NewTaskScheduler

func NewTaskScheduler(engine *ParallelProcessingEngine, logger logr.Logger) *TaskScheduler

func (*TaskScheduler) Start

func (ts *TaskScheduler) Start(ctx context.Context)

func (*TaskScheduler) Stop

func (ts *TaskScheduler) Stop()

type TaskStatus

type TaskStatus string

TaskStatus represents the status of a task

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCancelled TaskStatus = "cancelled"
	TaskStatusRetrying  TaskStatus = "retrying"
)

type TaskType

type TaskType string

TaskType represents different types of processing tasks

const (
	TaskTypeIntentParsing      TaskType = "intent_parsing"
	TaskTypeIntentProcessing   TaskType = "intent_processing"
	TaskTypeLLMProcessing      TaskType = "llm_processing"
	TaskTypeRAGQuery           TaskType = "rag_query"
	TaskTypeRAGRetrieval       TaskType = "rag_retrieval"
	TaskTypeResourcePlanning   TaskType = "resource_planning"
	TaskTypeManifestGeneration TaskType = "manifest_generation"
	TaskTypeGitOpsCommit       TaskType = "gitops_commit"
	TaskTypeDeployment         TaskType = "deployment"
	TaskTypeDeploymentVerify   TaskType = "deployment_verify"
)

type ValidationRule

type ValidationRule struct {
	Name        string
	Description string
	Validator   func(context.Context, client.Object, client.Client) error
}

ValidationRule represents a validation rule

type ValidationTask

type ValidationTask struct {
	ID       string
	Priority int
	Timeout  time.Duration
	Object   client.Object
	Rules    []ValidationRule
	Client   client.Client
}

ValidationTask represents a validation task that can run in parallel

func (*ValidationTask) Execute

func (vt *ValidationTask) Execute(ctx context.Context) error

Execute implements the TaskInterface for ValidationTask

func (*ValidationTask) GetID

func (vt *ValidationTask) GetID() string

GetID implements the TaskInterface for ValidationTask

func (*ValidationTask) GetPriority

func (vt *ValidationTask) GetPriority() int

GetPriority implements the TaskInterface for ValidationTask

func (*ValidationTask) GetTimeout

func (vt *ValidationTask) GetTimeout() time.Duration

GetTimeout implements the TaskInterface for ValidationTask

type WeightedResponseTimeLoadBalancer

type WeightedResponseTimeLoadBalancer struct {
	// contains filtered or unexported fields
}

func NewWeightedResponseTimeLoadBalancer

func NewWeightedResponseTimeLoadBalancer() *WeightedResponseTimeLoadBalancer

func (*WeightedResponseTimeLoadBalancer) GetStrategyName

func (wrt *WeightedResponseTimeLoadBalancer) GetStrategyName() string

func (*WeightedResponseTimeLoadBalancer) SelectPool

func (wrt *WeightedResponseTimeLoadBalancer) SelectPool(pools map[string]*WorkerPool, task *Task) (string, error)

func (*WeightedResponseTimeLoadBalancer) UpdateMetrics

func (wrt *WeightedResponseTimeLoadBalancer) UpdateMetrics(poolName string, metrics *PoolMetrics)

type Worker

type Worker struct {
	// Basic identification
	ID      int
	Type    TaskType
	Queue   chan *Task
	Results chan *Task
	Context context.Context
	Cancel  context.CancelFunc
	Engine  *ParallelProcessingEngine
	// contains filtered or unexported fields
}

Worker represents a worker goroutine for processing tasks

func (*Worker) GetMetrics

func (w *Worker) GetMetrics() map[string]interface{}

func (*Worker) IsHealthy

func (w *Worker) IsHealthy() bool

func (*Worker) Run

func (w *Worker) Run()

Run executes the worker's main processing loop

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

func (*Worker) Stop

func (w *Worker) Stop()

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool represents a pool of workers for processing tasks

func NewWorkerPool

func NewWorkerPool(name string, workerCount int, processor TaskProcessor, logger logr.Logger) *WorkerPool

func (*WorkerPool) GetHealth

func (wp *WorkerPool) GetHealth() map[string]interface{}

func (*WorkerPool) GetMetrics

func (wp *WorkerPool) GetMetrics() map[string]interface{}

func (*WorkerPool) Start

func (wp *WorkerPool) Start(ctx context.Context) error

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

func (*WorkerPool) SubmitTask

func (wp *WorkerPool) SubmitTask(task *Task) error

type WorkflowResult

type WorkflowResult struct {
	// IntentID is the ID of the processed intent
	IntentID string

	// Success indicates if the workflow completed successfully
	Success bool

	// Error contains error information if workflow failed
	Error error

	// Tasks contains information about all tasks in the workflow
	Tasks []*Task

	// StartTime when workflow processing began
	StartTime time.Time

	// EndTime when workflow processing completed
	EndTime time.Time

	// Duration of workflow processing
	Duration time.Duration

	// Results contains structured output from the workflow
	Results map[string]interface{}
}

WorkflowResult contains the result of processing a NetworkIntent workflow

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL