Documentation
¶
Overview ¶
Package streamsql 是一个轻量级的、基于 SQL 的物联网边缘流处理引擎。
StreamSQL 提供了高效的无界数据流处理和分析能力,支持多种窗口类型、聚合函数、 自定义函数,以及与 RuleGo 生态的无缝集成。
核心特性 ¶
• 轻量级设计 - 纯内存操作,无外部依赖 • SQL语法支持 - 使用熟悉的SQL语法处理流数据 • 多种窗口类型 - 滑动窗口、滚动窗口、计数窗口、会话窗口 • 丰富的聚合函数 - MAX, MIN, AVG, SUM, STDDEV, MEDIAN, PERCENTILE等 • 插件式自定义函数 - 运行时动态注册,支持8种函数类型 • RuleGo生态集成 - 利用RuleGo组件扩展输入输出源
入门示例 ¶
基本的流数据处理:
package main
import (
"fmt"
"math/rand"
"time"
"github.com/rulego/streamsql"
)
func main() {
// 创建StreamSQL实例
ssql := streamsql.New()
// 定义SQL查询 - 每5秒按设备ID分组计算温度平均值
sql := `SELECT deviceId,
AVG(temperature) as avg_temp,
MIN(humidity) as min_humidity,
window_start() as start,
window_end() as end
FROM stream
WHERE deviceId != 'device3'
GROUP BY deviceId, TumblingWindow('5s')`
// 执行SQL,创建流处理任务
err := ssql.Execute(sql)
if err != nil {
panic(err)
}
// 添加结果处理回调
ssql.AddSink(func(result interface{}) {
fmt.Printf("聚合结果: %v\n", result)
})
// 模拟发送流数据
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 生成随机设备数据
data := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1),
"temperature": 20.0 + rand.Float64()*10,
"humidity": 50.0 + rand.Float64()*20,
}
ssql.Emit(data)
}
}
}()
// 运行30秒
time.Sleep(30 * time.Second)
}
窗口函数 ¶
StreamSQL 支持多种窗口类型:
// 滚动窗口 - 每5秒一个独立窗口
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')
// 滑动窗口 - 窗口大小30秒,每10秒滑动一次
SELECT MAX(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
// 计数窗口 - 每100条记录一个窗口
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
// 会话窗口 - 超时5分钟自动关闭会话
SELECT user_id, COUNT(*) FROM stream GROUP BY user_id, SessionWindow('5m')
自定义函数 ¶
StreamSQL 支持插件式自定义函数,运行时动态注册:
// 注册温度转换函数
functions.RegisterCustomFunction(
"fahrenheit_to_celsius",
functions.TypeConversion,
"温度转换",
"华氏度转摄氏度",
1, 1,
func(ctx *functions.FunctionContext, args []interface{}) (interface{}, error) {
f, _ := functions.ConvertToFloat64(args[0])
return (f - 32) * 5 / 9, nil
},
)
// 立即在SQL中使用
sql := `SELECT deviceId,
AVG(fahrenheit_to_celsius(temperature)) as avg_celsius
FROM stream GROUP BY deviceId, TumblingWindow('5s')`
支持的自定义函数类型: • TypeMath - 数学计算函数 • TypeString - 字符串处理函数 • TypeConversion - 类型转换函数 • TypeDateTime - 时间日期函数 • TypeAggregation - 聚合函数 • TypeAnalytical - 分析函数 • TypeWindow - 窗口函数 • TypeCustom - 通用自定义函数
日志配置 ¶
StreamSQL 提供灵活的日志配置选项:
// 设置日志级别
ssql := streamsql.New(streamsql.WithLogLevel(logger.DEBUG))
// 输出到文件
logFile, _ := os.OpenFile("app.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
ssql := streamsql.New(streamsql.WithLogOutput(logFile, logger.INFO))
// 禁用日志(生产环境)
ssql := streamsql.New(streamsql.WithDiscardLog())
与RuleGo集成 ¶
StreamSQL提供了与RuleGo规则引擎的深度集成,通过两个专用组件实现流式数据处理:
• streamTransform (x/streamTransform) - 流转换器,处理非聚合SQL查询 • streamAggregator (x/streamAggregator) - 流聚合器,处理聚合SQL查询
基本集成示例:
package main
import (
"github.com/rulego/rulego"
"github.com/rulego/rulego/api/types"
// 注册StreamSQL组件
_ "github.com/rulego/rulego-components/external/streamsql"
)
func main() {
// 规则链配置
ruleChainJson := `{
"ruleChain": {"id": "rule01"},
"metadata": {
"nodes": [{
"id": "transform1",
"type": "x/streamTransform",
"configuration": {
"sql": "SELECT deviceId, temperature * 1.8 + 32 as temp_f FROM stream WHERE temperature > 20"
}
}, {
"id": "aggregator1",
"type": "x/streamAggregator",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp FROM stream GROUP BY deviceId, TumblingWindow('5s')"
}
}],
"connections": [{
"fromId": "transform1",
"toId": "aggregator1",
"type": "Success"
}]
}
}`
// 创建规则引擎
ruleEngine, _ := rulego.New("rule01", []byte(ruleChainJson))
// 发送数据
data := `{"deviceId":"sensor01","temperature":25.5}`
msg := types.NewMsg(0, "TELEMETRY", types.JSON, types.NewMetadata(), data)
ruleEngine.OnMsg(msg)
}
Index ¶
- type Option
- func WithBufferSizes(dataChannelSize, resultChannelSize, windowOutputSize int) Option
- func WithBuffers(dataBufSize, resultBufSize, sinkPoolSize int) Option
- func WithCustomPerformance(config types.PerformanceConfig) Option
- func WithCustomPersistence(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
- func WithDiscardLog() Option
- func WithHighPerf() Option
- func WithHighPerformance() Option
- func WithLogLevel(level logger.Level) Option
- func WithLowLatency() Option
- func WithMonitoring(updateInterval time.Duration, enableDetailedStats bool) Option
- func WithOverflowPolicy(strategy string, timeout time.Duration) Option
- func WithOverflowStrategy(strategy string, blockTimeout time.Duration) Option
- func WithPersistence() Option
- func WithPersistenceConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) Option
- func WithWorkerConfig(sinkPoolSize, sinkWorkerCount, maxRetryRoutines int) Option
- func WithZeroDataLoss() Option
- type Streamsql
- func (s *Streamsql) AddSink(sink func(interface{}))
- func (s *Streamsql) Emit(data interface{})
- func (s *Streamsql) EmitSync(data interface{}) (interface{}, error)
- func (s *Streamsql) Execute(sql string) error
- func (s *Streamsql) GetDetailedStats() map[string]interface{}
- func (s *Streamsql) GetStats() map[string]int64
- func (s *Streamsql) IsAggregationQuery() bool
- func (s *Streamsql) Print()
- func (s *Streamsql) Stop()
- func (s *Streamsql) Stream() *stream.Stream
- func (s *Streamsql) ToChannel() <-chan interface{}
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*Streamsql)
Option 定义StreamSQL的配置选项类型
func WithBufferSizes ¶
WithBufferSizes 设置自定义缓冲区大小
func WithBuffers ¶
WithBuffers 设置缓冲区大小 (已弃用,使用WithBufferSizes) Deprecated: 使用WithBufferSizes替代
func WithCustomPerformance ¶
func WithCustomPerformance(config types.PerformanceConfig) Option
WithCustomPerformance 使用自定义性能配置
func WithCustomPersistence ¶
WithCustomPersistence 使用自定义持久化配置
func WithHighPerf ¶
func WithHighPerf() Option
WithHighPerf 启用高性能模式 (已弃用,使用WithHighPerformance) Deprecated: 使用WithHighPerformance替代
func WithHighPerformance ¶
func WithHighPerformance() Option
WithHighPerformance 使用高性能配置 适用于需要最大吞吐量的场景
func WithMonitoring ¶
WithMonitoring 启用详细监控
func WithOverflowPolicy ¶
WithOverflowPolicy 设置溢出策略 (已弃用,使用WithOverflowStrategy) Deprecated: 使用WithOverflowStrategy替代
func WithOverflowStrategy ¶
WithOverflowStrategy 设置溢出策略
func WithPersistenceConfig ¶
WithPersistenceConfig 设置持久化配置 (已弃用,使用WithCustomPersistence) Deprecated: 使用WithCustomPersistence替代
func WithWorkerConfig ¶
WithWorkerConfig 设置工作池配置
type Streamsql ¶
type Streamsql struct {
// contains filtered or unexported fields
}
Streamsql 是StreamSQL流处理引擎的主要接口。 它封装了SQL解析、流处理、窗口管理等核心功能。
使用示例:
ssql := streamsql.New()
err := ssql.Execute("SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('5s')")
ssql.Emit(map[string]interface{}{"temperature": 25.5})
func New ¶
New 创建一个新的StreamSQL实例。 支持通过可选的Option参数进行配置。
参数:
- options: 可变长度的配置选项,用于自定义StreamSQL行为
返回值:
- *Streamsql: 新创建的StreamSQL实例
示例:
// 创建默认实例 ssql := streamsql.New() // 创建高性能实例 ssql := streamsql.New(streamsql.WithHighPerformance()) // 创建零数据丢失实例 ssql := streamsql.New(streamsql.WithZeroDataLoss())
func (*Streamsql) AddSink ¶
func (s *Streamsql) AddSink(sink func(interface{}))
AddSink 直接添加结果处理回调函数。 这是对 Stream().AddSink() 的便捷封装,使API调用更简洁。
参数:
- sink: 结果处理函数,接收处理结果作为参数
示例:
// 直接添加结果处理
ssql.AddSink(func(result interface{}) {
fmt.Printf("处理结果: %v\n", result)
})
// 添加多个处理器
ssql.AddSink(func(result interface{}) {
// 保存到数据库
saveToDatabase(result)
})
ssql.AddSink(func(result interface{}) {
// 发送到消息队列
sendToQueue(result)
})
func (*Streamsql) Emit ¶
func (s *Streamsql) Emit(data interface{})
Emit 向流中添加一条数据记录。 数据会根据已配置的SQL查询进行处理和聚合。
支持的数据格式:
- map[string]interface{}: 最常用的键值对格式
- 结构体: 会自动转换为map格式处理
参数:
- data: 要添加的数据,通常是map[string]interface{}或结构体
示例:
// 添加设备数据
ssql.Emit(map[string]interface{}{
"deviceId": "sensor001",
"temperature": 25.5,
"humidity": 60.0,
"timestamp": time.Now(),
})
// 添加用户行为数据
ssql.Emit(map[string]interface{}{
"userId": "user123",
"action": "click",
"page": "/home",
})
func (*Streamsql) EmitSync ¶
EmitSync 同步处理数据,立即返回处理结果。 仅适用于非聚合查询(如过滤、转换等),聚合查询会返回错误。
对于非聚合查询,此方法提供同步的数据处理能力,同时: 1. 立即返回处理结果(同步) 2. 触发已注册的AddSink回调(异步)
这确保了同步和异步模式的一致性,用户可以同时获得: - 立即可用的处理结果 - 异步回调处理(用于日志、监控、持久化等)
参数:
- data: 要处理的数据
返回值:
- interface{}: 处理后的结果,如果不匹配过滤条件返回nil
- error: 处理错误,如果是聚合查询会返回错误
示例:
// 添加日志回调
ssql.AddSink(func(result interface{}) {
fmt.Printf("异步日志: %v\n", result)
})
// 同步处理并立即获取结果
result, err := ssql.EmitSync(map[string]interface{}{
"temperature": 25.5,
"humidity": 60.0,
})
if err != nil {
// 处理错误
} else if result != nil {
// 立即使用处理结果
fmt.Printf("同步结果: %v\n", result)
// 同时异步回调也会被触发
}
func (*Streamsql) Execute ¶
Execute 解析并执行SQL查询,创建对应的流处理管道。 这是StreamSQL的核心方法,负责将SQL转换为实际的流处理逻辑。
支持的SQL语法:
- SELECT 子句: 选择字段和聚合函数
- FROM 子句: 指定数据源(通常为'stream')
- WHERE 子句: 数据过滤条件
- GROUP BY 子句: 分组字段和窗口函数
- HAVING 子句: 聚合结果过滤
- LIMIT 子句: 限制结果数量
- DISTINCT: 结果去重
窗口函数:
- TumblingWindow('5s'): 滚动窗口
- SlidingWindow('30s', '10s'): 滑动窗口
- CountingWindow(100): 计数窗口
- SessionWindow('5m'): 会话窗口
参数:
- sql: 要执行的SQL查询语句
返回值:
- error: 如果SQL解析或执行失败,返回相应错误
示例:
// 基本聚合查询
err := ssql.Execute("SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId, TumblingWindow('5s')")
// 带过滤条件的查询
err := ssql.Execute("SELECT * FROM stream WHERE temperature > 30")
// 复杂的窗口聚合
err := ssql.Execute(`
SELECT deviceId,
AVG(temperature) as avg_temp,
MAX(humidity) as max_humidity
FROM stream
WHERE deviceId != 'test'
GROUP BY deviceId, SlidingWindow('1m', '30s')
HAVING avg_temp > 25
LIMIT 100
`)
func (*Streamsql) GetDetailedStats ¶
GetDetailedStats 获取详细的性能统计信息
func (*Streamsql) IsAggregationQuery ¶
IsAggregationQuery 检查当前查询是否为聚合查询
func (*Streamsql) Print ¶
func (s *Streamsql) Print()
Print 打印结果到控制台。 这是一个便捷方法,自动添加一个打印结果的sink函数。
示例:
// 简单打印结果
ssql.Print()
// 等价于:
ssql.AddSink(func(result interface{}) {
fmt.Printf("Ressult: %v\n", result)
})
func (*Streamsql) Stop ¶
func (s *Streamsql) Stop()
Stop 停止流处理器,释放相关资源。 调用此方法后,流处理器将停止接收和处理新数据。
建议在应用程序退出前调用此方法进行清理:
defer ssql.Stop()
注意: 停止后的StreamSQL实例不能重新启动,需要创建新实例。
func (*Streamsql) Stream ¶
Stream 返回底层的流处理器实例。 通过此方法可以访问更底层的流处理功能。
返回值:
- *stream.Stream: 底层流处理器实例,如果未执行SQL则返回nil
常用场景:
- 添加结果处理回调
- 获取结果通道
- 手动控制流处理生命周期
示例:
// 添加结果处理回调
ssql.Stream().AddSink(func(result interface{}) {
fmt.Printf("处理结果: %v\n", result)
})
// 获取结果通道
resultChan := ssql.Stream().GetResultsChan()
go func() {
for result := range resultChan {
// 处理结果
}
}()
func (*Streamsql) ToChannel ¶
func (s *Streamsql) ToChannel() <-chan interface{}
ToChannel 返回结果通道,用于异步获取处理结果。 通过此通道可以以非阻塞方式获取流处理结果。
返回值:
- <-chan interface{}: 只读的结果通道,如果未执行SQL则返回nil
示例:
// 获取结果通道
resultChan := ssql.ToChannel()
if resultChan != nil {
go func() {
for result := range resultChan {
fmt.Printf("异步结果: %v\n", result)
}
}()
}
注意:
- 必须有消费者持续从通道读取数据,否则可能导致流处理阻塞
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
advanced-functions
command
|
|
|
complex-nested-access
command
|
|
|
comprehensive-test
command
|
|
|
custom-functions-demo
command
|
|
|
function-integration-demo
command
|
|
|
nested-field-examples
command
|
|
|
non-aggregation
command
|
|
|
null-comparison-examples
command
|
|
|
persistence
command
|
|
|
simple-custom-functions
command
|
|
|
unified_config
command
|
|
|
Package logger 提供StreamSQL的日志记录功能。
|
Package logger 提供StreamSQL的日志记录功能。 |
|
utils
|
|
|
Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。
|
Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。 |