Documentation
¶
Overview ¶
Package stream provides the core stream processing engine for StreamSQL.
This package implements the central stream processing pipeline that orchestrates data flow, window management, aggregation, filtering, and result generation. It serves as the execution engine that brings together all StreamSQL components into a cohesive streaming system.
Core Features ¶
• Real-time Stream Processing - High-throughput, low-latency data stream processing • Window Management - Integration with all window types (tumbling, sliding, counting, session) • Aggregation Engine - Efficient aggregation with incremental computation • Filtering Pipeline - Multi-stage filtering with WHERE and HAVING clause support • Performance Modes - Configurable performance profiles for different use cases • Metrics and Monitoring - Comprehensive performance metrics and health monitoring • Persistence Support - Optional data persistence for reliability and recovery • Backpressure Handling - Intelligent backpressure management and overflow strategies
Stream Architecture ¶
The stream processing pipeline consists of several key components:
type Stream struct {
dataChan chan map[string]interface{} // Input data channel
filter condition.Condition // WHERE clause filter
Window window.Window // Window manager
aggregator aggregator.Aggregator // Aggregation engine
config types.Config // Stream configuration
sinks []func([]map[string]interface{}) // Result processors
resultChan chan []map[string]interface{} // Result channel
dataStrategy DataProcessingStrategy // Data processing strategy
}
Performance Modes ¶
Configurable performance profiles for different scenarios:
// High Performance Mode
// - Optimized for maximum throughput
// - Larger buffer sizes
// - Batch processing optimization
stream := NewStreamWithHighPerformance(config)
// Low Latency Mode
// - Optimized for minimal processing delay
// - Smaller buffer sizes
// - Immediate processing
stream := NewStreamWithLowLatency(config)
// Zero Data Loss Mode
// - Guaranteed data persistence
// - Synchronous processing
// - Enhanced error recovery
stream := NewStreamWithZeroDataLoss(config)
// Custom Performance Mode
// - User-defined performance parameters
customConfig := &PerformanceConfig{
BufferSize: 1000,
BatchSize: 50,
FlushInterval: time.Second,
WorkerPoolSize: 4,
}
stream := NewStreamWithCustomPerformance(config, *customConfig)
Data Processing Pipeline ¶
Multi-stage processing pipeline with optimized data flow:
Data Ingestion ├── Input validation and type checking ├── Timestamp extraction and normalization └── Initial data transformation
Filtering (WHERE clause) ├── Field-based filtering ├── Expression evaluation └── Early data rejection
Window Processing ├── Window assignment ├── Data buffering └── Window trigger management
Aggregation ├── Group-by processing ├── Aggregate function execution └── Incremental computation
Post-Aggregation Filtering (HAVING clause) ├── Aggregate result filtering ├── Complex condition evaluation └── Final result validation
Result Generation ├── Field projection ├── Alias application └── Output formatting
Window Integration ¶
Seamless integration with all window types:
// Tumbling Windows - Non-overlapping time-based windows
config.WindowConfig = WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{
"size": "5s",
},
}
// Sliding Windows - Overlapping time-based windows
config.WindowConfig = WindowConfig{
Type: "sliding",
Params: map[string]interface{}{
"size": "30s",
"slide": "10s",
},
}
// Counting Windows - Count-based windows
config.WindowConfig = WindowConfig{
Type: "counting",
Params: map[string]interface{}{
"count": 100,
},
}
// Session Windows - Activity-based windows
config.WindowConfig = WindowConfig{
Type: "session",
Params: map[string]interface{}{
"timeout": "5m",
"groupBy": "user_id",
},
}
Metrics and Monitoring ¶
Comprehensive performance monitoring:
type MetricsManager struct {
processedCount int64 // Total processed records
filteredCount int64 // Filtered out records
aggregatedCount int64 // Aggregated records
errorCount int64 // Processing errors
processingTime time.Duration // Average processing time
throughput float64 // Records per second
memoryUsage int64 // Memory consumption
bufferUtilization float64 // Buffer usage percentage
}
// Get basic statistics
stats := stream.GetStats()
fmt.Printf("Processed: %d, Errors: %d\n", stats["processed"], stats["errors"])
// Get detailed performance metrics
detailed := stream.GetDetailedStats()
fmt.Printf("Throughput: %.2f records/sec\n", detailed["throughput"])
fmt.Printf("Memory Usage: %d bytes\n", detailed["memory_usage"])
Backpressure Management ¶
Intelligent handling of system overload:
// Overflow strategies const ( OverflowStrategyDrop = "drop" // Drop oldest data OverflowStrategyBlock = "block" // Block new data OverflowStrategySpill = "spill" // Spill to disk OverflowStrategyCompress = "compress" // Compress data ) // Configure backpressure handling config.PerformanceConfig.OverflowStrategy = OverflowStrategySpill config.PerformanceConfig.BufferSize = 10000 config.PerformanceConfig.HighWaterMark = 0.8
Usage Examples ¶
Basic stream processing:
// Create stream with default configuration
stream, err := NewStream(config)
if err != nil {
log.Fatal(err)
}
// Register result handler
stream.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Results: %v\n", results)
})
// Start processing
stream.Start()
// Send data
stream.Emit(map[string]interface{}{
"device_id": "sensor001",
"temperature": 25.5,
"timestamp": time.Now(),
})
High-performance stream processing:
// Create high-performance stream
stream, err := NewStreamWithHighPerformance(config)
// Configure for maximum throughput
stream.SetBufferSize(50000)
stream.SetBatchSize(1000)
stream.SetWorkerPoolSize(8)
// Enable metrics monitoring
stream.EnableMetrics(true)
// Process data in batches
for _, batch := range dataBatches {
stream.EmitBatch(batch)
}
Synchronous processing for non-aggregation queries:
// Process single record synchronously
result, err := stream.ProcessSync(data)
if err != nil {
log.Printf("Processing error: %v", err)
} else if result != nil {
fmt.Printf("Immediate result: %v\n", result)
}
Integration ¶
Central integration point for all StreamSQL components:
• RSQL package - Configuration parsing and application • Window package - Window lifecycle management • Aggregator package - Aggregation execution • Functions package - Function execution in expressions • Condition package - Filter condition evaluation • Types package - Data type handling and configuration • Logger package - Comprehensive logging and debugging
Index ¶
- Constants
- func AssessPerformanceLevel(dataUsage, dropRate float64) string
- type BlockingStrategy
- type DataHandler
- type DataProcessingStrategy
- type DataProcessor
- type DropStrategy
- type ExpansionStrategy
- type ResultHandler
- type StatsCollector
- func (sc *StatsCollector) GetBasicStats(dataChanLen, dataChanCap, resultChanLen, resultChanCap, sinkPoolLen, ... int, ...) map[string]int64
- func (sc *StatsCollector) GetDetailedStats(basicStats map[string]int64) map[string]interface{}
- func (sc *StatsCollector) GetDroppedCount() int64
- func (sc *StatsCollector) GetInputCount() int64
- func (sc *StatsCollector) GetOutputCount() int64
- func (sc *StatsCollector) IncrementDropped()
- func (sc *StatsCollector) IncrementInput()
- func (sc *StatsCollector) IncrementOutput()
- func (sc *StatsCollector) Reset()
- type StatsManager
- type StrategyFactory
- func (sf *StrategyFactory) CreateStrategy(strategyName string) (DataProcessingStrategy, error)
- func (sf *StrategyFactory) GetRegisteredStrategies() []string
- func (sf *StrategyFactory) RegisterStrategy(name string, constructor func() DataProcessingStrategy)
- func (sf *StrategyFactory) UnregisterStrategy(name string)
- type Stream
- func NewStream(config types.Config) (*Stream, error)
- func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)
- func NewStreamWithHighPerformance(config types.Config) (*Stream, error)
- func NewStreamWithLowLatency(config types.Config) (*Stream, error)
- func (s *Stream) AddSink(sink func([]map[string]interface{}))
- func (s *Stream) Emit(data map[string]interface{})
- func (s *Stream) GetDetailedStats() map[string]interface{}
- func (s *Stream) GetResultsChan() <-chan []map[string]interface{}
- func (s *Stream) GetStats() map[string]int64
- func (s *Stream) IsAggregationQuery() bool
- func (s *Stream) ProcessSync(data map[string]interface{}) (map[string]interface{}, error)
- func (s *Stream) RegisterFilter(conditionStr string) error
- func (s *Stream) ResetStats()
- func (s *Stream) Start()
- func (s *Stream) Stop()
- type StreamFactory
- func (sf *StreamFactory) CreateCustomPerformanceStream(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)
- func (sf *StreamFactory) CreateHighPerformanceStream(config types.Config) (*Stream, error)
- func (sf *StreamFactory) CreateLowLatencyStream(config types.Config) (*Stream, error)
- func (sf *StreamFactory) CreateStream(config types.Config) (*Stream, error)
Constants ¶
const ( InputCount = "input_count" OutputCount = "output_count" DroppedCount = "dropped_count" DataChanLen = "data_chan_len" DataChanCap = "data_chan_cap" ResultChanLen = "result_chan_len" ResultChanCap = "result_chan_cap" SinkPoolLen = "sink_pool_len" SinkPoolCap = "sink_pool_cap" ActiveRetries = "active_retries" Expanding = "expanding" )
Statistics field constants
const ( BasicStats = "basic_stats" DataChanUsage = "data_chan_usage" ResultChanUsage = "result_chan_usage" SinkPoolUsage = "sink_pool_usage" ProcessRate = "process_rate" DropRate = "drop_rate" PerformanceLevel = "performance_level" )
Detailed statistics field constants
const ( StrategyDrop = "drop" // Drop strategy StrategyBlock = "block" // Blocking strategy StrategyExpand = "expand" // Dynamic strategy )
Overflow strategy constants
const ( WindowStartField = "window_start" WindowEndField = "window_end" )
Window related constants
const ( PerformanceLevelCritical = "CRITICAL" PerformanceLevelWarning = "WARNING" PerformanceLevelHighLoad = "HIGH_LOAD" PerformanceLevelModerateLoad = "MODERATE_LOAD" PerformanceLevelOptimal = "OPTIMAL" )
Performance level constants
const (
PerformanceConfigKey = "performanceConfig"
)
const (
SQLKeywordCase = "CASE"
)
SQL keyword constants
Variables ¶
This section is empty.
Functions ¶
func AssessPerformanceLevel ¶ added in v0.10.2
AssessPerformanceLevel evaluates current performance level Assesses stream processing performance level based on data usage rate and drop rate
Types ¶
type BlockingStrategy ¶ added in v0.10.2
type BlockingStrategy struct {
// contains filtered or unexported fields
}
BlockingStrategy blocking strategy implementation
func NewBlockingStrategy ¶ added in v0.10.2
func NewBlockingStrategy() *BlockingStrategy
NewBlockingStrategy creates blocking strategy instance
func (*BlockingStrategy) GetStrategyName ¶ added in v0.10.2
func (bs *BlockingStrategy) GetStrategyName() string
GetStrategyName gets strategy name
func (*BlockingStrategy) Init ¶ added in v0.10.2
func (bs *BlockingStrategy) Init(stream *Stream, config types.PerformanceConfig) error
Init initializes blocking strategy
func (*BlockingStrategy) ProcessData ¶ added in v0.10.2
func (bs *BlockingStrategy) ProcessData(data map[string]interface{})
ProcessData implements blocking mode data processing
func (*BlockingStrategy) Stop ¶ added in v0.10.2
func (bs *BlockingStrategy) Stop() error
Stop stops and cleans up blocking strategy resources
type DataHandler ¶ added in v0.10.2
type DataHandler struct {
// contains filtered or unexported fields
}
DataHandler handles data processing for different strategies
func NewDataHandler ¶ added in v0.10.2
func NewDataHandler(stream *Stream) *DataHandler
NewDataHandler creates a new data handler
type DataProcessingStrategy ¶ added in v0.10.2
type DataProcessingStrategy interface {
// ProcessData core method for processing data
// Parameters:
// - data: data to process, must be map[string]interface{} type
ProcessData(data map[string]interface{})
// GetStrategyName gets strategy name
GetStrategyName() string
// Init initializes strategy
// Parameters:
// - stream: Stream instance reference
// - config: performance configuration
Init(stream *Stream, config types.PerformanceConfig) error
// Stop stops and cleans up resources
Stop() error
}
DataProcessingStrategy data processing strategy interface Defines unified interface for different overflow strategies, providing better extensibility and maintainability
type DataProcessor ¶ added in v0.10.2
type DataProcessor struct {
// contains filtered or unexported fields
}
DataProcessor data processor responsible for processing data streams
func NewDataProcessor ¶ added in v0.10.2
func NewDataProcessor(stream *Stream) *DataProcessor
NewDataProcessor creates a data processor
func (*DataProcessor) Process ¶ added in v0.10.2
func (dp *DataProcessor) Process()
Process main processing loop
type DropStrategy ¶ added in v0.10.2
type DropStrategy struct {
// contains filtered or unexported fields
}
DropStrategy drop strategy implementation
func NewDropStrategy ¶ added in v0.10.2
func NewDropStrategy() *DropStrategy
NewDropStrategy creates drop strategy instance
func (*DropStrategy) GetStrategyName ¶ added in v0.10.2
func (ds *DropStrategy) GetStrategyName() string
GetStrategyName gets strategy name
func (*DropStrategy) Init ¶ added in v0.10.2
func (ds *DropStrategy) Init(stream *Stream, config types.PerformanceConfig) error
Init initializes drop strategy
func (*DropStrategy) ProcessData ¶ added in v0.10.2
func (ds *DropStrategy) ProcessData(data map[string]interface{})
ProcessData implements drop mode data processing
func (*DropStrategy) Stop ¶ added in v0.10.2
func (ds *DropStrategy) Stop() error
Stop stops and cleans up drop strategy resources
type ExpansionStrategy ¶ added in v0.10.2
type ExpansionStrategy struct {
// contains filtered or unexported fields
}
ExpansionStrategy expansion strategy implementation
func NewExpansionStrategy ¶ added in v0.10.2
func NewExpansionStrategy() *ExpansionStrategy
NewExpansionStrategy creates expansion strategy instance
func (*ExpansionStrategy) GetStrategyName ¶ added in v0.10.2
func (es *ExpansionStrategy) GetStrategyName() string
GetStrategyName gets strategy name
func (*ExpansionStrategy) Init ¶ added in v0.10.2
func (es *ExpansionStrategy) Init(stream *Stream, config types.PerformanceConfig) error
Init initializes expansion strategy
func (*ExpansionStrategy) ProcessData ¶ added in v0.10.2
func (es *ExpansionStrategy) ProcessData(data map[string]interface{})
ProcessData implements expansion mode data processing
func (*ExpansionStrategy) Stop ¶ added in v0.10.2
func (es *ExpansionStrategy) Stop() error
Stop stops and cleans up expansion strategy resources
type ResultHandler ¶ added in v0.10.2
type ResultHandler struct {
// contains filtered or unexported fields
}
ResultHandler handles result output and sink function calls
func NewResultHandler ¶ added in v0.10.2
func NewResultHandler(stream *Stream) *ResultHandler
NewResultHandler creates a new result handler
type StatsCollector ¶ added in v0.10.2
type StatsCollector struct {
// contains filtered or unexported fields
}
StatsCollector statistics information collector Provides thread-safe statistics collection functionality
func NewStatsCollector ¶ added in v0.10.2
func NewStatsCollector() *StatsCollector
NewStatsCollector creates a new statistics collector
func (*StatsCollector) GetBasicStats ¶ added in v0.10.2
func (sc *StatsCollector) GetBasicStats(dataChanLen, dataChanCap, resultChanLen, resultChanCap, sinkPoolLen, sinkPoolCap int, activeRetries, expanding int32) map[string]int64
GetBasicStats gets basic statistics information
func (*StatsCollector) GetDetailedStats ¶ added in v0.10.2
func (sc *StatsCollector) GetDetailedStats(basicStats map[string]int64) map[string]interface{}
GetDetailedStats gets detailed performance statistics
func (*StatsCollector) GetDroppedCount ¶ added in v0.10.2
func (sc *StatsCollector) GetDroppedCount() int64
GetDroppedCount gets dropped count
func (*StatsCollector) GetInputCount ¶ added in v0.10.2
func (sc *StatsCollector) GetInputCount() int64
GetInputCount gets input count
func (*StatsCollector) GetOutputCount ¶ added in v0.10.2
func (sc *StatsCollector) GetOutputCount() int64
GetOutputCount gets output count
func (*StatsCollector) IncrementDropped ¶ added in v0.10.2
func (sc *StatsCollector) IncrementDropped()
IncrementDropped increments dropped count
func (*StatsCollector) IncrementInput ¶ added in v0.10.2
func (sc *StatsCollector) IncrementInput()
IncrementInput increments input count
func (*StatsCollector) IncrementOutput ¶ added in v0.10.2
func (sc *StatsCollector) IncrementOutput()
IncrementOutput increments output count
func (*StatsCollector) Reset ¶ added in v0.10.2
func (sc *StatsCollector) Reset()
Reset resets statistics information
type StatsManager ¶ added in v0.10.2
type StatsManager struct {
// contains filtered or unexported fields
}
StatsManager manages statistics information
func NewStatsManager ¶ added in v0.10.2
func NewStatsManager(stream *Stream) *StatsManager
NewStatsManager creates a new statistics manager
type StrategyFactory ¶ added in v0.10.2
type StrategyFactory struct {
// contains filtered or unexported fields
}
StrategyFactory strategy factory Uses unified registration mechanism to manage all strategies (built-in and custom)
func NewStrategyFactory ¶ added in v0.10.2
func NewStrategyFactory() *StrategyFactory
NewStrategyFactory creates strategy factory instance Automatically registers all built-in strategies
func (*StrategyFactory) CreateStrategy ¶ added in v0.10.2
func (sf *StrategyFactory) CreateStrategy(strategyName string) (DataProcessingStrategy, error)
CreateStrategy creates corresponding strategy instance based on strategy name Parameters:
- strategyName: strategy name
Returns:
- DataProcessingStrategy: strategy instance
- error: error information
func (*StrategyFactory) GetRegisteredStrategies ¶ added in v0.10.2
func (sf *StrategyFactory) GetRegisteredStrategies() []string
GetRegisteredStrategies gets all registered strategy names Returns:
- []string: strategy name list
func (*StrategyFactory) RegisterStrategy ¶ added in v0.10.2
func (sf *StrategyFactory) RegisterStrategy(name string, constructor func() DataProcessingStrategy)
RegisterStrategy registers strategy to factory Parameters:
- name: strategy name
- constructor: strategy constructor function
func (*StrategyFactory) UnregisterStrategy ¶ added in v0.10.2
func (sf *StrategyFactory) UnregisterStrategy(name string)
UnregisterStrategy unregisters strategy Parameters:
- name: strategy name
type Stream ¶
func NewStreamWithCustomPerformance ¶
func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)
NewStreamWithCustomPerformance creates Stream with custom performance configuration
func NewStreamWithHighPerformance ¶
NewStreamWithHighPerformance creates high-performance Stream
func NewStreamWithLowLatency ¶
NewStreamWithLowLatency creates low-latency Stream
func (*Stream) AddSink ¶
AddSink adds a sink function Parameters:
- sink: result processing function that receives []map[string]interface{} type result data
func (*Stream) Emit ¶
Emit adds data to stream processing pipeline Parameters:
- data: data to be processed, must be map[string]interface{} type
func (*Stream) GetDetailedStats ¶
GetDetailedStats gets detailed performance statistics
func (*Stream) GetResultsChan ¶
GetResultsChan gets the result channel
func (*Stream) IsAggregationQuery ¶
IsAggregationQuery checks if current stream is an aggregation query
func (*Stream) ProcessSync ¶
ProcessSync synchronously processes single data, returns result immediately Only applicable to non-aggregation queries, aggregation queries will return error Parameters:
- data: data to be processed, must be map[string]interface{} type
Returns:
- map[string]interface{}: processed result data, returns nil if doesn't match filter condition
- error: processing error, returns error for aggregation queries
func (*Stream) RegisterFilter ¶
RegisterFilter registers filter condition, supporting backtick identifiers, LIKE syntax and IS NULL syntax
type StreamFactory ¶ added in v0.10.2
type StreamFactory struct{}
StreamFactory Stream factory responsible for creating different types of Streams
func NewStreamFactory ¶ added in v0.10.2
func NewStreamFactory() *StreamFactory
NewStreamFactory creates Stream factory
func (*StreamFactory) CreateCustomPerformanceStream ¶ added in v0.10.2
func (sf *StreamFactory) CreateCustomPerformanceStream(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)
CreateCustomPerformanceStream creates Stream with custom performance configuration
func (*StreamFactory) CreateHighPerformanceStream ¶ added in v0.10.2
func (sf *StreamFactory) CreateHighPerformanceStream(config types.Config) (*Stream, error)
CreateHighPerformanceStream creates high-performance Stream
func (*StreamFactory) CreateLowLatencyStream ¶ added in v0.10.2
func (sf *StreamFactory) CreateLowLatencyStream(config types.Config) (*Stream, error)
CreateLowLatencyStream creates low-latency Stream
func (*StreamFactory) CreateStream ¶ added in v0.10.2
func (sf *StreamFactory) CreateStream(config types.Config) (*Stream, error)
CreateStream creates Stream using unified configuration