stream

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package stream provides the core stream processing engine for StreamSQL.

This package implements the central stream processing pipeline that orchestrates data flow, window management, aggregation, filtering, and result generation. It serves as the execution engine that brings together all StreamSQL components into a cohesive streaming system.

Core Features

• Real-time Stream Processing - High-throughput, low-latency data stream processing • Window Management - Integration with all window types (tumbling, sliding, counting, session) • Aggregation Engine - Efficient aggregation with incremental computation • Filtering Pipeline - Multi-stage filtering with WHERE and HAVING clause support • Performance Modes - Configurable performance profiles for different use cases • Metrics and Monitoring - Comprehensive performance metrics and health monitoring • Persistence Support - Optional data persistence for reliability and recovery • Backpressure Handling - Intelligent backpressure management and overflow strategies

Stream Architecture

The stream processing pipeline consists of several key components:

type Stream struct {
	dataChan       chan map[string]interface{}  // Input data channel
	filter         condition.Condition          // WHERE clause filter
	Window         window.Window                // Window manager
	aggregator     aggregator.Aggregator        // Aggregation engine
	config         types.Config                 // Stream configuration
	sinks          []func([]map[string]interface{}) // Result processors
	resultChan     chan []map[string]interface{} // Result channel

	dataStrategy   DataProcessingStrategy       // Data processing strategy
}

Performance Modes

Configurable performance profiles for different scenarios:

// High Performance Mode
// - Optimized for maximum throughput
// - Larger buffer sizes
// - Batch processing optimization
stream := NewStreamWithHighPerformance(config)

// Low Latency Mode
// - Optimized for minimal processing delay
// - Smaller buffer sizes
// - Immediate processing
stream := NewStreamWithLowLatency(config)

// Zero Data Loss Mode
// - Guaranteed data persistence
// - Synchronous processing
// - Enhanced error recovery
stream := NewStreamWithZeroDataLoss(config)

// Custom Performance Mode
// - User-defined performance parameters
customConfig := &PerformanceConfig{
	BufferSize:     1000,
	BatchSize:      50,
	FlushInterval:  time.Second,
	WorkerPoolSize: 4,
}
stream := NewStreamWithCustomPerformance(config, *customConfig)

Data Processing Pipeline

Multi-stage processing pipeline with optimized data flow:

  1. Data Ingestion ├── Input validation and type checking ├── Timestamp extraction and normalization └── Initial data transformation

  2. Filtering (WHERE clause) ├── Field-based filtering ├── Expression evaluation └── Early data rejection

  3. Window Processing ├── Window assignment ├── Data buffering └── Window trigger management

  4. Aggregation ├── Group-by processing ├── Aggregate function execution └── Incremental computation

  5. Post-Aggregation Filtering (HAVING clause) ├── Aggregate result filtering ├── Complex condition evaluation └── Final result validation

  6. Result Generation ├── Field projection ├── Alias application └── Output formatting

Window Integration

Seamless integration with all window types:

// Tumbling Windows - Non-overlapping time-based windows
config.WindowConfig = WindowConfig{
	Type: "tumbling",
	Params: map[string]interface{}{
		"size": "5s",
	},
}

// Sliding Windows - Overlapping time-based windows
config.WindowConfig = WindowConfig{
	Type: "sliding",
	Params: map[string]interface{}{
		"size": "30s",
		"slide": "10s",
	},
}

// Counting Windows - Count-based windows
config.WindowConfig = WindowConfig{
	Type: "counting",
	Params: map[string]interface{}{
		"count": 100,
	},
}

// Session Windows - Activity-based windows
config.WindowConfig = WindowConfig{
	Type: "session",
	Params: map[string]interface{}{
		"timeout": "5m",
		"groupBy": "user_id",
	},
}

Metrics and Monitoring

Comprehensive performance monitoring:

type MetricsManager struct {
	processedCount    int64     // Total processed records
	filteredCount     int64     // Filtered out records
	aggregatedCount   int64     // Aggregated records
	errorCount        int64     // Processing errors
	processingTime    time.Duration // Average processing time
	throughput        float64   // Records per second
	memoryUsage       int64     // Memory consumption
	bufferUtilization float64   // Buffer usage percentage
}

// Get basic statistics
stats := stream.GetStats()
fmt.Printf("Processed: %d, Errors: %d\n", stats["processed"], stats["errors"])

// Get detailed performance metrics
detailed := stream.GetDetailedStats()
fmt.Printf("Throughput: %.2f records/sec\n", detailed["throughput"])
fmt.Printf("Memory Usage: %d bytes\n", detailed["memory_usage"])

Backpressure Management

Intelligent handling of system overload:

// Overflow strategies
const (
	OverflowStrategyDrop     = "drop"     // Drop oldest data
	OverflowStrategyBlock    = "block"    // Block new data
	OverflowStrategySpill    = "spill"    // Spill to disk
	OverflowStrategyCompress = "compress" // Compress data
)

// Configure backpressure handling
config.PerformanceConfig.OverflowStrategy = OverflowStrategySpill
config.PerformanceConfig.BufferSize = 10000
config.PerformanceConfig.HighWaterMark = 0.8

Usage Examples

Basic stream processing:

// Create stream with default configuration
stream, err := NewStream(config)
if err != nil {
	log.Fatal(err)
}

// Register result handler
stream.AddSink(func(results []map[string]interface{}) {
	fmt.Printf("Results: %v\n", results)
})

// Start processing
stream.Start()

// Send data
stream.Emit(map[string]interface{}{
	"device_id":   "sensor001",
	"temperature": 25.5,
	"timestamp":   time.Now(),
})

High-performance stream processing:

// Create high-performance stream
stream, err := NewStreamWithHighPerformance(config)

// Configure for maximum throughput
stream.SetBufferSize(50000)
stream.SetBatchSize(1000)
stream.SetWorkerPoolSize(8)

// Enable metrics monitoring
stream.EnableMetrics(true)

// Process data in batches
for _, batch := range dataBatches {
	stream.EmitBatch(batch)
}

Synchronous processing for non-aggregation queries:

// Process single record synchronously
result, err := stream.ProcessSync(data)
if err != nil {
	log.Printf("Processing error: %v", err)
} else if result != nil {
	fmt.Printf("Immediate result: %v\n", result)
}

Integration

Central integration point for all StreamSQL components:

• RSQL package - Configuration parsing and application • Window package - Window lifecycle management • Aggregator package - Aggregation execution • Functions package - Function execution in expressions • Condition package - Filter condition evaluation • Types package - Data type handling and configuration • Logger package - Comprehensive logging and debugging

Index

Constants

View Source
const (
	InputCount    = "input_count"
	OutputCount   = "output_count"
	DroppedCount  = "dropped_count"
	DataChanLen   = "data_chan_len"
	DataChanCap   = "data_chan_cap"
	ResultChanLen = "result_chan_len"
	ResultChanCap = "result_chan_cap"
	SinkPoolLen   = "sink_pool_len"
	SinkPoolCap   = "sink_pool_cap"
	ActiveRetries = "active_retries"
	Expanding     = "expanding"
)

Statistics field constants

View Source
const (
	BasicStats       = "basic_stats"
	DataChanUsage    = "data_chan_usage"
	ResultChanUsage  = "result_chan_usage"
	SinkPoolUsage    = "sink_pool_usage"
	ProcessRate      = "process_rate"
	DropRate         = "drop_rate"
	PerformanceLevel = "performance_level"
)

Detailed statistics field constants

View Source
const (
	StrategyDrop   = "drop"   // Drop strategy
	StrategyBlock  = "block"  // Blocking strategy
	StrategyExpand = "expand" // Dynamic strategy
)

Overflow strategy constants

View Source
const (
	WindowStartField = "window_start"
	WindowEndField   = "window_end"
)

Window related constants

View Source
const (
	PerformanceLevelCritical     = "CRITICAL"
	PerformanceLevelWarning      = "WARNING"
	PerformanceLevelHighLoad     = "HIGH_LOAD"
	PerformanceLevelModerateLoad = "MODERATE_LOAD"
	PerformanceLevelOptimal      = "OPTIMAL"
)

Performance level constants

View Source
const (
	PerformanceConfigKey = "performanceConfig"
)
View Source
const (
	SQLKeywordCase = "CASE"
)

SQL keyword constants

Variables

This section is empty.

Functions

func AssessPerformanceLevel added in v0.10.2

func AssessPerformanceLevel(dataUsage, dropRate float64) string

AssessPerformanceLevel evaluates current performance level Assesses stream processing performance level based on data usage rate and drop rate

Types

type BlockingStrategy added in v0.10.2

type BlockingStrategy struct {
	// contains filtered or unexported fields
}

BlockingStrategy blocking strategy implementation

func NewBlockingStrategy added in v0.10.2

func NewBlockingStrategy() *BlockingStrategy

NewBlockingStrategy creates blocking strategy instance

func (*BlockingStrategy) GetStrategyName added in v0.10.2

func (bs *BlockingStrategy) GetStrategyName() string

GetStrategyName gets strategy name

func (*BlockingStrategy) Init added in v0.10.2

func (bs *BlockingStrategy) Init(stream *Stream, config types.PerformanceConfig) error

Init initializes blocking strategy

func (*BlockingStrategy) ProcessData added in v0.10.2

func (bs *BlockingStrategy) ProcessData(data map[string]interface{})

ProcessData implements blocking mode data processing

func (*BlockingStrategy) Stop added in v0.10.2

func (bs *BlockingStrategy) Stop() error

Stop stops and cleans up blocking strategy resources

type DataHandler added in v0.10.2

type DataHandler struct {
	// contains filtered or unexported fields
}

DataHandler handles data processing for different strategies

func NewDataHandler added in v0.10.2

func NewDataHandler(stream *Stream) *DataHandler

NewDataHandler creates a new data handler

type DataProcessingStrategy added in v0.10.2

type DataProcessingStrategy interface {
	// ProcessData core method for processing data
	// Parameters:
	//   - data: data to process, must be map[string]interface{} type
	ProcessData(data map[string]interface{})

	// GetStrategyName gets strategy name
	GetStrategyName() string

	// Init initializes strategy
	// Parameters:
	//   - stream: Stream instance reference
	//   - config: performance configuration
	Init(stream *Stream, config types.PerformanceConfig) error

	// Stop stops and cleans up resources
	Stop() error
}

DataProcessingStrategy data processing strategy interface Defines unified interface for different overflow strategies, providing better extensibility and maintainability

type DataProcessor added in v0.10.2

type DataProcessor struct {
	// contains filtered or unexported fields
}

DataProcessor data processor responsible for processing data streams

func NewDataProcessor added in v0.10.2

func NewDataProcessor(stream *Stream) *DataProcessor

NewDataProcessor creates a data processor

func (*DataProcessor) Process added in v0.10.2

func (dp *DataProcessor) Process()

Process main processing loop

type DropStrategy added in v0.10.2

type DropStrategy struct {
	// contains filtered or unexported fields
}

DropStrategy drop strategy implementation

func NewDropStrategy added in v0.10.2

func NewDropStrategy() *DropStrategy

NewDropStrategy creates drop strategy instance

func (*DropStrategy) GetStrategyName added in v0.10.2

func (ds *DropStrategy) GetStrategyName() string

GetStrategyName gets strategy name

func (*DropStrategy) Init added in v0.10.2

func (ds *DropStrategy) Init(stream *Stream, config types.PerformanceConfig) error

Init initializes drop strategy

func (*DropStrategy) ProcessData added in v0.10.2

func (ds *DropStrategy) ProcessData(data map[string]interface{})

ProcessData implements drop mode data processing

func (*DropStrategy) Stop added in v0.10.2

func (ds *DropStrategy) Stop() error

Stop stops and cleans up drop strategy resources

type ExpansionStrategy added in v0.10.2

type ExpansionStrategy struct {
	// contains filtered or unexported fields
}

ExpansionStrategy expansion strategy implementation

func NewExpansionStrategy added in v0.10.2

func NewExpansionStrategy() *ExpansionStrategy

NewExpansionStrategy creates expansion strategy instance

func (*ExpansionStrategy) GetStrategyName added in v0.10.2

func (es *ExpansionStrategy) GetStrategyName() string

GetStrategyName gets strategy name

func (*ExpansionStrategy) Init added in v0.10.2

func (es *ExpansionStrategy) Init(stream *Stream, config types.PerformanceConfig) error

Init initializes expansion strategy

func (*ExpansionStrategy) ProcessData added in v0.10.2

func (es *ExpansionStrategy) ProcessData(data map[string]interface{})

ProcessData implements expansion mode data processing

func (*ExpansionStrategy) Stop added in v0.10.2

func (es *ExpansionStrategy) Stop() error

Stop stops and cleans up expansion strategy resources

type ResultHandler added in v0.10.2

type ResultHandler struct {
	// contains filtered or unexported fields
}

ResultHandler handles result output and sink function calls

func NewResultHandler added in v0.10.2

func NewResultHandler(stream *Stream) *ResultHandler

NewResultHandler creates a new result handler

type StatsCollector added in v0.10.2

type StatsCollector struct {
	// contains filtered or unexported fields
}

StatsCollector statistics information collector Provides thread-safe statistics collection functionality

func NewStatsCollector added in v0.10.2

func NewStatsCollector() *StatsCollector

NewStatsCollector creates a new statistics collector

func (*StatsCollector) GetBasicStats added in v0.10.2

func (sc *StatsCollector) GetBasicStats(dataChanLen, dataChanCap, resultChanLen, resultChanCap, sinkPoolLen, sinkPoolCap int, activeRetries, expanding int32) map[string]int64

GetBasicStats gets basic statistics information

func (*StatsCollector) GetDetailedStats added in v0.10.2

func (sc *StatsCollector) GetDetailedStats(basicStats map[string]int64) map[string]interface{}

GetDetailedStats gets detailed performance statistics

func (*StatsCollector) GetDroppedCount added in v0.10.2

func (sc *StatsCollector) GetDroppedCount() int64

GetDroppedCount gets dropped count

func (*StatsCollector) GetInputCount added in v0.10.2

func (sc *StatsCollector) GetInputCount() int64

GetInputCount gets input count

func (*StatsCollector) GetOutputCount added in v0.10.2

func (sc *StatsCollector) GetOutputCount() int64

GetOutputCount gets output count

func (*StatsCollector) IncrementDropped added in v0.10.2

func (sc *StatsCollector) IncrementDropped()

IncrementDropped increments dropped count

func (*StatsCollector) IncrementInput added in v0.10.2

func (sc *StatsCollector) IncrementInput()

IncrementInput increments input count

func (*StatsCollector) IncrementOutput added in v0.10.2

func (sc *StatsCollector) IncrementOutput()

IncrementOutput increments output count

func (*StatsCollector) Reset added in v0.10.2

func (sc *StatsCollector) Reset()

Reset resets statistics information

type StatsManager added in v0.10.2

type StatsManager struct {
	// contains filtered or unexported fields
}

StatsManager manages statistics information

func NewStatsManager added in v0.10.2

func NewStatsManager(stream *Stream) *StatsManager

NewStatsManager creates a new statistics manager

type StrategyFactory added in v0.10.2

type StrategyFactory struct {
	// contains filtered or unexported fields
}

StrategyFactory strategy factory Uses unified registration mechanism to manage all strategies (built-in and custom)

func NewStrategyFactory added in v0.10.2

func NewStrategyFactory() *StrategyFactory

NewStrategyFactory creates strategy factory instance Automatically registers all built-in strategies

func (*StrategyFactory) CreateStrategy added in v0.10.2

func (sf *StrategyFactory) CreateStrategy(strategyName string) (DataProcessingStrategy, error)

CreateStrategy creates corresponding strategy instance based on strategy name Parameters:

  • strategyName: strategy name

Returns:

  • DataProcessingStrategy: strategy instance
  • error: error information

func (*StrategyFactory) GetRegisteredStrategies added in v0.10.2

func (sf *StrategyFactory) GetRegisteredStrategies() []string

GetRegisteredStrategies gets all registered strategy names Returns:

  • []string: strategy name list

func (*StrategyFactory) RegisterStrategy added in v0.10.2

func (sf *StrategyFactory) RegisterStrategy(name string, constructor func() DataProcessingStrategy)

RegisterStrategy registers strategy to factory Parameters:

  • name: strategy name
  • constructor: strategy constructor function

func (*StrategyFactory) UnregisterStrategy added in v0.10.2

func (sf *StrategyFactory) UnregisterStrategy(name string)

UnregisterStrategy unregisters strategy Parameters:

  • name: strategy name

type Stream

type Stream struct {
	Window window.Window
	// contains filtered or unexported fields
}

func NewStream

func NewStream(config types.Config) (*Stream, error)

NewStream creates Stream using unified configuration

func NewStreamWithCustomPerformance

func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)

NewStreamWithCustomPerformance creates Stream with custom performance configuration

func NewStreamWithHighPerformance

func NewStreamWithHighPerformance(config types.Config) (*Stream, error)

NewStreamWithHighPerformance creates high-performance Stream

func NewStreamWithLowLatency

func NewStreamWithLowLatency(config types.Config) (*Stream, error)

NewStreamWithLowLatency creates low-latency Stream

func (*Stream) AddSink

func (s *Stream) AddSink(sink func([]map[string]interface{}))

AddSink adds a sink function Parameters:

  • sink: result processing function that receives []map[string]interface{} type result data

func (*Stream) Emit

func (s *Stream) Emit(data map[string]interface{})

Emit adds data to stream processing pipeline Parameters:

  • data: data to be processed, must be map[string]interface{} type

func (*Stream) GetDetailedStats

func (s *Stream) GetDetailedStats() map[string]interface{}

GetDetailedStats gets detailed performance statistics

func (*Stream) GetResultsChan

func (s *Stream) GetResultsChan() <-chan []map[string]interface{}

GetResultsChan gets the result channel

func (*Stream) GetStats

func (s *Stream) GetStats() map[string]int64

GetStats gets stream processing statistics (thread-safe version)

func (*Stream) IsAggregationQuery

func (s *Stream) IsAggregationQuery() bool

IsAggregationQuery checks if current stream is an aggregation query

func (*Stream) ProcessSync

func (s *Stream) ProcessSync(data map[string]interface{}) (map[string]interface{}, error)

ProcessSync synchronously processes single data, returns result immediately Only applicable to non-aggregation queries, aggregation queries will return error Parameters:

  • data: data to be processed, must be map[string]interface{} type

Returns:

  • map[string]interface{}: processed result data, returns nil if doesn't match filter condition
  • error: processing error, returns error for aggregation queries

func (*Stream) RegisterFilter

func (s *Stream) RegisterFilter(conditionStr string) error

RegisterFilter registers filter condition, supporting backtick identifiers, LIKE syntax and IS NULL syntax

func (*Stream) ResetStats

func (s *Stream) ResetStats()

ResetStats resets statistics information

func (*Stream) Start

func (s *Stream) Start()

func (*Stream) Stop

func (s *Stream) Stop()

Stop stops stream processing

type StreamFactory added in v0.10.2

type StreamFactory struct{}

StreamFactory Stream factory responsible for creating different types of Streams

func NewStreamFactory added in v0.10.2

func NewStreamFactory() *StreamFactory

NewStreamFactory creates Stream factory

func (*StreamFactory) CreateCustomPerformanceStream added in v0.10.2

func (sf *StreamFactory) CreateCustomPerformanceStream(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)

CreateCustomPerformanceStream creates Stream with custom performance configuration

func (*StreamFactory) CreateHighPerformanceStream added in v0.10.2

func (sf *StreamFactory) CreateHighPerformanceStream(config types.Config) (*Stream, error)

CreateHighPerformanceStream creates high-performance Stream

func (*StreamFactory) CreateLowLatencyStream added in v0.10.2

func (sf *StreamFactory) CreateLowLatencyStream(config types.Config) (*Stream, error)

CreateLowLatencyStream creates low-latency Stream

func (*StreamFactory) CreateStream added in v0.10.2

func (sf *StreamFactory) CreateStream(config types.Config) (*Stream, error)

CreateStream creates Stream using unified configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL