Documentation
¶
Index ¶
- Constants
- func GenerateFilterID() string
- func GetCurrentTime() time.Time
- func HighlightLine(line string, highlights map[string][]int, colors map[string]string) string
- type BufferStats
- type ChainMode
- type CircularBuffer
- func (cb *CircularBuffer) Append(lines []string)
- func (cb *CircularBuffer) AppendString(content string)
- func (cb *CircularBuffer) Capacity() int
- func (cb *CircularBuffer) Clear()
- func (cb *CircularBuffer) EstimateMemoryUsage() int64
- func (cb *CircularBuffer) GetLastN(n int) []string
- func (cb *CircularBuffer) GetLines() []string
- func (cb *CircularBuffer) GetStats() BufferStats
- func (cb *CircularBuffer) IsFull() bool
- func (cb *CircularBuffer) Resize(newCapacity int)
- func (cb *CircularBuffer) Size() int
- func (cb *CircularBuffer) TotalLines() int64
- type EventBus
- func (eb *EventBus) Clear()
- func (eb *EventBus) GetAllSubscriptions() map[string]int
- func (eb *EventBus) GetSubscriberCount(jobID, outputType string) int
- func (eb *EventBus) GetSubscriptionInfo() []SubscriptionInfo
- func (eb *EventBus) HasSubscribers(jobID, outputType string) bool
- func (eb *EventBus) Publish(event StreamEvent)
- func (eb *EventBus) PublishError(jobID, outputType string, err error)
- func (eb *EventBus) Subscribe(jobID, outputType string, ch chan<- StreamEvent)
- func (eb *EventBus) Unsubscribe(jobID, outputType string, ch chan<- StreamEvent)
- func (eb *EventBus) UnsubscribeAll(jobID, outputType string)
- type FileWatcher
- type FilterChain
- type FilterManager
- func (fm *FilterManager) AddFilter(filter *StreamFilter) error
- func (fm *FilterManager) ApplyActiveFilters(line string, timestamp time.Time) (bool, map[string][]int)
- func (fm *FilterManager) ClearFilters()
- func (fm *FilterManager) CreateChain(name string, mode ChainMode, filterIDs []string) (*FilterChain, error)
- func (fm *FilterManager) GetFilter(filterID string) (*StreamFilter, error)
- func (fm *FilterManager) GetFilterStats() map[string]FilterStats
- func (fm *FilterManager) GetPresets() []*FilterPreset
- func (fm *FilterManager) LoadPreset(presetID string) error
- func (fm *FilterManager) QuickFilter(pattern string, filterType FilterType) error
- func (fm *FilterManager) RemoveFilter(filterID string) error
- func (fm *FilterManager) SavePreset(name, description, category string) (*FilterPreset, error)
- func (fm *FilterManager) SetActiveChain(chainID string) error
- type FilterPreset
- type FilterStats
- type FilterType
- type FilteredStreamManager
- func (fsm *FilteredStreamManager) AddCustomFilter(filter *StreamFilter) error
- func (fsm *FilteredStreamManager) ClearFilters()
- func (fsm *FilteredStreamManager) CloseFiltered() error
- func (fsm *FilteredStreamManager) EmitFilteredEvent(event StreamEvent)
- func (fsm *FilteredStreamManager) GetActiveFilters() []*StreamFilter
- func (fsm *FilteredStreamManager) GetFilterPresets() []*FilterPreset
- func (fsm *FilteredStreamManager) GetFilterStats() map[string]FilterStats
- func (fsm *FilteredStreamManager) GetFilteredContent(jobID, outputType string, includeHighlights bool) ([]string, error)
- func (fsm *FilteredStreamManager) GetSearchHistory() []string
- func (fsm *FilteredStreamManager) LoadFilterPreset(presetID string) error
- func (fsm *FilteredStreamManager) RemoveFilter(filterID string) error
- func (fsm *FilteredStreamManager) SaveFilterPreset(name, description, category string) (*FilterPreset, error)
- func (fsm *FilteredStreamManager) Search(jobID, outputType, query string, options SearchOptions) ([]*SearchResult, error)
- func (fsm *FilteredStreamManager) SearchNext(jobID, outputType string, currentLine int) (*SearchResult, error)
- func (fsm *FilteredStreamManager) SearchPrevious(jobID, outputType string, currentLine int) (*SearchResult, error)
- func (fsm *FilteredStreamManager) SetFilterChainMode(mode ChainMode)
- func (fsm *FilteredStreamManager) SetQuickFilter(pattern string, filterType FilterType) error
- func (fsm *FilteredStreamManager) StartFilteredStream(jobID, outputType string) error
- func (fsm *FilteredStreamManager) StopFilteredStream(jobID, outputType string) error
- func (fsm *FilteredStreamManager) StreamWithContext(ctx context.Context, jobID, outputType string) (<-chan StreamEvent, error)
- type JobStream
- type LogLevel
- type Match
- type PathResolver
- func (pr *PathResolver) GetJobOutputPaths(jobID string) (stdoutPath, stderrPath string, isRemote bool, nodeID string, err error)
- func (pr *PathResolver) ResolveOutputPath(jobID, outputType string) (string, bool, string, error)
- func (pr *PathResolver) ValidateOutputPath(filePath string, isRemote bool, nodeID string) error
- type SearchHistory
- type SearchOptions
- type SearchResult
- type SlurmConfig
- type StreamConfig
- type StreamEvent
- type StreamEventType
- type StreamFilter
- type StreamInfo
- type StreamManager
- func (sm *StreamManager) Close() error
- func (sm *StreamManager) GetActiveStreams() []StreamInfo
- func (sm *StreamManager) GetBuffer(jobID, outputType string) ([]string, error)
- func (sm *StreamManager) GetStats() StreamingStats
- func (sm *StreamManager) IsStreamActive(jobID, outputType string) bool
- func (sm *StreamManager) StartStream(jobID, outputType string) error
- func (sm *StreamManager) StopStream(jobID, outputType string) error
- func (sm *StreamManager) Subscribe(jobID, outputType string) <-chan StreamEvent
- func (sm *StreamManager) Unsubscribe(jobID, outputType string, ch <-chan StreamEvent)
- type StreamSearcher
- func (ss *StreamSearcher) Clear()
- func (ss *StreamSearcher) GetHighlightedLine(line string, highlightColor string) string
- func (ss *StreamSearcher) GetStats() (totalMatches int, matchedLines int)
- func (ss *StreamSearcher) Search(query string, options SearchOptions) ([]*SearchResult, error)
- func (ss *StreamSearcher) SearchNext(startLine int) (*SearchResult, error)
- func (ss *StreamSearcher) SearchPrevious(startLine int) (*SearchResult, error)
- type StreamingStats
- type SubscriptionInfo
Constants ¶
const ( DefaultOutputDir = "/var/spool/slurm/slurmd/job_output" DefaultFilePattern = "slurm-%s.out" DefaultErrorPattern = "slurm-%s.err" DefaultBufferSize = 10000 DefaultMaxConcurrentStreams = 4 DefaultPollInterval = 2 * time.Second DefaultMaxMemoryMB = 50 DefaultFileCheckInterval = 1 * time.Second DefaultMaxFileSize = 100 * 1024 * 1024 // 100MB )
Default configuration values
Variables ¶
This section is empty.
Functions ¶
func GenerateFilterID ¶
func GenerateFilterID() string
GenerateFilterID generates a unique filter ID
func GetCurrentTime ¶
GetCurrentTime returns the current time - allows for easier testing with time mocking
Types ¶
type BufferStats ¶
type BufferStats struct {
CurrentSize int `json:"current_size"`
Capacity int `json:"capacity"`
TotalLines int64 `json:"total_lines"`
IsFull bool `json:"is_full"`
UsagePercent float64 `json:"usage_percent"`
}
BufferStats contains statistics about a circular buffer
type CircularBuffer ¶
type CircularBuffer struct {
// contains filtered or unexported fields
}
CircularBuffer implements a thread-safe circular buffer for storing job output lines
func NewCircularBuffer ¶
func NewCircularBuffer(capacity int) *CircularBuffer
NewCircularBuffer creates a new circular buffer with the specified capacity
func (*CircularBuffer) Append ¶
func (cb *CircularBuffer) Append(lines []string)
Append adds new lines to the buffer
func (*CircularBuffer) AppendString ¶
func (cb *CircularBuffer) AppendString(content string)
AppendString adds a single string (potentially containing multiple lines) to the buffer
func (*CircularBuffer) Capacity ¶
func (cb *CircularBuffer) Capacity() int
Capacity returns the maximum capacity of the buffer
func (*CircularBuffer) Clear ¶
func (cb *CircularBuffer) Clear()
Clear removes all lines from the buffer
func (*CircularBuffer) EstimateMemoryUsage ¶
func (cb *CircularBuffer) EstimateMemoryUsage() int64
EstimateMemoryUsage estimates the memory usage of the buffer in bytes
func (*CircularBuffer) GetLastN ¶
func (cb *CircularBuffer) GetLastN(n int) []string
GetLastN returns the last N lines from the buffer
func (*CircularBuffer) GetLines ¶
func (cb *CircularBuffer) GetLines() []string
GetLines returns all current lines in the buffer
func (*CircularBuffer) GetStats ¶
func (cb *CircularBuffer) GetStats() BufferStats
GetStats returns buffer statistics
func (*CircularBuffer) IsFull ¶
func (cb *CircularBuffer) IsFull() bool
IsFull returns true if the buffer is at capacity
func (*CircularBuffer) Resize ¶
func (cb *CircularBuffer) Resize(newCapacity int)
Resize changes the buffer capacity (creates a new buffer and copies data)
func (*CircularBuffer) Size ¶
func (cb *CircularBuffer) Size() int
Size returns the current number of lines in the buffer
func (*CircularBuffer) TotalLines ¶
func (cb *CircularBuffer) TotalLines() int64
TotalLines returns the total number of lines ever added to the buffer
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus manages event distribution for streaming operations
func (*EventBus) Clear ¶
func (eb *EventBus) Clear()
Clear removes all subscribers and closes all channels
func (*EventBus) GetAllSubscriptions ¶
GetAllSubscriptions returns information about all active subscriptions
func (*EventBus) GetSubscriberCount ¶
GetSubscriberCount returns the number of subscribers for a specific job and output type
func (*EventBus) GetSubscriptionInfo ¶
func (eb *EventBus) GetSubscriptionInfo() []SubscriptionInfo
GetSubscriptionInfo returns detailed information about all subscriptions
func (*EventBus) HasSubscribers ¶
HasSubscribers returns true if there are any subscribers for a specific job and output type
func (*EventBus) Publish ¶
func (eb *EventBus) Publish(event StreamEvent)
Publish sends an event to all subscribers of a specific job and output type
func (*EventBus) PublishError ¶
PublishError sends an error event to all subscribers of a specific job and output type
func (*EventBus) Subscribe ¶
func (eb *EventBus) Subscribe(jobID, outputType string, ch chan<- StreamEvent)
Subscribe adds a subscriber for events from a specific job and output type
func (*EventBus) Unsubscribe ¶
func (eb *EventBus) Unsubscribe(jobID, outputType string, ch chan<- StreamEvent)
Unsubscribe removes a subscriber for a specific job and output type
func (*EventBus) UnsubscribeAll ¶
UnsubscribeAll removes all subscribers for a specific job and output type
type FileWatcher ¶
FileWatcher handles individual file watching operations
type FilterChain ¶
type FilterChain struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Filters []*StreamFilter `json:"filters"`
Mode ChainMode `json:"mode"` // AND or OR
Active bool `json:"active"`
}
FilterChain represents a collection of filters applied in sequence
type FilterManager ¶
type FilterManager struct {
// contains filtered or unexported fields
}
FilterManager manages filters and presets for streaming
func NewFilterManager ¶
func NewFilterManager(configPath string) *FilterManager
NewFilterManager creates a new filter manager
func (*FilterManager) AddFilter ¶
func (fm *FilterManager) AddFilter(filter *StreamFilter) error
AddFilter adds a new filter
func (*FilterManager) ApplyActiveFilters ¶
func (fm *FilterManager) ApplyActiveFilters(line string, timestamp time.Time) (bool, map[string][]int)
ApplyActiveFilters applies the active filter chain to a line
func (*FilterManager) ClearFilters ¶
func (fm *FilterManager) ClearFilters()
ClearFilters removes all active filters
func (*FilterManager) CreateChain ¶
func (fm *FilterManager) CreateChain(name string, mode ChainMode, filterIDs []string) (*FilterChain, error)
CreateChain creates a new filter chain
func (*FilterManager) GetFilter ¶
func (fm *FilterManager) GetFilter(filterID string) (*StreamFilter, error)
GetFilter retrieves a filter by ID
func (*FilterManager) GetFilterStats ¶
func (fm *FilterManager) GetFilterStats() map[string]FilterStats
GetFilterStats returns statistics for all active filters
func (*FilterManager) GetPresets ¶
func (fm *FilterManager) GetPresets() []*FilterPreset
GetPresets returns all available presets
func (*FilterManager) LoadPreset ¶
func (fm *FilterManager) LoadPreset(presetID string) error
LoadPreset loads and activates a filter preset
func (*FilterManager) QuickFilter ¶
func (fm *FilterManager) QuickFilter(pattern string, filterType FilterType) error
QuickFilter creates and activates a simple keyword filter
func (*FilterManager) RemoveFilter ¶
func (fm *FilterManager) RemoveFilter(filterID string) error
RemoveFilter removes a filter by ID
func (*FilterManager) SavePreset ¶
func (fm *FilterManager) SavePreset(name, description, category string) (*FilterPreset, error)
SavePreset saves the current filter configuration as a preset
func (*FilterManager) SetActiveChain ¶
func (fm *FilterManager) SetActiveChain(chainID string) error
SetActiveChain sets the active filter chain
type FilterPreset ¶
type FilterPreset struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Category string `json:"category"`
Filters []*StreamFilter `json:"filters"`
Tags []string `json:"tags"`
Created time.Time `json:"created"`
LastUsed time.Time `json:"last_used"`
UseCount int `json:"use_count"`
}
FilterPreset represents a saved filter configuration
type FilterStats ¶
type FilterStats struct {
MatchCount int64 `json:"match_count"`
ProcessedLines int64 `json:"processed_lines"`
LastMatch time.Time `json:"last_match"`
Created time.Time `json:"created"`
}
FilterStats tracks filter performance and matches
type FilterType ¶
type FilterType string
FilterType represents the type of filter to apply
const ( FilterTypeKeyword FilterType = "keyword" FilterTypeRegex FilterType = "regex" FilterTypeTimeRange FilterType = "timerange" FilterTypeLogLevel FilterType = "loglevel" FilterTypeInvert FilterType = "invert" )
type FilteredStreamManager ¶
type FilteredStreamManager struct {
*StreamManager
// contains filtered or unexported fields
}
FilteredStreamManager extends StreamManager with filtering capabilities
func NewFilteredStreamManager ¶
func NewFilteredStreamManager(client dao.SlurmClient, sshManager *ssh.SessionManager, config *SlurmConfig, configPath string) (*FilteredStreamManager, error)
NewFilteredStreamManager creates a new stream manager with filtering support
func (*FilteredStreamManager) AddCustomFilter ¶
func (fsm *FilteredStreamManager) AddCustomFilter(filter *StreamFilter) error
AddCustomFilter adds a custom filter
func (*FilteredStreamManager) ClearFilters ¶
func (fsm *FilteredStreamManager) ClearFilters()
ClearFilters removes all active filters
func (*FilteredStreamManager) CloseFiltered ¶
func (fsm *FilteredStreamManager) CloseFiltered() error
CloseFiltered closes the filtered stream manager
func (*FilteredStreamManager) EmitFilteredEvent ¶
func (fsm *FilteredStreamManager) EmitFilteredEvent(event StreamEvent)
EmitFilteredEvent emits an event after applying filters
func (*FilteredStreamManager) GetActiveFilters ¶
func (fsm *FilteredStreamManager) GetActiveFilters() []*StreamFilter
GetActiveFilters returns the currently active filters
func (*FilteredStreamManager) GetFilterPresets ¶
func (fsm *FilteredStreamManager) GetFilterPresets() []*FilterPreset
GetFilterPresets returns available filter presets
func (*FilteredStreamManager) GetFilterStats ¶
func (fsm *FilteredStreamManager) GetFilterStats() map[string]FilterStats
GetFilterStats returns filter statistics
func (*FilteredStreamManager) GetFilteredContent ¶
func (fsm *FilteredStreamManager) GetFilteredContent(jobID, outputType string, includeHighlights bool) ([]string, error)
GetFilteredContent returns the filtered content for a stream
func (*FilteredStreamManager) GetSearchHistory ¶
func (fsm *FilteredStreamManager) GetSearchHistory() []string
GetSearchHistory returns recent search queries
func (*FilteredStreamManager) LoadFilterPreset ¶
func (fsm *FilteredStreamManager) LoadFilterPreset(presetID string) error
LoadFilterPreset loads and activates a filter preset
func (*FilteredStreamManager) RemoveFilter ¶
func (fsm *FilteredStreamManager) RemoveFilter(filterID string) error
RemoveFilter removes a filter by ID
func (*FilteredStreamManager) SaveFilterPreset ¶
func (fsm *FilteredStreamManager) SaveFilterPreset(name, description, category string) (*FilterPreset, error)
SaveFilterPreset saves current filters as a preset
func (*FilteredStreamManager) Search ¶
func (fsm *FilteredStreamManager) Search(jobID, outputType, query string, options SearchOptions) ([]*SearchResult, error)
Search performs a search on a specific stream
func (*FilteredStreamManager) SearchNext ¶
func (fsm *FilteredStreamManager) SearchNext(jobID, outputType string, currentLine int) (*SearchResult, error)
SearchNext finds the next match in a stream
func (*FilteredStreamManager) SearchPrevious ¶
func (fsm *FilteredStreamManager) SearchPrevious(jobID, outputType string, currentLine int) (*SearchResult, error)
SearchPrevious finds the previous match in a stream
func (*FilteredStreamManager) SetFilterChainMode ¶
func (fsm *FilteredStreamManager) SetFilterChainMode(mode ChainMode)
SetFilterChainMode sets how multiple filters are combined (AND/OR)
func (*FilteredStreamManager) SetQuickFilter ¶
func (fsm *FilteredStreamManager) SetQuickFilter(pattern string, filterType FilterType) error
SetQuickFilter sets a quick filter for all streams
func (*FilteredStreamManager) StartFilteredStream ¶
func (fsm *FilteredStreamManager) StartFilteredStream(jobID, outputType string) error
StartFilteredStream starts a stream with filtering enabled
func (*FilteredStreamManager) StopFilteredStream ¶
func (fsm *FilteredStreamManager) StopFilteredStream(jobID, outputType string) error
StopFilteredStream stops a stream and cleans up resources
func (*FilteredStreamManager) StreamWithContext ¶
func (fsm *FilteredStreamManager) StreamWithContext(ctx context.Context, jobID, outputType string) (<-chan StreamEvent, error)
StreamWithContext provides a filtered event channel for a specific stream
type JobStream ¶
type JobStream struct {
JobID string // SLURM job ID
OutputType string // "stdout" or "stderr"
Buffer *CircularBuffer // Memory-efficient output storage
FilePath string // Path to output file
LastOffset int64 // File offset for tailing
IsActive bool // Whether streaming is active
IsRemote bool // Local or remote file
NodeID string // For remote files
Subscribers []chan<- StreamEvent // Event subscribers
FileWatcher *FileWatcher // Individual file watcher
LastUpdate time.Time // Last update timestamp
}
JobStream represents an active streaming session for a job's output
type PathResolver ¶
type PathResolver struct {
// contains filtered or unexported fields
}
PathResolver resolves SLURM job output file paths using SLURM API data
func NewPathResolver ¶
func NewPathResolver(client dao.SlurmClient, config *SlurmConfig) *PathResolver
NewPathResolver creates a new path resolver
func (*PathResolver) GetJobOutputPaths ¶
func (pr *PathResolver) GetJobOutputPaths(jobID string) (stdoutPath, stderrPath string, isRemote bool, nodeID string, err error)
GetJobOutputPaths returns both stdout and stderr paths for a job
func (*PathResolver) ResolveOutputPath ¶
ResolveOutputPath determines the full path to job output file using SLURM API data
func (*PathResolver) ValidateOutputPath ¶
func (pr *PathResolver) ValidateOutputPath(filePath string, isRemote bool, nodeID string) error
ValidateOutputPath checks if an output path is accessible
type SearchHistory ¶
type SearchHistory struct {
// contains filtered or unexported fields
}
SearchHistory tracks recent searches
func NewSearchHistory ¶
func NewSearchHistory(maxSize int) *SearchHistory
NewSearchHistory creates a new search history
type SearchOptions ¶
type SearchOptions struct {
CaseSensitive bool `json:"case_sensitive"`
WholeWord bool `json:"whole_word"`
UseRegex bool `json:"use_regex"`
ContextLines int `json:"context_lines"` // Number of lines before/after to include
MaxResults int `json:"max_results"` // Limit number of results
Reverse bool `json:"reverse"` // Search from bottom to top
}
SearchOptions configures search behavior
type SearchResult ¶
type SearchResult struct {
LineNumber int `json:"line_number"`
Line string `json:"line"`
Matches []Match `json:"matches"`
Context []string `json:"context,omitempty"` // Lines before/after for context
}
SearchResult represents a search match in the stream
type SlurmConfig ¶
type SlurmConfig struct {
// File paths (fallback when SLURM API doesn't provide paths)
OutputDir string `json:"output_dir"` // Default: /var/spool/slurm/slurmd/job_output
ErrorDir string `json:"error_dir"` // Usually same as OutputDir
FilePattern string `json:"file_pattern"` // Default: "slurm-{jobid}.out"
ErrorPattern string `json:"error_pattern"` // Default: "slurm-{jobid}.err"
// Node configuration
RemoteAccess bool `json:"remote_access"` // Enable SSH for remote files
LocalNodes []string `json:"local_nodes"` // Nodes accessible via local filesystem
SSHUser string `json:"ssh_user"` // Default SSH username
SSHKeyPath string `json:"ssh_key_path"` // SSH private key path
// Performance tuning
FileCheckInterval time.Duration `json:"file_check_interval"` // 1s
MaxFileSize int64 `json:"max_file_size"` // 100MB
BufferSize int `json:"buffer_size"` // 10000 lines
}
SlurmConfig contains SLURM-specific configuration for streaming
func DefaultSlurmConfig ¶
func DefaultSlurmConfig() *SlurmConfig
DefaultSlurmConfig returns default SLURM configuration
type StreamConfig ¶
type StreamConfig struct {
MaxConcurrentStreams int `json:"max_concurrent_streams"` // Default: 4
BufferSize int `json:"buffer_size_lines"` // Default: 10000
PollInterval time.Duration `json:"poll_interval"` // Default: 2s
MaxMemoryMB int `json:"max_memory_mb"` // Default: 50
AutoScroll bool `json:"auto_scroll_default"` // Default: true
ShowTimestamps bool `json:"show_timestamps"` // Default: true
ExportFormat string `json:"export_format"` // Default: "txt"
}
StreamConfig contains user-configurable streaming settings
func DefaultStreamConfig ¶
func DefaultStreamConfig() *StreamConfig
DefaultStreamConfig returns default streaming configuration
type StreamEvent ¶
type StreamEvent struct {
JobID string `json:"job_id"`
OutputType string `json:"output_type"` // "stdout" or "stderr"
Content string `json:"content"`
NewLines []string `json:"new_lines"` // New lines added since last event
Timestamp time.Time `json:"timestamp"`
EventType StreamEventType `json:"event_type"`
FileOffset int64 `json:"file_offset"` // Current file position
Error error `json:"error,omitempty"`
}
StreamEvent represents an event in the streaming system
type StreamEventType ¶
type StreamEventType string
StreamEventType represents the type of stream event
const ( StreamEventNewOutput StreamEventType = "NEW_OUTPUT" StreamEventJobComplete StreamEventType = "JOB_COMPLETE" StreamEventError StreamEventType = "ERROR" StreamEventFileRotated StreamEventType = "FILE_ROTATED" StreamEventStreamStart StreamEventType = "STREAM_START" StreamEventStreamStop StreamEventType = "STREAM_STOP" )
type StreamFilter ¶
type StreamFilter struct {
ID string `json:"id"`
Name string `json:"name"`
Type FilterType `json:"type"`
Enabled bool `json:"enabled"`
Pattern string `json:"pattern"` // For keyword/regex filters
Regex *regexp.Regexp `json:"-"` // Compiled regex
TimeStart time.Time `json:"time_start"` // For time range filters
TimeEnd time.Time `json:"time_end"`
LogLevels []LogLevel `json:"log_levels"` // For log level filters
Invert bool `json:"invert"` // Invert the filter results
CaseSensitive bool `json:"case_sensitive"`
Highlight bool `json:"highlight"` // Highlight matches
HighlightColor string `json:"highlight_color"`
Stats FilterStats `json:"stats"`
}
StreamFilter represents a filter that can be applied to streaming output
func NewStreamFilter ¶
func NewStreamFilter(filterType FilterType, pattern string) (*StreamFilter, error)
NewStreamFilter creates a new stream filter
type StreamInfo ¶
type StreamInfo struct {
JobID string `json:"job_id"`
OutputType string `json:"output_type"`
FilePath string `json:"file_path"`
IsRemote bool `json:"is_remote"`
NodeID string `json:"node_id"`
BufferSize int `json:"buffer_size"`
BufferUsage float64 `json:"buffer_usage_percent"`
LastUpdate time.Time `json:"last_update"`
Subscribers int `json:"subscribers"`
}
StreamInfo contains information about an active stream
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages real-time job output streaming
func NewStreamManager ¶
func NewStreamManager(client dao.SlurmClient, sshManager *ssh.SessionManager, config *SlurmConfig) (*StreamManager, error)
NewStreamManager creates a new stream manager
func (*StreamManager) Close ¶
func (sm *StreamManager) Close() error
Close shuts down the stream manager
func (*StreamManager) GetActiveStreams ¶
func (sm *StreamManager) GetActiveStreams() []StreamInfo
GetActiveStreams returns information about all active streams
func (*StreamManager) GetBuffer ¶
func (sm *StreamManager) GetBuffer(jobID, outputType string) ([]string, error)
GetBuffer returns the current buffer contents for a stream
func (*StreamManager) GetStats ¶
func (sm *StreamManager) GetStats() StreamingStats
GetStats returns streaming statistics
func (*StreamManager) IsStreamActive ¶
func (sm *StreamManager) IsStreamActive(jobID, outputType string) bool
IsStreamActive returns true if a stream is currently active
func (*StreamManager) StartStream ¶
func (sm *StreamManager) StartStream(jobID, outputType string) error
StartStream begins watching job output file
func (*StreamManager) StopStream ¶
func (sm *StreamManager) StopStream(jobID, outputType string) error
StopStream stops watching job output file
func (*StreamManager) Subscribe ¶
func (sm *StreamManager) Subscribe(jobID, outputType string) <-chan StreamEvent
Subscribe adds a subscriber for stream events
func (*StreamManager) Unsubscribe ¶
func (sm *StreamManager) Unsubscribe(jobID, outputType string, ch <-chan StreamEvent)
Unsubscribe removes a subscriber
type StreamSearcher ¶
type StreamSearcher struct {
// contains filtered or unexported fields
}
StreamSearcher handles searching within stream buffers
func NewStreamSearcher ¶
func NewStreamSearcher(buffer *CircularBuffer) *StreamSearcher
NewStreamSearcher creates a new stream searcher
func (*StreamSearcher) Clear ¶
func (ss *StreamSearcher) Clear()
Clear clears search results and pattern
func (*StreamSearcher) GetHighlightedLine ¶
func (ss *StreamSearcher) GetHighlightedLine(line string, highlightColor string) string
GetHighlightedLine returns a line with search matches highlighted
func (*StreamSearcher) GetStats ¶
func (ss *StreamSearcher) GetStats() (totalMatches int, matchedLines int)
GetStats returns search statistics
func (*StreamSearcher) Search ¶
func (ss *StreamSearcher) Search(query string, options SearchOptions) ([]*SearchResult, error)
Search performs a search on the buffer content
func (*StreamSearcher) SearchNext ¶
func (ss *StreamSearcher) SearchNext(startLine int) (*SearchResult, error)
SearchNext finds the next occurrence after the given line number
func (*StreamSearcher) SearchPrevious ¶
func (ss *StreamSearcher) SearchPrevious(startLine int) (*SearchResult, error)
SearchPrevious finds the previous occurrence before the given line number
type StreamingStats ¶
type StreamingStats struct {
ActiveStreams int `json:"active_streams"`
TotalStreams int `json:"total_streams"`
MemoryUsage int64 `json:"memory_usage_bytes"`
TotalEvents int64 `json:"total_events"`
ErrorCount int64 `json:"error_count"`
Uptime time.Duration `json:"uptime"`
LastError error `json:"last_error,omitempty"`
LastErrorTime time.Time `json:"last_error_time,omitempty"`
}
StreamingStats contains statistics about streaming operations
type SubscriptionInfo ¶
type SubscriptionInfo struct {
JobID string `json:"job_id"`
OutputType string `json:"output_type"`
Subscribers int `json:"subscribers"`
}
SubscriptionInfo contains information about a subscription