window

package
v0.10.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package window provides windowing functionality for StreamSQL stream processing.

This package implements various types of windows for aggregating streaming data over time intervals or record counts. It supports tumbling, sliding, counting, and session windows with efficient memory management and concurrent processing.

Core Features

• Multiple Window Types - Tumbling, Sliding, Counting, and Session windows • Time Management - Time-based window boundaries and event time processing • Trigger Mechanisms - Triggering based on time, count, or custom conditions • Memory Efficiency - Optimized data structures and memory management • Concurrent Processing - Thread-safe operations • Late Data Handling - Configurable policies for late-arriving data

Window Types

Four distinct window types for different stream processing scenarios:

• Tumbling Windows - Non-overlapping, fixed-size time windows • Sliding Windows - Overlapping time windows with configurable slide interval • Counting Windows - Count-based windows that trigger after N records • Session Windows - Activity-based windows with configurable timeout

Window Interface

All window types implement a unified Window interface:

type Window interface {
	Add(row types.Row) error              // Add data to window
	Reset() error                         // Reset window state
	Start() error                         // Start window processing
	Stop()                                // Stop window operations and clean up resources
	OutputChan() <-chan []types.Row       // Get output channel
	SetCallback(func([]types.Row))        // Set callback function
	Trigger() error                       // Manual trigger
}

Tumbling Windows

Non-overlapping time-based windows:

// Create tumbling window with processing time (default)
config := types.WindowConfig{
	Type: "tumbling",
	Params: []interface{}{"5s"},  // 5-second windows
	TsProp: "timestamp",
	TimeCharacteristic: types.ProcessingTime, // Uses system clock
}
window, err := NewTumblingWindow(config)

// Create tumbling window with event time
config := types.WindowConfig{
	Type: "tumbling",
	Params: []interface{}{"5s"},  // 5-second windows
	TsProp: "timestamp",
	TimeCharacteristic: types.EventTime, // Uses event timestamps
	MaxOutOfOrderness: 2 * time.Second, // Allow 2 seconds of out-of-order data
	WatermarkInterval: 200 * time.Millisecond, // Update watermark every 200ms
	AllowedLateness: 1 * time.Second, // Allow 1 second of late data after window closes
}
window, err := NewTumblingWindow(config)

// Window characteristics:
// - Fixed size (e.g., 5 seconds)
// - No overlap between windows
// - Triggers at regular intervals (ProcessingTime) or based on watermark (EventTime)
// - Memory efficient
// - Suitable for periodic aggregations

// ProcessingTime example timeline (based on data arrival):
// Window 1: [00:00 - 00:05) - triggers when 5s elapsed from first data
// Window 2: [00:05 - 00:10) - triggers when next 5s elapsed
// Window 3: [00:10 - 00:15) - triggers when next 5s elapsed

// EventTime example timeline (based on event timestamps):
// Window 1: [00:00 - 00:05) - triggers when watermark >= 00:05
// Window 2: [00:05 - 00:10) - triggers when watermark >= 00:10
// Window 3: [00:10 - 00:15) - triggers when watermark >= 00:15

Sliding Windows

Overlapping time-based windows with configurable slide interval:

// Create sliding window with processing time (default)
config := types.WindowConfig{
	Type: "sliding",
	Params: []interface{}{"30s", "10s"}, // 30-second window size, 10-second slide
	TsProp: "timestamp",
	TimeCharacteristic: types.ProcessingTime, // Uses system clock
}
window, err := NewSlidingWindow(config)

// Create sliding window with event time
config := types.WindowConfig{
	Type: "sliding",
	Params: []interface{}{"30s", "10s"}, // 30-second window size, 10-second slide
	TsProp: "timestamp",
	TimeCharacteristic: types.EventTime, // Uses event timestamps
	MaxOutOfOrderness: 2 * time.Second, // Allow 2 seconds of out-of-order data
	WatermarkInterval: 200 * time.Millisecond, // Update watermark every 200ms
}
window, err := NewSlidingWindow(config)

// Window characteristics:
// - Fixed size with configurable slide
// - Overlapping windows
// - More frequent updates
// - Higher memory usage
// - Suitable for smooth trend analysis

// ProcessingTime example timeline (30s window, 10s slide, based on data arrival):
// Window 1: [00:00 - 00:30) - triggers when 30s elapsed from first data
// Window 2: [00:10 - 00:40) - triggers 10s after Window 1
// Window 3: [00:20 - 00:50) - triggers 10s after Window 2

// EventTime example timeline (30s window, 10s slide, based on event timestamps):
// Window 1: [00:00 - 00:30) - triggers when watermark >= 00:30
// Window 2: [00:10 - 00:40) - triggers when watermark >= 00:40
// Window 3: [00:20 - 00:50) - triggers when watermark >= 00:50

Counting Windows

Count-based windows that trigger after a specified number of records:

// Create counting window
config := types.WindowConfig{
	Type: "counting",
	Params: map[string]interface{}{
		"count": 100, // Trigger every 100 records
	},
}
window, err := NewCountingWindow(config)

// Window characteristics:
// - Fixed record count
// - Time-independent
// - Predictable memory usage
// - Suitable for batch processing
// - Handles variable data rates

// Example:
// Window 1: Records 1-100
// Window 2: Records 101-200
// Window 3: Records 201-300

Session Windows

Activity-based windows with configurable session timeout:

// Create session window
config := types.WindowConfig{
	Type: "session",
	Params: map[string]interface{}{
		"timeout": "5m", // 5-minute session timeout
	},
	GroupByKey: "user_id", // Group sessions by user
}
window, err := NewSessionWindow(config)

// Window characteristics:
// - Variable window size
// - Activity-based triggers
// - Per-group session tracking
// - Automatic session expiration
// - Suitable for user behavior analysis

// Example (5-minute timeout):
// User A: [10:00 - 10:15) - 15-minute session
// User B: [10:05 - 10:08) - 3-minute session
// User A: [10:20 - 10:25) - New 5-minute session

Window Factory

Centralized window creation:

func CreateWindow(config types.WindowConfig) (Window, error)

Time Management

Time handling for window operations:

func GetTimestamp(data interface{}, timeField string) (time.Time, error)

type TimeSlot struct {
	Start    time.Time
	End      time.Time
	Duration time.Duration
}

Time Characteristics

Windows support two time characteristics:

## ProcessingTime (Default) - Uses system clock for window operations - Windows trigger based on when data arrives - Cannot handle out-of-order data - Lower latency, but results may be inconsistent - Suitable for real-time monitoring and low-latency requirements

## EventTime - Uses event timestamps for window operations - Windows trigger based on event time via watermark mechanism - Can handle out-of-order and late-arriving data - Consistent results, but may have higher latency - Suitable for accurate time-based analysis and historical data processing

## Watermark Mechanism For EventTime windows, watermark indicates that no events with timestamp less than watermark time are expected: - Watermark = max(event_time) - max_out_of_orderness - Windows trigger when watermark >= window_end_time - Late data (before watermark) can be detected and handled specially

## Allowed Lateness For EventTime windows, `allowedLateness` allows windows to accept late data after they have been triggered: - When watermark >= window_end, window triggers and outputs result - Window remains open until watermark >= window_end + allowedLateness - Late data arriving within allowedLateness triggers delayed updates (window fires again) - After allowedLateness expires, window closes and late data is ignored - Default: 0 (no late data accepted after window closes)

Example: - Window [00:00 - 00:05) triggers when watermark >= 00:05 - With allowedLateness = 2s, window stays open until watermark >= 00:07 - Late data with timestamp in [00:00 - 00:05) arriving before watermark >= 00:07 triggers delayed update - After watermark >= 00:07, window closes and late data is ignored

## Idle Source Mechanism For EventTime windows, `idleTimeout` enables watermark advancement based on processing time when the data source is idle: - Normally: Watermark advances based on event time (Watermark = max(event_time) - maxOutOfOrderness) - When idle: If no data arrives within idleTimeout, watermark advances based on processing time - This ensures windows can close even when the data source stops sending data - Prevents memory leaks from windows that never close - Default: 0 (disabled, watermark only advances based on event time)

Example: - Window [00:00 - 00:05) has data with max event time = 00:02 - Data source stops sending data at 00:03 - With idleTimeout = 5s, after 5 seconds of no data (at 00:08), watermark advances based on processing time - Watermark = currentProcessingTime - maxOutOfOrderness = 00:08 - 1s = 00:07 - Window [00:00 - 00:05) can trigger (watermark >= 00:05) and close

Performance Features

• Memory Management - Efficient buffer management and garbage collection • Concurrency - Thread-safe operations with minimal locking • Time Efficiency - Optimized timestamp processing and timer management

Usage Examples

Basic tumbling window:

config := types.WindowConfig{
	Type: "tumbling",
	Params: map[string]interface{}{"size": "10s"},
	TsProp: "timestamp",
}
window, err := CreateWindow(config)
window.SetCallback(func(results []types.Row) {
	fmt.Printf("Window results: %d records\n", len(results))
})
window.Start()

Sliding window:

config := types.WindowConfig{
	Type: "sliding",
	Params: map[string]interface{}{
		"size":  "1m",
		"slide": "10s",
	},
	TsProp: "event_time",
}
window, err := NewSlidingWindow(config)

Session window:

config := types.WindowConfig{
	Type: "session",
	Params: map[string]interface{}{"timeout": "30m"},
	GroupByKey: "user_id",
}
window, err := NewSessionWindow(config)

Integration

Integrates with other StreamSQL components:

• Stream package - Stream processing and data flow • RSQL package - SQL-based window definitions • Functions package - Aggregation functions for window results • Types package - Shared data types and configuration

Index

Constants

View Source
const (
	TypeTumbling = "tumbling"
	TypeSliding  = "sliding"
	TypeCounting = "counting"
	TypeSession  = "session"
)

Variables

View Source
var EnableDebug = false

EnableDebug enables debug logging for window operations

Functions

func GetTimestamp

func GetTimestamp(data interface{}, tsProp string, timeUnit time.Duration) time.Time

GetTimestamp extracts timestamp from data

Types

type CountingWindow

type CountingWindow struct {
	// contains filtered or unexported fields
}

func NewCountingWindow

func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error)

func (*CountingWindow) Add

func (cw *CountingWindow) Add(data interface{})

func (*CountingWindow) GetStats added in v0.10.4

func (cw *CountingWindow) GetStats() map[string]int64

func (*CountingWindow) OutputChan

func (cw *CountingWindow) OutputChan() <-chan []types.Row

func (*CountingWindow) Reset

func (cw *CountingWindow) Reset()

func (*CountingWindow) SetCallback

func (cw *CountingWindow) SetCallback(callback func([]types.Row))

func (*CountingWindow) Start

func (cw *CountingWindow) Start()

func (*CountingWindow) Stop added in v0.10.4

func (cw *CountingWindow) Stop()

func (*CountingWindow) Trigger

func (cw *CountingWindow) Trigger()

type SessionWindow

type SessionWindow struct {
	// contains filtered or unexported fields
}

SessionWindow represents a session window Session window is an event-time based window that closes when no events arrive for a period of time

func NewSessionWindow

func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error)

NewSessionWindow creates a new session window instance

func (*SessionWindow) Add

func (sw *SessionWindow) Add(data interface{})

Add adds data to session window

func (*SessionWindow) GetStats added in v0.10.6

func (sw *SessionWindow) GetStats() map[string]int64

GetStats returns window performance statistics

func (*SessionWindow) OutputChan

func (sw *SessionWindow) OutputChan() <-chan []types.Row

OutputChan returns a read-only channel for receiving data when window triggers

func (*SessionWindow) Reset

func (sw *SessionWindow) Reset()

Reset resets session window data

func (*SessionWindow) ResetStats added in v0.10.6

func (sw *SessionWindow) ResetStats()

ResetStats resets performance statistics

func (*SessionWindow) SetCallback

func (sw *SessionWindow) SetCallback(callback func([]types.Row))

SetCallback sets the callback function when session window triggers

func (*SessionWindow) Start

func (sw *SessionWindow) Start()

Start starts the session window's periodic check mechanism Start starts the session window, begins periodic checking of expired sessions Uses lazy initialization mode to avoid infinite waiting when no data, while ensuring subsequent data can be processed normally

func (*SessionWindow) Stop

func (sw *SessionWindow) Stop()

Stop stops session window operations

func (*SessionWindow) Trigger

func (sw *SessionWindow) Trigger()

Trigger manually triggers all session windows

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow represents a sliding window for processing data within time ranges

func NewSlidingWindow

func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error)

NewSlidingWindow creates a new sliding window instance size parameter represents the total window size, slide represents the sliding interval

func (*SlidingWindow) Add

func (sw *SlidingWindow) Add(data interface{})

Add adds data to the sliding window

func (*SlidingWindow) GetStats

func (sw *SlidingWindow) GetStats() map[string]int64

GetStats returns window performance statistics

func (*SlidingWindow) NextSlot

func (sw *SlidingWindow) NextSlot() *types.TimeSlot

func (*SlidingWindow) OutputChan

func (sw *SlidingWindow) OutputChan() <-chan []types.Row

OutputChan returns the sliding window's output channel

func (*SlidingWindow) Reset

func (sw *SlidingWindow) Reset()

Reset resets the sliding window and clears window data

func (*SlidingWindow) ResetStats

func (sw *SlidingWindow) ResetStats()

ResetStats resets performance statistics

func (*SlidingWindow) SetCallback

func (sw *SlidingWindow) SetCallback(callback func([]types.Row))

SetCallback sets the callback function to execute when sliding window triggers

func (*SlidingWindow) Start

func (sw *SlidingWindow) Start()

Start starts the sliding window with periodic triggering Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally First window triggers when it ends, then subsequent windows trigger at slide intervals

func (*SlidingWindow) Stop

func (sw *SlidingWindow) Stop()

Stop stops the sliding window operations

func (*SlidingWindow) Trigger

func (sw *SlidingWindow) Trigger()

Trigger triggers the sliding window to process data within the window For ProcessingTime: called by timer For EventTime: called by watermark updates

type TimedData

type TimedData struct {
	Data      interface{}
	Timestamp time.Time
}

TimedData wraps data with timestamp

type TumblingWindow

type TumblingWindow struct {
	// contains filtered or unexported fields
}

TumblingWindow represents a tumbling window for collecting data and triggering processing at fixed time intervals

func NewTumblingWindow

func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error)

NewTumblingWindow creates a new tumbling window instance Parameter size is the time size of the window

func (*TumblingWindow) Add

func (tw *TumblingWindow) Add(data interface{})

Add adds data to the tumbling window

func (*TumblingWindow) GetStats

func (tw *TumblingWindow) GetStats() map[string]int64

GetStats returns window performance statistics

func (*TumblingWindow) NextSlot

func (tw *TumblingWindow) NextSlot() *types.TimeSlot

func (*TumblingWindow) OutputChan

func (tw *TumblingWindow) OutputChan() <-chan []types.Row

OutputChan returns a read-only channel for receiving data when window triggers

func (*TumblingWindow) Reset

func (tw *TumblingWindow) Reset()

Reset resets tumbling window data

func (*TumblingWindow) ResetStats

func (tw *TumblingWindow) ResetStats()

ResetStats resets performance statistics

func (*TumblingWindow) SetCallback

func (tw *TumblingWindow) SetCallback(callback func([]types.Row))

SetCallback sets the callback function to execute when tumbling window triggers

func (*TumblingWindow) Start

func (tw *TumblingWindow) Start()

Start starts the tumbling window's periodic trigger mechanism Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally

func (*TumblingWindow) Stop

func (tw *TumblingWindow) Stop()

Stop stops tumbling window operations

func (*TumblingWindow) Trigger

func (tw *TumblingWindow) Trigger()

Trigger triggers the tumbling window's processing logic For ProcessingTime: called by timer For EventTime: called by watermark updates

type Watermark added in v0.10.4

type Watermark struct {
	// contains filtered or unexported fields
}

Watermark represents a watermark for event time processing Watermark indicates that no events with timestamp less than watermark time are expected

func NewWatermark added in v0.10.4

func NewWatermark(maxOutOfOrderness time.Duration, updateInterval time.Duration, idleTimeout time.Duration) *Watermark

NewWatermark creates a new watermark manager

func (*Watermark) GetCurrentWatermark added in v0.10.4

func (wm *Watermark) GetCurrentWatermark() time.Time

GetCurrentWatermark returns the current watermark time

func (*Watermark) IsEventTimeLate added in v0.10.4

func (wm *Watermark) IsEventTimeLate(eventTime time.Time) bool

IsEventTimeLate checks if an event time is late (before current watermark)

func (*Watermark) Stop added in v0.10.4

func (wm *Watermark) Stop()

Stop stops the watermark manager

func (*Watermark) UpdateEventTime added in v0.10.4

func (wm *Watermark) UpdateEventTime(eventTime time.Time)

UpdateEventTime updates the maximum event time seen

func (*Watermark) WatermarkChan added in v0.10.4

func (wm *Watermark) WatermarkChan() <-chan time.Time

WatermarkChan returns a channel for receiving watermark updates

type Window

type Window interface {
	Add(item interface{})
	//GetResults() []interface{}
	Reset()
	Start()
	Stop() // Stop window operations and clean up resources
	OutputChan() <-chan []types.Row
	SetCallback(callback func([]types.Row))
	Trigger()
	GetStats() map[string]int64
}

func CreateWindow

func CreateWindow(config types.WindowConfig) (Window, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL