Documentation
¶
Index ¶
- Constants
- Variables
- type StreamAggregatorNode
- func (x *StreamAggregatorNode) Destroy()
- func (x *StreamAggregatorNode) Init(ruleConfig types.Config, configuration types.Configuration) error
- func (x *StreamAggregatorNode) New() types.Node
- func (x *StreamAggregatorNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)
- func (x *StreamAggregatorNode) Type() string
- type StreamAggregatorNodeConfiguration
- type StreamTransformNode
- func (x *StreamTransformNode) Destroy()
- func (x *StreamTransformNode) Init(ruleConfig types.Config, configuration types.Configuration) error
- func (x *StreamTransformNode) New() types.Node
- func (x *StreamTransformNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)
- func (x *StreamTransformNode) Type() string
- type StreamTransformNodeConfiguration
Constants ¶
const RelationTypeWindowEvent = "window_event"
RelationTypeWindowEvent 表示窗口事件关系类型,用于聚合结果的链路传递
const WindowEventMsgType = "window_event"
WindowEventMsgType 表示窗口事件消息类型,用于标识聚合结果消息
Variables ¶
var ( ErrAggregatorSQLEmpty = errors.New("aggregator SQL query is required") ErrNotAggregatorQuery = errors.New("SQL does not contain aggregation functions, use x/streamTransform instead") ErrAggregatorSQLExecution = errors.New("failed to execute aggregator SQL") ErrAggregatorChainCtxNil = errors.New("chain context is nil") ErrAggregatorNodeIdEmpty = errors.New("self node id is empty") ErrAggregatorChainIdEmpty = errors.New("chain id is empty") )
错误定义
var ( ErrTransformSQLEmpty = errors.New("transform SQL query is required") ErrNotTransformQuery = errors.New("SQL contains aggregation functions, use x/streamAggregator instead") ErrTransformSQLExecution = errors.New("failed to execute transform SQL") ErrNotMatchWhereCondition = errors.New("not match WHERE condition") ErrStreamsqlInstanceNil = errors.New("streamsql instance is nil") ErrArrayProcessingFailed = errors.New("failed to process array data") ErrUnsupportedDataType = errors.New("only JSON data type is supported") ErrDataProcessingFailed = errors.New("failed to process message data") // 元数据键名,用于标识数据是否匹配转换条件 Match = "match" MatchTrue = "true" MatchFalse = "false" )
错误定义和常量
Functions ¶
This section is empty.
Types ¶
type StreamAggregatorNode ¶
type StreamAggregatorNode struct {
// 节点配置
Config StreamAggregatorNodeConfiguration
// contains filtered or unexported fields
}
StreamAggregatorNode 流聚合器节点
功能说明: - 专门处理聚合查询,如窗口聚合、分组聚合、统计计算等 - 支持单条数据和数组数据输入,数组数据会被逐条添加到聚合流中 - 聚合结果通过 `window_event` 关系链传递到下一个节点,而不是通过普通的Success链 - 原始输入数据(无论单条还是数组)都会通过 `Success` 链继续传递,保持数据流的连续性
数据流向: - 输入数据 -> 添加到聚合流 -> 原始数据通过Success链传递 - 聚合触发 -> 聚合结果通过window_event链传递
注意事项: - 聚合结果通过全局`Config.OnEnd`回调返回,而不是通过消息处理上下文的OnEnd回调返回 - 聚合计算是异步进行的,不会阻塞原始数据的流转 - 窗口触发时机由StreamSQL引擎根据时间窗口或数据量自动决定
func (*StreamAggregatorNode) Destroy ¶
func (x *StreamAggregatorNode) Destroy()
Destroy 销毁节点,释放资源 该方法在节点被卸载时调用,用于清理StreamSQL实例和相关资源
func (*StreamAggregatorNode) Init ¶
func (x *StreamAggregatorNode) Init(ruleConfig types.Config, configuration types.Configuration) error
Init 初始化节点 该方法在节点被加载时调用,用于验证配置和初始化StreamSQL实例
func (*StreamAggregatorNode) OnMsg ¶
func (x *StreamAggregatorNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)
OnMsg 处理消息 支持单条数据和数组数据: - 单条数据:直接添加到聚合流中 - 数组数据:遍历每个元素并逐条添加到聚合流中 无论哪种情况,原始消息都会通过Success链继续传递
type StreamAggregatorNodeConfiguration ¶
type StreamAggregatorNodeConfiguration struct {
// SQL查询语句,仅支持聚合查询(包含GROUP BY、聚合函数、窗口函数等)
// 聚合查询示例:
// SELECT AVG(temperature) as avg_temp FROM stream GROUP BY TumblingWindow('5s')
// SELECT deviceId, MAX(temperature) as max_temp FROM stream GROUP BY deviceId, SlidingWindow('1m', '30s')
// SELECT COUNT(*) as count, SUM(value) as total FROM stream GROUP BY TumblingWindow('10s')
// 支持的聚合函数:COUNT, SUM, AVG, MAX, MIN, FIRST, LAST等
// 支持的窗口函数:TumblingWindow, SlidingWindow, SessionWindow等
SQL string `json:"sql"`
}
StreamAggregatorNodeConfiguration 流聚合器节点配置
type StreamTransformNode ¶
type StreamTransformNode struct {
// 节点配置
Config StreamTransformNodeConfiguration
// contains filtered or unexported fields
}
StreamTransformNode 流转换器节点
功能说明: - 专门处理非聚合查询,如数据过滤、字段转换、格式变换等 - 支持单条数据和数组数据输入:
- 单条数据:直接进行转换处理
- 数组数据:遍历转换每个元素,将成功转换的结果合并成数组输出
- 数据符合WHERE条件并转换成功,则通过`Success`链输出,否则通过`Failure`链输出 - 对于数组输入,只要有任何元素转换成功,就会通过Success链输出合并结果
数据流向: - 单条数据:输入 -> SQL转换 -> Success/Failure输出 - 数组数据:输入数组 -> 逐个转换 -> 合并成功结果 -> Success输出(如有成功项)/ Failure输出(全部失败)
注意事项: - 转换处理是同步的,会阻塞当前消息的处理 - WHERE条件不匹配的数据会被过滤掉,不包含在输出结果中 - 对于数组输入,部分元素转换失败不会影响整体结果,只影响最终数组的元素数量
func (*StreamTransformNode) Destroy ¶
func (x *StreamTransformNode) Destroy()
Destroy 销毁节点,释放资源 该方法在节点被卸载时调用,用于清理StreamSQL实例和相关资源
func (*StreamTransformNode) Init ¶
func (x *StreamTransformNode) Init(ruleConfig types.Config, configuration types.Configuration) error
Init 初始化节点 该方法在节点被加载时调用,用于验证配置和初始化StreamSQL实例
func (*StreamTransformNode) OnMsg ¶
func (x *StreamTransformNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)
OnMsg 处理消息 支持单条数据和数组数据:
- 单条数据:直接进行SQL转换,成功则通过Success链输出,失败则通过Failure链输出
- 数组数据:遍历每个元素进行转换,将所有成功的结果合并成数组输出 如果至少有一个元素转换成功,则通过Success链输出;如果全部失败,则通过Failure链输出
type StreamTransformNodeConfiguration ¶
type StreamTransformNodeConfiguration struct {
// SQL查询语句,仅支持非聚合查询(过滤、转换、字段选择等)
// 转换查询示例:
// SELECT temperature, humidity FROM stream WHERE temperature > 20
// SELECT deviceId, temperature * 1.8 + 32 as temp_fahrenheit FROM stream
// SELECT *, CASE WHEN status = 'active' THEN 1 ELSE 0 END as status_code FROM stream
// SELECT deviceId, UPPER(deviceName) as name, temperature FROM stream WHERE deviceId LIKE 'sensor_%'
// 支持的操作:
// - 字段选择:SELECT field1, field2 FROM stream
// - 字段重命名:SELECT field1 as new_name FROM stream
// - 数学运算:SELECT field1 * 2 + 10 as calculated FROM stream
// - 条件过滤:WHERE 子句进行数据过滤
// - 字符串函数:UPPER, LOWER, SUBSTR, CONCAT等
// - 数学函数:ABS, ROUND, CEIL, FLOOR等
// - 条件表达式:CASE WHEN ... THEN ... ELSE ... END
SQL string `json:"sql"`
}
StreamTransformNodeConfiguration 流转换器节点配置