Documentation
¶
Overview ¶
Package window provides windowing functionality for StreamSQL stream processing.
This package implements various types of windows for aggregating streaming data over time intervals or record counts. It supports tumbling, sliding, counting, and session windows with efficient memory management and concurrent processing.
Core Features ¶
• Multiple Window Types - Tumbling, Sliding, Counting, and Session windows • Time Management - Time-based window boundaries and event time processing • Trigger Mechanisms - Triggering based on time, count, or custom conditions • Memory Efficiency - Optimized data structures and memory management • Concurrent Processing - Thread-safe operations • Late Data Handling - Configurable policies for late-arriving data
Window Types ¶
Four distinct window types for different stream processing scenarios:
• Tumbling Windows - Non-overlapping, fixed-size time windows • Sliding Windows - Overlapping time windows with configurable slide interval • Counting Windows - Count-based windows that trigger after N records • Session Windows - Activity-based windows with configurable timeout
Window Interface ¶
All window types implement a unified Window interface:
type Window interface {
Add(row types.Row) error // Add data to window
Reset() error // Reset window state
Start() error // Start window processing
OutputChan() <-chan []types.Row // Get output channel
SetCallback(func([]types.Row)) // Set callback function
Trigger() error // Manual trigger
}
Tumbling Windows ¶
Non-overlapping time-based windows:
// Create tumbling window
config := types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{
"size": "5s", // 5-second windows
},
TsProp: "timestamp",
}
window, err := NewTumblingWindow(config)
// Window characteristics:
// - Fixed size (e.g., 5 seconds)
// - No overlap between windows
// - Triggers at regular intervals
// - Memory efficient
// - Suitable for periodic aggregations
// Example timeline:
// Window 1: [00:00 - 00:05)
// Window 2: [00:05 - 00:10)
// Window 3: [00:10 - 00:15)
Sliding Windows ¶
Overlapping time-based windows with configurable slide interval:
// Create sliding window
config := types.WindowConfig{
Type: "sliding",
Params: map[string]interface{}{
"size": "30s", // 30-second window size
"slide": "10s", // 10-second slide interval
},
TsProp: "timestamp",
}
window, err := NewSlidingWindow(config)
// Window characteristics:
// - Fixed size with configurable slide
// - Overlapping windows
// - More frequent updates
// - Higher memory usage
// - Suitable for smooth trend analysis
// Example timeline (30s window, 10s slide):
// Window 1: [00:00 - 00:30)
// Window 2: [00:10 - 00:40)
// Window 3: [00:20 - 00:50)
Counting Windows ¶
Count-based windows that trigger after a specified number of records:
// Create counting window
config := types.WindowConfig{
Type: "counting",
Params: map[string]interface{}{
"count": 100, // Trigger every 100 records
},
}
window, err := NewCountingWindow(config)
// Window characteristics:
// - Fixed record count
// - Time-independent
// - Predictable memory usage
// - Suitable for batch processing
// - Handles variable data rates
// Example:
// Window 1: Records 1-100
// Window 2: Records 101-200
// Window 3: Records 201-300
Session Windows ¶
Activity-based windows with configurable session timeout:
// Create session window
config := types.WindowConfig{
Type: "session",
Params: map[string]interface{}{
"timeout": "5m", // 5-minute session timeout
},
GroupByKey: "user_id", // Group sessions by user
}
window, err := NewSessionWindow(config)
// Window characteristics:
// - Variable window size
// - Activity-based triggers
// - Per-group session tracking
// - Automatic session expiration
// - Suitable for user behavior analysis
// Example (5-minute timeout):
// User A: [10:00 - 10:15) - 15-minute session
// User B: [10:05 - 10:08) - 3-minute session
// User A: [10:20 - 10:25) - New 5-minute session
Window Factory ¶
Centralized window creation:
func CreateWindow(config types.WindowConfig) (Window, error)
Time Management ¶
Time handling for window operations:
func GetTimestamp(data interface{}, timeField string) (time.Time, error)
type TimeSlot struct {
Start time.Time
End time.Time
Duration time.Duration
}
Performance Features ¶
• Memory Management - Efficient buffer management and garbage collection • Concurrency - Thread-safe operations with minimal locking • Time Efficiency - Optimized timestamp processing and timer management
Usage Examples ¶
Basic tumbling window:
config := types.WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": "10s"},
TsProp: "timestamp",
}
window, err := CreateWindow(config)
window.SetCallback(func(results []types.Row) {
fmt.Printf("Window results: %d records\n", len(results))
})
window.Start()
Sliding window:
config := types.WindowConfig{
Type: "sliding",
Params: map[string]interface{}{
"size": "1m",
"slide": "10s",
},
TsProp: "event_time",
}
window, err := NewSlidingWindow(config)
Session window:
config := types.WindowConfig{
Type: "session",
Params: map[string]interface{}{"timeout": "30m"},
GroupByKey: "user_id",
}
window, err := NewSessionWindow(config)
Integration ¶
Integrates with other StreamSQL components:
• Stream package - Stream processing and data flow • RSQL package - SQL-based window definitions • Functions package - Aggregation functions for window results • Types package - Shared data types and configuration
Index ¶
- Constants
- func GetTimestamp(data interface{}, tsProp string, timeUnit time.Duration) time.Time
- type CountingWindow
- type SessionWindow
- func (sw *SessionWindow) Add(data interface{})
- func (sw *SessionWindow) OutputChan() <-chan []types.Row
- func (sw *SessionWindow) Reset()
- func (sw *SessionWindow) SetCallback(callback func([]types.Row))
- func (sw *SessionWindow) Start()
- func (sw *SessionWindow) Stop()
- func (sw *SessionWindow) Trigger()
- type SlidingWindow
- func (sw *SlidingWindow) Add(data interface{})
- func (sw *SlidingWindow) GetStats() map[string]int64
- func (sw *SlidingWindow) NextSlot() *types.TimeSlot
- func (sw *SlidingWindow) OutputChan() <-chan []types.Row
- func (sw *SlidingWindow) Reset()
- func (sw *SlidingWindow) ResetStats()
- func (sw *SlidingWindow) SetCallback(callback func([]types.Row))
- func (sw *SlidingWindow) Start()
- func (sw *SlidingWindow) Stop()
- func (sw *SlidingWindow) Trigger()
- type TimedData
- type TumblingWindow
- func (tw *TumblingWindow) Add(data interface{})
- func (tw *TumblingWindow) GetStats() map[string]int64
- func (sw *TumblingWindow) NextSlot() *types.TimeSlot
- func (tw *TumblingWindow) OutputChan() <-chan []types.Row
- func (tw *TumblingWindow) Reset()
- func (tw *TumblingWindow) ResetStats()
- func (tw *TumblingWindow) SetCallback(callback func([]types.Row))
- func (tw *TumblingWindow) Start()
- func (tw *TumblingWindow) Stop()
- func (tw *TumblingWindow) Trigger()
- type Window
Constants ¶
const ( TypeTumbling = "tumbling" TypeSliding = "sliding" TypeCounting = "counting" TypeSession = "session" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type CountingWindow ¶
type CountingWindow struct {
// contains filtered or unexported fields
}
func NewCountingWindow ¶
func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error)
func (*CountingWindow) Add ¶
func (cw *CountingWindow) Add(data interface{})
func (*CountingWindow) OutputChan ¶
func (cw *CountingWindow) OutputChan() <-chan []types.Row
func (*CountingWindow) Reset ¶
func (cw *CountingWindow) Reset()
func (*CountingWindow) SetCallback ¶
func (cw *CountingWindow) SetCallback(callback func([]types.Row))
func (*CountingWindow) Start ¶
func (cw *CountingWindow) Start()
func (*CountingWindow) Trigger ¶
func (cw *CountingWindow) Trigger()
type SessionWindow ¶
type SessionWindow struct {
// contains filtered or unexported fields
}
SessionWindow represents a session window Session window is an event-time based window that closes when no events arrive for a period of time
func NewSessionWindow ¶
func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error)
NewSessionWindow creates a new session window instance
func (*SessionWindow) Add ¶
func (sw *SessionWindow) Add(data interface{})
Add adds data to session window
func (*SessionWindow) OutputChan ¶
func (sw *SessionWindow) OutputChan() <-chan []types.Row
OutputChan returns a read-only channel for receiving data when window triggers
func (*SessionWindow) SetCallback ¶
func (sw *SessionWindow) SetCallback(callback func([]types.Row))
SetCallback sets the callback function when session window triggers
func (*SessionWindow) Start ¶
func (sw *SessionWindow) Start()
Start starts the session window's periodic check mechanism Start starts the session window, begins periodic checking of expired sessions Uses lazy initialization mode to avoid infinite waiting when no data, while ensuring subsequent data can be processed normally
func (*SessionWindow) Trigger ¶
func (sw *SessionWindow) Trigger()
Trigger manually triggers all session windows
type SlidingWindow ¶
type SlidingWindow struct {
// contains filtered or unexported fields
}
SlidingWindow represents a sliding window for processing data within time ranges
func NewSlidingWindow ¶
func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error)
NewSlidingWindow creates a new sliding window instance size parameter represents the total window size, slide represents the sliding interval
func (*SlidingWindow) Add ¶
func (sw *SlidingWindow) Add(data interface{})
Add adds data to the sliding window
func (*SlidingWindow) GetStats ¶
func (sw *SlidingWindow) GetStats() map[string]int64
GetStats returns window performance statistics
func (*SlidingWindow) NextSlot ¶
func (sw *SlidingWindow) NextSlot() *types.TimeSlot
func (*SlidingWindow) OutputChan ¶
func (sw *SlidingWindow) OutputChan() <-chan []types.Row
OutputChan returns the sliding window's output channel
func (*SlidingWindow) Reset ¶
func (sw *SlidingWindow) Reset()
Reset resets the sliding window and clears window data
func (*SlidingWindow) ResetStats ¶
func (sw *SlidingWindow) ResetStats()
ResetStats resets performance statistics
func (*SlidingWindow) SetCallback ¶
func (sw *SlidingWindow) SetCallback(callback func([]types.Row))
SetCallback sets the callback function to execute when sliding window triggers
func (*SlidingWindow) Start ¶
func (sw *SlidingWindow) Start()
Start starts the sliding window with periodic triggering Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally
func (*SlidingWindow) Stop ¶
func (sw *SlidingWindow) Stop()
Stop stops the sliding window operations
func (*SlidingWindow) Trigger ¶
func (sw *SlidingWindow) Trigger()
Trigger triggers the sliding window to process data within the window
type TumblingWindow ¶
type TumblingWindow struct {
// contains filtered or unexported fields
}
TumblingWindow represents a tumbling window for collecting data and triggering processing at fixed time intervals
func NewTumblingWindow ¶
func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error)
NewTumblingWindow creates a new tumbling window instance Parameter size is the time size of the window
func (*TumblingWindow) Add ¶
func (tw *TumblingWindow) Add(data interface{})
Add adds data to the tumbling window
func (*TumblingWindow) GetStats ¶
func (tw *TumblingWindow) GetStats() map[string]int64
GetStats returns window performance statistics
func (*TumblingWindow) NextSlot ¶
func (sw *TumblingWindow) NextSlot() *types.TimeSlot
func (*TumblingWindow) OutputChan ¶
func (tw *TumblingWindow) OutputChan() <-chan []types.Row
OutputChan returns a read-only channel for receiving data when window triggers
func (*TumblingWindow) ResetStats ¶
func (tw *TumblingWindow) ResetStats()
ResetStats resets performance statistics
func (*TumblingWindow) SetCallback ¶
func (tw *TumblingWindow) SetCallback(callback func([]types.Row))
SetCallback sets the callback function to execute when tumbling window triggers
func (*TumblingWindow) Start ¶
func (tw *TumblingWindow) Start()
Start starts the tumbling window's periodic trigger mechanism Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally
func (*TumblingWindow) Stop ¶
func (tw *TumblingWindow) Stop()
Stop stops tumbling window operations
func (*TumblingWindow) Trigger ¶
func (tw *TumblingWindow) Trigger()
Trigger triggers the tumbling window's processing logic