window

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package window provides windowing functionality for StreamSQL stream processing.

This package implements various types of windows for aggregating streaming data over time intervals or record counts. It supports tumbling, sliding, counting, and session windows with efficient memory management and concurrent processing.

Core Features

• Multiple Window Types - Tumbling, Sliding, Counting, and Session windows • Time Management - Time-based window boundaries and event time processing • Trigger Mechanisms - Triggering based on time, count, or custom conditions • Memory Efficiency - Optimized data structures and memory management • Concurrent Processing - Thread-safe operations • Late Data Handling - Configurable policies for late-arriving data

Window Types

Four distinct window types for different stream processing scenarios:

• Tumbling Windows - Non-overlapping, fixed-size time windows • Sliding Windows - Overlapping time windows with configurable slide interval • Counting Windows - Count-based windows that trigger after N records • Session Windows - Activity-based windows with configurable timeout

Window Interface

All window types implement a unified Window interface:

type Window interface {
	Add(row types.Row) error              // Add data to window
	Reset() error                         // Reset window state
	Start() error                         // Start window processing
	OutputChan() <-chan []types.Row       // Get output channel
	SetCallback(func([]types.Row))        // Set callback function
	Trigger() error                       // Manual trigger
}

Tumbling Windows

Non-overlapping time-based windows:

// Create tumbling window
config := types.WindowConfig{
	Type: "tumbling",
	Params: map[string]interface{}{
		"size": "5s",  // 5-second windows
	},
	TsProp: "timestamp",
}
window, err := NewTumblingWindow(config)

// Window characteristics:
// - Fixed size (e.g., 5 seconds)
// - No overlap between windows
// - Triggers at regular intervals
// - Memory efficient
// - Suitable for periodic aggregations

// Example timeline:
// Window 1: [00:00 - 00:05)
// Window 2: [00:05 - 00:10)
// Window 3: [00:10 - 00:15)

Sliding Windows

Overlapping time-based windows with configurable slide interval:

// Create sliding window
config := types.WindowConfig{
	Type: "sliding",
	Params: map[string]interface{}{
		"size":  "30s", // 30-second window size
		"slide": "10s", // 10-second slide interval
	},
	TsProp: "timestamp",
}
window, err := NewSlidingWindow(config)

// Window characteristics:
// - Fixed size with configurable slide
// - Overlapping windows
// - More frequent updates
// - Higher memory usage
// - Suitable for smooth trend analysis

// Example timeline (30s window, 10s slide):
// Window 1: [00:00 - 00:30)
// Window 2: [00:10 - 00:40)
// Window 3: [00:20 - 00:50)

Counting Windows

Count-based windows that trigger after a specified number of records:

// Create counting window
config := types.WindowConfig{
	Type: "counting",
	Params: map[string]interface{}{
		"count": 100, // Trigger every 100 records
	},
}
window, err := NewCountingWindow(config)

// Window characteristics:
// - Fixed record count
// - Time-independent
// - Predictable memory usage
// - Suitable for batch processing
// - Handles variable data rates

// Example:
// Window 1: Records 1-100
// Window 2: Records 101-200
// Window 3: Records 201-300

Session Windows

Activity-based windows with configurable session timeout:

// Create session window
config := types.WindowConfig{
	Type: "session",
	Params: map[string]interface{}{
		"timeout": "5m", // 5-minute session timeout
	},
	GroupByKey: "user_id", // Group sessions by user
}
window, err := NewSessionWindow(config)

// Window characteristics:
// - Variable window size
// - Activity-based triggers
// - Per-group session tracking
// - Automatic session expiration
// - Suitable for user behavior analysis

// Example (5-minute timeout):
// User A: [10:00 - 10:15) - 15-minute session
// User B: [10:05 - 10:08) - 3-minute session
// User A: [10:20 - 10:25) - New 5-minute session

Window Factory

Centralized window creation:

func CreateWindow(config types.WindowConfig) (Window, error)

Time Management

Time handling for window operations:

func GetTimestamp(data interface{}, timeField string) (time.Time, error)

type TimeSlot struct {
	Start    time.Time
	End      time.Time
	Duration time.Duration
}

Performance Features

• Memory Management - Efficient buffer management and garbage collection • Concurrency - Thread-safe operations with minimal locking • Time Efficiency - Optimized timestamp processing and timer management

Usage Examples

Basic tumbling window:

config := types.WindowConfig{
	Type: "tumbling",
	Params: map[string]interface{}{"size": "10s"},
	TsProp: "timestamp",
}
window, err := CreateWindow(config)
window.SetCallback(func(results []types.Row) {
	fmt.Printf("Window results: %d records\n", len(results))
})
window.Start()

Sliding window:

config := types.WindowConfig{
	Type: "sliding",
	Params: map[string]interface{}{
		"size":  "1m",
		"slide": "10s",
	},
	TsProp: "event_time",
}
window, err := NewSlidingWindow(config)

Session window:

config := types.WindowConfig{
	Type: "session",
	Params: map[string]interface{}{"timeout": "30m"},
	GroupByKey: "user_id",
}
window, err := NewSessionWindow(config)

Integration

Integrates with other StreamSQL components:

• Stream package - Stream processing and data flow • RSQL package - SQL-based window definitions • Functions package - Aggregation functions for window results • Types package - Shared data types and configuration

Index

Constants

View Source
const (
	TypeTumbling = "tumbling"
	TypeSliding  = "sliding"
	TypeCounting = "counting"
	TypeSession  = "session"
)

Variables

This section is empty.

Functions

func GetTimestamp

func GetTimestamp(data interface{}, tsProp string, timeUnit time.Duration) time.Time

GetTimestamp extracts timestamp from data

Types

type CountingWindow

type CountingWindow struct {
	// contains filtered or unexported fields
}

func NewCountingWindow

func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error)

func (*CountingWindow) Add

func (cw *CountingWindow) Add(data interface{})

func (*CountingWindow) OutputChan

func (cw *CountingWindow) OutputChan() <-chan []types.Row

func (*CountingWindow) Reset

func (cw *CountingWindow) Reset()

func (*CountingWindow) SetCallback

func (cw *CountingWindow) SetCallback(callback func([]types.Row))

func (*CountingWindow) Start

func (cw *CountingWindow) Start()

func (*CountingWindow) Trigger

func (cw *CountingWindow) Trigger()

type SessionWindow

type SessionWindow struct {
	// contains filtered or unexported fields
}

SessionWindow represents a session window Session window is an event-time based window that closes when no events arrive for a period of time

func NewSessionWindow

func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error)

NewSessionWindow creates a new session window instance

func (*SessionWindow) Add

func (sw *SessionWindow) Add(data interface{})

Add adds data to session window

func (*SessionWindow) OutputChan

func (sw *SessionWindow) OutputChan() <-chan []types.Row

OutputChan returns a read-only channel for receiving data when window triggers

func (*SessionWindow) Reset

func (sw *SessionWindow) Reset()

Reset resets session window data

func (*SessionWindow) SetCallback

func (sw *SessionWindow) SetCallback(callback func([]types.Row))

SetCallback sets the callback function when session window triggers

func (*SessionWindow) Start

func (sw *SessionWindow) Start()

Start starts the session window's periodic check mechanism Start starts the session window, begins periodic checking of expired sessions Uses lazy initialization mode to avoid infinite waiting when no data, while ensuring subsequent data can be processed normally

func (*SessionWindow) Stop

func (sw *SessionWindow) Stop()

Stop stops session window operations

func (*SessionWindow) Trigger

func (sw *SessionWindow) Trigger()

Trigger manually triggers all session windows

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow represents a sliding window for processing data within time ranges

func NewSlidingWindow

func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error)

NewSlidingWindow creates a new sliding window instance size parameter represents the total window size, slide represents the sliding interval

func (*SlidingWindow) Add

func (sw *SlidingWindow) Add(data interface{})

Add adds data to the sliding window

func (*SlidingWindow) GetStats

func (sw *SlidingWindow) GetStats() map[string]int64

GetStats returns window performance statistics

func (*SlidingWindow) NextSlot

func (sw *SlidingWindow) NextSlot() *types.TimeSlot

func (*SlidingWindow) OutputChan

func (sw *SlidingWindow) OutputChan() <-chan []types.Row

OutputChan returns the sliding window's output channel

func (*SlidingWindow) Reset

func (sw *SlidingWindow) Reset()

Reset resets the sliding window and clears window data

func (*SlidingWindow) ResetStats

func (sw *SlidingWindow) ResetStats()

ResetStats resets performance statistics

func (*SlidingWindow) SetCallback

func (sw *SlidingWindow) SetCallback(callback func([]types.Row))

SetCallback sets the callback function to execute when sliding window triggers

func (*SlidingWindow) Start

func (sw *SlidingWindow) Start()

Start starts the sliding window with periodic triggering Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally

func (*SlidingWindow) Stop

func (sw *SlidingWindow) Stop()

Stop stops the sliding window operations

func (*SlidingWindow) Trigger

func (sw *SlidingWindow) Trigger()

Trigger triggers the sliding window to process data within the window

type TimedData

type TimedData struct {
	Data      interface{}
	Timestamp time.Time
}

TimedData wraps data with timestamp

type TumblingWindow

type TumblingWindow struct {
	// contains filtered or unexported fields
}

TumblingWindow represents a tumbling window for collecting data and triggering processing at fixed time intervals

func NewTumblingWindow

func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error)

NewTumblingWindow creates a new tumbling window instance Parameter size is the time size of the window

func (*TumblingWindow) Add

func (tw *TumblingWindow) Add(data interface{})

Add adds data to the tumbling window

func (*TumblingWindow) GetStats

func (tw *TumblingWindow) GetStats() map[string]int64

GetStats returns window performance statistics

func (*TumblingWindow) NextSlot

func (sw *TumblingWindow) NextSlot() *types.TimeSlot

func (*TumblingWindow) OutputChan

func (tw *TumblingWindow) OutputChan() <-chan []types.Row

OutputChan returns a read-only channel for receiving data when window triggers

func (*TumblingWindow) Reset

func (tw *TumblingWindow) Reset()

Reset resets tumbling window data

func (*TumblingWindow) ResetStats

func (tw *TumblingWindow) ResetStats()

ResetStats resets performance statistics

func (*TumblingWindow) SetCallback

func (tw *TumblingWindow) SetCallback(callback func([]types.Row))

SetCallback sets the callback function to execute when tumbling window triggers

func (*TumblingWindow) Start

func (tw *TumblingWindow) Start()

Start starts the tumbling window's periodic trigger mechanism Uses lazy initialization to avoid infinite waiting when no data, ensuring subsequent data can be processed normally

func (*TumblingWindow) Stop

func (tw *TumblingWindow) Stop()

Stop stops tumbling window operations

func (*TumblingWindow) Trigger

func (tw *TumblingWindow) Trigger()

Trigger triggers the tumbling window's processing logic

type Window

type Window interface {
	Add(item interface{})
	//GetResults() []interface{}
	Reset()
	Start()
	OutputChan() <-chan []types.Row
	SetCallback(callback func([]types.Row))
	Trigger()
}

func CreateWindow

func CreateWindow(config types.WindowConfig) (Window, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL