loop

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2025 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Overview

Package loop provides file system watching and processing capabilities for the conductor.

Index

Constants

View Source
const (
	MaxJSONSize = 5 * 1024 * 1024 // 5MB max JSON size for handling larger intent files

	MaxStatusSize = 256 * 1024 // 256KB max status file size (reduced for efficiency)

	MaxMessageSize = 64 * 1024 // 64KB max error message size

	MaxFileNameLength = 255 // Max filename length

	MaxPathDepth = 10 // Max directory depth for path traversal prevention

	MaxJSONDepth = 100 // Max JSON nesting depth for JSON bomb prevention

	LatencyRingBufferSize = 1000 // Size of latency ring buffer

	MetricsUpdateInterval = 10 * time.Second // Metrics update frequency

)
View Source
const (
	StateFileName = ".conductor-state.json"
)

Variables

View Source
var ErrFileGone = errors.New("file gone")
View Source
var ErrNilWatcher = errors.New("watcher is nil")

Functions

func AlertingExample

func AlertingExample(watcher *Watcher)

func ComputeStatusFileName

func ComputeStatusFileName(srcPath string, timestamp time.Time) string

func DefaultPorchSubmit

func DefaultPorchSubmit(ctx context.Context, intent *ingest.Intent, mode string) error

func ExampleBadDeferPattern

func ExampleBadDeferPattern() error

func ExampleFixedDeferPattern

func ExampleFixedDeferPattern() error

func ExampleMetricsUsage

func ExampleMetricsUsage()

func ExampleSafeDeferPattern1

func ExampleSafeDeferPattern1() error

func ExampleSafeDeferPattern2

func ExampleSafeDeferPattern2() error

func ExampleSafeDeferPattern3

func ExampleSafeDeferPattern3() error

func ExampleSafeDeferPattern4

func ExampleSafeDeferPattern4() error

func IsIntentFile

func IsIntentFile(filename string) bool

func IsLongPathSupportEnabled

func IsLongPathSupportEnabled() bool

func IsShutdownFailure

func IsShutdownFailure(shuttingDown bool, err error) bool

func MetricsCollectorExample

func MetricsCollectorExample()

func NormalizeLongPath

func NormalizeLongPath(path string) string

func SafeClose

func SafeClose(watcher *Watcher) error

func SafeCloserFunc

func SafeCloserFunc(watcher *Watcher) func()

func SafeWatcherOperation

func SafeWatcherOperation(watcher *Watcher, operation func(*Watcher) error) error

Types

type AIContext

type AIContext struct {
	ProcessingTimeEstimate time.Duration `json:"processing_time_estimate"`

	ResourceRequirement uint8 `json:"resource_requirement"` // 1-10 scale

	EnergyCost float32 `json:"energy_cost"` // Watts

	BatchCompatible bool `json:"batch_compatible"`
}

type AccessPredictor

type AccessPredictor struct{}

type AdaptivePoolConfig

type AdaptivePoolConfig struct {
	MinWorkers int // Minimum number of workers

	MaxWorkers int // Maximum number of workers

	QueueSize int // Maximum queue size

	ScaleUpThreshold float64 // Queue utilization threshold to scale up (0.7 = 70%)

	ScaleDownThreshold float64 // Queue utilization threshold to scale down (0.3 = 30%)

	ScaleCooldown time.Duration // Minimum time between scaling operations

	MetricsWindow int // Size of metrics window for decisions
}

func DefaultAdaptiveConfig

func DefaultAdaptiveConfig() *AdaptivePoolConfig

type AdaptiveWorkerPool

type AdaptiveWorkerPool struct {
	// contains filtered or unexported fields
}

func NewAdaptiveWorkerPool

func NewAdaptiveWorkerPool(config *AdaptivePoolConfig) *AdaptiveWorkerPool

func (*AdaptiveWorkerPool) ActiveWorkers

func (p *AdaptiveWorkerPool) ActiveWorkers() int64

func (*AdaptiveWorkerPool) GetMetrics

func (p *AdaptiveWorkerPool) GetMetrics() *PoolMetrics

func (*AdaptiveWorkerPool) IsBackpressured

func (p *AdaptiveWorkerPool) IsBackpressured() bool

func (*AdaptiveWorkerPool) QueueDepth

func (p *AdaptiveWorkerPool) QueueDepth() int

func (*AdaptiveWorkerPool) Shutdown

func (p *AdaptiveWorkerPool) Shutdown(timeout time.Duration) error

func (*AdaptiveWorkerPool) Submit

func (p *AdaptiveWorkerPool) Submit(item WorkItem) error

type AggregatedMetrics

type AggregatedMetrics struct {
	AvgProcessingTime time.Duration

	P50ProcessingTime time.Duration

	P95ProcessingTime time.Duration

	P99ProcessingTime time.Duration

	AvgQueueDepth float64

	MaxQueueDepth int

	Throughput float64 // items per second

	SuccessRate float64

	LastUpdate time.Time
}

type AnomalyEvent

type AnomalyEvent struct {
	Timestamp time.Time `json:"timestamp"`

	Severity float64 `json:"severity"` // 0.0-1.0 severity score

	Features []float64 `json:"features"` // Input features that caused anomaly

	ReconstructionError float64 `json:"reconstruction_error"`

	Explanation string `json:"explanation"` // Human-readable explanation
}

type AnomalyResult

type AnomalyResult struct {
	IsAnomaly bool `json:"is_anomaly"`

	ReconstructionError float64 `json:"reconstruction_error"`

	Severity float64 `json:"severity"`

	Features []float64 `json:"features"`

	Timestamp time.Time `json:"timestamp"`

	Explanation string `json:"explanation"`
}

type AsyncIOConfig

type AsyncIOConfig struct {
	WriteWorkers int // Number of concurrent write workers

	WriteBatchSize int // Maximum writes to batch

	WriteBatchAge time.Duration // Maximum age of batched writes

	ReadCacheSize int // Number of files to cache

	ReadAheadSize int // Bytes to read ahead

	BufferSize int // I/O buffer size
}

func DefaultAsyncIOConfig

func DefaultAsyncIOConfig() *AsyncIOConfig

type AsyncIOManager

type AsyncIOManager struct {
	// contains filtered or unexported fields
}

func NewAsyncIOManager

func NewAsyncIOManager(config *AsyncIOConfig) *AsyncIOManager

func (*AsyncIOManager) GetMetrics

func (m *AsyncIOManager) GetMetrics() *IOMetrics

func (*AsyncIOManager) ReadFileAsync

func (m *AsyncIOManager) ReadFileAsync(ctx context.Context, path string) ([]byte, error)

func (*AsyncIOManager) Shutdown

func (m *AsyncIOManager) Shutdown(timeout time.Duration) error

func (*AsyncIOManager) WriteFileAsync

func (m *AsyncIOManager) WriteFileAsync(path string, data []byte, mode os.FileMode, callback func(error))

type AsyncWorkItem

type AsyncWorkItem struct {
	FilePath string

	FileSize int64

	ModTime time.Time

	Priority uint8 // 0=normal, 1=high, 2=critical

	Deadline time.Time // For O-RAN L Release SLA compliance

	AIContext *AIContext // ML prediction context
}

type AttentionMechanism

type AttentionMechanism struct {
	// contains filtered or unexported fields
}

func (*AttentionMechanism) Forward

func (am *AttentionMechanism) Forward(input *mat.Dense) (*mat.Dense, error)

type AutoEncoderDetector

type AutoEncoderDetector struct {
	// contains filtered or unexported fields
}

type BatchBuffer

type BatchBuffer struct {
	// contains filtered or unexported fields
}

func (*BatchBuffer) Add

func (b *BatchBuffer) Add(req *WriteRequest) bool

type BatchProcessor

type BatchProcessor struct {
	// contains filtered or unexported fields
}

type BoundedFileList

type BoundedFileList struct {
	// contains filtered or unexported fields
}

func NewBoundedFileList

func NewBoundedFileList(maxSize int) *BoundedFileList

func (*BoundedFileList) Add

func (l *BoundedFileList) Add(info FileInfo)

func (*BoundedFileList) GetRecent

func (l *BoundedFileList) GetRecent(n int) []FileInfo

type BoundedStats

type BoundedStats struct {
	// contains filtered or unexported fields
}

func NewBoundedStats

func NewBoundedStats(config *BoundedStatsConfig) *BoundedStats

func (*BoundedStats) GetSnapshot

func (s *BoundedStats) GetSnapshot() StatsSnapshot

func (*BoundedStats) RecordCPUUsage

func (s *BoundedStats) RecordCPUUsage(percentage float64)

func (*BoundedStats) RecordFailure

func (s *BoundedStats) RecordFailure(filename, errorType string)

func (*BoundedStats) RecordMemoryUsage

func (s *BoundedStats) RecordMemoryUsage(bytes int64)

func (*BoundedStats) RecordProcessing

func (s *BoundedStats) RecordProcessing(filename string, size int64, duration time.Duration)

func (*BoundedStats) RecordQueueDepth

func (s *BoundedStats) RecordQueueDepth(depth int)

func (*BoundedStats) RecordTimeout

func (s *BoundedStats) RecordTimeout()

type BoundedStatsConfig

type BoundedStatsConfig struct {
	ProcessingWindowSize int // Number of samples for processing times

	QueueWindowSize int // Number of samples for queue depths

	ThroughputBuckets int // Number of buckets for throughput calculation

	ThroughputInterval time.Duration // Time per bucket

	MaxRecentFiles int // Maximum recent files to track

	MaxErrorTypes int // Maximum error types to track

	AggregationInterval time.Duration // How often to compute aggregates
}

func DefaultBoundedStatsConfig

func DefaultBoundedStatsConfig() *BoundedStatsConfig

type CPUFrequency

type CPUFrequency struct{ MHz int }

type CPUPowerModel

type CPUPowerModel struct{}

func (*CPUPowerModel) PredictPower

func (cpm *CPUPowerModel) PredictPower(utilization float64) float64

type CPUVoltage

type CPUVoltage struct{ Volts float64 }

type CacheEntry

type CacheEntry struct {
	Data []byte

	AccessTime time.Time

	AccessCount int32

	Probability float64 // Predicted access probability
}

type CalibrationData

type CalibrationData struct{}

type CarbonAwareScheduler

type CarbonAwareScheduler struct{}

type CarbonModel

type CarbonModel struct{}

func NewCarbonModel

func NewCarbonModel() *CarbonModel

func (*CarbonModel) GetCurrentIntensity

func (cm *CarbonModel) GetCurrentIntensity() (float64, error)

type CarbonTracker

type CarbonTracker struct{}

func NewCarbonTracker

func NewCarbonTracker() *CarbonTracker

func (*CarbonTracker) GetCurrentCarbonIntensity

func (ct *CarbonTracker) GetCurrentCarbonIntensity() (float64, error)

type Config

type Config struct {
	PorchPath string `json:"porch_path"`

	PorchURL string `json:"porch_url"`

	Mode string `json:"mode"`

	OutDir string `json:"out_dir"`

	Once bool `json:"once"`

	DebounceDur time.Duration `json:"debounce_duration"`

	Period time.Duration `json:"period"`

	MaxWorkers int `json:"max_workers"`

	CleanupAfter time.Duration `json:"cleanup_after"`

	GracePeriod time.Duration `json:"grace_period"` // Grace period for graceful shutdown (default: 5s)

	MetricsPort int `json:"metrics_port"` // HTTP port for metrics endpoint

	MetricsAddr string `json:"metrics_addr"` // Bind address for metrics (default: localhost)

	MetricsAuth bool `json:"metrics_auth"` // Enable basic auth for metrics

	MetricsUser string `json:"metrics_user"` // Basic auth username

	MetricsPass string `json:"metrics_pass"` // Basic auth password
}

func (*Config) Validate

func (c *Config) Validate() error

type CrossPlatformSyncBarrier

type CrossPlatformSyncBarrier struct {
	// contains filtered or unexported fields
}

func NewCrossPlatformSyncBarrier

func NewCrossPlatformSyncBarrier(participants int) *CrossPlatformSyncBarrier

func (*CrossPlatformSyncBarrier) Wait

func (b *CrossPlatformSyncBarrier) Wait()

type CustomMetrics

type CustomMetrics struct {
	*WatcherMetrics

	BusinessKPIs map[string]float64
}

func (*CustomMetrics) CalculateBusinessMetrics

func (c *CustomMetrics) CalculateBusinessMetrics()

type DVFSController

type DVFSController struct {
	// contains filtered or unexported fields
}

func NewDVFSController

func NewDVFSController() (*DVFSController, error)

func (*DVFSController) ReduceFrequency

func (dvfs *DVFSController) ReduceFrequency()

func (*DVFSController) Stop

func (dvfs *DVFSController) Stop()

type DVFSPerformanceModel

type DVFSPerformanceModel struct{}

type DVFSPowerModel

type DVFSPowerModel struct{}

type DirectoryManager

type DirectoryManager struct {
	// contains filtered or unexported fields
}

type EfficiencyTracker

type EfficiencyTracker struct{}

func NewEfficiencyTracker

func NewEfficiencyTracker() *EfficiencyTracker

type EnergyAwareLoadBalancer

type EnergyAwareLoadBalancer struct{}

type EnergyAwareScheduler

type EnergyAwareScheduler struct{}

type EnergyAwareWorker

type EnergyAwareWorker struct{}

type EnergyAwareWorkloadBalancer

type EnergyAwareWorkloadBalancer struct {
	// contains filtered or unexported fields
}

func NewEnergyAwareWorkloadBalancer

func NewEnergyAwareWorkloadBalancer() *EnergyAwareWorkloadBalancer

func (*EnergyAwareWorkloadBalancer) BalanceResourceUsage

func (wlb *EnergyAwareWorkloadBalancer) BalanceResourceUsage()

func (*EnergyAwareWorkloadBalancer) DeferNonCriticalWork

func (wlb *EnergyAwareWorkloadBalancer) DeferNonCriticalWork()

func (*EnergyAwareWorkloadBalancer) OptimizeBatchSizes

func (wlb *EnergyAwareWorkloadBalancer) OptimizeBatchSizes()

func (*EnergyAwareWorkloadBalancer) OptimizeCacheUsage

func (wlb *EnergyAwareWorkloadBalancer) OptimizeCacheUsage()

func (*EnergyAwareWorkloadBalancer) OptimizeIOPatterns

func (wlb *EnergyAwareWorkloadBalancer) OptimizeIOPatterns()

func (*EnergyAwareWorkloadBalancer) ReduceActiveWorkers

func (wlb *EnergyAwareWorkloadBalancer) ReduceActiveWorkers()

func (*EnergyAwareWorkloadBalancer) Stop

func (wlb *EnergyAwareWorkloadBalancer) Stop()

type EnergyConfig

type EnergyConfig struct {
	PowerBudgetWatts float64 `json:"power_budget_watts"`

	TargetEfficiencyGbpsWatt float64 `json:"target_efficiency_gbps_watt"`

	CarbonLimitKgPerHour float64 `json:"carbon_limit_kg_per_hour"`

	RenewableEnergyTarget float64 `json:"renewable_energy_target"` // 0.0-1.0

	SleepModeEnabled bool `json:"sleep_mode_enabled"`

	DVFSEnabled bool `json:"dvfs_enabled"`

	CarbonAwareScheduling bool `json:"carbon_aware_scheduling"`

	EnergyMeasurementInterval time.Duration `json:"energy_measurement_interval"`

	OptimizationInterval time.Duration `json:"optimization_interval"`
}

type EnergyConstraints

type EnergyConstraints struct {
	MaxPowerBudget float64 `json:"max_power_budget"`

	TargetEfficiency float64 `json:"target_efficiency"`

	CarbonLimit float64 `json:"carbon_limit"`
}

type EnergyData

type EnergyData struct{}

type EnergyForecast

type EnergyForecast struct {
	EstimatedPowerConsumption float64 `json:"estimated_power_consumption"`

	EstimatedEfficiency float64 `json:"estimated_efficiency"`

	CarbonEmissions float64 `json:"carbon_emissions"`

	Duration time.Duration `json:"duration"`

	Confidence float64 `json:"confidence"`
}

type EnergyMetrics

type EnergyMetrics struct {
	CurrentPowerConsumption float64 `json:"current_power_consumption"`

	AveragePowerConsumption float64 `json:"average_power_consumption"`

	PeakPowerConsumption float64 `json:"peak_power_consumption"`

	CurrentThroughput float64 `json:"current_throughput_gbps"`

	EnergyEfficiency float64 `json:"energy_efficiency_gbps_watt"`

	CarbonIntensity float64 `json:"carbon_intensity_g_kwh"`

	CarbonEmissions float64 `json:"carbon_emissions_kg_hour"`

	RenewableEnergyPercent float64 `json:"renewable_energy_percent"`

	CPUPowerUsage float64 `json:"cpu_power_usage_watts"`

	MemoryPowerUsage float64 `json:"memory_power_usage_watts"`

	NetworkPowerUsage float64 `json:"network_power_usage_watts"`

	StoragePowerUsage float64 `json:"storage_power_usage_watts"`

	CoolingPowerUsage float64 `json:"cooling_power_usage_watts"`

	LastUpdated time.Time `json:"last_updated"`

	MeasurementStartTime time.Time `json:"measurement_start_time"`
}

type EnergyOptimizationEngine

type EnergyOptimizationEngine struct {
	// contains filtered or unexported fields
}

func NewEnergyOptimizationEngine

func NewEnergyOptimizationEngine(config *EnergyConfig) (*EnergyOptimizationEngine, error)

func (*EnergyOptimizationEngine) GetEnergyEfficiency

func (eoe *EnergyOptimizationEngine) GetEnergyEfficiency() float64

func (*EnergyOptimizationEngine) GetEnergyMetrics

func (eoe *EnergyOptimizationEngine) GetEnergyMetrics() *EnergyMetrics

func (*EnergyOptimizationEngine) IsWithinPowerBudget

func (eoe *EnergyOptimizationEngine) IsWithinPowerBudget() bool

func (*EnergyOptimizationEngine) PredictEnergyUsage

func (eoe *EnergyOptimizationEngine) PredictEnergyUsage(workload *WorkloadProfile) (*EnergyForecast, error)

func (*EnergyOptimizationEngine) Start

func (*EnergyOptimizationEngine) Stop

func (eoe *EnergyOptimizationEngine) Stop() error

type EnergyOptimizedWorkItem

type EnergyOptimizedWorkItem struct{}

type EnergyOptimizer

type EnergyOptimizer struct {
	// contains filtered or unexported fields
}

type EnergyPrediction

type EnergyPrediction struct {
	PowerConsumption float64 `json:"power_consumption"`

	CarbonEmissions float64 `json:"carbon_emissions"`

	RenewablePercent float64 `json:"renewable_percent"`

	Efficiency float64 `json:"efficiency"`

	Recommendations []string `json:"recommendations"`

	OptimalScheduling *SchedulingStrategy `json:"optimal_scheduling"`

	Timestamp time.Time `json:"timestamp"`
}

type EnergyPredictor

type EnergyPredictor struct {
	// contains filtered or unexported fields
}

type EnergyUnits

type EnergyUnits = float32

type EnhancedOnceState

type EnhancedOnceState struct {
	// contains filtered or unexported fields
}

func NewEnhancedOnceState

func NewEnhancedOnceState() *EnhancedOnceState

func (*EnhancedOnceState) GetStats

func (eos *EnhancedOnceState) GetStats() map[string]interface{}

func (*EnhancedOnceState) IncrementFailed

func (eos *EnhancedOnceState) IncrementFailed()

func (*EnhancedOnceState) IncrementProcessed

func (eos *EnhancedOnceState) IncrementProcessed()

func (*EnhancedOnceState) IsComplete

func (eos *EnhancedOnceState) IsComplete() bool

func (*EnhancedOnceState) MarkProcessingDone

func (eos *EnhancedOnceState) MarkProcessingDone()

func (*EnhancedOnceState) MarkProcessingStarted

func (eos *EnhancedOnceState) MarkProcessingStarted()

func (*EnhancedOnceState) MarkScanComplete

func (eos *EnhancedOnceState) MarkScanComplete(fileCount int)

type EnhancedOnceWatcher

type EnhancedOnceWatcher struct {
	*Watcher
	// contains filtered or unexported fields
}

func (*EnhancedOnceWatcher) StartWithTracking

func (ew *EnhancedOnceWatcher) StartWithTracking() error

func (*EnhancedOnceWatcher) WaitForProcessingComplete

func (ew *EnhancedOnceWatcher) WaitForProcessingComplete(timeout time.Duration) error

type ErrorCount

type ErrorCount struct {
	Type string

	Count int64
}

type ErrorSummary

type ErrorSummary struct {
	// contains filtered or unexported fields
}

func NewErrorSummary

func NewErrorSummary(maxKeys int) *ErrorSummary

func (*ErrorSummary) GetTop

func (e *ErrorSummary) GetTop(n int) []ErrorCount

func (*ErrorSummary) Increment

func (e *ErrorSummary) Increment(errorType string)

type Experience

type Experience struct {
	State []float64

	Action []float64

	Reward float64

	NextState []float64
}

type FeedForwardNetwork

type FeedForwardNetwork struct{}

func (*FeedForwardNetwork) Forward

func (ffn *FeedForwardNetwork) Forward(input *mat.Dense) *mat.Dense

type FileCreationSynchronizer

type FileCreationSynchronizer struct {
	// contains filtered or unexported fields
}

func NewFileCreationSynchronizer

func NewFileCreationSynchronizer(directory string, expectedFiles []string, timeout time.Duration) *FileCreationSynchronizer

func (*FileCreationSynchronizer) NotifyFileCreated

func (fcs *FileCreationSynchronizer) NotifyFileCreated(filename string)

func (*FileCreationSynchronizer) WaitForAllFiles

func (fcs *FileCreationSynchronizer) WaitForAllFiles() error

type FileInfo

type FileInfo struct {
	Name string // Just filename, not full path

	Size int64

	Timestamp time.Time

	Status string
}

type FileManager

type FileManager struct {
	// contains filtered or unexported fields
}

func NewFileManager

func NewFileManager(baseDir string) (*FileManager, error)

func (*FileManager) CleanupOldFiles

func (fm *FileManager) CleanupOldFiles(olderThan time.Duration) error

func (*FileManager) GetFailedFiles

func (fm *FileManager) GetFailedFiles() ([]string, error)

func (*FileManager) GetProcessedFiles

func (fm *FileManager) GetProcessedFiles() ([]string, error)

func (*FileManager) GetStats

func (fm *FileManager) GetStats() (ProcessingStats, error)

func (*FileManager) IsEmpty

func (fm *FileManager) IsEmpty() (bool, error)

func (*FileManager) MoveToFailed

func (fm *FileManager) MoveToFailed(filePath, errorMsg string) error

func (*FileManager) MoveToProcessed

func (fm *FileManager) MoveToProcessed(filePath string) error

type FileProcessingState

type FileProcessingState struct {
	// contains filtered or unexported fields
}

type FileState

type FileState struct {
	FilePath string `json:"file_path"`

	SHA256 string `json:"sha256"`

	Size int64 `json:"size"`

	ProcessedAt time.Time `json:"processed_at"`

	Status string `json:"status"` // "processed" or "failed"
}

type FileSystemSyncGuard

type FileSystemSyncGuard struct {
	// contains filtered or unexported fields
}

func (*FileSystemSyncGuard) EnsureFilesVisible

func (fsg *FileSystemSyncGuard) EnsureFilesVisible(expectedFiles []string) error

func (*FileSystemSyncGuard) FlushFileSystem

func (fsg *FileSystemSyncGuard) FlushFileSystem()

type IOMetrics

type IOMetrics struct {
	TotalReads atomic.Int64

	TotalWrites atomic.Int64

	CacheHits atomic.Int64

	CacheMisses atomic.Int64

	BytesRead atomic.Int64

	BytesWritten atomic.Int64

	ReadLatency *RingBuffer

	WriteLatency *RingBuffer

	BatchedWrites atomic.Int64

	FailedWrites atomic.Int64
	// contains filtered or unexported fields
}

type IPMIInterface

type IPMIInterface struct{}

type IdlenessPredictor

type IdlenessPredictor struct{}

type IntelligentSleepScheduler

type IntelligentSleepScheduler struct {
	// contains filtered or unexported fields
}

func NewIntelligentSleepScheduler

func NewIntelligentSleepScheduler() *IntelligentSleepScheduler

func (*IntelligentSleepScheduler) EnableAggressiveSleep

func (iss *IntelligentSleepScheduler) EnableAggressiveSleep()

func (*IntelligentSleepScheduler) Stop

func (iss *IntelligentSleepScheduler) Stop()

type IntentProcessor

type IntentProcessor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(config *ProcessorConfig, validator Validator, porchFunc PorchSubmitFunc) (*IntentProcessor, error)

func (*IntentProcessor) GetStats

func (p *IntentProcessor) GetStats() (ProcessingStats, error)

func (*IntentProcessor) IsShutdownFailure

func (p *IntentProcessor) IsShutdownFailure(err error) bool

func (*IntentProcessor) MarkGracefulShutdown

func (p *IntentProcessor) MarkGracefulShutdown()

func (*IntentProcessor) ProcessFile

func (p *IntentProcessor) ProcessFile(filename string) error

func (*IntentProcessor) StartBatchProcessor

func (p *IntentProcessor) StartBatchProcessor()

func (*IntentProcessor) Stop

func (p *IntentProcessor) Stop()

type IntentSchema

type IntentSchema struct {
	APIVersion string `json:"apiVersion"`

	Kind string `json:"kind"`

	Metadata json.RawMessage `json:"metadata"`

	Spec json.RawMessage `json:"spec"`
}

type LRUCache

type LRUCache struct {
	// contains filtered or unexported fields
}

func NewLRUCache

func NewLRUCache(capacity int) *LRUCache

func (*LRUCache) Capacity

func (c *LRUCache) Capacity() int

func (*LRUCache) Clear

func (c *LRUCache) Clear()

func (*LRUCache) Get

func (c *LRUCache) Get(key string) (interface{}, bool)

func (*LRUCache) Put

func (c *LRUCache) Put(key string, value interface{})

func (*LRUCache) Remove

func (c *LRUCache) Remove(key string) bool

func (*LRUCache) Size

func (c *LRUCache) Size() int

type LayerNormalization

type LayerNormalization struct{}

func (*LayerNormalization) Forward

func (ln *LayerNormalization) Forward(input *mat.Dense) *mat.Dense

type LockFreeQueue

type LockFreeQueue[T any] struct {
	// contains filtered or unexported fields
}

type MLConfig

type MLConfig struct {
	TrafficWindowSize int `json:"traffic_window_size"` // Historical data window

	PredictionHorizon time.Duration `json:"prediction_horizon"` // How far ahead to predict

	LearningRate float64 `json:"learning_rate"` // Model learning rate

	BatchSize int `json:"batch_size"` // Training batch size

	UpdateInterval time.Duration `json:"update_interval"` // Model update frequency

	EnergyWeight float64 `json:"energy_weight"` // Energy optimization weight

	PerformanceWeight float64 `json:"performance_weight"` // Performance optimization weight

	CarbonAware bool `json:"carbon_aware"` // Enable carbon-aware optimization
}

type MLOptimizer

type MLOptimizer struct {
	// contains filtered or unexported fields
}

func NewMLOptimizer

func NewMLOptimizer(config *MLConfig) *MLOptimizer

func (*MLOptimizer) DetectAnomalies

func (ml *MLOptimizer) DetectAnomalies(sample TrafficSample) (*AnomalyResult, error)

func (*MLOptimizer) OptimizeResources

func (ml *MLOptimizer) OptimizeResources(currentState *ResourceState) (*ResourceAction, error)

func (*MLOptimizer) PredictEnergyConsumption

func (ml *MLOptimizer) PredictEnergyConsumption(workload *WorkloadProfile) (*EnergyPrediction, error)

func (*MLOptimizer) PredictTraffic

func (ml *MLOptimizer) PredictTraffic(historical []TrafficSample, horizon time.Duration) (*TrafficPrediction, error)

func (*MLOptimizer) Train

func (ml *MLOptimizer) Train(trainingData *TrainingData) error

type MLPredictor

type MLPredictor struct{}

func NewMLPredictor

func NewMLPredictor() *MLPredictor

type MemoryPowerModel

type MemoryPowerModel struct{}

func (*MemoryPowerModel) PredictPower

func (mpm *MemoryPowerModel) PredictPower(usage float64) float64

type MetricsSample

type MetricsSample struct {
	Timestamp time.Time

	Value time.Duration
}

type MetricsServer

type MetricsServer struct {
	// contains filtered or unexported fields
}

func NewMetricsServer

func NewMetricsServer(watcher *OptimizedWatcher, addr string, port int) *MetricsServer

func (*MetricsServer) Start

func (s *MetricsServer) Start() error

func (*MetricsServer) Stop

func (s *MetricsServer) Stop() error

type MockPorchConfig

type MockPorchConfig struct {
	ExitCode int

	Stdout string

	Stderr string

	ProcessDelay time.Duration

	FailPattern string // Pattern in filename that causes failure
}

type NetworkPowerModel

type NetworkPowerModel struct{}

func (*NetworkPowerModel) PredictPower

func (npm *NetworkPowerModel) PredictPower(load float64) float64

type NeuralNetwork

type NeuralNetwork struct{}

func NewNeuralNetwork

func NewNeuralNetwork([]int) *NeuralNetwork

func (*NeuralNetwork) Forward

func (nn *NeuralNetwork) Forward([]float64) []float64

type OnceModeSynchronizer

type OnceModeSynchronizer struct {
	// contains filtered or unexported fields
}

func NewOnceModeSynchronizer

func NewOnceModeSynchronizer(watcher *Watcher, expectedFiles int) *OnceModeSynchronizer

func (*OnceModeSynchronizer) GetStats

func (oms *OnceModeSynchronizer) GetStats() (processed, failed int)

func (*OnceModeSynchronizer) NotifyFailed

func (oms *OnceModeSynchronizer) NotifyFailed()

func (*OnceModeSynchronizer) NotifyProcessed

func (oms *OnceModeSynchronizer) NotifyProcessed()

func (*OnceModeSynchronizer) StartWithCompletion

func (oms *OnceModeSynchronizer) StartWithCompletion(timeout time.Duration) error

type OptimizedMemoryPool

type OptimizedMemoryPool struct {
	// contains filtered or unexported fields
}

func (*OptimizedMemoryPool) GetBuffer

func (omp *OptimizedMemoryPool) GetBuffer(size int) []byte

type OptimizedWatcher

type OptimizedWatcher struct {
	*Watcher // Embed base watcher
	// contains filtered or unexported fields
}

func NewOptimizedWatcher

func NewOptimizedWatcher(dir string, config Config) (*OptimizedWatcher, error)

func (*OptimizedWatcher) ProcessFileOptimized

func (ow *OptimizedWatcher) ProcessFileOptimized(filePath string, fileInfo FileInfo) error

type PDUInterface

type PDUInterface struct{}

type PPOAgent

type PPOAgent struct{}

func NewPPOAgent

func NewPPOAgent(int, int) *PPOAgent

func (*PPOAgent) SelectAction

func (agent *PPOAgent) SelectAction([]float64) []float64

type PerformanceModel

type PerformanceModel struct{}

type PoolMetrics

type PoolMetrics struct {
	TotalProcessed int64

	TotalRejected int64

	AverageWaitTime time.Duration

	AverageProcessTime time.Duration

	CurrentWorkers int

	QueueDepth int

	Throughput float64 // items/second
	// contains filtered or unexported fields
}

type PorchSubmitFunc

type PorchSubmitFunc func(ctx context.Context, intent *ingest.Intent, mode string) error

type PowerMeter

type PowerMeter struct {
	// contains filtered or unexported fields
}

func NewPowerMeter

func NewPowerMeter() (*PowerMeter, error)

func (*PowerMeter) GetCPUPower

func (pm *PowerMeter) GetCPUPower() float64

func (*PowerMeter) GetMemoryPower

func (pm *PowerMeter) GetMemoryPower() float64

func (*PowerMeter) GetNetworkPower

func (pm *PowerMeter) GetNetworkPower() float64

func (*PowerMeter) GetStoragePower

func (pm *PowerMeter) GetStoragePower() float64

type PowerModel

type PowerModel struct {
	// contains filtered or unexported fields
}

type PredictiveCache

type PredictiveCache struct {
	// contains filtered or unexported fields
}

type PredictiveScaler

type PredictiveScaler struct {
	// contains filtered or unexported fields
}

type Priority

type Priority = uint8

type ProcessingCompletionWaiter

type ProcessingCompletionWaiter struct {
	// contains filtered or unexported fields
}

func NewProcessingCompletionWaiter

func NewProcessingCompletionWaiter(watcher *Watcher, expectedFiles int) *ProcessingCompletionWaiter

func (*ProcessingCompletionWaiter) WaitForCompletion

func (pcw *ProcessingCompletionWaiter) WaitForCompletion() error

type ProcessingStats

type ProcessingStats struct {
	ProcessedCount int `json:"processed_count"`

	FailedCount int `json:"failed_count"`

	ShutdownFailedCount int `json:"shutdown_failed_count"`

	RealFailedCount int `json:"real_failed_count"`

	ProcessedFiles []string `json:"processed_files,omitempty"`

	FailedFiles []string `json:"failed_files,omitempty"`

	ShutdownFailedFiles []string `json:"shutdown_failed_files,omitempty"`

	RealFailedFiles []string `json:"real_failed_files,omitempty"`
}

type ProcessingTracker

type ProcessingTracker struct {
	// contains filtered or unexported fields
}

func (*ProcessingTracker) GetProcessedCount

func (pt *ProcessingTracker) GetProcessedCount() int

func (*ProcessingTracker) MarkProcessed

func (pt *ProcessingTracker) MarkProcessed()

func (*ProcessingTracker) WaitForCompletion

func (pt *ProcessingTracker) WaitForCompletion() bool

type ProcessorConfig

type ProcessorConfig struct {
	HandoffDir string

	ErrorDir string

	PorchMode string // "direct" or "structured"

	BatchSize int

	BatchInterval time.Duration

	MaxRetries int

	SendTimeout time.Duration // Timeout for sending to batch coordinator

	WorkerCount int // Number of concurrent workers
}

func DefaultConfig

func DefaultConfig() *ProcessorConfig

type RAPLInterface

type RAPLInterface struct{}

type RLOptimizer

type RLOptimizer struct {
	// contains filtered or unexported fields
}

type RenewableEnergyMonitor

type RenewableEnergyMonitor struct{}

func NewRenewableEnergyMonitor

func NewRenewableEnergyMonitor() *RenewableEnergyMonitor

func (*RenewableEnergyMonitor) GetRenewablePercent

func (rem *RenewableEnergyMonitor) GetRenewablePercent() (float64, error)

type RenewableForecaster

type RenewableForecaster struct{}

func NewRenewableForecaster

func NewRenewableForecaster() *RenewableForecaster

func (*RenewableForecaster) GetRenewableAvailability

func (rf *RenewableForecaster) GetRenewableAvailability(time.Time) (float64, error)

type ReplayBuffer

type ReplayBuffer struct{}

func NewReplayBuffer

func NewReplayBuffer(int) *ReplayBuffer

func (*ReplayBuffer) Add

func (rb *ReplayBuffer) Add(*Experience)

type ResourceAction

type ResourceAction struct {
	WorkerCountChange int `json:"worker_count_change"`

	MemoryAllocation float64 `json:"memory_allocation"`

	CPUAllocation float64 `json:"cpu_allocation"`

	Priority float64 `json:"priority"`

	Timestamp time.Time `json:"timestamp"`

	ExpectedReward float64 `json:"expected_reward"`
}

type ResourceData

type ResourceData struct{}

type ResourceEnv

type ResourceEnv struct{}

func NewResourceEnv

func NewResourceEnv() *ResourceEnv

type ResourceState

type ResourceState struct {
	WorkerCount int `json:"worker_count"`

	CPUUtilization float64 `json:"cpu_utilization"`

	MemoryUsage float64 `json:"memory_usage"`

	QueueLength int `json:"queue_length"`

	ResponseTime float64 `json:"response_time"`

	Throughput float64 `json:"throughput"`

	ErrorRate float64 `json:"error_rate"`
}

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

func NewRingBuffer

func NewRingBuffer(size int) *RingBuffer

func (*RingBuffer) Add

func (r *RingBuffer) Add(value float64)

func (*RingBuffer) Average

func (r *RingBuffer) Average() float64

func (*RingBuffer) GetAll

func (r *RingBuffer) GetAll() []float64

func (*RingBuffer) Last

func (r *RingBuffer) Last() float64

type SafeSet

type SafeSet struct {
	// contains filtered or unexported fields
}

func NewSafeSet

func NewSafeSet() *SafeSet

func (*SafeSet) Add

func (s *SafeSet) Add(key string)

func (*SafeSet) Delete

func (s *SafeSet) Delete(key string)

func (*SafeSet) Has

func (s *SafeSet) Has(key string) bool

func (*SafeSet) LoadFromSlice

func (s *SafeSet) LoadFromSlice(items []string)

func (*SafeSet) Size

func (s *SafeSet) Size() int

func (*SafeSet) ToSlice

func (s *SafeSet) ToSlice() []string

type ScalingModel

type ScalingModel struct{}

func NewScalingModel

func NewScalingModel() *ScalingModel

func (*ScalingModel) PredictOptimalWorkers

func (sm *ScalingModel) PredictOptimalWorkers(float64, int, time.Duration, float64) ScalingPrediction

type ScalingPrediction

type ScalingPrediction struct {
	OptimalWorkers int
}

type SchedulingStrategy

type SchedulingStrategy struct{}

type SleepEvent

type SleepEvent struct{}

type SleepScheduler

type SleepScheduler struct{}

func NewSleepScheduler

func NewSleepScheduler() *SleepScheduler

type SleepState

type SleepState struct {
	Name string

	PowerSavings float64
}

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

func NewStateManager

func NewStateManager(baseDir string) (*StateManager, error)

func (*StateManager) CalculateFileSHA256

func (sm *StateManager) CalculateFileSHA256(filePath string) (string, error)

func (*StateManager) CleanupOldEntries

func (sm *StateManager) CleanupOldEntries(olderThan time.Duration) error

func (*StateManager) Close

func (sm *StateManager) Close() error

func (*StateManager) GetFailedFiles

func (sm *StateManager) GetFailedFiles() []string

func (*StateManager) GetProcessedFiles

func (sm *StateManager) GetProcessedFiles() []string

func (*StateManager) IsProcessed

func (sm *StateManager) IsProcessed(filePath string) (bool, error)

func (*StateManager) IsProcessedBySHA

func (sm *StateManager) IsProcessedBySHA(sha256Hash string) (bool, error)

func (*StateManager) MarkFailed

func (sm *StateManager) MarkFailed(filePath string) error

func (*StateManager) MarkProcessed

func (sm *StateManager) MarkProcessed(filePath string) error

type StatsSnapshot

type StatsSnapshot struct {
	TotalProcessed int64

	SuccessCount int64

	FailureCount int64

	TimeoutCount int64

	AvgProcessingTime time.Duration

	P95ProcessingTime time.Duration

	Throughput float64

	SuccessRate float64

	CurrentQueueDepth int

	AvgMemoryUsage float64

	AvgCPUUsage float64

	RecentFiles []FileInfo

	TopErrors []ErrorCount

	LastUpdate time.Time
}

type SynchronizationTimeoutError

type SynchronizationTimeoutError struct {
	Operation string

	Expected int

	Actual int

	Timeout time.Duration
}

func (*SynchronizationTimeoutError) Error

type TestSyncHelper

type TestSyncHelper struct {
	// contains filtered or unexported fields
}

func NewTestSyncHelper

func NewTestSyncHelper(t testing.TB) *TestSyncHelper

func (*TestSyncHelper) Cleanup

func (h *TestSyncHelper) Cleanup()

func (*TestSyncHelper) CreateIntentFile

func (h *TestSyncHelper) CreateIntentFile(filename, content string) string

func (*TestSyncHelper) CreateMockPorch

func (h *TestSyncHelper) CreateMockPorch(config MockPorchConfig) (string, *ProcessingTracker)

func (*TestSyncHelper) CreateMultipleIntentFiles

func (h *TestSyncHelper) CreateMultipleIntentFiles(count int, contentTemplate string) []string

func (*TestSyncHelper) GetHandoffDir

func (h *TestSyncHelper) GetHandoffDir() string

func (*TestSyncHelper) GetOutDir

func (h *TestSyncHelper) GetOutDir() string

func (*TestSyncHelper) GetTempDir

func (h *TestSyncHelper) GetTempDir() string

func (*TestSyncHelper) NewEnhancedOnceWatcher

func (h *TestSyncHelper) NewEnhancedOnceWatcher(config Config, expectedFiles int) (*EnhancedOnceWatcher, error)

func (*TestSyncHelper) NewFileSystemSyncGuard

func (h *TestSyncHelper) NewFileSystemSyncGuard() *FileSystemSyncGuard

func (*TestSyncHelper) NewProcessingTracker

func (h *TestSyncHelper) NewProcessingTracker(expectedFiles int) *ProcessingTracker

func (*TestSyncHelper) StartWatcherWithSync

func (h *TestSyncHelper) StartWatcherWithSync(config Config) (*Watcher, error)

func (*TestSyncHelper) VerifyProcessingResults

func (h *TestSyncHelper) VerifyProcessingResults(expectedProcessed, expectedFailed int) error

func (*TestSyncHelper) WaitForFilesCreated

func (h *TestSyncHelper) WaitForFilesCreated(timeout time.Duration) bool

func (*TestSyncHelper) WaitWithTimeout

func (h *TestSyncHelper) WaitWithTimeout(condition func() bool, timeout time.Duration, description string) error

type ThroughputWindow

type ThroughputWindow struct {
	// contains filtered or unexported fields
}

func (*ThroughputWindow) GetThroughput

func (t *ThroughputWindow) GetThroughput() float64

func (*ThroughputWindow) Increment

func (t *ThroughputWindow) Increment()

type TrafficPrediction

type TrafficPrediction struct {
	RequestRateForecast []float64 `json:"request_rate_forecast"`

	ProcessingTimeForecast []float64 `json:"processing_time_forecast"`

	ResourceDemand []float64 `json:"resource_demand"`

	Confidence float64 `json:"confidence"`

	Horizon time.Duration `json:"horizon"`

	Timestamp time.Time `json:"timestamp"`
}

type TrafficPredictor

type TrafficPredictor struct{}

func NewTrafficPredictor

func NewTrafficPredictor() *TrafficPredictor

type TrafficSample

type TrafficSample struct {
	Timestamp time.Time `json:"timestamp"`

	RequestRate float64 `json:"request_rate"` // Requests per second

	ProcessingTime float64 `json:"processing_time"` // Average processing time

	QueueDepth int `json:"queue_depth"` // Current queue depth

	ErrorRate float64 `json:"error_rate"` // Error percentage

	ResourceUtil float64 `json:"resource_util"` // CPU/Memory utilization

	EnergyUsage float64 `json:"energy_usage"` // Current power consumption
}

type TrainingData

type TrainingData struct {
	TrafficSamples []TrafficSample

	ResourceData []ResourceData

	EnergyData []EnergyData
}

type TransformerPredictor

type TransformerPredictor struct {
	// contains filtered or unexported fields
}

type Validator

type Validator interface {
	ValidateBytes([]byte) (*ingest.Intent, error)
}

type WakeupPredictor

type WakeupPredictor struct{}

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func MustCreateWatcher

func MustCreateWatcher(dir string, config Config) *Watcher

func MustCreateWatcherWithProcessor

func MustCreateWatcherWithProcessor(dir string, processor *IntentProcessor) *Watcher

func NewWatcher

func NewWatcher(dir string, config Config) (*Watcher, error)

func NewWatcherWithConfig

func NewWatcherWithConfig(dir string, config Config, processor *IntentProcessor) (*Watcher, error)

func NewWatcherWithProcessor

func NewWatcherWithProcessor(dir string, processor *IntentProcessor) (*Watcher, error)

func (*Watcher) Close

func (w *Watcher) Close() error

func (*Watcher) GetMetrics

func (w *Watcher) GetMetrics() *WatcherMetrics

func (*Watcher) GetStats

func (w *Watcher) GetStats() (ProcessingStats, error)

func (*Watcher) ProcessExistingFiles

func (w *Watcher) ProcessExistingFiles() error

func (*Watcher) Start

func (w *Watcher) Start() error

type WatcherMetrics

type WatcherMetrics struct {
	FilesProcessedTotal int64 // Total files processed successfully

	FilesFailedTotal int64 // Total files that failed processing

	ProcessingDurationTotal int64 // Total processing time in nanoseconds

	ValidationFailuresTotal int64 // Total validation failures

	RetryAttemptsTotal int64 // Total retry attempts

	QueueDepthCurrent int64 // Current queue depth

	BackpressureEventsTotal int64 // Total backpressure events

	ProcessingLatencies []int64 // Ring buffer for latency samples

	ValidationErrorsByType map[string]int64 // Validation errors by type

	ProcessingErrorsByType map[string]int64 // Processing errors by type

	TimeoutCount int64 // Number of timeouts

	MemoryUsageBytes int64 // Current memory usage

	GoroutineCount int64 // Current number of goroutines

	FileDescriptorCount int64 // Current file descriptor usage

	DirectorySizeBytes int64 // Watched directory size

	ThroughputFilesPerSecond float64 // Files processed per second

	AverageProcessingTime time.Duration // Average processing time

	WorkerUtilization float64 // Worker pool utilization %

	StatusFileGenerationRate float64 // Status files generated per second

	StartTime time.Time

	LastUpdateTime time.Time

	MetricsEnabled bool
	// contains filtered or unexported fields
}

type WorkItem

type WorkItem struct {
	FilePath string

	Attempt int

	Ctx context.Context
}

type WorkerEnergyMetrics

type WorkerEnergyMetrics struct{}

type WorkerID

type WorkerID = uint32

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

type WorkloadMigrationController

type WorkloadMigrationController struct{}

type WorkloadProfile

type WorkloadProfile struct {
	RequestsPerSecond float64 `json:"requests_per_second"`

	CPUUtilization int `json:"cpu_utilization"` // Percentage 0-100

	MemoryUsage int `json:"memory_usage"` // MB

	NetworkUtilization int `json:"network_utilization"` // Mbps

	IOOperations int `json:"io_operations"`

	NetworkPackets int `json:"network_packets"`

	Throughput float64 `json:"throughput"` // Gbps

	ProcessingDuration time.Duration `json:"processing_duration"`
}

type WriteRequest

type WriteRequest struct {
	Path string

	Data []byte

	Mode os.FileMode

	Callback func(error)

	Timestamp time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL