aggregator

package
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package aggregator provides data aggregation functionality for StreamSQL.

This package implements group-based aggregation operations for stream processing, supporting various aggregation functions and expression evaluation. It provides thread-safe aggregation with support for custom expressions and built-in functions.

Core Features

• Group Aggregation - Group data by specified fields and apply aggregation functions • Built-in Functions - Support for Sum, Count, Avg, Max, Min, and more • Expression Support - Custom expression evaluation within aggregations • Thread Safety - Concurrent aggregation operations with proper synchronization • Type Flexibility - Automatic type conversion and validation • Performance Optimized - Efficient memory usage and processing

Aggregation Types

Supported aggregation functions (re-exported from functions package):

// Mathematical aggregations
Sum, Count, Avg, Max, Min
StdDev, StdDevS, Var, VarS
Median, Percentile

// Collection aggregations
Collect, LastValue, MergeAgg
Deduplicate

// Window aggregations
WindowStart, WindowEnd

// Analytical functions
Lag, Latest, ChangedCol, HadChanged

// Custom expressions
Expression

Core Interfaces

Main aggregation interfaces:

type Aggregator interface {
	Add(data interface{}) error
	Put(key string, val interface{}) error
	GetResults() ([]map[string]interface{}, error)
	Reset()
	RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}

type AggregatorFunction interface {
	New() AggregatorFunction
	Add(value interface{})
	Result() interface{}
}

Aggregation Configuration

Field configuration for aggregations:

type AggregationField struct {
	InputField    string        // Source field name
	AggregateType AggregateType // Aggregation function type
	OutputAlias   string        // Result field alias
}

Usage Examples

Basic group aggregation:

// Define aggregation fields
aggFields := []AggregationField{
	{InputField: "temperature", AggregateType: Avg, OutputAlias: "avg_temp"},
	{InputField: "humidity", AggregateType: Max, OutputAlias: "max_humidity"},
	{InputField: "device_id", AggregateType: Count, OutputAlias: "device_count"},
}

// Create group aggregator
aggregator := NewGroupAggregator([]string{"location"}, aggFields)

// Add data
data := map[string]interface{}{
	"location": "room1",
	"temperature": 25.5,
	"humidity": 60,
	"device_id": "sensor001",
}
aggregator.Add(data)

// Get results
results, err := aggregator.GetResults()

Expression-based aggregation:

// Register custom expression
aggregator.RegisterExpression(
	"comfort_index",
	"temperature * 0.7 + humidity * 0.3",
	[]string{"temperature", "humidity"},
	func(data interface{}) (interface{}, error) {
		// Custom evaluation logic
		return evaluateComfortIndex(data)
	},
)

Multiple group aggregation:

// Group by multiple fields
aggregator := NewGroupAggregator(
	[]string{"location", "device_type"},
	aggFields,
)

// Results will be grouped by both location and device_type
results, err := aggregator.GetResults()

Built-in Aggregators

Create built-in aggregation functions:

// Create specific aggregator
sumAgg := CreateBuiltinAggregator(Sum)
avgAgg := CreateBuiltinAggregator(Avg)
countAgg := CreateBuiltinAggregator(Count)

// Use aggregator
sumAgg.Add(10)
sumAgg.Add(20)
result := sumAgg.Result() // returns 30

Custom Aggregators

Register custom aggregation functions:

Register("custom_avg", func() AggregatorFunction {
	return &CustomAvgAggregator{}
})

Integration

Integrates with other StreamSQL components:

• Functions package - Built-in aggregation function implementations • Stream package - Real-time data aggregation in streams • Window package - Window-based aggregation operations • Types package - Data type definitions and conversions • RSQL package - SQL GROUP BY and aggregation parsing

Index

Constants

View Source
const (
	Sum         = functions.Sum
	Count       = functions.Count
	Avg         = functions.Avg
	Max         = functions.Max
	Min         = functions.Min
	StdDev      = functions.StdDev
	Median      = functions.Median
	Percentile  = functions.Percentile
	WindowStart = functions.WindowStart
	WindowEnd   = functions.WindowEnd
	Collect     = functions.Collect
	LastValue   = functions.LastValue
	MergeAgg    = functions.MergeAgg
	StdDevS     = functions.StdDevS
	Deduplicate = functions.Deduplicate
	Var         = functions.Var
	VarS        = functions.VarS
	// Analytical functions
	Lag        = functions.Lag
	Latest     = functions.Latest
	ChangedCol = functions.ChangedCol
	HadChanged = functions.HadChanged
	// Expression aggregator for handling custom functions
	Expression = functions.Expression
)

Re-export all aggregate type constants

Variables

This section is empty.

Functions

func Register

func Register(name string, constructor func() AggregatorFunction)

Register adds custom aggregator to global registry, re-exports functions.RegisterLegacyAggregator

Types

type AggregateType

type AggregateType = functions.AggregateType

AggregateType aggregate type, re-exports functions.AggregateType

type AggregationField

type AggregationField struct {
	InputField    string        // Input field name (e.g., "temperature")
	AggregateType AggregateType // Aggregation type (e.g., Sum, Avg)
	OutputAlias   string        // Output alias (e.g., "temp_sum")
}

AggregationField defines configuration for a single aggregation field

type Aggregator

type Aggregator interface {
	Add(data interface{}) error
	Put(key string, val interface{}) error
	GetResults() ([]map[string]interface{}, error)
	Reset()
	// RegisterExpression registers expression evaluator
	RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))
}

Aggregator aggregator interface

type AggregatorFunction

type AggregatorFunction = functions.LegacyAggregatorFunction

AggregatorFunction aggregator function interface, re-exports functions.LegacyAggregatorFunction

func CreateBuiltinAggregator

func CreateBuiltinAggregator(aggType AggregateType) AggregatorFunction

CreateBuiltinAggregator creates built-in aggregator, re-exports functions.CreateLegacyAggregator

type ContextAggregator

type ContextAggregator = functions.ContextAggregator

ContextAggregator aggregator interface supporting context mechanism, re-exports functions.ContextAggregator

type ExpressionAggregatorWrapper

type ExpressionAggregatorWrapper struct {
	// contains filtered or unexported fields
}

ExpressionAggregatorWrapper wraps expression aggregator to make it compatible with LegacyAggregatorFunction interface

func (*ExpressionAggregatorWrapper) Add

func (w *ExpressionAggregatorWrapper) Add(value interface{})

func (*ExpressionAggregatorWrapper) New

func (*ExpressionAggregatorWrapper) Result

func (w *ExpressionAggregatorWrapper) Result() interface{}

type ExpressionEvaluator

type ExpressionEvaluator struct {
	Expression string   // Complete expression
	Field      string   // Primary field name
	Fields     []string // All fields referenced in expression
	// contains filtered or unexported fields
}

ExpressionEvaluator wraps expression evaluation functionality

type GroupAggregator

type GroupAggregator struct {
	// contains filtered or unexported fields
}

func NewGroupAggregator

func NewGroupAggregator(groupFields []string, aggregationFields []AggregationField) *GroupAggregator

NewGroupAggregator creates a new group aggregator

func (*GroupAggregator) Add

func (ga *GroupAggregator) Add(data interface{}) error

func (*GroupAggregator) GetResults

func (ga *GroupAggregator) GetResults() ([]map[string]interface{}, error)

func (*GroupAggregator) Put

func (ga *GroupAggregator) Put(key string, val interface{}) error

func (*GroupAggregator) RegisterExpression

func (ga *GroupAggregator) RegisterExpression(field, expression string, fields []string, evaluator func(data interface{}) (interface{}, error))

RegisterExpression registers expression evaluator

func (*GroupAggregator) Reset

func (ga *GroupAggregator) Reset()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL