aggregator

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package aggregator provides data aggregation functionality for StreamSQL.

This package implements group-based aggregation operations for stream processing, supporting various aggregation functions and expression evaluation. It provides thread-safe aggregation with support for custom expressions and built-in functions.

Core Features

• Group Aggregation - Group data by specified fields and apply aggregation functions • Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more • Expression Support - Custom expression evaluation within aggregations • Thread Safety - Concurrent aggregation operations with proper synchronization • Type Flexibility - Automatic type conversion and validation • Performance Optimized - Efficient memory usage and processing

Aggregation Types

Supported aggregation functions (re-exported from functions package):

// Mathematical aggregations
Sum, Count, Avg, Max, Min
StdDev, StdDevS, Var, VarS
Median, Percentile

// Collection aggregations
Collect, LastValue, MergeAgg
Deduplicate

// Window aggregations
WindowStart, WindowEnd

// Analytical functions
Lag, Latest, ChangedCol, HadChanged

// Custom expressions
Expression

Core Interfaces

Main aggregation interfaces:

type Aggregator interface {
	Add(data interface{}) error
	Put(key string, val interface{}) error
	GetResults() ([]map[string]interface{}, error)
	Reset()
	RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}

type AggregatorFunction interface {
	New() AggregatorFunction
	Add(value interface{})
	Result() interface{}
}

Aggregation Configuration

Field configuration for aggregations:

type AggregationField struct {
	InputField    string        // Source field name
	AggregateType AggregateType // Aggregation function type
	OutputAlias   string        // Result field alias
}

Usage Examples

Basic group aggregation:

// Define aggregation fields
aggFields := []AggregationField{
	{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
	{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
	{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}

// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)

// Add data
data := map[string]interface{}{
	"location": "room1",
	"temperature": 25.5,
	"humidity": 60,
	"device_id": "sensor001",
}
aggregator.Add(data)

// Get results
results, err := aggregator.GetResults()

Expression-based aggregation:

// Register custom expression
aggregator.RegisterExpression(
	"comfort_index",
	"temperature * 0.7 + humidity * 0.3",
	[]string{"temperature", "humidity"},
	func(data interface{}) (interface{}, error) {
		// Custom evaluation logic
		return evaluateComfortIndex(data)
	},
)

Multiple group aggregation:

// Group by multiple fields
aggregator := NewGroupAggregator(
	[]string{"location", "device_type"},
	aggFields,
)

// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()

Built-in Aggregators

Create built-in aggregation functions:

// Create specific aggregator
sumAgg := CreateBuiltinAggregator(Sum)
avgAgg := CreateBuiltinAggregator(Avg)
countAgg := CreateBuiltinAggregator(Count)

// Use aggregator
sumAgg.Add(10)
sumAgg.Add(20)
result := sumAgg.Result() // returns 30

Custom Aggregators

Register custom aggregation functions:

Register("custom_avg", func() AggregatorFunction {
	return &CustomAvgAggregator{}
})

Integration

Integrates with other StreamSQL components:

• Functions package - Built-in aggregation function implementations • Stream package - Real-time data aggregation in streams • Window package - Window-based aggregation operations • Types package - Data type definitions and conversions • RSQL package - SQL GROUP BY and aggregation parsing

Index

Constants

View Source
const (
	Sum         = functions.Sum
	Count       = functions.Count
	Avg         = functions.Avg
	Max         = functions.Max
	Min         = functions.Min
	StdDev      = functions.StdDev
	Median      = functions.Median
	Percentile  = functions.Percentile
	WindowStart = functions.WindowStart
	WindowEnd   = functions.WindowEnd
	Collect     = functions.Collect
	FirstValue  = functions.FirstValue
	LastValue   = functions.LastValue
	MergeAgg    = functions.MergeAgg
	StdDevS     = functions.StdDevS
	Deduplicate = functions.Deduplicate
	Var         = functions.Var
	VarS        = functions.VarS
	// Analytical functions
	Lag        = functions.Lag
	Latest     = functions.Latest
	ChangedCol = functions.ChangedCol
	HadChanged = functions.HadChanged
	// Expression aggregator for handling custom functions
	Expression = functions.Expression
	// Post-aggregation marker
	PostAggregation = functions.PostAggregation
)

Re-export all aggregate type constants

View Source
const (
	// PlaceholderPrefix defines the prefix for aggregation field placeholders
	PlaceholderPrefix = "__"
	// PlaceholderSuffix defines the suffix for aggregation field placeholders
	PlaceholderSuffix = "__"
	// HashMultiplier is used for generating unique hash values for function calls
	HashMultiplier = 31
	// MaxFunctionNameLength defines the maximum allowed length for function names
	MaxFunctionNameLength = 100
	// MaxExpressionDepth defines the maximum nesting depth for expression parsing
	MaxExpressionDepth = 50
)

Configuration constants for post-aggregation processing

Variables

This section is empty.

Functions

func Register

func Register(name string, constructor func() AggregatorFunction)

Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator

Types

type AggregateType

type AggregateType = functions.AggregateType

AggregateType aggregate type, re-exports functions.AggregateType

type AggregationField

type AggregationField struct {
	InputField    string        // Input field name (e.g., "temperature")
	AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
	OutputAlias   string        // Output alias (e.g., "temp_sum")
}

AggregationField defines configuration for a single aggregation field

type AggregationFieldInfo added in v0.10.3

type AggregationFieldInfo struct {
	FuncName    string        // 函数名,如 "first_value"
	InputField  string        // 输入字段,如 "displayNum"
	Placeholder string        // 占位符,如 "__first_value_0__"
	AggType     AggregateType // 聚合类型
	FullCall    string        // 完整函数调用,如 "NTH_VALUE(value, 2)"
}

AggregationFieldInfo holds information about an aggregation function in an expression

func ParseComplexAggregationExpression added in v0.10.3

func ParseComplexAggregationExpression(expr string) (aggFields []AggregationFieldInfo, exprTemplate string, err error)

ParseComplexAggregationExpression parses expressions containing multiple aggregation functions Returns the list of required aggregation fields and the expression template 该函数将包含聚合函数的复杂表达式分解为: 1. 后聚合表达式模板(聚合函数被占位符替换) 2. 需要预先计算的聚合字段信息列表 3. 错误信息(如果解析失败)

示例:

输入: "SUM(price) + AVG(quantity) * 2"
输出: 表达式模板 "__SUM_123__ + __AVG_456__ * 2"
      聚合字段 [{FieldName: "__SUM_123__", FunctionName: "SUM", Arguments: ["price"]}, ...]

type Aggregator

type Aggregator interface {
	Add(data interface{}) error
	Put(key string, val interface{}) error
	GetResults() ([]map[string]interface{}, error)
	Reset()
	// RegisterExpression registers expression evaluator
	RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}

Aggregator aggregator interface

type AggregatorFunction

type AggregatorFunction = functions.LegacyAggregatorFunction

AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction

func CreateBuiltinAggregator

func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction

CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator

type ContextAggregator

type ContextAggregator = functions.ContextAggregator

ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator

type EnhancedGroupAggregator added in v0.10.3

type EnhancedGroupAggregator struct {
	*GroupAggregator
	// contains filtered or unexported fields
}

Enhanced GroupAggregator with post-aggregation support

func NewEnhancedGroupAggregator added in v0.10.3

func NewEnhancedGroupAggregator(groupFields []string, aggregationFields []AggregationField) *EnhancedGroupAggregator

NewEnhancedGroupAggregator creates a new enhanced group aggregator with post-aggregation support

func (*EnhancedGroupAggregator) AddPostAggregationExpression added in v0.10.3

func (ega *EnhancedGroupAggregator) AddPostAggregationExpression(outputField, originalExpr string, requiredFields []AggregationFieldInfo) error

AddPostAggregationExpression adds an expression that needs post-aggregation processing

func (*EnhancedGroupAggregator) GetResults added in v0.10.3

func (ega *EnhancedGroupAggregator) GetResults() ([]map[string]interface{}, error)

GetResults returns results with post-aggregation expressions evaluated

type ExpressionAggregatorWrapper

type ExpressionAggregatorWrapper struct {
	// contains filtered or unexported fields
}

ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface

func (*ExpressionAggregatorWrapper) Add

func (w *ExpressionAggregatorWrapper) Add(value interface{})

func (*ExpressionAggregatorWrapper) New

func (*ExpressionAggregatorWrapper) Result

func (w *ExpressionAggregatorWrapper) Result() interface{}

type ExpressionEvaluator

type ExpressionEvaluator struct {
	Expression string   // Complete expression
	Field      string   // Primary field name
	Fields     []string // All fields referenced in expression
	// contains filtered or unexported fields
}

ExpressionEvaluator wraps expression evaluation functionality

type GroupAggregator

type GroupAggregator struct {
	// contains filtered or unexported fields
}

func NewGroupAggregator

func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator

NewGroupAggregator creates a new group aggregator

func (*GroupAggregator) Add

func (ga *GroupAggregator) Add(data interface{}) error

func (*GroupAggregator) GetResults

func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error)

func (*GroupAggregator) Put

func (ga *GroupAggregator) Put(key string, val interface{}) error

func (*GroupAggregator) RegisterExpression

func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))

RegisterExpression registers expression evaluator

func (*GroupAggregator) Reset

func (ga *GroupAggregator) Reset()

type PostAggregationExpression added in v0.10.3

type PostAggregationExpression struct {
	OutputField       string   // 输出字段名
	Expression        string   // 表达式模板,如 "__first_value_0__ - __last_value_1__"
	RequiredAggFields []string // 依赖的聚合字段,如 ["__first_value_0__", "__last_value_1__"]
	OriginalExpr      string   // 原始表达式,用于调试
	// contains filtered or unexported fields
}

PostAggregationExpression represents an expression that needs to be evaluated after aggregation

func (*PostAggregationExpression) Evaluate added in v0.10.3

func (pae *PostAggregationExpression) Evaluate(data map[string]interface{}) (interface{}, error)

Evaluate 评估后聚合表达式

type PostAggregationPlaceholder added in v0.10.3

type PostAggregationPlaceholder struct{}

PostAggregationPlaceholder is a placeholder aggregator for post-aggregation fields

func (*PostAggregationPlaceholder) Add added in v0.10.3

func (p *PostAggregationPlaceholder) Add(value interface{})

func (*PostAggregationPlaceholder) New added in v0.10.3

func (*PostAggregationPlaceholder) Result added in v0.10.3

func (p *PostAggregationPlaceholder) Result() interface{}

type PostAggregationProcessor added in v0.10.3

type PostAggregationProcessor struct {
	// contains filtered or unexported fields
}

PostAggregationProcessor handles expressions that contain aggregation functions

func NewPostAggregationProcessor added in v0.10.3

func NewPostAggregationProcessor() *PostAggregationProcessor

NewPostAggregationProcessor creates a new post-aggregation processor

func (*PostAggregationProcessor) AddExpression added in v0.10.3

func (p *PostAggregationProcessor) AddExpression(outputField, originalExpr string, aggFields []string, exprTemplate string)

AddExpression adds a post-aggregation expression

func (*PostAggregationProcessor) ProcessResults added in v0.10.3

func (p *PostAggregationProcessor) ProcessResults(results []map[string]interface{}) ([]map[string]interface{}, error)

ProcessResults processes aggregation results and evaluates post-aggregation expressions

type WindowFunctionWrapper added in v0.10.3

type WindowFunctionWrapper struct {
	// contains filtered or unexported fields
}

WindowFunctionWrapper wraps window functions to make them compatible with LegacyAggregatorFunction

func (*WindowFunctionWrapper) Add added in v0.10.3

func (w *WindowFunctionWrapper) Add(value interface{})

func (*WindowFunctionWrapper) Clone added in v0.10.3

func (*WindowFunctionWrapper) New added in v0.10.3

func (*WindowFunctionWrapper) Reset added in v0.10.3

func (w *WindowFunctionWrapper) Reset()

func (*WindowFunctionWrapper) Result added in v0.10.3

func (w *WindowFunctionWrapper) Result() interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL