Documentation
¶
Overview ¶
Package aggregator provides data aggregation functionality for StreamSQL.
This package implements group-based aggregation operations for stream processing, supporting various aggregation functions and expression evaluation. It provides thread-safe aggregation with support for custom expressions and built-in functions.
Core Features ¶
• Group Aggregation - Group data by specified fields and apply aggregation functions • Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more • Expression Support - Custom expression evaluation within aggregations • Thread Safety - Concurrent aggregation operations with proper synchronization • Type Flexibility - Automatic type conversion and validation • Performance Optimized - Efficient memory usage and processing
Aggregation Types ¶
Supported aggregation functions (re-exported from functions package):
// Mathematical aggregations Sum, Count, Avg, Max, Min StdDev, StdDevS, Var, VarS Median, Percentile // Collection aggregations Collect, LastValue, MergeAgg Deduplicate // Window aggregations WindowStart, WindowEnd // Analytical functions Lag, Latest, ChangedCol, HadChanged // Custom expressions Expression
Core Interfaces ¶
Main aggregation interfaces:
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
type AggregatorFunction interface {
New() AggregatorFunction
Add(value interface{})
Result() interface{}
}
Aggregation Configuration ¶
Field configuration for aggregations:
type AggregationField struct {
InputField string // Source field name
AggregateType AggregateType // Aggregation function type
OutputAlias string // Result field alias
}
Usage Examples ¶
Basic group aggregation:
// Define aggregation fields
aggFields := []AggregationField{
{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}
// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)
// Add data
data := map[string]interface{}{
"location": "room1",
"temperature": 25.5,
"humidity": 60,
"device_id": "sensor001",
}
aggregator.Add(data)
// Get results
results, err := aggregator.GetResults()
Expression-based aggregation:
// Register custom expression
aggregator.RegisterExpression(
"comfort_index",
"temperature * 0.7 + humidity * 0.3",
[]string{"temperature", "humidity"},
func(data interface{}) (interface{}, error) {
// Custom evaluation logic
return evaluateComfortIndex(data)
},
)
Multiple group aggregation:
// Group by multiple fields
aggregator := NewGroupAggregator(
[]string{"location", "device_type"},
aggFields,
)
// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()
Built-in Aggregators ¶
Create built-in aggregation functions:
// Create specific aggregator sumAgg := CreateBuiltinAggregator(Sum) avgAgg := CreateBuiltinAggregator(Avg) countAgg := CreateBuiltinAggregator(Count) // Use aggregator sumAgg.Add(10) sumAgg.Add(20) result := sumAgg.Result() // returns 30
Custom Aggregators ¶
Register custom aggregation functions:
Register("custom_avg", func() AggregatorFunction {
return &CustomAvgAggregator{}
})
Integration ¶
Integrates with other StreamSQL components:
• Functions package - Built-in aggregation function implementations • Stream package - Real-time data aggregation in streams • Window package - Window-based aggregation operations • Types package - Data type definitions and conversions • RSQL package - SQL GROUP BY and aggregation parsing
Index ¶
- Constants
- func Register(name string, constructor func() AggregatorFunction)
- type AggregateType
- type AggregationField
- type Aggregator
- type AggregatorFunction
- type ContextAggregator
- type ExpressionAggregatorWrapper
- type ExpressionEvaluator
- type GroupAggregator
- func (ga *GroupAggregator) Add(data interface{}) error
- func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error)
- func (ga *GroupAggregator) Put(key string, val interface{}) error
- func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, ...)
- func (ga *GroupAggregator) Reset()
Constants ¶
const ( Sum = functions.Sum Count = functions.Count Avg = functions.Avg Max = functions.Max Min = functions.Min StdDev = functions.StdDev Median = functions.Median Percentile = functions.Percentile WindowStart = functions.WindowStart WindowEnd = functions.WindowEnd Collect = functions.Collect LastValue = functions.LastValue MergeAgg = functions.MergeAgg StdDevS = functions.StdDevS Deduplicate = functions.Deduplicate Var = functions.Var VarS = functions.VarS // Analytical functions Lag = functions.Lag Latest = functions.Latest ChangedCol = functions.ChangedCol HadChanged = functions.HadChanged // Expression aggregator for handling custom functions Expression = functions.Expression )
Re-export all aggregate type constants
Variables ¶
This section is empty.
Functions ¶
func Register ¶
func Register(name string, constructor func() AggregatorFunction)
Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator
Types ¶
type AggregateType ¶
type AggregateType = functions.AggregateType
AggregateType aggregate type, re-exports functions.AggregateType
type AggregationField ¶
type AggregationField struct {
InputField string // Input field name (e.g., "temperature")
AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
OutputAlias string // Output alias (e.g., "temp_sum")
}
AggregationField defines configuration for a single aggregation field
type Aggregator ¶
type Aggregator interface {
Add(data interface{}) error
Put(key string, val interface{}) error
GetResults() ([]map[string]interface{}, error)
Reset()
// RegisterExpression registers expression evaluator
RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}
Aggregator aggregator interface
type AggregatorFunction ¶
type AggregatorFunction = functions.LegacyAggregatorFunction
AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction
func CreateBuiltinAggregator ¶
func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction
CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator
type ContextAggregator ¶
type ContextAggregator = functions.ContextAggregator
ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator
type ExpressionAggregatorWrapper ¶
type ExpressionAggregatorWrapper struct {
// contains filtered or unexported fields
}
ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface
func (*ExpressionAggregatorWrapper) Add ¶
func (w *ExpressionAggregatorWrapper) Add(value interface{})
func (*ExpressionAggregatorWrapper) New ¶
func (w *ExpressionAggregatorWrapper) New() AggregatorFunction
func (*ExpressionAggregatorWrapper) Result ¶
func (w *ExpressionAggregatorWrapper) Result() interface{}
type ExpressionEvaluator ¶
type ExpressionEvaluator struct {
Expression string // Complete expression
Field string // Primary field name
Fields []string // All fields referenced in expression
// contains filtered or unexported fields
}
ExpressionEvaluator wraps expression evaluation functionality
type GroupAggregator ¶
type GroupAggregator struct {
// contains filtered or unexported fields
}
func NewGroupAggregator ¶
func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator
NewGroupAggregator creates a new group aggregator
func (*GroupAggregator) Add ¶
func (ga *GroupAggregator) Add(data interface{}) error
func (*GroupAggregator) GetResults ¶
func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error)
func (*GroupAggregator) Put ¶
func (ga *GroupAggregator) Put(key string, val interface{}) error
func (*GroupAggregator) RegisterExpression ¶
func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
RegisterExpression registers expression evaluator
func (*GroupAggregator) Reset ¶
func (ga *GroupAggregator) Reset()