Documentation
¶
Overview ¶
Package window provides windowing functionality for StreamSQL stream processing.
This package implements various types of windows for aggregating streaming data over time intervals or record counts. It supports tumbling, sliding, counting, and session windows with efficient memory management and concurrent processing.
Core Features ¶
• Multiple Window Types - Tumbling, Sliding, Counting, and Session windows • Time Management - Time-based window boundaries and event time processing • Trigger Mechanisms - Triggering based on time, count, or custom conditions • Memory Efficiency - Optimized data structures and memory management • Concurrent Processing - Thread-safe operations • Late Data Handling - Configurable policies for late-arriving data
Window Types ¶
Four distinct window types for different stream processing scenarios:
• Tumbling Windows - Non-overlapping, fixed-size time windows • Sliding Windows - Overlapping time windows with configurable slide interval • Counting Windows - Count-based windows that trigger after N records • Session Windows - Activity-based windows with configurable timeout
Window Interface ¶
All window types implement a unified Window interface:
type Window interface {
Add(row types.Row) error // Add data to window
Reset() error // Reset window state
Start() error // Start window processing
Stop() // Stop window operations and clean up resources
OutputChan() <-chan []types.Row // Get output channel
SetCallback(func([]types.Row)) // Set callback function
Trigger() error // Manual trigger
}
Tumbling Windows ¶
Non-overlapping time-based windows:
// Create tumbling window with processing time (default)
config := types.WindowConfig{
Type: "tumbling",
Params: []interface{}{"5s"}, // 5-second windows
TsProp: "timestamp",
TimeCharacteristic: types.ProcessingTime, // Uses system clock
}
window, err := NewTumblingWindow(config)
// Create tumbling window with event time
config := types.WindowConfig{
Type: "tumbling",
Params: []interface{}{"5s"}, // 5-second windows
TsProp: "timestamp",
TimeCharacteristic: types.EventTime, // Uses event timestamps
MaxOutOfOrderness: 2 * time.Second, // Allow 2 seconds of out-of-order data
WatermarkInterval: 200 * time.Millisecond, // Update watermark every 200ms
AllowedLateness: 1 * time.Second, // Allow 1 second of late data after window closes
}
window, err := NewTumblingWindow(config)
// Window characteristics:
// - Fixed size (e.g., 5 seconds)
// - No overlap between windows
// - Triggers at regular intervals (ProcessingTime) or based on watermark (EventTime)
// - Memory efficient
// - Suitable for periodic aggregations
// ProcessingTime example timeline (based on data arrival):
// Window 1: [00:00 - 00:05) - triggers when 5s elapsed from first data
// Window 2: [00:05 - 00:10) - triggers when next 5s elapsed
// Window 3: [00:10 - 00:15) - triggers when next 5s elapsed
// EventTime example timeline (based on event timestamps):
// Window 1: [00:00 - 00:05) - triggers when watermark >= 00:05
// Window 2: [00:05 - 00:10) - triggers when watermark >= 00:10
// Window 3: [00:10 - 00:15) - triggers when watermark >= 00:15
Sliding Windows ¶
Overlapping time-based windows with configurable slide interval:
// Create sliding window with processing time (default)
config := types.WindowConfig{
Type: "sliding",
Params: []interface{}{"30s", "10s"}, // 30-second window size, 10-second slide
TsProp: "timestamp",
TimeCharacteristic: types.ProcessingTime, // Uses system clock
}
window, err := NewSlidingWindow(config)
// Create sliding window with event time
config := types.WindowConfig{
Type: "sliding",
Params: []interface{}{"30s", "10s"}, // 30-second window size, 10-second slide
TsProp: "timestamp",
TimeCharacteristic: types.EventTime, // Uses event timestamps
MaxOutOfOrderness: 2 * time.Second, // Allow 2 seconds of out-of-order data
WatermarkInterval: 200 * time.Millisecond, // Update watermark every 200ms
}
window, err := NewSlidingWindow(config)
// Window characteristics:
// - Fixed size with configurable slide
// - Overlapping windows
// - More frequent updates
// - Higher memory usage
// - Suitable for smooth trend analysis
// ProcessingTime example timeline (30s window, 10s slide, based on data arrival):
// Window 1: [00:00 - 00:30) - triggers when 30s elapsed from first data
// Window 2: [00:10 - 00:40) - triggers 10s after Window 1
// Window 3: [00:20 - 00:50) - triggers 10s after Window 2
// EventTime example timeline (30s window, 10s slide, based on event timestamps):
// Window 1: [00:00 - 00:30) - triggers when watermark >= 00:30
// Window 2: [00:10 - 00:40) - triggers when watermark >= 00:40
// Window 3: [00:20 - 00:50) - triggers when watermark >= 00:50
Counting Windows ¶
Count-based windows that trigger after a specified number of records:
// Create counting window
config := types.WindowConfig{
Type: "counting",
Params: map[string]interface{}{
"count": 100, // Trigger every 100 records
},
}
window, err := NewCountingWindow(config)
// Window characteristics:
// - Fixed record count
// - Time-independent
// - Predictable memory usage
// - Suitable for batch processing
// - Handles variable data rates
// Example:
// Window 1: Records 1-100
// Window 2: Records 101-200
// Window 3: Records 201-300
Session Windows ¶
Activity-based windows with configurable session timeout:
// Create session window
config := types.WindowConfig{
Type: "session",
Params: map[string]interface{}{
"timeout": "5m", // 5-minute session timeout
},
GroupByKey: "user_id", // Group sessions by user
}
window, err := NewSessionWindow(config)
// Window characteristics:
// - Variable window size
// - Activity-based triggers
// - Per-group session tracking
// - Automatic session expiration
// - Suitable for user behavior analysis
// Example (5-minute timeout):
// User A: [10:00 - 10:15) - 15-minute session
// User B: [10:05 - 10:08) - 3-minute session
// User A: [10:20 - 10:25) - New 5-minute session
Window Factory ¶
Centralized window creation:
func CreateWindow(config types.WindowConfig) (Window, error)
Time Management ¶
Time handling for window operations:
func GetTimestamp(data interface{}, timeField string) (time.Time, error)
type TimeSlot struct {
Start time.Time
End time.Time
Duration time.Duration
}
Time Characteristics ¶
Windows support two time characteristics:
## ProcessingTime (Default) - Uses system clock for window operations - Windows trigger based on when data arrives - Cannot handle out-of-order data - Lower latency, but results may be inconsistent - Suitable for real-time monitoring and low-latency requirements
## EventTime - Uses event timestamps for window operations - Windows trigger based on event time via watermark mechanism - Can handle out-of-order and late-arriving data - Consistent results, but may have higher latency - Suitable for accurate time-based analysis and historical data processing
## Watermark Mechanism For EventTime windows, watermark indicates that no events with timestamp less than watermark time are expected: - Watermark = max(event_time) - max_out_of_orderness - Windows trigger when watermark >= window_end_time - Late data (before watermark) can be detected and handled specially
## Allowed Lateness For EventTime windows, `allowedLateness` allows windows to accept late data after they have been triggered: - When watermark >= window_end, window triggers and outputs result - Window remains open until watermark >= window_end + allowedLateness - Late data arriving within allowedLateness triggers delayed updates (window fires again) - After allowedLateness expires, window closes and late data is ignored - Default: 0 (no late data accepted after window closes)
Example: - Window [00:00 - 00:05) triggers when watermark >= 00:05 - With allowedLateness = 2s, window stays open until watermark >= 00:07 - Late data with timestamp in [00:00 - 00:05) arriving before watermark >= 00:07 triggers delayed update - After watermark >= 00:07, window closes and late data is ignored
## Idle Source Mechanism For EventTime windows, `idleTimeout` enables watermark advancement based on processing time when the data source is idle: - Normally: Watermark advances based on event time (Watermark = max(event_time) - maxOutOfOrderness) - When idle: If no data arrives within idleTimeout, watermark advances based on processing time - This ensures windows can close even when the data source stops sending data - Prevents memory leaks from windows that never close - Default: 0 (disabled, watermark only advances based on event time)
Example: - Window [00:00 - 00:05) has data with max event time = 00:02 - Data source stops sending data at 00:03 - With idleTimeout = 5s, after 5 seconds of no data (at 00:08), watermark advances based on processing time - Watermark = currentProcessingTime - maxOutOfOrderness = 00:08 - 1s = 00:07 - Window [00:00 - 00:05) can trigger (watermark >= 00:05) and close
Performance Features ¶
• Memory Management - Efficient buffer management and garbage collection • Concurrency - Thread-safe operations with minimal locking • Time Efficiency - Optimized timestamp processing and timer management
Usage Examples ¶
Basic tumbling window:
config := types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": "10s"},
TsProp: "timestamp",
}
window, err := CreateWindow(config)
window.SetCallback(func(results []types.Row) {
fmt.Printf("Window results: %d records\n", len(results))
})
window.Start()
Sliding window:
config := types.WindowConfig{
Type: "sliding",
Params: map[string]interface{}{
"size": "1m",
"slide": "10s",
},
TsProp: "event_time",
}
window, err := NewSlidingWindow(config)
Session window:
config := types.WindowConfig{
Type: "session",
Params: map[string]interface{}{"timeout": "30m"},
GroupByKey: "user_id",
}
window, err := NewSessionWindow(config)
Integration ¶
Integrates with other StreamSQL components:
• Stream package - Stream processing and data flow • RSQL package - SQL-based window definitions • Functions package - Aggregation functions for window results • Types package - Shared data types and configuration
Index ¶
- Constants
- Variables
- func GetTimestamp(data interface{}, tsProp string, timeUnit time.Duration) time.Time
- type CountingWindow
- func (cw *CountingWindow) Add(data interface{})
- func (cw *CountingWindow) GetStats() map[string]int64
- func (cw *CountingWindow) OutputChan() <-chan []types.Row
- func (cw *CountingWindow) Reset()
- func (cw *CountingWindow) SetCallback(callback func([]types.Row))
- func (cw *CountingWindow) Start()
- func (cw *CountingWindow) Stop()
- func (cw *CountingWindow) Trigger()
- type SessionWindow
- func (sw *SessionWindow) Add(data interface{})
- func (sw *SessionWindow) GetStats() map[string]int64
- func (sw *SessionWindow) OutputChan() <-chan []types.Row
- func (sw *SessionWindow) Reset()
- func (sw *SessionWindow) ResetStats()
- func (sw *SessionWindow) SetCallback(callback func([]types.Row))
- func (sw *SessionWindow) Start()
- func (sw *SessionWindow) Stop()
- func (sw *SessionWindow) Trigger()
- type SlidingWindow
- func (sw *SlidingWindow) Add(data interface{})
- func (sw *SlidingWindow) GetStats() map[string]int64
- func (sw *SlidingWindow) NextSlot() *types.TimeSlot
- func (sw *SlidingWindow) OutputChan() <-chan []types.Row
- func (sw *SlidingWindow) Reset()
- func (sw *SlidingWindow) ResetStats()
- func (sw *SlidingWindow) SetCallback(callback func([]types.Row))
- func (sw *SlidingWindow) Start()
- func (sw *SlidingWindow) Stop()
- func (sw *SlidingWindow) Trigger()
- type TimedData
- type TumblingWindow
- func (tw *TumblingWindow) Add(data interface{})
- func (tw *TumblingWindow) GetStats() map[string]int64
- func (tw *TumblingWindow) NextSlot() *types.TimeSlot
- func (tw *TumblingWindow) OutputChan() <-chan []types.Row
- func (tw *TumblingWindow) Reset()
- func (tw *TumblingWindow) ResetStats()
- func (tw *TumblingWindow) SetCallback(callback func([]types.Row))
- func (tw *TumblingWindow) Start()
- func (tw *TumblingWindow) Stop()
- func (tw *TumblingWindow) Trigger()
- type Watermark
- type Window
Constants ¶
const ( TypeTumbling = "tumbling" TypeSliding = "sliding" TypeCounting = "counting" TypeSession = "session" )
Variables ¶
var EnableDebug = false
EnableDebug enables debug logging for window operations
Functions ¶
Types ¶
type CountingWindow ¶
type CountingWindow struct {
// contains filtered or unexported fields
}
func NewCountingWindow ¶
func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error)
func (*CountingWindow) Add ¶
func (cw *CountingWindow) Add(data interface{})
func (*CountingWindow) GetStats ¶ added in v0.10.4
func (cw *CountingWindow) GetStats() map[string]int64
func (*CountingWindow) OutputChan ¶
func (cw *CountingWindow) OutputChan() <-chan []types.Row
func (*CountingWindow) Reset ¶
func (cw *CountingWindow) Reset()
func (*CountingWindow) SetCallback ¶
func (cw *CountingWindow) SetCallback(callback func([]types.Row))
func (*CountingWindow) Start ¶
func (cw *CountingWindow) Start()
func (*CountingWindow) Stop ¶ added in v0.10.4
func (cw *CountingWindow) Stop()
func (*CountingWindow) Trigger ¶
func (cw *CountingWindow) Trigger()
type SessionWindow ¶
type SessionWindow struct {
// contains filtered or unexported fields
}
SessionWindow represents a session window Session window is an event-time based window that closes when no events arrive for a period of time
func NewSessionWindow ¶
func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error)
NewSessionWindow creates a new session window instance
func (*SessionWindow) Add ¶
func (sw *SessionWindow) Add(data interface{})
Add adds data to session window
func (*SessionWindow) GetStats ¶ added in v0.10.6
func (sw *SessionWindow) GetStats() map[string]int64
GetStats returns window performance statistics
func (*SessionWindow) OutputChan ¶
func (sw *SessionWindow) OutputChan() <-chan []types.Row
OutputChan returns a read-only channel for receiving data when window triggers
func (*SessionWindow) ResetStats ¶ added in v0.10.6
func (sw *SessionWindow) ResetStats()
ResetStats resets performance statistics
func (*SessionWindow) SetCallback ¶
func (sw *SessionWindow) SetCallback(callback func([]types.Row))
SetCallback sets the callback function when session window triggers
func (*SessionWindow) Start ¶
func (sw *SessionWindow) Start()
Start starts the session window's periodic check mechanism Start starts the session window, begins periodic checking of expired sessions Uses lazy initialization mode to avoid infinite waiting when no data, while ensuring subsequent data can be processed normally
func (*SessionWindow) Trigger ¶
func (sw *SessionWindow) Trigger()
Trigger manually triggers all session windows
type SlidingWindow ¶
type SlidingWindow struct {
// contains filtered or unexported fields
}
SlidingWindow represents a sliding window for processing data within time ranges
func NewSlidingWindow ¶
func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error)
NewSlidingWindow creates a new sliding window instance size parameter represents the total window size, slide represents the sliding interval
func (*SlidingWindow) Add ¶
func (sw *SlidingWindow) Add(data interface{})
Add adds data to the sliding window
func (*SlidingWindow) GetStats ¶
func (sw *SlidingWindow) GetStats() map[string]int64
GetStats returns window performance statistics
func (*SlidingWindow) NextSlot ¶
func (sw *SlidingWindow) NextSlot() *types.TimeSlot
func (*SlidingWindow) OutputChan ¶
func (sw *SlidingWindow) OutputChan() <-chan []types.Row
OutputChan returns the sliding window's output channel
func (*SlidingWindow) Reset ¶
func (sw *SlidingWindow) Reset()
Reset resets the sliding window and clears window data
func (*SlidingWindow) ResetStats ¶
func (sw *SlidingWindow) ResetStats()
ResetStats resets performance statistics
func (*SlidingWindow) SetCallback ¶
func (sw *SlidingWindow) SetCallback(callback func([]types.Row))
SetCallback sets the callback function to execute when sliding window triggers
func (*SlidingWindow) Start ¶
func (sw *SlidingWindow) Start()
Start starts the sliding window with periodic triggering Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally First window triggers when it ends, then subsequent windows trigger at slide intervals
func (*SlidingWindow) Stop ¶
func (sw *SlidingWindow) Stop()
Stop stops the sliding window operations
func (*SlidingWindow) Trigger ¶
func (sw *SlidingWindow) Trigger()
Trigger triggers the sliding window to process data within the window For ProcessingTime: called by timer For EventTime: called by watermark updates
type TumblingWindow ¶
type TumblingWindow struct {
// contains filtered or unexported fields
}
TumblingWindow represents a tumbling window for collecting data and triggering processing at fixed time intervals
func NewTumblingWindow ¶
func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error)
NewTumblingWindow creates a new tumbling window instance Parameter size is the time size of the window
func (*TumblingWindow) Add ¶
func (tw *TumblingWindow) Add(data interface{})
Add adds data to the tumbling window
func (*TumblingWindow) GetStats ¶
func (tw *TumblingWindow) GetStats() map[string]int64
GetStats returns window performance statistics
func (*TumblingWindow) NextSlot ¶
func (tw *TumblingWindow) NextSlot() *types.TimeSlot
func (*TumblingWindow) OutputChan ¶
func (tw *TumblingWindow) OutputChan() <-chan []types.Row
OutputChan returns a read-only channel for receiving data when window triggers
func (*TumblingWindow) ResetStats ¶
func (tw *TumblingWindow) ResetStats()
ResetStats resets performance statistics
func (*TumblingWindow) SetCallback ¶
func (tw *TumblingWindow) SetCallback(callback func([]types.Row))
SetCallback sets the callback function to execute when tumbling window triggers
func (*TumblingWindow) Start ¶
func (tw *TumblingWindow) Start()
Start starts the tumbling window's periodic trigger mechanism Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally
func (*TumblingWindow) Stop ¶
func (tw *TumblingWindow) Stop()
Stop stops tumbling window operations
func (*TumblingWindow) Trigger ¶
func (tw *TumblingWindow) Trigger()
Trigger triggers the tumbling window's processing logic For ProcessingTime: called by timer For EventTime: called by watermark updates
type Watermark ¶ added in v0.10.4
type Watermark struct {
// contains filtered or unexported fields
}
Watermark represents a watermark for event time processing Watermark indicates that no events with timestamp less than watermark time are expected
func NewWatermark ¶ added in v0.10.4
func NewWatermark(maxOutOfOrderness time.Duration, updateInterval time.Duration, idleTimeout time.Duration) *Watermark
NewWatermark creates a new watermark manager
func (*Watermark) GetCurrentWatermark ¶ added in v0.10.4
GetCurrentWatermark returns the current watermark time
func (*Watermark) IsEventTimeLate ¶ added in v0.10.4
IsEventTimeLate checks if an event time is late (before current watermark)
func (*Watermark) Stop ¶ added in v0.10.4
func (wm *Watermark) Stop()
Stop stops the watermark manager
func (*Watermark) UpdateEventTime ¶ added in v0.10.4
UpdateEventTime updates the maximum event time seen
func (*Watermark) WatermarkChan ¶ added in v0.10.4
WatermarkChan returns a channel for receiving watermark updates
type Window ¶
type Window interface {
Add(item interface{})
//GetResults() []interface{}
Reset()
Start()
Stop() // Stop window operations and clean up resources
OutputChan() <-chan []types.Row
SetCallback(callback func([]types.Row))
Trigger()
GetStats() map[string]int64
}
func CreateWindow ¶
func CreateWindow(config types.WindowConfig) (Window, error)