Documentation
¶
Overview ¶
Package aggregator provides data aggregation functionality for StreamSQL.
This package implements group-based aggregation operations for stream processing, supporting various aggregation functions and expression evaluation. It provides thread-safe aggregation with support for custom expressions and built-in functions.
Core Features ¶
• Group Aggregation - Group data by specified fields and apply aggregation functions • Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more • Expression Support - Custom expression evaluation within aggregations • Thread Safety - Concurrent aggregation operations with proper synchronization • Type Flexibility - Automatic type conversion and validation • Performance Optimized - Efficient memory usage and processing
Aggregation Types ¶
Supported aggregation functions (re-exported from functions package):
// Mathematical aggregations Sum, Count, Avg, Max, Min StdDev, StdDevS, Var, VarS Median, Percentile // Collection aggregations Collect, LastValue, MergeAgg Deduplicate // Window aggregations WindowStart, WindowEnd // Analytical functions Lag, Latest, ChangedCol, HadChanged // Custom expressions Expression
Core Interfaces ¶
Main aggregation interfaces:
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
type AggregatorFunction interface {
New() AggregatorFunction
Add(value interface{})
Result() interface{}
}
Aggregation Configuration ¶
Field configuration for aggregations:
type AggregationField struct {
InputField string // Source field name
AggregateType AggregateType // Aggregation function type
OutputAlias string // Result field alias
}
Usage Examples ¶
Basic group aggregation:
// Define aggregation fields
aggFields := []AggregationField{
{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}
// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)
// Add data
data := map[string]interface{}{
"location": "room1",
"temperature": 25.5,
"humidity": 60,
"device_id": "sensor001",
}
aggregator.Add(data)
// Get results
results, err := aggregator.GetResults()
Expression-based aggregation:
// Register custom expression
aggregator.RegisterExpression(
"comfort_index",
"temperature * 0.7 + humidity * 0.3",
[]string{"temperature", "humidity"},
func(data interface{}) (interface{}, error) {
// Custom evaluation logic
return evaluateComfortIndex(data)
},
)
Multiple group aggregation:
// Group by multiple fields
aggregator := NewGroupAggregator(
[]string{"location", "device_type"},
aggFields,
)
// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()
Built-in Aggregators ¶
Create built-in aggregation functions:
// Create specific aggregator sumAgg := CreateBuiltinAggregator(Sum) avgAgg := CreateBuiltinAggregator(Avg) countAgg := CreateBuiltinAggregator(Count) // Use aggregator sumAgg.Add(10) sumAgg.Add(20) result := sumAgg.Result() // returns 30
Custom Aggregators ¶
Register custom aggregation functions:
Register("custom_avg", func() AggregatorFunction {
return &CustomAvgAggregator{}
})
Integration ¶
Integrates with other StreamSQL components:
• Functions package - Built-in aggregation function implementations • Stream package - Real-time data aggregation in streams • Window package - Window-based aggregation operations • Types package - Data type definitions and conversions • RSQL package - SQL GROUP BY and aggregation parsing
Index ¶
- Constants
- func Register(name string, constructor func() AggregatorFunction)
- type AggregateType
- type AggregationField
- type AggregationFieldInfo
- type Aggregator
- type AggregatorFunction
- type ContextAggregator
- type EnhancedGroupAggregator
- type ExpressionAggregatorWrapper
- type ExpressionEvaluator
- type GroupAggregator
- func (ga *GroupAggregator) Add(data interface{}) error
- func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error)
- func (ga *GroupAggregator) Put(key string, val interface{}) error
- func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, ...)
- func (ga *GroupAggregator) Reset()
- type PostAggregationExpression
- type PostAggregationPlaceholder
- type PostAggregationProcessor
- type WindowFunctionWrapper
Constants ¶
const ( Sum = functions.Sum Count = functions.Count Avg = functions.Avg Max = functions.Max Min = functions.Min StdDev = functions.StdDev Median = functions.Median Percentile = functions.Percentile WindowStart = functions.WindowStart WindowEnd = functions.WindowEnd Collect = functions.Collect FirstValue = functions.FirstValue LastValue = functions.LastValue MergeAgg = functions.MergeAgg StdDevS = functions.StdDevS Deduplicate = functions.Deduplicate Var = functions.Var VarS = functions.VarS // Analytical functions Lag = functions.Lag Latest = functions.Latest ChangedCol = functions.ChangedCol HadChanged = functions.HadChanged // Expression aggregator for handling custom functions Expression = functions.Expression // Post-aggregation marker PostAggregation = functions.PostAggregation )
Re-export all aggregate type constants
const ( // PlaceholderPrefix defines the prefix for aggregation field placeholders PlaceholderPrefix = "__" // PlaceholderSuffix defines the suffix for aggregation field placeholders PlaceholderSuffix = "__" // HashMultiplier is used for generating unique hash values for function calls HashMultiplier = 31 // MaxFunctionNameLength defines the maximum allowed length for function names MaxFunctionNameLength = 100 // MaxExpressionDepth defines the maximum nesting depth for expression parsing MaxExpressionDepth = 50 )
Configuration constants for post-aggregation processing
Variables ¶
This section is empty.
Functions ¶
func Register ¶
func Register(name string, constructor func() AggregatorFunction)
Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator
Types ¶
type AggregateType ¶
type AggregateType = functions.AggregateType
AggregateType aggregate type, re-exports functions.AggregateType
type AggregationField ¶
type AggregationField struct {
InputField string // Input field name (e.g., "temperature")
AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
OutputAlias string // Output alias (e.g., "temp_sum")
}
AggregationField defines configuration for a single aggregation field
type AggregationFieldInfo ¶ added in v0.10.3
type AggregationFieldInfo struct {
FuncName string // 函数名,如 "first_value"
InputField string // 输入字段,如 "displayNum"
Placeholder string // 占位符,如 "__first_value_0__"
AggType AggregateType // 聚合类型
FullCall string // 完整函数调用,如 "NTH_VALUE(value, 2)"
}
AggregationFieldInfo holds information about an aggregation function in an expression
func ParseComplexAggregationExpression ¶ added in v0.10.3
func ParseComplexAggregationExpression(expr string) (aggFields []AggregationFieldInfo, exprTemplate string, err error)
ParseComplexAggregationExpression parses expressions containing multiple aggregation functions Returns the list of required aggregation fields and the expression template 该函数将包含聚合函数的复杂表达式分解为: 1. 后聚合表达式模板(聚合函数被占位符替换) 2. 需要预先计算的聚合字段信息列表 3. 错误信息(如果解析失败)
示例:
输入: "SUM(price) + AVG(quantity) * 2"
输出: 表达式模板 "__SUM_123__ + __AVG_456__ * 2"
聚合字段 [{FieldName: "__SUM_123__", FunctionName: "SUM", Arguments: ["price"]}, ...]
type Aggregator ¶
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
// RegisterExpression registers expression evaluator
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
Aggregator aggregator interface
type AggregatorFunction ¶
type AggregatorFunction = functions.LegacyAggregatorFunction
AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
func CreateBuiltinAggregator ¶
func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction
CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator
type ContextAggregator ¶
type ContextAggregator = functions.ContextAggregator
ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator
type EnhancedGroupAggregator ¶ added in v0.10.3
type EnhancedGroupAggregator struct {
*GroupAggregator
// contains filtered or unexported fields
}
Enhanced GroupAggregator with post-aggregation support
func NewEnhancedGroupAggregator ¶ added in v0.10.3
func NewEnhancedGroupAggregator(groupFields []string, aggregationFields []AggregationField) *EnhancedGroupAggregator
NewEnhancedGroupAggregator creates a new enhanced group aggregator with post-aggregation support
func (*EnhancedGroupAggregator) AddPostAggregationExpression ¶ added in v0.10.3
func (ega *EnhancedGroupAggregator) AddPostAggregationExpression(outputField, originalExpr string, requiredFields []AggregationFieldInfo) error
AddPostAggregationExpression adds an expression that needs post-aggregation processing
func (*EnhancedGroupAggregator) GetResults ¶ added in v0.10.3
func (ega *EnhancedGroupAggregator) GetResults() ([]map[string]interface{}, error)
GetResults returns results with post-aggregation expressions evaluated
type ExpressionAggregatorWrapper ¶
type ExpressionAggregatorWrapper struct {
// contains filtered or unexported fields
}
ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
func (*ExpressionAggregatorWrapper) Add ¶
func (w *ExpressionAggregatorWrapper) Add(value interface{})
func (*ExpressionAggregatorWrapper) New ¶
func (w *ExpressionAggregatorWrapper) New() AggregatorFunction
func (*ExpressionAggregatorWrapper) Result ¶
func (w *ExpressionAggregatorWrapper) Result() interface{}
type ExpressionEvaluator ¶
type ExpressionEvaluator struct {
Expression string // Complete expression
Field string // Primary field name
Fields []string // All fields referenced in expression
// contains filtered or unexported fields
}
ExpressionEvaluator wraps expression evaluation functionality
type GroupAggregator ¶
type GroupAggregator struct {
// contains filtered or unexported fields
}
func NewGroupAggregator ¶
func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator
NewGroupAggregator creates a new group aggregator
func (*GroupAggregator) Add ¶
func (ga *GroupAggregator) Add(data interface{}) error
func (*GroupAggregator) GetResults ¶
func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error)
func (*GroupAggregator) Put ¶
func (ga *GroupAggregator) Put(key string, val interface{}) error
func (*GroupAggregator) RegisterExpression ¶
func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
RegisterExpression registers expression evaluator
func (*GroupAggregator) Reset ¶
func (ga *GroupAggregator) Reset()
type PostAggregationExpression ¶ added in v0.10.3
type PostAggregationExpression struct {
OutputField string // 输出字段名
Expression string // 表达式模板,如 "__first_value_0__ - __last_value_1__"
RequiredAggFields []string // 依赖的聚合字段,如 ["__first_value_0__", "__last_value_1__"]
OriginalExpr string // 原始表达式,用于调试
// contains filtered or unexported fields
}
PostAggregationExpression represents an expression that needs to be evaluated after aggregation
func (*PostAggregationExpression) Evaluate ¶ added in v0.10.3
func (pae *PostAggregationExpression) Evaluate(data map[string]interface{}) (interface{}, error)
Evaluate 评估后聚合表达式
type PostAggregationPlaceholder ¶ added in v0.10.3
type PostAggregationPlaceholder struct{}
PostAggregationPlaceholder is a placeholder aggregator for post-aggregation fields
func (*PostAggregationPlaceholder) Add ¶ added in v0.10.3
func (p *PostAggregationPlaceholder) Add(value interface{})
func (*PostAggregationPlaceholder) New ¶ added in v0.10.3
func (p *PostAggregationPlaceholder) New() AggregatorFunction
func (*PostAggregationPlaceholder) Result ¶ added in v0.10.3
func (p *PostAggregationPlaceholder) Result() interface{}
type PostAggregationProcessor ¶ added in v0.10.3
type PostAggregationProcessor struct {
// contains filtered or unexported fields
}
PostAggregationProcessor handles expressions that contain aggregation functions
func NewPostAggregationProcessor ¶ added in v0.10.3
func NewPostAggregationProcessor() *PostAggregationProcessor
NewPostAggregationProcessor creates a new post-aggregation processor
func (*PostAggregationProcessor) AddExpression ¶ added in v0.10.3
func (p *PostAggregationProcessor) AddExpression(outputField, originalExpr string, aggFields []string, exprTemplate string)
AddExpression adds a post-aggregation expression
func (*PostAggregationProcessor) ProcessResults ¶ added in v0.10.3
func (p *PostAggregationProcessor) ProcessResults(results []map[string]interface{}) ([]map[string]interface{}, error)
ProcessResults processes aggregation results and evaluates post-aggregation expressions
type WindowFunctionWrapper ¶ added in v0.10.3
type WindowFunctionWrapper struct {
// contains filtered or unexported fields
}
WindowFunctionWrapper wraps window functions to make them compatible with LegacyAggregatorFunction
func (*WindowFunctionWrapper) Add ¶ added in v0.10.3
func (w *WindowFunctionWrapper) Add(value interface{})
func (*WindowFunctionWrapper) Clone ¶ added in v0.10.3
func (w *WindowFunctionWrapper) Clone() AggregatorFunction
func (*WindowFunctionWrapper) New ¶ added in v0.10.3
func (w *WindowFunctionWrapper) New() AggregatorFunction
func (*WindowFunctionWrapper) Reset ¶ added in v0.10.3
func (w *WindowFunctionWrapper) Reset()
func (*WindowFunctionWrapper) Result ¶ added in v0.10.3
func (w *WindowFunctionWrapper) Result() interface{}