streaming

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package streaming provides streaming buffer and output management.

Index

Constants

View Source
const (
	DefaultOutputDir            = "/var/spool/slurm/slurmd/job_output"
	DefaultFilePattern          = "slurm-%s.out"
	DefaultErrorPattern         = "slurm-%s.err"
	DefaultBufferSize           = 10000
	DefaultMaxConcurrentStreams = 4
	DefaultPollInterval         = 2 * time.Second
	DefaultMaxMemoryMB          = 50
	DefaultFileCheckInterval    = 1 * time.Second
	DefaultMaxFileSize          = 100 * 1024 * 1024 // 100MB
)

Default configuration values

Variables

This section is empty.

Functions

func GenerateFilterID

func GenerateFilterID() string

GenerateFilterID generates a unique filter ID

func GetCurrentTime

func GetCurrentTime() time.Time

GetCurrentTime returns the current time - allows for easier testing with time mocking

func HighlightLine

func HighlightLine(line string, highlights map[string][]int, colors map[string]string) string

HighlightLine applies multiple highlights to a line (for both search and filters)

Types

type BufferStats

type BufferStats struct {
	CurrentSize  int     `json:"current_size"`
	Capacity     int     `json:"capacity"`
	TotalLines   int64   `json:"total_lines"`
	IsFull       bool    `json:"is_full"`
	UsagePercent float64 `json:"usage_percent"`
}

BufferStats contains statistics about a circular buffer

type ChainMode

type ChainMode string

ChainMode determines how multiple filters are combined

const (
	// ChainModeAND indicates all filters must match.
	ChainModeAND ChainMode = "AND"
	// ChainModeOR indicates any filter must match.
	ChainModeOR ChainMode = "OR"
)

type CircularBuffer

type CircularBuffer struct {
	// contains filtered or unexported fields
}

CircularBuffer implements a thread-safe circular buffer for storing job output lines

func NewCircularBuffer

func NewCircularBuffer(capacity int) *CircularBuffer

NewCircularBuffer creates a new circular buffer with the specified capacity

func (*CircularBuffer) Append

func (cb *CircularBuffer) Append(lines []string)

Append adds new lines to the buffer

func (*CircularBuffer) AppendString

func (cb *CircularBuffer) AppendString(content string)

AppendString adds a single string (potentially containing multiple lines) to the buffer

func (*CircularBuffer) Capacity

func (cb *CircularBuffer) Capacity() int

Capacity returns the maximum capacity of the buffer

func (*CircularBuffer) Clear

func (cb *CircularBuffer) Clear()

Clear removes all lines from the buffer

func (*CircularBuffer) EstimateMemoryUsage

func (cb *CircularBuffer) EstimateMemoryUsage() int64

EstimateMemoryUsage estimates the memory usage of the buffer in bytes

func (*CircularBuffer) GetLastN

func (cb *CircularBuffer) GetLastN(n int) []string

GetLastN returns the last N lines from the buffer

func (*CircularBuffer) GetLines

func (cb *CircularBuffer) GetLines() []string

GetLines returns all current lines in the buffer

func (*CircularBuffer) GetStats

func (cb *CircularBuffer) GetStats() BufferStats

GetStats returns buffer statistics

func (*CircularBuffer) IsFull

func (cb *CircularBuffer) IsFull() bool

IsFull returns true if the buffer is at capacity

func (*CircularBuffer) Resize

func (cb *CircularBuffer) Resize(newCapacity int)

Resize changes the buffer capacity (creates a new buffer and copies data)

func (*CircularBuffer) Size

func (cb *CircularBuffer) Size() int

Size returns the current number of lines in the buffer

func (*CircularBuffer) TotalLines

func (cb *CircularBuffer) TotalLines() int64

TotalLines returns the total number of lines ever added to the buffer

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus manages event distribution for streaming operations

func NewEventBus

func NewEventBus() *EventBus

NewEventBus creates a new event bus

func (*EventBus) Clear

func (eb *EventBus) Clear()

Clear removes all subscribers and closes all channels

func (*EventBus) GetAllSubscriptions

func (eb *EventBus) GetAllSubscriptions() map[string]int

GetAllSubscriptions returns information about all active subscriptions

func (*EventBus) GetSubscriberCount

func (eb *EventBus) GetSubscriberCount(jobID, outputType string) int

GetSubscriberCount returns the number of subscribers for a specific job and output type

func (*EventBus) GetSubscriptionInfo

func (eb *EventBus) GetSubscriptionInfo() []SubscriptionInfo

GetSubscriptionInfo returns detailed information about all subscriptions

func (*EventBus) HasSubscribers

func (eb *EventBus) HasSubscribers(jobID, outputType string) bool

HasSubscribers returns true if there are any subscribers for a specific job and output type

func (*EventBus) Publish

func (eb *EventBus) Publish(event *StreamEvent)

Publish sends an event to all subscribers of a specific job and output type

func (*EventBus) PublishError

func (eb *EventBus) PublishError(jobID, outputType string, err error)

PublishError sends an error event to all subscribers of a specific job and output type

func (*EventBus) Subscribe

func (eb *EventBus) Subscribe(jobID, outputType string, ch chan<- StreamEvent)

Subscribe adds a subscriber for events from a specific job and output type

func (*EventBus) Unsubscribe

func (eb *EventBus) Unsubscribe(jobID, outputType string, ch chan<- StreamEvent)

Unsubscribe removes a subscriber for a specific job and output type

func (*EventBus) UnsubscribeAll

func (eb *EventBus) UnsubscribeAll(jobID, outputType string)

UnsubscribeAll removes all subscribers for a specific job and output type

type FileWatcher

type FileWatcher struct {
	FilePath   string
	IsRemote   bool
	NodeID     string
	LastOffset int64
}

FileWatcher handles individual file watching operations

type FilterChain

type FilterChain struct {
	ID          string          `json:"id"`
	Name        string          `json:"name"`
	Description string          `json:"description"`
	Filters     []*StreamFilter `json:"filters"`
	Mode        ChainMode       `json:"mode"` // AND or OR
	Active      bool            `json:"active"`
}

FilterChain represents a collection of filters applied in sequence

func (*FilterChain) Apply

func (fc *FilterChain) Apply(line string, timestamp time.Time) (bool, map[string][]int)

Apply applies a filter chain to a line and returns whether the filter matches and match indices.

type FilterManager

type FilterManager struct {
	// contains filtered or unexported fields
}

FilterManager manages filters and presets for streaming

func NewFilterManager

func NewFilterManager(configPath string) *FilterManager

NewFilterManager creates a new filter manager

func (*FilterManager) AddFilter

func (fm *FilterManager) AddFilter(filter *StreamFilter) error

AddFilter adds a new filter

func (*FilterManager) ApplyActiveFilters

func (fm *FilterManager) ApplyActiveFilters(line string, timestamp time.Time) (bool, map[string][]int)

ApplyActiveFilters applies the active filter chain to a line

func (*FilterManager) ClearFilters

func (fm *FilterManager) ClearFilters()

ClearFilters removes all active filters

func (*FilterManager) CreateChain

func (fm *FilterManager) CreateChain(name string, mode ChainMode, filterIDs []string) (*FilterChain, error)

CreateChain creates a new filter chain

func (*FilterManager) GetFilter

func (fm *FilterManager) GetFilter(filterID string) (*StreamFilter, error)

GetFilter retrieves a filter by ID

func (*FilterManager) GetFilterStats

func (fm *FilterManager) GetFilterStats() map[string]FilterStats

GetFilterStats returns statistics for all active filters

func (*FilterManager) GetPresets

func (fm *FilterManager) GetPresets() []*FilterPreset

GetPresets returns all available presets

func (*FilterManager) LoadPreset

func (fm *FilterManager) LoadPreset(presetID string) error

LoadPreset loads and activates a filter preset

func (*FilterManager) QuickFilter

func (fm *FilterManager) QuickFilter(pattern string, filterType FilterType) error

QuickFilter creates and activates a simple keyword filter

func (*FilterManager) RemoveFilter

func (fm *FilterManager) RemoveFilter(filterID string) error

RemoveFilter removes a filter by ID

func (*FilterManager) SavePreset

func (fm *FilterManager) SavePreset(name, description, category string) (*FilterPreset, error)

SavePreset saves the current filter configuration as a preset

func (*FilterManager) SetActiveChain

func (fm *FilterManager) SetActiveChain(chainID string) error

SetActiveChain sets the active filter chain

type FilterPreset

type FilterPreset struct {
	ID          string          `json:"id"`
	Name        string          `json:"name"`
	Description string          `json:"description"`
	Category    string          `json:"category"`
	Filters     []*StreamFilter `json:"filters"`
	Tags        []string        `json:"tags"`
	Created     time.Time       `json:"created"`
	LastUsed    time.Time       `json:"last_used"`
	UseCount    int             `json:"use_count"`
}

FilterPreset represents a saved filter configuration

func GetCommonPresets

func GetCommonPresets() []*FilterPreset

GetCommonPresets returns the common filter presets.

type FilterStats

type FilterStats struct {
	MatchCount     int64     `json:"match_count"`
	ProcessedLines int64     `json:"processed_lines"`
	LastMatch      time.Time `json:"last_match"`
	Created        time.Time `json:"created"`
}

FilterStats tracks filter performance and matches

type FilterType

type FilterType string

FilterType represents the type of filter to apply

const (
	// FilterTypeKeyword is the filter type for keyword matching.
	FilterTypeKeyword FilterType = "keyword"
	// FilterTypeRegex is the filter type for regex matching.
	FilterTypeRegex FilterType = "regex"
	// FilterTypeTimeRange is the filter type for time range filtering.
	FilterTypeTimeRange FilterType = "timerange"
	// FilterTypeLogLevel is the filter type for log level filtering.
	FilterTypeLogLevel FilterType = "loglevel"
	// FilterTypeInvert is the filter type for inverted filtering.
	FilterTypeInvert FilterType = "invert"
)

type FilteredStreamManager

type FilteredStreamManager struct {
	*StreamManager
	// contains filtered or unexported fields
}

FilteredStreamManager extends StreamManager with filtering capabilities

func NewFilteredStreamManager

func NewFilteredStreamManager(client dao.SlurmClient, sshManager *ssh.SessionManager, config *SlurmConfig, configPath string) (*FilteredStreamManager, error)

NewFilteredStreamManager creates a new stream manager with filtering support

func (*FilteredStreamManager) AddCustomFilter

func (fsm *FilteredStreamManager) AddCustomFilter(filter *StreamFilter) error

AddCustomFilter adds a custom filter

func (*FilteredStreamManager) ClearFilters

func (fsm *FilteredStreamManager) ClearFilters()

ClearFilters removes all active filters

func (*FilteredStreamManager) CloseFiltered

func (fsm *FilteredStreamManager) CloseFiltered() error

CloseFiltered closes the filtered stream manager

func (*FilteredStreamManager) EmitFilteredEvent

func (fsm *FilteredStreamManager) EmitFilteredEvent(event *StreamEvent)

EmitFilteredEvent emits an event after applying filters

func (*FilteredStreamManager) GetActiveFilters

func (fsm *FilteredStreamManager) GetActiveFilters() []*StreamFilter

GetActiveFilters returns the currently active filters

func (*FilteredStreamManager) GetFilterPresets

func (fsm *FilteredStreamManager) GetFilterPresets() []*FilterPreset

GetFilterPresets returns available filter presets

func (*FilteredStreamManager) GetFilterStats

func (fsm *FilteredStreamManager) GetFilterStats() map[string]FilterStats

GetFilterStats returns filter statistics

func (*FilteredStreamManager) GetFilteredContent

func (fsm *FilteredStreamManager) GetFilteredContent(jobID, outputType string, includeHighlights bool) ([]string, error)

GetFilteredContent returns the filtered content for a stream

func (*FilteredStreamManager) GetSearchHistory

func (fsm *FilteredStreamManager) GetSearchHistory() []string

GetSearchHistory returns recent search queries

func (*FilteredStreamManager) LoadFilterPreset

func (fsm *FilteredStreamManager) LoadFilterPreset(presetID string) error

LoadFilterPreset loads and activates a filter preset

func (*FilteredStreamManager) RemoveFilter

func (fsm *FilteredStreamManager) RemoveFilter(filterID string) error

RemoveFilter removes a filter by ID

func (*FilteredStreamManager) SaveFilterPreset

func (fsm *FilteredStreamManager) SaveFilterPreset(name, description, category string) (*FilterPreset, error)

SaveFilterPreset saves current filters as a preset

func (*FilteredStreamManager) Search

func (fsm *FilteredStreamManager) Search(jobID, outputType, query string, options SearchOptions) ([]*SearchResult, error)

Search performs a search on a specific stream

func (*FilteredStreamManager) SearchNext

func (fsm *FilteredStreamManager) SearchNext(jobID, outputType string, currentLine int) (*SearchResult, error)

SearchNext finds the next match in a stream

func (*FilteredStreamManager) SearchPrevious

func (fsm *FilteredStreamManager) SearchPrevious(jobID, outputType string, currentLine int) (*SearchResult, error)

SearchPrevious finds the previous match in a stream

func (*FilteredStreamManager) SetFilterChainMode

func (fsm *FilteredStreamManager) SetFilterChainMode(mode ChainMode)

SetFilterChainMode sets how multiple filters are combined (AND/OR)

func (*FilteredStreamManager) SetQuickFilter

func (fsm *FilteredStreamManager) SetQuickFilter(pattern string, filterType FilterType) error

SetQuickFilter sets a quick filter for all streams

func (*FilteredStreamManager) StartFilteredStream

func (fsm *FilteredStreamManager) StartFilteredStream(jobID, outputType string) error

StartFilteredStream starts a stream with filtering enabled

func (*FilteredStreamManager) StopFilteredStream

func (fsm *FilteredStreamManager) StopFilteredStream(jobID, outputType string) error

StopFilteredStream stops a stream and cleans up resources

func (*FilteredStreamManager) StreamWithContext

func (fsm *FilteredStreamManager) StreamWithContext(ctx context.Context, jobID, outputType string) (<-chan *StreamEvent, error)

StreamWithContext provides a filtered event channel for a specific stream

type JobStream

type JobStream struct {
	JobID       string               // SLURM job ID
	OutputType  string               // "stdout" or "stderr"
	Buffer      *CircularBuffer      // Memory-efficient output storage
	FilePath    string               // Path to output file
	LastOffset  int64                // File offset for tailing
	IsActive    bool                 // Whether streaming is active
	IsRemote    bool                 // Local or remote file
	NodeID      string               // For remote files
	Subscribers []chan<- StreamEvent // Event subscribers
	FileWatcher *FileWatcher         // Individual file watcher
	LastUpdate  time.Time            // Last update timestamp

}

JobStream represents an active streaming session for a job's output

type LogLevel

type LogLevel string

LogLevel represents common log levels for filtering

const (
	// LogLevelDebug is the debug log level.
	LogLevelDebug LogLevel = "DEBUG"
	// LogLevelInfo is the info log level.
	LogLevelInfo LogLevel = "INFO"
	// LogLevelWarning is the warning log level.
	LogLevelWarning LogLevel = "WARNING"
	// LogLevelError is the error log level.
	LogLevelError LogLevel = "ERROR"
	// LogLevelFatal is the fatal log level.
	LogLevelFatal LogLevel = "FATAL"
)

type Match

type Match struct {
	Start int    `json:"start"`
	End   int    `json:"end"`
	Text  string `json:"text"`
}

Match represents a single match within a line

type PathResolver

type PathResolver struct {
	// contains filtered or unexported fields
}

PathResolver resolves SLURM job output file paths using SLURM API data

func NewPathResolver

func NewPathResolver(client dao.SlurmClient, config *SlurmConfig) *PathResolver

NewPathResolver creates a new path resolver

func (*PathResolver) GetJobOutputPaths

func (pr *PathResolver) GetJobOutputPaths(jobID string) (stdoutPath, stderrPath string, isRemote bool, nodeID string, err error)

GetJobOutputPaths returns both stdout and stderr paths for a job

func (*PathResolver) ResolveOutputPath

func (pr *PathResolver) ResolveOutputPath(jobID, outputType string) (string, bool, string, error)

ResolveOutputPath determines the full path to job output file using SLURM API data

func (*PathResolver) ValidateOutputPath

func (pr *PathResolver) ValidateOutputPath(filePath string, _ bool, _ string) error

ValidateOutputPath checks if an output path is accessible

type SearchHistory

type SearchHistory struct {
	// contains filtered or unexported fields
}

SearchHistory tracks recent searches

func NewSearchHistory

func NewSearchHistory(maxSize int) *SearchHistory

NewSearchHistory creates a new search history

func (*SearchHistory) Add

func (sh *SearchHistory) Add(query string)

Add adds a query to history

func (*SearchHistory) Get

func (sh *SearchHistory) Get() []string

Get returns the search history

type SearchOptions

type SearchOptions struct {
	CaseSensitive bool `json:"case_sensitive"`
	WholeWord     bool `json:"whole_word"`
	UseRegex      bool `json:"use_regex"`
	ContextLines  int  `json:"context_lines"` // Number of lines before/after to include
	MaxResults    int  `json:"max_results"`   // Limit number of results
	Reverse       bool `json:"reverse"`       // Search from bottom to top
}

SearchOptions configures search behavior

type SearchResult

type SearchResult struct {
	LineNumber int      `json:"line_number"`
	Line       string   `json:"line"`
	Matches    []Match  `json:"matches"`
	Context    []string `json:"context,omitempty"` // Lines before/after for context
}

SearchResult represents a search match in the stream

type SlurmConfig

type SlurmConfig struct {
	// File paths (fallback when SLURM API doesn't provide paths)
	OutputDir    string `json:"output_dir"`    // Default: /var/spool/slurm/slurmd/job_output
	ErrorDir     string `json:"error_dir"`     // Usually same as OutputDir
	FilePattern  string `json:"file_pattern"`  // Default: "slurm-{jobid}.out"
	ErrorPattern string `json:"error_pattern"` // Default: "slurm-{jobid}.err"

	// Node configuration
	RemoteAccess bool     `json:"remote_access"` // Enable SSH for remote files
	LocalNodes   []string `json:"local_nodes"`   // Nodes accessible via local filesystem
	SSHUser      string   `json:"ssh_user"`      // Default SSH username
	SSHKeyPath   string   `json:"ssh_key_path"`  // SSH private key path

	// Performance tuning
	FileCheckInterval time.Duration `json:"file_check_interval"` // 1s
	MaxFileSize       int64         `json:"max_file_size"`       // 100MB
	BufferSize        int           `json:"buffer_size"`         // 10000 lines
}

SlurmConfig contains SLURM-specific configuration for streaming

func DefaultSlurmConfig

func DefaultSlurmConfig() *SlurmConfig

DefaultSlurmConfig returns default SLURM configuration

type Stats added in v0.3.0

type Stats struct {
	ActiveStreams int           `json:"active_streams"`
	TotalStreams  int           `json:"total_streams"`
	MemoryUsage   int64         `json:"memory_usage_bytes"`
	TotalEvents   int64         `json:"total_events"`
	ErrorCount    int64         `json:"error_count"`
	Uptime        time.Duration `json:"uptime"`
	LastError     error         `json:"last_error,omitempty"`
	LastErrorTime time.Time     `json:"last_error_time,omitempty"`
}

Stats contains statistics about streaming operations

type StreamConfig

type StreamConfig struct {
	MaxConcurrentStreams int           `json:"max_concurrent_streams"` // Default: 4
	BufferSize           int           `json:"buffer_size_lines"`      // Default: 10000
	PollInterval         time.Duration `json:"poll_interval"`          // Default: 2s
	MaxMemoryMB          int           `json:"max_memory_mb"`          // Default: 50
	AutoScroll           bool          `json:"auto_scroll_default"`    // Default: true
	ShowTimestamps       bool          `json:"show_timestamps"`        // Default: true
	ExportFormat         string        `json:"export_format"`          // Default: "txt"
}

StreamConfig contains user-configurable streaming settings

func DefaultStreamConfig

func DefaultStreamConfig() *StreamConfig

DefaultStreamConfig returns default streaming configuration

type StreamEvent

type StreamEvent struct {
	JobID      string          `json:"job_id"`
	OutputType string          `json:"output_type"` // "stdout" or "stderr"
	Content    string          `json:"content"`
	NewLines   []string        `json:"new_lines"` // New lines added since last event
	Timestamp  time.Time       `json:"timestamp"`
	EventType  StreamEventType `json:"event_type"`
	FileOffset int64           `json:"file_offset"` // Current file position
	Error      error           `json:"error,omitempty"`
}

StreamEvent represents an event in the streaming system

type StreamEventType

type StreamEventType string

StreamEventType represents the type of stream event

const (
	// StreamEventNewOutput is the stream event type for new output.
	StreamEventNewOutput StreamEventType = "NEW_OUTPUT"
	// StreamEventJobComplete is the stream event type for job completion.
	StreamEventJobComplete StreamEventType = "JOB_COMPLETE"
	// StreamEventError is the stream event type for errors.
	StreamEventError StreamEventType = "ERROR"
	// StreamEventFileRotated is the stream event type for file rotation.
	StreamEventFileRotated StreamEventType = "FILE_ROTATED"
	// StreamEventStreamStart is the stream event type for stream start.
	StreamEventStreamStart StreamEventType = "STREAM_START"
	// StreamEventStreamStop is the stream event type for stream stop.
	StreamEventStreamStop StreamEventType = "STREAM_STOP"
)

type StreamFilter

type StreamFilter struct {
	ID             string         `json:"id"`
	Name           string         `json:"name"`
	Type           FilterType     `json:"type"`
	Enabled        bool           `json:"enabled"`
	Pattern        string         `json:"pattern"`    // For keyword/regex filters
	Regex          *regexp.Regexp `json:"-"`          // Compiled regex
	TimeStart      time.Time      `json:"time_start"` // For time range filters
	TimeEnd        time.Time      `json:"time_end"`
	LogLevels      []LogLevel     `json:"log_levels"` // For log level filters
	Invert         bool           `json:"invert"`     // Invert the filter results
	CaseSensitive  bool           `json:"case_sensitive"`
	Highlight      bool           `json:"highlight"` // Highlight matches
	HighlightColor string         `json:"highlight_color"`
	Stats          FilterStats    `json:"stats"`
}

StreamFilter represents a filter that can be applied to streaming output

func NewStreamFilter

func NewStreamFilter(filterType FilterType, pattern string) (*StreamFilter, error)

NewStreamFilter creates a new stream filter

func (*StreamFilter) Match

func (f *StreamFilter) Match(line string, timestamp time.Time) (bool, []int)

Match checks if a line matches the filter criteria

type StreamInfo

type StreamInfo struct {
	JobID       string    `json:"job_id"`
	OutputType  string    `json:"output_type"`
	FilePath    string    `json:"file_path"`
	IsRemote    bool      `json:"is_remote"`
	NodeID      string    `json:"node_id"`
	BufferSize  int       `json:"buffer_size"`
	BufferUsage float64   `json:"buffer_usage_percent"`
	LastUpdate  time.Time `json:"last_update"`
	Subscribers int       `json:"subscribers"`
}

StreamInfo contains information about an active stream

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages real-time job output streaming

func NewStreamManager

func NewStreamManager(client dao.SlurmClient, sshManager *ssh.SessionManager, config *SlurmConfig) (*StreamManager, error)

NewStreamManager creates a new stream manager

func (*StreamManager) Close

func (sm *StreamManager) Close() error

Close shuts down the stream manager

func (*StreamManager) GetActiveStreams

func (sm *StreamManager) GetActiveStreams() []StreamInfo

GetActiveStreams returns information about all active streams

func (*StreamManager) GetBuffer

func (sm *StreamManager) GetBuffer(jobID, outputType string) ([]string, error)

GetBuffer returns the current buffer contents for a stream

func (*StreamManager) GetStats

func (sm *StreamManager) GetStats() StreamingStats

GetStats returns streaming statistics

func (*StreamManager) IsStreamActive

func (sm *StreamManager) IsStreamActive(jobID, outputType string) bool

IsStreamActive returns true if a stream is currently active

func (*StreamManager) StartStream

func (sm *StreamManager) StartStream(jobID, outputType string) error

StartStream begins watching job output file

func (*StreamManager) StopStream

func (sm *StreamManager) StopStream(jobID, outputType string) error

StopStream stops watching job output file

func (*StreamManager) Subscribe

func (sm *StreamManager) Subscribe(jobID, outputType string) <-chan StreamEvent

Subscribe adds a subscriber for stream events

func (*StreamManager) Unsubscribe

func (sm *StreamManager) Unsubscribe(jobID, outputType string, _ <-chan StreamEvent)

Unsubscribe removes a subscriber

type StreamSearcher

type StreamSearcher struct {
	// contains filtered or unexported fields
}

StreamSearcher handles searching within stream buffers

func NewStreamSearcher

func NewStreamSearcher(buffer *CircularBuffer) *StreamSearcher

NewStreamSearcher creates a new stream searcher

func (*StreamSearcher) Clear

func (ss *StreamSearcher) Clear()

Clear clears search results and pattern

func (*StreamSearcher) GetHighlightedLine

func (ss *StreamSearcher) GetHighlightedLine(line, highlightColor string) string

GetHighlightedLine returns a line with search matches highlighted

func (*StreamSearcher) GetStats

func (ss *StreamSearcher) GetStats() (totalMatches, matchedLines int)

GetStats returns search statistics

func (*StreamSearcher) Search

func (ss *StreamSearcher) Search(query string, options SearchOptions) ([]*SearchResult, error)

Search performs a search on the buffer content

func (*StreamSearcher) SearchNext

func (ss *StreamSearcher) SearchNext(startLine int) (*SearchResult, error)

SearchNext finds the next occurrence after the given line number

func (*StreamSearcher) SearchPrevious

func (ss *StreamSearcher) SearchPrevious(startLine int) (*SearchResult, error)

SearchPrevious finds the previous occurrence before the given line number

type StreamingStats

type StreamingStats = Stats

type SubscriptionInfo

type SubscriptionInfo struct {
	JobID       string `json:"job_id"`
	OutputType  string `json:"output_type"`
	Subscribers int    `json:"subscribers"`
}

SubscriptionInfo contains information about a subscription

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL