streamsql

package
v0.35.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const RelationTypeWindowEvent = "window_event"

RelationTypeWindowEvent 表示窗口事件关系类型,用于聚合结果的链路传递

View Source
const WindowEventMsgType = "window_event"

WindowEventMsgType 表示窗口事件消息类型,用于标识聚合结果消息

Variables

View Source
var (
	ErrAggregatorSQLEmpty     = errors.New("aggregator SQL query is required")
	ErrNotAggregatorQuery     = errors.New("SQL does not contain aggregation functions, use x/streamTransform instead")
	ErrAggregatorSQLExecution = errors.New("failed to execute aggregator SQL")
	ErrAggregatorChainCtxNil  = errors.New("chain context is nil")
	ErrAggregatorNodeIdEmpty  = errors.New("self node id is empty")
	ErrAggregatorChainIdEmpty = errors.New("chain id is empty")
)

错误定义

View Source
var (
	ErrTransformSQLEmpty      = errors.New("transform SQL query is required")
	ErrNotTransformQuery      = errors.New("SQL contains aggregation functions, use x/streamAggregator instead")
	ErrTransformSQLExecution  = errors.New("failed to execute transform SQL")
	ErrNotMatchWhereCondition = errors.New("not match WHERE condition")
	ErrStreamsqlInstanceNil   = errors.New("streamsql instance is nil")
	ErrArrayProcessingFailed  = errors.New("failed to process array data")
	ErrUnsupportedDataType    = errors.New("only JSON data type is supported")
	ErrDataProcessingFailed   = errors.New("failed to process message data")

	// 元数据键名,用于标识数据是否匹配转换条件
	Match      = "match"
	MatchTrue  = "true"
	MatchFalse = "false"
)

错误定义和常量

Functions

This section is empty.

Types

type StreamAggregatorNode

type StreamAggregatorNode struct {
	// 节点配置
	Config StreamAggregatorNodeConfiguration
	// contains filtered or unexported fields
}

StreamAggregatorNode 流聚合器节点

功能说明: - 专门处理聚合查询,如窗口聚合、分组聚合、统计计算等 - 支持单条数据和数组数据输入,数组数据会被逐条添加到聚合流中 - 聚合结果通过 `window_event` 关系链传递到下一个节点,而不是通过普通的Success链 - 原始输入数据(无论单条还是数组)都会通过 `Success` 链继续传递,保持数据流的连续性

数据流向: - 输入数据 -> 添加到聚合流 -> 原始数据通过Success链传递 - 聚合触发 -> 聚合结果通过window_event链传递

注意事项: - 聚合结果通过全局`Config.OnEnd`回调返回,而不是通过消息处理上下文的OnEnd回调返回 - 聚合计算是异步进行的,不会阻塞原始数据的流转 - 窗口触发时机由StreamSQL引擎根据时间窗口或数据量自动决定

func (*StreamAggregatorNode) Destroy

func (x *StreamAggregatorNode) Destroy()

Destroy 销毁节点,释放资源 该方法在节点被卸载时调用,用于清理StreamSQL实例和相关资源

func (*StreamAggregatorNode) Init

func (x *StreamAggregatorNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化节点 该方法在节点被加载时调用,用于验证配置和初始化StreamSQL实例

func (*StreamAggregatorNode) New

func (x *StreamAggregatorNode) New() types.Node

New 创建流聚合器节点实例

func (*StreamAggregatorNode) OnMsg

func (x *StreamAggregatorNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息 支持单条数据和数组数据: - 单条数据:直接添加到聚合流中 - 数组数据:遍历每个元素并逐条添加到聚合流中 无论哪种情况,原始消息都会通过Success链继续传递

func (*StreamAggregatorNode) Type

func (x *StreamAggregatorNode) Type() string

Type 返回组件类型标识

type StreamAggregatorNodeConfiguration

type StreamAggregatorNodeConfiguration struct {
	// SQL查询语句,仅支持聚合查询(包含GROUP BY、聚合函数、窗口函数等)
	// 聚合查询示例:
	//   SELECT AVG(temperature) as avg_temp FROM stream GROUP BY TumblingWindow('5s')
	//   SELECT deviceId, MAX(temperature) as max_temp FROM stream GROUP BY deviceId, SlidingWindow('1m', '30s')
	//   SELECT COUNT(*) as count, SUM(value) as total FROM stream GROUP BY TumblingWindow('10s')
	// 支持的聚合函数:COUNT, SUM, AVG, MAX, MIN, FIRST, LAST等
	// 支持的窗口函数:TumblingWindow, SlidingWindow, SessionWindow等
	SQL string `json:"sql"`
}

StreamAggregatorNodeConfiguration 流聚合器节点配置

type StreamTransformNode

type StreamTransformNode struct {
	// 节点配置
	Config StreamTransformNodeConfiguration
	// contains filtered or unexported fields
}

StreamTransformNode 流转换器节点

功能说明: - 专门处理非聚合查询,如数据过滤、字段转换、格式变换等 - 支持单条数据和数组数据输入:

  • 单条数据:直接进行转换处理
  • 数组数据:遍历转换每个元素,将成功转换的结果合并成数组输出

- 数据符合WHERE条件并转换成功,则通过`Success`链输出,否则通过`Failure`链输出 - 对于数组输入,只要有任何元素转换成功,就会通过Success链输出合并结果

数据流向: - 单条数据:输入 -> SQL转换 -> Success/Failure输出 - 数组数据:输入数组 -> 逐个转换 -> 合并成功结果 -> Success输出(如有成功项)/ Failure输出(全部失败)

注意事项: - 转换处理是同步的,会阻塞当前消息的处理 - WHERE条件不匹配的数据会被过滤掉,不包含在输出结果中 - 对于数组输入,部分元素转换失败不会影响整体结果,只影响最终数组的元素数量

func (*StreamTransformNode) Destroy

func (x *StreamTransformNode) Destroy()

Destroy 销毁节点,释放资源 该方法在节点被卸载时调用,用于清理StreamSQL实例和相关资源

func (*StreamTransformNode) Init

func (x *StreamTransformNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化节点 该方法在节点被加载时调用,用于验证配置和初始化StreamSQL实例

func (*StreamTransformNode) New

func (x *StreamTransformNode) New() types.Node

New 创建流转换器节点实例

func (*StreamTransformNode) OnMsg

func (x *StreamTransformNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息 支持单条数据和数组数据:

  • 单条数据:直接进行SQL转换,成功则通过Success链输出,失败则通过Failure链输出
  • 数组数据:遍历每个元素进行转换,将所有成功的结果合并成数组输出 如果至少有一个元素转换成功,则通过Success链输出;如果全部失败,则通过Failure链输出

func (*StreamTransformNode) Type

func (x *StreamTransformNode) Type() string

Type 返回组件类型标识

type StreamTransformNodeConfiguration

type StreamTransformNodeConfiguration struct {
	// SQL查询语句,仅支持非聚合查询(过滤、转换、字段选择等)
	// 转换查询示例:
	//   SELECT temperature, humidity FROM stream WHERE temperature > 20
	//   SELECT deviceId, temperature * 1.8 + 32 as temp_fahrenheit FROM stream
	//   SELECT *, CASE WHEN status = 'active' THEN 1 ELSE 0 END as status_code FROM stream
	//   SELECT deviceId, UPPER(deviceName) as name, temperature FROM stream WHERE deviceId LIKE 'sensor_%'
	// 支持的操作:
	// - 字段选择:SELECT field1, field2 FROM stream
	// - 字段重命名:SELECT field1 as new_name FROM stream
	// - 数学运算:SELECT field1 * 2 + 10 as calculated FROM stream
	// - 条件过滤:WHERE 子句进行数据过滤
	// - 字符串函数:UPPER, LOWER, SUBSTR, CONCAT等
	// - 数学函数:ABS, ROUND, CEIL, FLOOR等
	// - 条件表达式:CASE WHEN ... THEN ... ELSE ... END
	SQL string `json:"sql"`
}

StreamTransformNodeConfiguration 流转换器节点配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL