Documentation
¶
Overview ¶
Package streamsql is a lightweight, SQL-based IoT edge stream processing engine.
StreamSQL provides efficient unbounded data stream processing and analysis capabilities, supporting multiple window types, aggregate functions, custom functions, and seamless integration with the RuleGo ecosystem.
Core Features ¶
• Lightweight design - Pure in-memory operations, no external dependencies • SQL syntax support - Process stream data using familiar SQL syntax • Multiple window types - Sliding, tumbling, counting, and session windows • Rich aggregate functions - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE, etc. • Plugin-based custom functions - Runtime dynamic registration, supports 8 function types • RuleGo ecosystem integration - Extend input/output sources using RuleGo components
Getting Started ¶
Basic stream data processing:
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
// Create StreamSQL instance
ssql := streamsql.New()
// Define SQL query - Calculate temperature average by device ID every 5 seconds
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity,
window_start() as start,
window_end() as end
FROM stream
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`
// Execute SQL, create stream processing task
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// Add result processing callback
ssql.AddSink(func(result []map[string]interface{}) {
fmt.Printf("Aggregation result: %v\n", result)
})
// Simulate sending stream data
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Generate random device data
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10,
"humidity": 50.0 + rand.Float64()*20,
}
ssql.Emit(data)
}
}
}()
// Run for 30 seconds
time.Sleep(30 * time.Second)
}
Window Functions ¶
StreamSQL supports multiple window types:
// Tumbling window - Independent window every 5 seconds
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// Sliding window - 30-second window size, slides every 10 seconds
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// Counting window - One window per 100 records
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// Session window - Automatically closes session after 5-minute timeout
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
Custom Functions ¶
StreamSQL supports plugin-based custom functions with runtime dynamic registration:
// Register temperature conversion function
functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeConversion,
"Temperature conversion",
"Fahrenheit to Celsius",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0])
return (f - 32) * 5 / 9, nil
},
)
// Use immediately in SQL
sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
Supported custom function types: • TypeMath - Mathematical calculation functions • TypeString - String processing functions • TypeConversion - Type conversion functions • TypeDateTime - Date and time functions • TypeAggregation - Aggregate functions • TypeAnalytical - Analytical functions • TypeWindow - Window functions • TypeCustom - General custom functions
Log Configuration ¶
StreamSQL provides flexible log configuration options:
// Set log level
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// Output to file
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// Disable logging (production environment)
ssql := streamsql.New(streamsql.WithDiscardLog())
RuleGo Integration ¶
StreamSQL provides deep integration with the RuleGo rule engine through two dedicated components for stream data processing:
• streamTransform (x/streamTransform) - Stream transformer, handles non-aggregation SQL queries • streamAggregator (x/streamAggregator) - Stream aggregator, handles aggregation SQL queries
Basic integration example:
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/api/types"
// Register StreamSQL components
_ "github.com/rulego/rulego-components/external/streamsql"
)
func main() {
// Rule chain configuration
ruleChainJson := `{
"ruleChain": {"id": "rule01"},
"metadata": {
"nodes": [{
"id": "transform1",
"type": "x/streamTransform",
"configuration": {
"sql": "SELECT deviceId, temperature * 1.8 + 32 as temp_f FROM stream WHERE temperature > 20"
}
}, {
"id": "aggregator1",
"type": "x/streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('5s')"
}
}],
"connections": [{
"fromId": "transform1",
"toId": "aggregator1",
"type": "Success"
}]
}
}`
// Create rule engine
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
// Send data
data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg)
}
Index ¶
- type Option
- func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
- func WithCustomPerformance(config types.PerformanceConfig) Option
- func WithDiscardLog() Option
- func WithHighPerformance() Option
- func WithLogLevel(level logger.Level) Option
- func WithLowLatency() Option
- func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
- func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
- func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
- func WithZeroDataLoss() Option
- type Streamsql
- func (s *Streamsql) AddSink(sink func([]map[string]interface{}))
- func (s *Streamsql) Emit(data map[string]interface{})
- func (s *Streamsql) EmitSync(data map[string]interface{}) (map[string]interface{}, error)
- func (s *Streamsql) Execute(sql string) error
- func (s *Streamsql) GetDetailedStats() map[string]interface{}
- func (s *Streamsql) GetStats() map[string]int64
- func (s *Streamsql) IsAggregationQuery() bool
- func (s *Streamsql) PrintTable()
- func (s *Streamsql) Stop()
- func (s *Streamsql) Stream() *stream.Stream
- func (s *Streamsql) ToChannel() <-chan []map[string]interface{}
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*Streamsql)
Option defines the configuration option type for StreamSQL
func WithBufferSizes ¶
WithBufferSizes sets custom buffer sizes
func WithCustomPerformance ¶
func WithCustomPerformance(config types.PerformanceConfig) Option
WithCustomPerformance uses custom performance configuration
func WithHighPerformance ¶
func WithHighPerformance() Option
WithHighPerformance uses high-performance configuration Suitable for scenarios requiring maximum throughput
func WithLowLatency ¶
func WithLowLatency() Option
WithLowLatency uses low-latency configuration Suitable for real-time interactive applications, minimizing latency
func WithMonitoring ¶
WithMonitoring enables detailed monitoring
func WithOverflowStrategy ¶
WithOverflowStrategy sets the overflow strategy
func WithWorkerConfig ¶
WithWorkerConfig sets the worker pool configuration
func WithZeroDataLoss ¶
func WithZeroDataLoss() Option
WithZeroDataLoss uses zero data loss configuration Suitable for critical business data, ensuring no data loss
type Streamsql ¶
type Streamsql struct {
// contains filtered or unexported fields
}
Streamsql is the main interface for the StreamSQL streaming engine. It encapsulates core functionality including SQL parsing, stream processing, and window management.
Usage example:
ssql := streamsql.New()
err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')")
ssql.Emit(map[string]interface{}{"temperature": 25.5})
func New ¶
New creates a new StreamSQL instance. Supports configuration through optional Option parameters.
Parameters:
- options: Variable configuration options for customizing StreamSQL behavior
Returns:
- *Streamsql: Newly created StreamSQL instance
Examples:
// Create default instance ssql := streamsql.New() // Create high performance instance ssql := streamsql.New(streamsql.WithHighPerformance()) // Create zero data loss instance ssql := streamsql.New(streamsql.WithZeroDataLoss())
func (*Streamsql) AddSink ¶
AddSink directly adds result processing callback functions. Convenience wrapper for Stream().AddSink() for cleaner API calls.
Parameters:
- sink: Result processing function, receives []map[string]interface{} type result data
Examples:
// Directly add result processing
ssql.AddSink(func(results []map[string]interface{}) {
fmt.Printf("Processing results: %v\n", results)
})
// Add multiple processors
ssql.AddSink(func(results []map[string]interface{}) {
// Save to database
saveToDatabase(results)
})
ssql.AddSink(func(results []map[string]interface{}) {
// Send to message queue
sendToQueue(results)
})
func (*Streamsql) Emit ¶
Emit adds data to the stream processing pipeline. Accepts type-safe map[string]interface{} format data.
Parameters:
- data: Data to add, must be map[string]interface{} type
Examples:
// Add device data
ssql.Emit(map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
"humidity": 60.0,
"timestamp": time.Now(),
})
// Add user behavior data
ssql.Emit(map[string]interface{}{
"userId": "user123",
"action": "click",
"page": "/home",
})
func (*Streamsql) EmitSync ¶
EmitSync processes data synchronously, returning results immediately. Only applicable for non-aggregation queries, aggregation queries will return an error. Accepts type-safe map[string]interface{} format data.
Parameters:
- data: Data to process, must be map[string]interface{} type
Returns:
- map[string]interface{}: Processed result data, returns nil if filter conditions don't match
- error: Processing error
Examples:
result, err := ssql.EmitSync(map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
})
if err != nil {
log.Printf("processing error: %v", err)
} else if result != nil {
// Use processed result immediately (result is map[string]interface{} type)
fmt.Printf("Processing result: %v\n", result)
}
func (*Streamsql) Execute ¶
Execute parses and executes SQL queries, creating corresponding stream processing pipelines. This is the core method of StreamSQL, responsible for converting SQL into actual stream processing logic.
Supported SQL syntax:
- SELECT clause: Select fields and aggregate functions
- FROM clause: Specify data source (usually 'stream')
- WHERE clause: Data filtering conditions
- GROUP BY clause: Grouping fields and window functions
- HAVING clause: Aggregate result filtering
- LIMIT clause: Limit result count
- DISTINCT: Result deduplication
Window functions:
- TumblingWindow('5s'): Tumbling window
- SlidingWindow('30s', '10s'): Sliding window
- CountingWindow(100): Counting window
- SessionWindow('5m'): Session window
Parameters:
- sql: SQL query statement to execute
Returns:
- error: Returns error if SQL parsing or execution fails
Examples:
// Basic aggregation query
err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')")
// Query with filtering conditions
err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30")
// Complex window aggregation
err := ssql.Execute(`
SELECT deviceId,
AVG(temperature) as avg_temp,
MAX(humidity) as max_humidity
FROM stream
WHERE deviceId != 'test'
GROUP BY deviceId, SlidingWindow('1m', '30s')
HAVING avg_temp > 25
LIMIT 100
`)
func (*Streamsql) GetDetailedStats ¶
GetDetailedStats returns detailed performance statistics
func (*Streamsql) IsAggregationQuery ¶
IsAggregationQuery checks if the current query is an aggregation query
func (*Streamsql) PrintTable ¶ added in v0.10.2
func (s *Streamsql) PrintTable()
PrintTable prints results to console in table format, similar to database output. Displays column names first, then data rows.
Supported data formats:
- []map[string]interface{}: Multiple rows
- map[string]interface{}: Single row
- Other types: Direct print
Example:
// Print results in table format ssql.PrintTable() // Output format: // +--------+----------+ // | device | max_temp | // +--------+----------+ // | aa | 30.0 | // | bb | 22.0 | // +--------+----------+
func (*Streamsql) Stop ¶
func (s *Streamsql) Stop()
Stop stops the stream processor and releases related resources. After calling this method, the stream processor will stop receiving and processing new data.
Recommended to call this method for cleanup before application exit:
defer ssql.Stop()
Note: StreamSQL instance cannot be restarted after stopping, create a new instance.
func (*Streamsql) Stream ¶
Stream returns the underlying stream processor instance. Provides access to lower-level stream processing functionality.
Returns:
- *stream.Stream: Underlying stream processor instance, returns nil if SQL not executed
Common use cases:
- Add result processing callbacks
- Get result channel
- Manually control stream processing lifecycle
Examples:
// Add result processing callback
ssql.Stream().AddSink(func(results []map[string]interface{}) {
fmt.Printf("Processing results: %v\n", results)
})
// Get result channel
resultChan := ssql.Stream().GetResultsChan()
go func() {
for result := range resultChan {
// Process result
}
}()
Directories
¶
| Path | Synopsis |
|---|---|
|
Package aggregator provides data aggregation functionality for StreamSQL.
|
Package aggregator provides data aggregation functionality for StreamSQL. |
|
Package condition provides condition evaluation functionality for StreamSQL.
|
Package condition provides condition evaluation functionality for StreamSQL. |
|
examples
|
|
|
advanced-functions
command
|
|
|
complex-nested-access
command
|
|
|
comprehensive-test
command
|
|
|
custom-functions-demo
command
|
|
|
function-integration-demo
command
|
|
|
nested-field-examples
command
|
|
|
non-aggregation
command
|
|
|
null-comparison-examples
command
|
|
|
simple-custom-functions
command
|
|
|
table_print_demo
command
|
|
|
Package expr provides expression parsing and evaluation capabilities for StreamSQL.
|
Package expr provides expression parsing and evaluation capabilities for StreamSQL. |
|
Package functions provides a comprehensive function registry and execution framework for StreamSQL.
|
Package functions provides a comprehensive function registry and execution framework for StreamSQL. |
|
Package logger provides logging functionality for StreamSQL.
|
Package logger provides logging functionality for StreamSQL. |
|
Package rsql provides SQL parsing and analysis capabilities for StreamSQL.
|
Package rsql provides SQL parsing and analysis capabilities for StreamSQL. |
|
Package stream provides the core stream processing engine for StreamSQL.
|
Package stream provides the core stream processing engine for StreamSQL. |
|
Package types provides core type definitions and data structures for StreamSQL.
|
Package types provides core type definitions and data structures for StreamSQL. |
|
utils
|
|
|
Package window provides windowing functionality for StreamSQL stream processing.
|
Package window provides windowing functionality for StreamSQL stream processing. |