functions

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

README

StreamSQL Functions 模块扩展

概述

本次扩展实现了统一的聚合函数和分析函数管理,简化了自定义函数的扩展过程。现在只需要在 functions 模块中实现函数,就可以自动在 aggregator 模块中使用。

主要改进

1. 统一的函数接口
  • AggregatorFunction: 支持增量计算的聚合函数接口
  • AnalyticalFunction: 支持状态管理的分析函数接口
  • Function: 基础函数接口
2. 自动适配器
  • AggregatorAdapter: 将 functions 模块的聚合函数适配到 aggregator 模块
  • AnalyticalAdapter: 将 functions 模块的分析函数适配到 aggregator 模块
3. 简化的扩展流程

现在添加自定义函数只需要:

  1. 在 functions 模块中实现函数
  2. 注册函数和适配器
  3. 无需修改 aggregator 模块

使用方法

创建自定义聚合函数
// 1. 定义函数结构
type CustomSumFunction struct {
    *BaseFunction
    sum float64
}

// 2. 实现基础接口
func (f *CustomSumFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
    // 实现函数逻辑
}

// 3. 实现AggregatorFunction接口
func (f *CustomSumFunction) New() AggregatorFunction {
    return &CustomSumFunction{BaseFunction: f.BaseFunction}
}

func (f *CustomSumFunction) Add(value interface{}) {
    // 增量计算逻辑
}

func (f *CustomSumFunction) Result() interface{} {
    return f.sum
}

func (f *CustomSumFunction) Reset() {
    f.sum = 0
}

func (f *CustomSumFunction) Clone() AggregatorFunction {
    return &CustomSumFunction{BaseFunction: f.BaseFunction, sum: f.sum}
}

// 4. 注册函数
func init() {
    Register(NewCustomSumFunction())
    RegisterAggregatorAdapter("custom_sum")
}
创建自定义分析函数
// 1. 定义函数结构
type CustomAnalyticalFunction struct {
    *BaseFunction
    state interface{}
}

// 2. 实现基础接口
func (f *CustomAnalyticalFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error) {
    // 实现分析逻辑
}

// 3. 实现AnalyticalFunction接口
func (f *CustomAnalyticalFunction) Reset() {
    f.state = nil
}

func (f *CustomAnalyticalFunction) Clone() AnalyticalFunction {
    return &CustomAnalyticalFunction{BaseFunction: f.BaseFunction, state: f.state}
}

// 4. 注册函数
func init() {
    Register(NewCustomAnalyticalFunction())
    RegisterAnalyticalAdapter("custom_analytical")
}
使用简化的注册方式
// 注册简单的自定义函数
RegisterCustomFunction("double", TypeAggregation, "数学函数", "将值乘以2", 1, 1,
    func(ctx *FunctionContext, args []interface{}) (interface{}, error) {
        val, err := cast.ToFloat64E(args[0])
        if err != nil {
            return nil, err
        }
        return val * 2, nil
    })

内置函数

聚合函数
  • sum: 求和
  • avg: 平均值
  • min: 最小值
  • max: 最大值
  • count: 计数
  • stddev: 标准差
  • median: 中位数
  • percentile: 百分位数
  • collect: 收集所有值
  • last_value: 最后一个值
  • merge_agg: 合并聚合
  • stddevs: 样本标准差
  • deduplicate: 去重
  • var: 总体方差
  • vars: 样本方差
分析函数
  • lag: 滞后函数
  • latest: 最新值
  • changed_col: 变化列
  • had_changed: 是否变化

自定义函数示例

参考 custom_example.go 文件中的示例:

  • CustomProductFunction: 乘积聚合函数
  • CustomGeometricMeanFunction: 几何平均聚合函数
  • CustomMovingAverageFunction: 移动平均分析函数

Documentation

Overview

Package functions provides a comprehensive function registry and execution framework for StreamSQL.

This package implements a unified function management system that supports built-in functions, custom user-defined functions, and specialized aggregation and analytical functions. It serves as the central hub for all function-related operations in SQL expressions and stream processing.

Core Features

• Unified Function Registry - Centralized registration and management of all function types • Plugin Architecture - Runtime registration of custom functions without code modification • Type System - Comprehensive function categorization and type validation • Aggregation Support - Specialized interfaces for incremental aggregation functions • Analytical Functions - Advanced analytical functions with state management • Performance Optimization - Efficient function dispatch and execution • Automatic Adaptation - Seamless integration between function types and aggregator modules

Function Types

The package supports eight distinct function categories:

TypeMath        - Mathematical functions (SIN, COS, SQRT, ABS, etc.)
TypeString      - String manipulation functions (UPPER, LOWER, SUBSTRING, etc.)
TypeConversion  - Type conversion functions (CAST, CONVERT, TO_NUMBER, etc.)
TypeDateTime    - Date and time functions (NOW, DATE_FORMAT, EXTRACT, etc.)
TypeAggregation - Aggregate functions (SUM, AVG, COUNT, MAX, MIN, etc.)
TypeAnalytical  - Analytical functions (ROW_NUMBER, RANK, LAG, LEAD, etc.)
TypeWindow      - Window functions (TUMBLING_WINDOW, SLIDING_WINDOW, etc.)
TypeCustom      - User-defined custom functions

Built-in Functions

Extensive collection of built-in functions across all categories:

// Mathematical functions
ABS(x)          - Absolute value
SQRT(x)         - Square root
POWER(x, y)     - Power operation
ROUND(x, d)     - Round to decimal places

// String functions
UPPER(str)      - Convert to uppercase
LOWER(str)      - Convert to lowercase
LENGTH(str)     - String length
SUBSTRING(str, start, len) - Extract substring

// Aggregation functions
SUM(field)      - Sum of values
AVG(field)      - Average of values
COUNT(*)        - Count of records
MAX(field)      - Maximum value
MIN(field)      - Minimum value

Custom Function Registration

Simple API for registering custom functions:

// Register a simple custom function
RegisterCustomFunction(
	"fahrenheit_to_celsius",
	TypeConversion,
	"Temperature conversion",
	"Convert Fahrenheit to Celsius",
	1, 1, // min and max arguments
	func(ctx *FunctionContext, args []interface{}) (interface{}, error) {
		f := args[0].(float64)
		return (f - 32) * 5 / 9, nil
	},
)

// Register an aggregation function
type CustomSumFunction struct {
	*BaseFunction
	sum float64
}

func (f *CustomSumFunction) Add(value interface{}) {
	if v, ok := value.(float64); ok {
		f.sum += v
	}
}

func (f *CustomSumFunction) Result() interface{} {
	return f.sum
}

Function Interfaces

The package defines several interfaces for different function types:

// Basic function interface
type Function interface {
	GetName() string
	GetType() FunctionType
	Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)
}

// Aggregation function interface
type AggregatorFunction interface {
	Function
	New() AggregatorFunction
	Add(value interface{})
	Result() interface{}
	Reset()
	Clone() AggregatorFunction
}

// Analytical function interface
type AnalyticalFunction interface {
	AggregatorFunction
}

Adapter System

Automatic adaptation between function types and aggregator modules:

// AggregatorAdapter - Adapts functions to aggregator interface
type AggregatorAdapter struct {
	function AggregatorFunction
}

// AnalyticalAdapter - Adapts analytical functions
type AnalyticalAdapter struct {
	function AnalyticalFunction
}

Performance Features

• Function Caching - Efficient function lookup and caching • Lazy Initialization - Functions are initialized only when needed • Batch Processing - Optimized batch execution for aggregation functions • Memory Management - Automatic cleanup and resource management • Type Optimization - Specialized execution paths for common data types

Usage Examples

Basic function usage in SQL:

SELECT UPPER(device_name), ROUND(temperature, 2)
FROM stream
WHERE ABS(temperature - 25) > 5

Aggregation functions with windows:

SELECT device_id,
       AVG(temperature) as avg_temp,
       STDDEV(temperature) as temp_variance
FROM stream
GROUP BY device_id, TumblingWindow('5s')

Custom function in expressions:

SELECT device_id,
       fahrenheit_to_celsius(temperature) as temp_celsius
FROM stream
WHERE fahrenheit_to_celsius(temperature) > 30

Integration

Seamless integration with other StreamSQL components:

• Expr package - Function execution in expressions • Aggregator package - Automatic function adaptation • RSQL package - Function parsing and validation • Stream package - Real-time function execution • Types package - Function context and data type support

Index

Constants

View Source
const (
	SumStr         = string(Sum)
	CountStr       = string(Count)
	AvgStr         = string(Avg)
	MaxStr         = string(Max)
	MinStr         = string(Min)
	MedianStr      = string(Median)
	PercentileStr  = string(Percentile)
	WindowStartStr = string(WindowStart)
	WindowEndStr   = string(WindowEnd)
	CollectStr     = string(Collect)
	FirstValueStr  = string(FirstValue)
	LastValueStr   = string(LastValue)
	MergeAggStr    = string(MergeAgg)
	StdStr         = "std"
	StdDevStr      = string(StdDev)
	StdDevSStr     = string(StdDevS)
	DeduplicateStr = string(Deduplicate)
	VarStr         = string(Var)
	VarSStr        = string(VarS)
	// Analytical functions
	LagStr        = string(Lag)
	LatestStr     = string(Latest)
	ChangedColStr = string(ChangedCol)
	HadChangedStr = string(HadChanged)
	// Expression aggregator
	ExpressionStr = string(Expression)
	// Post-aggregation marker
	PostAggregationStr = string(PostAggregation)
)

String constant versions for convenience

Variables

This section is empty.

Functions

func CreateAnalyticalAggregatorFromFunctions

func CreateAnalyticalAggregatorFromFunctions(funcType string) interface{}

CreateAnalyticalAggregatorFromFunctions creates analytical function aggregator from functions module

func CreateBuiltinAggregatorFromFunctions

func CreateBuiltinAggregatorFromFunctions(aggType string) interface{}

CreateBuiltinAggregatorFromFunctions creates aggregator from functions module

func EvaluateWithBridge

func EvaluateWithBridge(expression string, data map[string]interface{}) (interface{}, error)

便捷函数:直接评估表达式

func Execute

func Execute(name string, ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute executes a function

func GetAggregatorAdapter

func GetAggregatorAdapter(name string) (func() interface{}, bool)

GetAggregatorAdapter gets aggregator adapter

func GetAllAvailableFunctions

func GetAllAvailableFunctions() map[string]interface{}

便捷函数:获取所有可用函数信息

func GetAnalyticalAdapter

func GetAnalyticalAdapter(name string) (func() *AnalyticalAdapter, bool)

GetAnalyticalAdapter gets analytical function adapter

func IsAggregatorFunction added in v0.10.3

func IsAggregatorFunction(name string) bool

IsAggregatorFunction checks if a function name is an aggregator function

func IsUnnestResult

func IsUnnestResult(value interface{}) bool

IsUnnestResult 检查是否为 unnest 结果

func ListAll

func ListAll() map[string]Function

func ProcessUnnestResult

func ProcessUnnestResult(value interface{}) []map[string]interface{}

ProcessUnnestResult 处理 unnest 结果,将其转换为多行

func Register

func Register(fn Function) error

Global function registration and retrieval methods

func RegisterAggregatorAdapter

func RegisterAggregatorAdapter(name string) error

RegisterAggregatorAdapter registers an aggregator adapter

func RegisterAnalyticalAdapter

func RegisterAnalyticalAdapter(name string) error

RegisterAnalyticalAdapter registers analytical function adapter

func RegisterCustomFunction

func RegisterCustomFunction(name string, fnType FunctionType, category, description string,
	minArgs, maxArgs int, executor func(ctx *FunctionContext, args []interface{}) (interface{}, error)) error

RegisterCustomFunction registers a custom function

func RegisterLegacyAggregator

func RegisterLegacyAggregator(name string, constructor func() LegacyAggregatorFunction)

RegisterLegacyAggregator registers legacy aggregator to global registry

func Unregister

func Unregister(name string) bool

func Validate added in v0.10.3

func Validate(name string) error

Validate validates if a function exists in the registry

Types

type AbsFunction

type AbsFunction struct {
	*BaseFunction
}

AbsFunction calculates absolute value

func NewAbsFunction

func NewAbsFunction() *AbsFunction

func (*AbsFunction) Execute

func (f *AbsFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AbsFunction) Validate

func (f *AbsFunction) Validate(args []interface{}) error

type AcosFunction

type AcosFunction struct {
	*BaseFunction
}

AcosFunction calculates arccosine

func NewAcosFunction

func NewAcosFunction() *AcosFunction

func (*AcosFunction) Execute

func (f *AcosFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AcosFunction) Validate

func (f *AcosFunction) Validate(args []interface{}) error

type AggregateType

type AggregateType string

AggregateType defines aggregate types, migrated from aggregator.AggregateType

const (
	Sum         AggregateType = "sum"
	Count       AggregateType = "count"
	Avg         AggregateType = "avg"
	Max         AggregateType = "max"
	Min         AggregateType = "min"
	Median      AggregateType = "median"
	Percentile  AggregateType = "percentile"
	WindowStart AggregateType = "window_start"
	WindowEnd   AggregateType = "window_end"
	Collect     AggregateType = "collect"
	FirstValue  AggregateType = "first_value"
	LastValue   AggregateType = "last_value"
	MergeAgg    AggregateType = "merge_agg"
	StdDev      AggregateType = "stddev"
	StdDevS     AggregateType = "stddevs"
	Deduplicate AggregateType = "deduplicate"
	Var         AggregateType = "var"
	VarS        AggregateType = "vars"
	// Analytical functions
	Lag        AggregateType = "lag"
	Latest     AggregateType = "latest"
	ChangedCol AggregateType = "changed_col"
	HadChanged AggregateType = "had_changed"
	// Expression aggregator for handling custom functions
	Expression AggregateType = "expression"
	// Post-aggregation marker for fields that need post-processing
	PostAggregation AggregateType = "post_aggregation"
)

type AggregatorAdapter

type AggregatorAdapter struct {
	// contains filtered or unexported fields
}

AggregatorAdapter provides adapter for aggregator functions, compatible with legacy aggregator interface

func NewAggregatorAdapter

func NewAggregatorAdapter(name string) (*AggregatorAdapter, error)

NewAggregatorAdapter creates an aggregator adapter

func (*AggregatorAdapter) Add

func (a *AggregatorAdapter) Add(value interface{})

Add adds a value

func (*AggregatorAdapter) GetFunctionName

func (a *AggregatorAdapter) GetFunctionName() string

GetFunctionName returns the underlying function name for context mechanism support

func (*AggregatorAdapter) New

func (a *AggregatorAdapter) New() interface{}

New creates a new aggregator instance

func (*AggregatorAdapter) Result

func (a *AggregatorAdapter) Result() interface{}

Result returns the result

type AggregatorFunction

type AggregatorFunction interface {
	Function
	// New creates a new aggregator instance
	New() AggregatorFunction
	// Add adds a value for incremental computation
	Add(value interface{})
	// Result returns the aggregation result
	Result() interface{}
	// Reset resets the aggregator state
	Reset()
	// Clone clones the aggregator (used for window functions and similar scenarios)
	Clone() AggregatorFunction
}

AggregatorFunction defines the interface for aggregator functions that support incremental computation

func CreateAggregator

func CreateAggregator(name string) (AggregatorFunction, error)

CreateAggregator creates an aggregator instance

func CreateParameterizedAggregator added in v0.10.3

func CreateParameterizedAggregator(name string, args []interface{}) (AggregatorFunction, error)

CreateParameterizedAggregator creates a parameterized aggregator instance with initialization

type AnalyticalAdapter

type AnalyticalAdapter struct {
	// contains filtered or unexported fields
}

AnalyticalAdapter provides adapter for analytical functions

func CreateAnalyticalFromFunctions

func CreateAnalyticalFromFunctions(funcType string) *AnalyticalAdapter

CreateAnalyticalFromFunctions creates analytical function from functions module

func NewAnalyticalAdapter

func NewAnalyticalAdapter(name string) (*AnalyticalAdapter, error)

NewAnalyticalAdapter creates an analytical function adapter

func (*AnalyticalAdapter) Clone

Clone clones the instance

func (*AnalyticalAdapter) Execute

func (a *AnalyticalAdapter) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute executes the analytical function

func (*AnalyticalAdapter) Reset

func (a *AnalyticalAdapter) Reset()

Reset resets the state

type AnalyticalAggregatorAdapter

type AnalyticalAggregatorAdapter struct {
	// contains filtered or unexported fields
}

AnalyticalAggregatorAdapter provides adapter from analytical functions to aggregators

func NewAnalyticalAggregatorAdapter

func NewAnalyticalAggregatorAdapter(name string) (*AnalyticalAggregatorAdapter, error)

NewAnalyticalAggregatorAdapter creates an analytical function aggregator adapter

func (*AnalyticalAggregatorAdapter) Add

func (a *AnalyticalAggregatorAdapter) Add(value interface{})

Add adds a value

func (*AnalyticalAggregatorAdapter) New

func (a *AnalyticalAggregatorAdapter) New() interface{}

New creates a new adapter instance

func (*AnalyticalAggregatorAdapter) Result

func (a *AnalyticalAggregatorAdapter) Result() interface{}

Result returns the result

type AnalyticalAggregatorWrapper

type AnalyticalAggregatorWrapper struct {
	// contains filtered or unexported fields
}

AnalyticalAggregatorWrapper wraps functions module analytical function aggregator to make it compatible with original interface

func (*AnalyticalAggregatorWrapper) Add

func (w *AnalyticalAggregatorWrapper) Add(value interface{})

func (*AnalyticalAggregatorWrapper) New

func (*AnalyticalAggregatorWrapper) Result

func (w *AnalyticalAggregatorWrapper) Result() interface{}

type AnalyticalFunction

type AnalyticalFunction interface {
	AggregatorFunction
}

AnalyticalFunction defines the interface for analytical functions with state management Now inherits from AggregatorFunction to support incremental computation

func CreateAnalytical

func CreateAnalytical(name string) (AnalyticalFunction, error)

CreateAnalytical creates an analytical function instance

type ArrayContainsFunction

type ArrayContainsFunction struct {
	*BaseFunction
}

ArrayContainsFunction checks if array contains specified value

func NewArrayContainsFunction

func NewArrayContainsFunction() *ArrayContainsFunction

func (*ArrayContainsFunction) Execute

func (f *ArrayContainsFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayContainsFunction) Validate

func (f *ArrayContainsFunction) Validate(args []interface{}) error

type ArrayDistinctFunction

type ArrayDistinctFunction struct {
	*BaseFunction
}

ArrayDistinctFunction 数组去重

func NewArrayDistinctFunction

func NewArrayDistinctFunction() *ArrayDistinctFunction

func (*ArrayDistinctFunction) Execute

func (f *ArrayDistinctFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayDistinctFunction) Validate

func (f *ArrayDistinctFunction) Validate(args []interface{}) error

type ArrayExceptFunction

type ArrayExceptFunction struct {
	*BaseFunction
}

ArrayExceptFunction 数组差集

func NewArrayExceptFunction

func NewArrayExceptFunction() *ArrayExceptFunction

func (*ArrayExceptFunction) Execute

func (f *ArrayExceptFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayExceptFunction) Validate

func (f *ArrayExceptFunction) Validate(args []interface{}) error

type ArrayIntersectFunction

type ArrayIntersectFunction struct {
	*BaseFunction
}

ArrayIntersectFunction 数组交集

func NewArrayIntersectFunction

func NewArrayIntersectFunction() *ArrayIntersectFunction

func (*ArrayIntersectFunction) Execute

func (f *ArrayIntersectFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayIntersectFunction) Validate

func (f *ArrayIntersectFunction) Validate(args []interface{}) error

type ArrayLengthFunction

type ArrayLengthFunction struct {
	*BaseFunction
}

ArrayLengthFunction returns array length

func NewArrayLengthFunction

func NewArrayLengthFunction() *ArrayLengthFunction

func (*ArrayLengthFunction) Execute

func (f *ArrayLengthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayLengthFunction) Validate

func (f *ArrayLengthFunction) Validate(args []interface{}) error

type ArrayPositionFunction

type ArrayPositionFunction struct {
	*BaseFunction
}

ArrayPositionFunction returns position of value in array

func NewArrayPositionFunction

func NewArrayPositionFunction() *ArrayPositionFunction

func (*ArrayPositionFunction) Execute

func (f *ArrayPositionFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayPositionFunction) Validate

func (f *ArrayPositionFunction) Validate(args []interface{}) error

type ArrayRemoveFunction

type ArrayRemoveFunction struct {
	*BaseFunction
}

ArrayRemoveFunction removes specified value from array

func NewArrayRemoveFunction

func NewArrayRemoveFunction() *ArrayRemoveFunction

func (*ArrayRemoveFunction) Execute

func (f *ArrayRemoveFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayRemoveFunction) Validate

func (f *ArrayRemoveFunction) Validate(args []interface{}) error

type ArrayUnionFunction

type ArrayUnionFunction struct {
	*BaseFunction
}

ArrayUnionFunction 数组并集

func NewArrayUnionFunction

func NewArrayUnionFunction() *ArrayUnionFunction

func (*ArrayUnionFunction) Execute

func (f *ArrayUnionFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ArrayUnionFunction) Validate

func (f *ArrayUnionFunction) Validate(args []interface{}) error

type AsinFunction

type AsinFunction struct {
	*BaseFunction
}

AsinFunction calculates arcsine

func NewAsinFunction

func NewAsinFunction() *AsinFunction

func (*AsinFunction) Execute

func (f *AsinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AsinFunction) Validate

func (f *AsinFunction) Validate(args []interface{}) error

type Atan2Function

type Atan2Function struct {
	*BaseFunction
}

Atan2Function 两个参数的反正切函数

func NewAtan2Function

func NewAtan2Function() *Atan2Function

func (*Atan2Function) Execute

func (f *Atan2Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Atan2Function) Validate

func (f *Atan2Function) Validate(args []interface{}) error

type AtanFunction

type AtanFunction struct {
	*BaseFunction
}

AtanFunction 反正切函数

func NewAtanFunction

func NewAtanFunction() *AtanFunction

func (*AtanFunction) Execute

func (f *AtanFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AtanFunction) Validate

func (f *AtanFunction) Validate(args []interface{}) error

type AvgFunction

type AvgFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

AvgFunction calculates the average of numeric values

func NewAvgFunction

func NewAvgFunction() *AvgFunction

func (*AvgFunction) Add

func (f *AvgFunction) Add(value interface{})

func (*AvgFunction) Clone

func (f *AvgFunction) Clone() AggregatorFunction

func (*AvgFunction) Execute

func (f *AvgFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*AvgFunction) New

实现AggregatorFunction接口

func (*AvgFunction) Reset

func (f *AvgFunction) Reset()

func (*AvgFunction) Result

func (f *AvgFunction) Result() interface{}

func (*AvgFunction) Validate

func (f *AvgFunction) Validate(args []interface{}) error

type BaseFunction

type BaseFunction struct {
	// contains filtered or unexported fields
}

BaseFunction provides basic function implementation with common functionality

func NewBaseFunction

func NewBaseFunction(name string, fnType FunctionType, category, description string, minArgs, maxArgs int) *BaseFunction

NewBaseFunction creates a new base function

func NewBaseFunctionWithAliases added in v0.10.2

func NewBaseFunctionWithAliases(name string, fnType FunctionType, category, description string, minArgs, maxArgs int, aliases []string) *BaseFunction

NewBaseFunctionWithAliases creates base function with aliases

func (*BaseFunction) GetAliases added in v0.10.2

func (bf *BaseFunction) GetAliases() []string

GetAliases returns function alias list

func (*BaseFunction) GetCategory

func (bf *BaseFunction) GetCategory() string

func (*BaseFunction) GetDescription

func (bf *BaseFunction) GetDescription() string

func (*BaseFunction) GetMaxArgs added in v0.10.3

func (bf *BaseFunction) GetMaxArgs() int

GetMaxArgs returns the maximum number of arguments (-1 means unlimited)

func (*BaseFunction) GetMinArgs added in v0.10.3

func (bf *BaseFunction) GetMinArgs() int

GetMinArgs returns the minimum number of arguments

func (*BaseFunction) GetName

func (bf *BaseFunction) GetName() string

func (*BaseFunction) GetType

func (bf *BaseFunction) GetType() FunctionType

func (*BaseFunction) ValidateArgCount

func (bf *BaseFunction) ValidateArgCount(args []interface{}) error

ValidateArgCount validates the number of arguments

type BitAndFunction

type BitAndFunction struct {
	*BaseFunction
}

BitAndFunction 按位与函数

func NewBitAndFunction

func NewBitAndFunction() *BitAndFunction

func (*BitAndFunction) Execute

func (f *BitAndFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitAndFunction) Validate

func (f *BitAndFunction) Validate(args []interface{}) error

type BitNotFunction

type BitNotFunction struct {
	*BaseFunction
}

BitNotFunction 按位非函数

func NewBitNotFunction

func NewBitNotFunction() *BitNotFunction

func (*BitNotFunction) Execute

func (f *BitNotFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitNotFunction) Validate

func (f *BitNotFunction) Validate(args []interface{}) error

type BitOrFunction

type BitOrFunction struct {
	*BaseFunction
}

BitOrFunction 按位或函数

func NewBitOrFunction

func NewBitOrFunction() *BitOrFunction

func (*BitOrFunction) Execute

func (f *BitOrFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitOrFunction) Validate

func (f *BitOrFunction) Validate(args []interface{}) error

type BitXorFunction

type BitXorFunction struct {
	*BaseFunction
}

BitXorFunction 按位异或函数

func NewBitXorFunction

func NewBitXorFunction() *BitXorFunction

func (*BitXorFunction) Execute

func (f *BitXorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*BitXorFunction) Validate

func (f *BitXorFunction) Validate(args []interface{}) error

type CaseWhenFunction

type CaseWhenFunction struct {
	*BaseFunction
}

CaseWhenFunction CASE WHEN表达式

func NewCaseWhenFunction

func NewCaseWhenFunction() *CaseWhenFunction

func (*CaseWhenFunction) Execute

func (f *CaseWhenFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CaseWhenFunction) Validate

func (f *CaseWhenFunction) Validate(args []interface{}) error

type CastFunction

type CastFunction struct {
	*BaseFunction
}

CastFunction performs type conversion

func NewCastFunction

func NewCastFunction() *CastFunction

func (*CastFunction) Execute

func (f *CastFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CastFunction) Validate

func (f *CastFunction) Validate(args []interface{}) error

type CeilingFunction

type CeilingFunction struct {
	*BaseFunction
}

CeilingFunction 向上取整函数

func NewCeilingFunction

func NewCeilingFunction() *CeilingFunction

func (*CeilingFunction) Execute

func (f *CeilingFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CeilingFunction) Validate

func (f *CeilingFunction) Validate(args []interface{}) error

type ChangedColFunction

type ChangedColFunction struct {
	*BaseFunction
	PreviousValues map[string]interface{}
}

ChangedColFunction 变化列函数 - 返回发生变化的列名

func NewChangedColFunction

func NewChangedColFunction() *ChangedColFunction

func (*ChangedColFunction) Add

func (f *ChangedColFunction) Add(value interface{})

func (*ChangedColFunction) Clone

func (*ChangedColFunction) Execute

func (f *ChangedColFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ChangedColFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*ChangedColFunction) Reset

func (f *ChangedColFunction) Reset()

func (*ChangedColFunction) Result

func (f *ChangedColFunction) Result() interface{}

func (*ChangedColFunction) Validate

func (f *ChangedColFunction) Validate(args []interface{}) error

type ChrFunction

type ChrFunction struct {
	*BaseFunction
}

ChrFunction 返回对应ASCII字符

func NewChrFunction

func NewChrFunction() *ChrFunction

func (*ChrFunction) Execute

func (f *ChrFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ChrFunction) Validate

func (f *ChrFunction) Validate(args []interface{}) error

type CoalesceFunction

type CoalesceFunction struct {
	*BaseFunction
}

CoalesceFunction returns first non-NULL value

func NewCoalesceFunction

func NewCoalesceFunction() *CoalesceFunction

func (*CoalesceFunction) Execute

func (f *CoalesceFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CoalesceFunction) Validate

func (f *CoalesceFunction) Validate(args []interface{}) error

type CollectAggregatorFunction

type CollectAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为CollectFunction添加AggregatorFunction接口实现

func NewCollectAggregatorFunction

func NewCollectAggregatorFunction() *CollectAggregatorFunction

func (*CollectAggregatorFunction) Add

func (f *CollectAggregatorFunction) Add(value interface{})

func (*CollectAggregatorFunction) Clone

func (*CollectAggregatorFunction) Execute

func (f *CollectAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CollectAggregatorFunction) New

func (*CollectAggregatorFunction) Reset

func (f *CollectAggregatorFunction) Reset()

func (*CollectAggregatorFunction) Result

func (f *CollectAggregatorFunction) Result() interface{}

func (*CollectAggregatorFunction) Validate

func (f *CollectAggregatorFunction) Validate(args []interface{}) error

type CollectFunction

type CollectFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CollectFunction 收集函数 - 获取当前窗口所有消息的列值组成的数组

func NewCollectFunction

func NewCollectFunction() *CollectFunction

func (*CollectFunction) Add

func (f *CollectFunction) Add(value interface{})

func (*CollectFunction) Clone

func (*CollectFunction) Execute

func (f *CollectFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CollectFunction) New

实现AggregatorFunction接口

func (*CollectFunction) Reset

func (f *CollectFunction) Reset()

func (*CollectFunction) Result

func (f *CollectFunction) Result() interface{}

func (*CollectFunction) Validate

func (f *CollectFunction) Validate(args []interface{}) error

type ConcatFunction

type ConcatFunction struct {
	*BaseFunction
}

ConcatFunction concatenates multiple strings

func NewConcatFunction

func NewConcatFunction() *ConcatFunction

func (*ConcatFunction) Execute

func (f *ConcatFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ConcatFunction) Validate

func (f *ConcatFunction) Validate(args []interface{}) error

type ContextAggregator

type ContextAggregator interface {
	GetContextKey() string
}

ContextAggregator defines aggregator interface that supports context mechanism

type ConvertTzFunction

type ConvertTzFunction struct {
	*BaseFunction
}

ConvertTzFunction 时区转换函数

func NewConvertTzFunction

func NewConvertTzFunction() *ConvertTzFunction

func (*ConvertTzFunction) Execute

func (f *ConvertTzFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ConvertTzFunction) Validate

func (f *ConvertTzFunction) Validate(args []interface{}) error

type CosFunction

type CosFunction struct {
	*BaseFunction
}

CosFunction 余弦函数

func NewCosFunction

func NewCosFunction() *CosFunction

func (*CosFunction) Execute

func (f *CosFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CosFunction) Validate

func (f *CosFunction) Validate(args []interface{}) error

type CoshFunction

type CoshFunction struct {
	*BaseFunction
}

CoshFunction 双曲余弦函数

func NewCoshFunction

func NewCoshFunction() *CoshFunction

func (*CoshFunction) Execute

func (f *CoshFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CoshFunction) Validate

func (f *CoshFunction) Validate(args []interface{}) error

type CountFunction

type CountFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CountFunction 计数函数

func NewCountFunction

func NewCountFunction() *CountFunction

func (*CountFunction) Add

func (f *CountFunction) Add(value interface{})

func (*CountFunction) Clone

func (f *CountFunction) Clone() AggregatorFunction

func (*CountFunction) Execute

func (f *CountFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CountFunction) New

实现AggregatorFunction接口

func (*CountFunction) Reset

func (f *CountFunction) Reset()

func (*CountFunction) Result

func (f *CountFunction) Result() interface{}

func (*CountFunction) Validate

func (f *CountFunction) Validate(args []interface{}) error

type CurrentDateFunction

type CurrentDateFunction struct {
	*BaseFunction
}

CurrentDateFunction returns current date

func NewCurrentDateFunction

func NewCurrentDateFunction() *CurrentDateFunction

func (*CurrentDateFunction) Execute

func (f *CurrentDateFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CurrentDateFunction) Validate

func (f *CurrentDateFunction) Validate(args []interface{}) error

type CurrentTimeFunction

type CurrentTimeFunction struct {
	*BaseFunction
}

CurrentTimeFunction returns current time

func NewCurrentTimeFunction

func NewCurrentTimeFunction() *CurrentTimeFunction

func (*CurrentTimeFunction) Execute

func (f *CurrentTimeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CurrentTimeFunction) Validate

func (f *CurrentTimeFunction) Validate(args []interface{}) error

type CustomFunction

type CustomFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

CustomFunction implements custom function

func (*CustomFunction) Execute

func (f *CustomFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*CustomFunction) Validate

func (f *CustomFunction) Validate(args []interface{}) error

type DateAddFunction

type DateAddFunction struct {
	*BaseFunction
}

DateAddFunction performs date addition

func NewDateAddFunction

func NewDateAddFunction() *DateAddFunction

func (*DateAddFunction) Execute

func (f *DateAddFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateAddFunction) Validate

func (f *DateAddFunction) Validate(args []interface{}) error

type DateDiffFunction

type DateDiffFunction struct {
	*BaseFunction
}

DateDiffFunction 日期差函数

func NewDateDiffFunction

func NewDateDiffFunction() *DateDiffFunction

func (*DateDiffFunction) Execute

func (f *DateDiffFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateDiffFunction) Validate

func (f *DateDiffFunction) Validate(args []interface{}) error

type DateFormatFunction

type DateFormatFunction struct {
	*BaseFunction
}

DateFormatFunction 日期格式化函数

func NewDateFormatFunction

func NewDateFormatFunction() *DateFormatFunction

func (*DateFormatFunction) Execute

func (f *DateFormatFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateFormatFunction) Validate

func (f *DateFormatFunction) Validate(args []interface{}) error

type DateParseFunction

type DateParseFunction struct {
	*BaseFunction
}

DateParseFunction 日期解析函数

func NewDateParseFunction

func NewDateParseFunction() *DateParseFunction

func (*DateParseFunction) Execute

func (f *DateParseFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateParseFunction) Validate

func (f *DateParseFunction) Validate(args []interface{}) error

type DateSubFunction

type DateSubFunction struct {
	*BaseFunction
}

DateSubFunction 日期减法函数

func NewDateSubFunction

func NewDateSubFunction() *DateSubFunction

func (*DateSubFunction) Execute

func (f *DateSubFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DateSubFunction) Validate

func (f *DateSubFunction) Validate(args []interface{}) error

type DayFunction

type DayFunction struct {
	*BaseFunction
}

DayFunction 提取日期函数

func NewDayFunction

func NewDayFunction() *DayFunction

func (*DayFunction) Execute

func (f *DayFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DayFunction) Validate

func (f *DayFunction) Validate(args []interface{}) error

type DayOfWeekFunction

type DayOfWeekFunction struct {
	*BaseFunction
}

DayOfWeekFunction 获取星期几函数

func NewDayOfWeekFunction

func NewDayOfWeekFunction() *DayOfWeekFunction

func (*DayOfWeekFunction) Execute

func (f *DayOfWeekFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DayOfWeekFunction) Validate

func (f *DayOfWeekFunction) Validate(args []interface{}) error

type DayOfYearFunction

type DayOfYearFunction struct {
	*BaseFunction
}

DayOfYearFunction 获取一年中的第几天函数

func NewDayOfYearFunction

func NewDayOfYearFunction() *DayOfYearFunction

func (*DayOfYearFunction) Execute

func (f *DayOfYearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DayOfYearFunction) Validate

func (f *DayOfYearFunction) Validate(args []interface{}) error

type Dec2HexFunction

type Dec2HexFunction struct {
	*BaseFunction
}

Dec2HexFunction converts decimal to hexadecimal

func NewDec2HexFunction

func NewDec2HexFunction() *Dec2HexFunction

func (*Dec2HexFunction) Execute

func (f *Dec2HexFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Dec2HexFunction) Validate

func (f *Dec2HexFunction) Validate(args []interface{}) error

type DecodeFunction

type DecodeFunction struct {
	*BaseFunction
}

DecodeFunction 将编码的字符串解码为原始数据

func NewDecodeFunction

func NewDecodeFunction() *DecodeFunction

func (*DecodeFunction) Execute

func (f *DecodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DecodeFunction) Validate

func (f *DecodeFunction) Validate(args []interface{}) error

type DeduplicateAggregatorFunction

type DeduplicateAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为DeduplicateFunction添加AggregatorFunction接口实现

func NewDeduplicateAggregatorFunction

func NewDeduplicateAggregatorFunction() *DeduplicateAggregatorFunction

func (*DeduplicateAggregatorFunction) Add

func (f *DeduplicateAggregatorFunction) Add(value interface{})

func (*DeduplicateAggregatorFunction) Clone

func (*DeduplicateAggregatorFunction) Execute

func (f *DeduplicateAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DeduplicateAggregatorFunction) New

func (*DeduplicateAggregatorFunction) Reset

func (f *DeduplicateAggregatorFunction) Reset()

func (*DeduplicateAggregatorFunction) Result

func (f *DeduplicateAggregatorFunction) Result() interface{}

func (*DeduplicateAggregatorFunction) Validate

func (f *DeduplicateAggregatorFunction) Validate(args []interface{}) error

type DeduplicateFunction

type DeduplicateFunction struct {
	*BaseFunction
}

DeduplicateFunction 去重函数

func NewDeduplicateFunction

func NewDeduplicateFunction() *DeduplicateFunction

func (*DeduplicateFunction) Execute

func (f *DeduplicateFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*DeduplicateFunction) Validate

func (f *DeduplicateFunction) Validate(args []interface{}) error

type EncodeFunction

type EncodeFunction struct {
	*BaseFunction
}

EncodeFunction 将输入值编码为指定格式的字符串

func NewEncodeFunction

func NewEncodeFunction() *EncodeFunction

func (*EncodeFunction) Execute

func (f *EncodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*EncodeFunction) Validate

func (f *EncodeFunction) Validate(args []interface{}) error

type EndswithFunction

type EndswithFunction struct {
	*BaseFunction
}

EndswithFunction 检查字符串是否以指定后缀结尾

func NewEndswithFunction

func NewEndswithFunction() *EndswithFunction

func (*EndswithFunction) Execute

func (f *EndswithFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*EndswithFunction) Validate

func (f *EndswithFunction) Validate(args []interface{}) error

type ExpFunction

type ExpFunction struct {
	*BaseFunction
}

ExpFunction 指数函数

func NewExpFunction

func NewExpFunction() *ExpFunction

func (*ExpFunction) Execute

func (f *ExpFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExpFunction) Validate

func (f *ExpFunction) Validate(args []interface{}) error

type ExprBridge

type ExprBridge struct {
	// contains filtered or unexported fields
}

ExprBridge bridges StreamSQL function system with expr-lang/expr

func GetExprBridge

func GetExprBridge() *ExprBridge

GetExprBridge 获取全局桥接器实例

func NewExprBridge

func NewExprBridge() *ExprBridge

NewExprBridge creates new expression bridge

func (*ExprBridge) CompileExpressionWithStreamSQLFunctions

func (bridge *ExprBridge) CompileExpressionWithStreamSQLFunctions(expression string, dataType interface{}) (*vm.Program, error)

CompileExpressionWithStreamSQLFunctions 编译表达式,包含StreamSQL函数

func (*ExprBridge) ContainsBacktickIdentifiers added in v0.10.2

func (bridge *ExprBridge) ContainsBacktickIdentifiers(expression string) bool

ContainsBacktickIdentifiers 检查表达式是否包含反引号标识符

func (*ExprBridge) ContainsIsNullOperator

func (bridge *ExprBridge) ContainsIsNullOperator(expression string) bool

ContainsIsNullOperator 检查表达式是否包含IS NULL或IS NOT NULL操作符

func (*ExprBridge) ContainsLikeOperator

func (bridge *ExprBridge) ContainsLikeOperator(expression string) bool

ContainsLikeOperator 检查表达式是否包含LIKE操作符

func (*ExprBridge) CreateEnhancedExprEnvironment

func (bridge *ExprBridge) CreateEnhancedExprEnvironment(data map[string]interface{}) map[string]interface{}

CreateEnhancedExprEnvironment creates enhanced expr execution environment

func (*ExprBridge) EvaluateExpression

func (bridge *ExprBridge) EvaluateExpression(expression string, data map[string]interface{}) (interface{}, error)

EvaluateExpression 评估表达式,自动选择最合适的引擎

func (*ExprBridge) GetFunctionInfo

func (bridge *ExprBridge) GetFunctionInfo() map[string]interface{}

GetFunctionInfo 获取函数信息,统一两个系统的函数

func (*ExprBridge) IsExprLangFunction

func (bridge *ExprBridge) IsExprLangFunction(name string) bool

IsExprLangFunction 检查函数名是否是expr-lang内置函数

func (*ExprBridge) PreprocessBacktickIdentifiers added in v0.10.2

func (bridge *ExprBridge) PreprocessBacktickIdentifiers(expression string) (string, error)

PreprocessBacktickIdentifiers 预处理反引号标识符,去除反引号

func (*ExprBridge) PreprocessIsNullExpression

func (bridge *ExprBridge) PreprocessIsNullExpression(expression string) (string, error)

PreprocessIsNullExpression 预处理IS NULL和IS NOT NULL表达式,转换为expr-lang可理解的表达式

func (*ExprBridge) PreprocessLikeExpression

func (bridge *ExprBridge) PreprocessLikeExpression(expression string) (string, error)

PreprocessLikeExpression 预处理LIKE表达式,转换为expr-lang可理解的函数调用

func (*ExprBridge) RegisterStreamSQLFunctionsToExpr

func (bridge *ExprBridge) RegisterStreamSQLFunctionsToExpr() []expr.Option

RegisterStreamSQLFunctionsToExpr registers StreamSQL functions to expr environment

func (*ExprBridge) ResolveFunction

func (bridge *ExprBridge) ResolveFunction(name string) (interface{}, bool, string)

ResolveFunction 解析函数调用,优先使用StreamSQL函数

type ExprFunction

type ExprFunction struct {
	*BaseFunction
}

ExprFunction expr函数,用于在SQL中执行表达式

func NewExprFunction

func NewExprFunction() *ExprFunction

func (*ExprFunction) Execute

func (f *ExprFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExprFunction) Validate

func (f *ExprFunction) Validate(args []interface{}) error

type ExpressionAggregatorFunction

type ExpressionAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

ExpressionAggregatorFunction 表达式聚合器函数 - 用于处理非聚合函数在聚合查询中的情况

func NewExpressionAggregatorFunction

func NewExpressionAggregatorFunction() *ExpressionAggregatorFunction

func (*ExpressionAggregatorFunction) Add

func (f *ExpressionAggregatorFunction) Add(value interface{})

func (*ExpressionAggregatorFunction) Clone

func (*ExpressionAggregatorFunction) Execute

func (f *ExpressionAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExpressionAggregatorFunction) New

实现AggregatorFunction接口

func (*ExpressionAggregatorFunction) Reset

func (f *ExpressionAggregatorFunction) Reset()

func (*ExpressionAggregatorFunction) Result

func (f *ExpressionAggregatorFunction) Result() interface{}

func (*ExpressionAggregatorFunction) Validate

func (f *ExpressionAggregatorFunction) Validate(args []interface{}) error

type ExpressionFunction

type ExpressionFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

ExpressionFunction 表达式函数,用于处理自定义表达式

func NewExpressionFunction

func NewExpressionFunction() *ExpressionFunction

func (*ExpressionFunction) Add

func (f *ExpressionFunction) Add(value interface{})

func (*ExpressionFunction) Clone

func (*ExpressionFunction) Execute

func (f *ExpressionFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExpressionFunction) New

实现AggregatorFunction接口

func (*ExpressionFunction) Reset

func (f *ExpressionFunction) Reset()

func (*ExpressionFunction) Result

func (f *ExpressionFunction) Result() interface{}

func (*ExpressionFunction) Validate

func (f *ExpressionFunction) Validate(args []interface{}) error

type ExtractFunction

type ExtractFunction struct {
	*BaseFunction
}

ExtractFunction 提取日期部分函数

func NewExtractFunction

func NewExtractFunction() *ExtractFunction

func (*ExtractFunction) Execute

func (f *ExtractFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ExtractFunction) Validate

func (f *ExtractFunction) Validate(args []interface{}) error

type FirstValueFunction

type FirstValueFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

FirstValueFunction 首个值函数 - 返回组中第一行的值

func NewFirstValueFunction

func NewFirstValueFunction() *FirstValueFunction

func (*FirstValueFunction) Add

func (f *FirstValueFunction) Add(value interface{})

func (*FirstValueFunction) Clone

func (*FirstValueFunction) Execute

func (f *FirstValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FirstValueFunction) New

实现AggregatorFunction接口

func (*FirstValueFunction) Reset

func (f *FirstValueFunction) Reset()

func (*FirstValueFunction) Result

func (f *FirstValueFunction) Result() interface{}

func (*FirstValueFunction) Validate

func (f *FirstValueFunction) Validate(args []interface{}) error

type FloorFunction

type FloorFunction struct {
	*BaseFunction
}

FloorFunction 向下取整函数

func NewFloorFunction

func NewFloorFunction() *FloorFunction

func (*FloorFunction) Execute

func (f *FloorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FloorFunction) Validate

func (f *FloorFunction) Validate(args []interface{}) error

type FormatFunction

type FormatFunction struct {
	*BaseFunction
}

FormatFunction 格式化函数

func NewFormatFunction

func NewFormatFunction() *FormatFunction

func (*FormatFunction) Execute

func (f *FormatFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FormatFunction) Validate

func (f *FormatFunction) Validate(args []interface{}) error

type FromJsonFunction

type FromJsonFunction struct {
	*BaseFunction
}

FromJsonFunction parses value from JSON string

func NewFromJsonFunction

func NewFromJsonFunction() *FromJsonFunction

func (*FromJsonFunction) Execute

func (f *FromJsonFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FromJsonFunction) Validate

func (f *FromJsonFunction) Validate(args []interface{}) error

type FromUnixtimeFunction

type FromUnixtimeFunction struct {
	*BaseFunction
}

FromUnixtimeFunction 从Unix时间戳转换函数

func NewFromUnixtimeFunction

func NewFromUnixtimeFunction() *FromUnixtimeFunction

func (*FromUnixtimeFunction) Execute

func (f *FromUnixtimeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*FromUnixtimeFunction) Validate

func (f *FromUnixtimeFunction) Validate(args []interface{}) error

type Function

type Function interface {
	// GetName returns the function name
	GetName() string
	// GetType returns the function type
	GetType() FunctionType
	// GetCategory returns the function category
	GetCategory() string
	// GetAliases returns the function aliases
	GetAliases() []string
	// Validate validates the arguments
	Validate(args []interface{}) error
	// Execute executes the function
	Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)
	// GetDescription returns the function description
	GetDescription() string

	// GetMinArgs returns the minimum number of arguments
	GetMinArgs() int
	// GetMaxArgs returns the maximum number of arguments (-1 means unlimited)
	GetMaxArgs() int
}

Function defines the interface for all functions

func Get

func Get(name string) (Function, bool)

func GetByType

func GetByType(fnType FunctionType) []Function

type FunctionAggregatorWrapper

type FunctionAggregatorWrapper struct {
	// contains filtered or unexported fields
}

FunctionAggregatorWrapper wraps functions module aggregator to make it compatible with original interface

func (*FunctionAggregatorWrapper) Add

func (w *FunctionAggregatorWrapper) Add(value interface{})

func (*FunctionAggregatorWrapper) GetContextKey

func (w *FunctionAggregatorWrapper) GetContextKey() string

Implements ContextAggregator interface, supports context mechanism for window functions

func (*FunctionAggregatorWrapper) New

func (*FunctionAggregatorWrapper) Result

func (w *FunctionAggregatorWrapper) Result() interface{}

type FunctionContext

type FunctionContext struct {
	// Current data row
	Data map[string]interface{}
	// Window information (if applicable)
	WindowInfo *WindowInfo
	// Additional context information
	Extra map[string]interface{}
}

FunctionContext represents the execution context for functions

type FunctionRegistry

type FunctionRegistry struct {
	// contains filtered or unexported fields
}

FunctionRegistry manages function registration and retrieval

func NewFunctionRegistry

func NewFunctionRegistry() *FunctionRegistry

NewFunctionRegistry creates a new function registry

func (*FunctionRegistry) Get

func (r *FunctionRegistry) Get(name string) (Function, bool)

Get retrieves a function

func (*FunctionRegistry) GetByType

func (r *FunctionRegistry) GetByType(fnType FunctionType) []Function

GetByType retrieves functions by type

func (*FunctionRegistry) ListAll

func (r *FunctionRegistry) ListAll() map[string]Function

ListAll lists all registered functions

func (*FunctionRegistry) Register

func (r *FunctionRegistry) Register(fn Function) error

Register registers a function 注册函数及其别名到注册表中

func (*FunctionRegistry) Unregister

func (r *FunctionRegistry) Unregister(name string) bool

Unregister removes a function 从注册表中移除函数及其所有别名

type FunctionType

type FunctionType string

FunctionType defines the enumeration of function types

const (
	// Aggregation functions
	TypeAggregation FunctionType = "aggregation"
	// Window functions
	TypeWindow FunctionType = "window"
	// Date and time functions
	TypeDateTime FunctionType = "datetime"
	// Conversion functions
	TypeConversion FunctionType = "conversion"
	// Math functions
	TypeMath FunctionType = "math"
	// String functions
	TypeString FunctionType = "string"
	// Analytical functions
	TypeAnalytical FunctionType = "analytical"
	// User-defined functions
	TypeCustom FunctionType = "custom"
)

type GreatestFunction

type GreatestFunction struct {
	*BaseFunction
}

GreatestFunction returns maximum value

func NewGreatestFunction

func NewGreatestFunction() *GreatestFunction

func (*GreatestFunction) Execute

func (f *GreatestFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*GreatestFunction) Validate

func (f *GreatestFunction) Validate(args []interface{}) error

type HadChangedFunction

type HadChangedFunction struct {
	*BaseFunction
	PreviousValue interface{}
	IsSet         bool // 标记PreviousValue是否已被设置
}

HadChangedFunction 是否变化函数 - 判断指定列的值是否发生变化

func NewHadChangedFunction

func NewHadChangedFunction() *HadChangedFunction

func (*HadChangedFunction) Add

func (f *HadChangedFunction) Add(value interface{})

func (*HadChangedFunction) Clone

func (*HadChangedFunction) Execute

func (f *HadChangedFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*HadChangedFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*HadChangedFunction) Reset

func (f *HadChangedFunction) Reset()

func (*HadChangedFunction) Result

func (f *HadChangedFunction) Result() interface{}

func (*HadChangedFunction) Validate

func (f *HadChangedFunction) Validate(args []interface{}) error

type Hex2DecFunction

type Hex2DecFunction struct {
	*BaseFunction
}

Hex2DecFunction converts hexadecimal to decimal

func NewHex2DecFunction

func NewHex2DecFunction() *Hex2DecFunction

func (*Hex2DecFunction) Execute

func (f *Hex2DecFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Hex2DecFunction) Validate

func (f *Hex2DecFunction) Validate(args []interface{}) error

type HourFunction

type HourFunction struct {
	*BaseFunction
}

HourFunction 提取小时函数

func NewHourFunction

func NewHourFunction() *HourFunction

func (*HourFunction) Execute

func (f *HourFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*HourFunction) Validate

func (f *HourFunction) Validate(args []interface{}) error

type IfNullFunction

type IfNullFunction struct {
	*BaseFunction
}

IfNullFunction returns second argument if first argument is NULL

func NewIfNullFunction

func NewIfNullFunction() *IfNullFunction

func (*IfNullFunction) Execute

func (f *IfNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IfNullFunction) Validate

func (f *IfNullFunction) Validate(args []interface{}) error

type IndexofFunction

type IndexofFunction struct {
	*BaseFunction
}

IndexofFunction 返回子字符串在字符串中的位置

func NewIndexofFunction

func NewIndexofFunction() *IndexofFunction

func (*IndexofFunction) Execute

func (f *IndexofFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IndexofFunction) Validate

func (f *IndexofFunction) Validate(args []interface{}) error

type IsArrayFunction

type IsArrayFunction struct {
	*BaseFunction
}

IsArrayFunction 检查是否为数组类型

func NewIsArrayFunction

func NewIsArrayFunction() *IsArrayFunction

func (*IsArrayFunction) Execute

func (f *IsArrayFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsArrayFunction) Validate

func (f *IsArrayFunction) Validate(args []interface{}) error

type IsBoolFunction

type IsBoolFunction struct {
	*BaseFunction
}

IsBoolFunction checks if value is boolean type

func NewIsBoolFunction

func NewIsBoolFunction() *IsBoolFunction

func (*IsBoolFunction) Execute

func (f *IsBoolFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsBoolFunction) Validate

func (f *IsBoolFunction) Validate(args []interface{}) error

type IsNotNullFunction

type IsNotNullFunction struct {
	*BaseFunction
}

IsNotNullFunction checks if value is not NULL

func NewIsNotNullFunction

func NewIsNotNullFunction() *IsNotNullFunction

func (*IsNotNullFunction) Execute

func (f *IsNotNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsNotNullFunction) Validate

func (f *IsNotNullFunction) Validate(args []interface{}) error

type IsNullFunction

type IsNullFunction struct {
	*BaseFunction
}

IsNullFunction checks if value is NULL

func NewIsNullFunction

func NewIsNullFunction() *IsNullFunction

func (*IsNullFunction) Execute

func (f *IsNullFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsNullFunction) Validate

func (f *IsNullFunction) Validate(args []interface{}) error

type IsNumericFunction

type IsNumericFunction struct {
	*BaseFunction
}

IsNumericFunction checks if value is numeric type

func NewIsNumericFunction

func NewIsNumericFunction() *IsNumericFunction

func (*IsNumericFunction) Execute

func (f *IsNumericFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsNumericFunction) Validate

func (f *IsNumericFunction) Validate(args []interface{}) error

type IsObjectFunction

type IsObjectFunction struct {
	*BaseFunction
}

IsObjectFunction 检查是否为对象类型

func NewIsObjectFunction

func NewIsObjectFunction() *IsObjectFunction

func (*IsObjectFunction) Execute

func (f *IsObjectFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsObjectFunction) Validate

func (f *IsObjectFunction) Validate(args []interface{}) error

type IsStringFunction

type IsStringFunction struct {
	*BaseFunction
}

IsStringFunction checks if value is string type

func NewIsStringFunction

func NewIsStringFunction() *IsStringFunction

func (*IsStringFunction) Execute

func (f *IsStringFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*IsStringFunction) Validate

func (f *IsStringFunction) Validate(args []interface{}) error

type JsonExtractFunction

type JsonExtractFunction struct {
	*BaseFunction
}

JsonExtractFunction extracts JSON field value

func NewJsonExtractFunction

func NewJsonExtractFunction() *JsonExtractFunction

func (*JsonExtractFunction) Execute

func (f *JsonExtractFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonExtractFunction) Validate

func (f *JsonExtractFunction) Validate(args []interface{}) error

type JsonLengthFunction

type JsonLengthFunction struct {
	*BaseFunction
}

JsonLengthFunction 返回JSON数组或对象的长度

func NewJsonLengthFunction

func NewJsonLengthFunction() *JsonLengthFunction

func (*JsonLengthFunction) Execute

func (f *JsonLengthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonLengthFunction) Validate

func (f *JsonLengthFunction) Validate(args []interface{}) error

type JsonTypeFunction

type JsonTypeFunction struct {
	*BaseFunction
}

JsonTypeFunction 返回JSON值的类型

func NewJsonTypeFunction

func NewJsonTypeFunction() *JsonTypeFunction

func (*JsonTypeFunction) Execute

func (f *JsonTypeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonTypeFunction) Validate

func (f *JsonTypeFunction) Validate(args []interface{}) error

type JsonValidFunction

type JsonValidFunction struct {
	*BaseFunction
}

JsonValidFunction 验证JSON格式是否有效

func NewJsonValidFunction

func NewJsonValidFunction() *JsonValidFunction

func (*JsonValidFunction) Execute

func (f *JsonValidFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*JsonValidFunction) Validate

func (f *JsonValidFunction) Validate(args []interface{}) error

type LagFunction

type LagFunction struct {
	*BaseFunction
	PreviousValues []interface{}
	DefaultValue   interface{}
	Offset         int
}

LagFunction LAG函数 - 返回当前行之前的第N行的值

func NewLagFunction

func NewLagFunction() *LagFunction

func (*LagFunction) Add

func (f *LagFunction) Add(value interface{})

func (*LagFunction) Clone

func (f *LagFunction) Clone() AggregatorFunction

func (*LagFunction) Execute

func (f *LagFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LagFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*LagFunction) Reset

func (f *LagFunction) Reset()

func (*LagFunction) Result

func (f *LagFunction) Result() interface{}

func (*LagFunction) Validate

func (f *LagFunction) Validate(args []interface{}) error

type LastValueAggregatorFunction

type LastValueAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为LastValueFunction添加AggregatorFunction接口实现

func NewLastValueAggregatorFunction

func NewLastValueAggregatorFunction() *LastValueAggregatorFunction

func (*LastValueAggregatorFunction) Add

func (f *LastValueAggregatorFunction) Add(value interface{})

func (*LastValueAggregatorFunction) Clone

func (*LastValueAggregatorFunction) Execute

func (f *LastValueAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LastValueAggregatorFunction) New

func (*LastValueAggregatorFunction) Reset

func (f *LastValueAggregatorFunction) Reset()

func (*LastValueAggregatorFunction) Result

func (f *LastValueAggregatorFunction) Result() interface{}

func (*LastValueAggregatorFunction) Validate

func (f *LastValueAggregatorFunction) Validate(args []interface{}) error

type LastValueFunction

type LastValueFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

LastValueFunction 最后值函数 - 返回组中最后一行的值

func NewLastValueFunction

func NewLastValueFunction() *LastValueFunction

func (*LastValueFunction) Add

func (f *LastValueFunction) Add(value interface{})

func (*LastValueFunction) Clone

func (*LastValueFunction) Execute

func (f *LastValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LastValueFunction) New

实现AggregatorFunction接口

func (*LastValueFunction) Reset

func (f *LastValueFunction) Reset()

func (*LastValueFunction) Result

func (f *LastValueFunction) Result() interface{}

func (*LastValueFunction) Validate

func (f *LastValueFunction) Validate(args []interface{}) error

type LatestFunction

type LatestFunction struct {
	*BaseFunction
	LatestValue interface{}
}

LatestFunction 最新值函数 - 返回指定列的最新值

func NewLatestFunction

func NewLatestFunction() *LatestFunction

func (*LatestFunction) Add

func (f *LatestFunction) Add(value interface{})

func (*LatestFunction) Clone

func (*LatestFunction) Execute

func (f *LatestFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LatestFunction) New

实现AggregatorFunction接口 - 增量计算支持

func (*LatestFunction) Reset

func (f *LatestFunction) Reset()

func (*LatestFunction) Result

func (f *LatestFunction) Result() interface{}

func (*LatestFunction) Validate

func (f *LatestFunction) Validate(args []interface{}) error

type LeadFunction

type LeadFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

LeadFunction 返回当前行之后第N行的值

func NewLeadFunction

func NewLeadFunction() *LeadFunction

func (*LeadFunction) Add

func (f *LeadFunction) Add(value interface{})

func (*LeadFunction) Clone

func (f *LeadFunction) Clone() AggregatorFunction

func (*LeadFunction) Execute

func (f *LeadFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LeadFunction) Init added in v0.10.3

func (f *LeadFunction) Init(args []interface{}) error

Init implements ParameterizedFunction interface

func (*LeadFunction) New

实现AggregatorFunction接口

func (*LeadFunction) Reset

func (f *LeadFunction) Reset()

func (*LeadFunction) Result

func (f *LeadFunction) Result() interface{}

func (*LeadFunction) Validate

func (f *LeadFunction) Validate(args []interface{}) error

type LeastFunction

type LeastFunction struct {
	*BaseFunction
}

LeastFunction 返回最小值

func NewLeastFunction

func NewLeastFunction() *LeastFunction

func (*LeastFunction) Execute

func (f *LeastFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LeastFunction) Validate

func (f *LeastFunction) Validate(args []interface{}) error

type LegacyAggregatorFunction

type LegacyAggregatorFunction interface {
	New() LegacyAggregatorFunction
	Add(value interface{})
	Result() interface{}
}

LegacyAggregatorFunction defines aggregator function interface compatible with legacy aggregator interface Maintains compatibility with original interface for backward compatibility

func CreateLegacyAggregator

func CreateLegacyAggregator(aggType AggregateType) LegacyAggregatorFunction

CreateLegacyAggregator creates legacy aggregator, prioritizing functions module

type LengthFunction

type LengthFunction struct {
	*BaseFunction
}

LengthFunction returns the length of a string

func NewLengthFunction

func NewLengthFunction() *LengthFunction

func (*LengthFunction) Execute

func (f *LengthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute calculates the length of a string or array. Supports strings, arrays, slices, etc., using Go's standard len() function.

func (*LengthFunction) Validate

func (f *LengthFunction) Validate(args []interface{}) error

type LnFunction

type LnFunction struct {
	*BaseFunction
}

LnFunction 自然对数函数

func NewLnFunction

func NewLnFunction() *LnFunction

func (*LnFunction) Execute

func (f *LnFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LnFunction) Validate

func (f *LnFunction) Validate(args []interface{}) error

type Log10Function

type Log10Function struct {
	*BaseFunction
}

Log10Function 以10为底的对数函数

func NewLog10Function

func NewLog10Function() *Log10Function

func (*Log10Function) Execute

func (f *Log10Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Log10Function) Validate

func (f *Log10Function) Validate(args []interface{}) error

type Log2Function

type Log2Function struct {
	*BaseFunction
}

Log2Function 以2为底的对数函数

func NewLog2Function

func NewLog2Function() *Log2Function

func (*Log2Function) Execute

func (f *Log2Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Log2Function) Validate

func (f *Log2Function) Validate(args []interface{}) error

type LogFunction

type LogFunction struct {
	*BaseFunction
}

LogFunction 以10为底的对数函数 (log的别名)

func NewLogFunction

func NewLogFunction() *LogFunction

func (*LogFunction) Execute

func (f *LogFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LogFunction) Validate

func (f *LogFunction) Validate(args []interface{}) error

type LowerFunction

type LowerFunction struct {
	*BaseFunction
}

LowerFunction converts string to lowercase

func NewLowerFunction

func NewLowerFunction() *LowerFunction

func (*LowerFunction) Execute

func (f *LowerFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LowerFunction) Validate

func (f *LowerFunction) Validate(args []interface{}) error

type LpadFunction

type LpadFunction struct {
	*BaseFunction
}

LpadFunction 左填充字符串

func NewLpadFunction

func NewLpadFunction() *LpadFunction

func (*LpadFunction) Execute

func (f *LpadFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LpadFunction) Validate

func (f *LpadFunction) Validate(args []interface{}) error

type LtrimFunction

type LtrimFunction struct {
	*BaseFunction
}

LtrimFunction 去除左侧空白字符

func NewLtrimFunction

func NewLtrimFunction() *LtrimFunction

func (*LtrimFunction) Execute

func (f *LtrimFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*LtrimFunction) Validate

func (f *LtrimFunction) Validate(args []interface{}) error

type MaxFunction

type MaxFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

MaxFunction calculates the maximum value

func NewMaxFunction

func NewMaxFunction() *MaxFunction

func (*MaxFunction) Add

func (f *MaxFunction) Add(value interface{})

func (*MaxFunction) Clone

func (f *MaxFunction) Clone() AggregatorFunction

func (*MaxFunction) Execute

func (f *MaxFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MaxFunction) New

Implement AggregatorFunction interface

func (*MaxFunction) Reset

func (f *MaxFunction) Reset()

func (*MaxFunction) Result

func (f *MaxFunction) Result() interface{}

func (*MaxFunction) Validate

func (f *MaxFunction) Validate(args []interface{}) error

type Md5Function

type Md5Function struct {
	*BaseFunction
}

Md5Function calculates MD5 hash value

func NewMd5Function

func NewMd5Function() *Md5Function

func (*Md5Function) Execute

func (f *Md5Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Md5Function) Validate

func (f *Md5Function) Validate(args []interface{}) error

type MedianAggregatorFunction

type MedianAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为MedianFunction添加AggregatorFunction接口实现

func NewMedianAggregatorFunction

func NewMedianAggregatorFunction() *MedianAggregatorFunction

func (*MedianAggregatorFunction) Add

func (f *MedianAggregatorFunction) Add(value interface{})

func (*MedianAggregatorFunction) Clone

func (*MedianAggregatorFunction) Execute

func (f *MedianAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MedianAggregatorFunction) New

func (*MedianAggregatorFunction) Reset

func (f *MedianAggregatorFunction) Reset()

func (*MedianAggregatorFunction) Result

func (f *MedianAggregatorFunction) Result() interface{}

func (*MedianAggregatorFunction) Validate

func (f *MedianAggregatorFunction) Validate(args []interface{}) error

type MedianFunction

type MedianFunction struct {
	*BaseFunction
}

MedianFunction 中位数函数

func NewMedianFunction

func NewMedianFunction() *MedianFunction

func (*MedianFunction) Execute

func (f *MedianFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MedianFunction) Validate

func (f *MedianFunction) Validate(args []interface{}) error

type MergeAggAggregatorFunction

type MergeAggAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为MergeAggFunction添加AggregatorFunction接口实现

func NewMergeAggAggregatorFunction

func NewMergeAggAggregatorFunction() *MergeAggAggregatorFunction

func (*MergeAggAggregatorFunction) Add

func (f *MergeAggAggregatorFunction) Add(value interface{})

func (*MergeAggAggregatorFunction) Clone

func (*MergeAggAggregatorFunction) Execute

func (f *MergeAggAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MergeAggAggregatorFunction) New

func (*MergeAggAggregatorFunction) Reset

func (f *MergeAggAggregatorFunction) Reset()

func (*MergeAggAggregatorFunction) Result

func (f *MergeAggAggregatorFunction) Result() interface{}

func (*MergeAggAggregatorFunction) Validate

func (f *MergeAggAggregatorFunction) Validate(args []interface{}) error

type MergeAggFunction

type MergeAggFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

MergeAggFunction 合并聚合函数 - 将组中的值合并为单个值

func NewMergeAggFunction

func NewMergeAggFunction() *MergeAggFunction

func (*MergeAggFunction) Add

func (f *MergeAggFunction) Add(value interface{})

func (*MergeAggFunction) Clone

func (*MergeAggFunction) Execute

func (f *MergeAggFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MergeAggFunction) New

实现AggregatorFunction接口

func (*MergeAggFunction) Reset

func (f *MergeAggFunction) Reset()

func (*MergeAggFunction) Result

func (f *MergeAggFunction) Result() interface{}

func (*MergeAggFunction) Validate

func (f *MergeAggFunction) Validate(args []interface{}) error

type MinFunction

type MinFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

MinFunction calculates the minimum value

func NewMinFunction

func NewMinFunction() *MinFunction

func (*MinFunction) Add

func (f *MinFunction) Add(value interface{})

func (*MinFunction) Clone

func (f *MinFunction) Clone() AggregatorFunction

func (*MinFunction) Execute

func (f *MinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MinFunction) New

Implement AggregatorFunction interface

func (*MinFunction) Reset

func (f *MinFunction) Reset()

func (*MinFunction) Result

func (f *MinFunction) Result() interface{}

func (*MinFunction) Validate

func (f *MinFunction) Validate(args []interface{}) error

type MinuteFunction

type MinuteFunction struct {
	*BaseFunction
}

MinuteFunction 提取分钟函数

func NewMinuteFunction

func NewMinuteFunction() *MinuteFunction

func (*MinuteFunction) Execute

func (f *MinuteFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MinuteFunction) Validate

func (f *MinuteFunction) Validate(args []interface{}) error

type ModFunction

type ModFunction struct {
	*BaseFunction
}

ModFunction 取模函数

func NewModFunction

func NewModFunction() *ModFunction

func (*ModFunction) Execute

func (f *ModFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ModFunction) Validate

func (f *ModFunction) Validate(args []interface{}) error

type MonthFunction

type MonthFunction struct {
	*BaseFunction
}

MonthFunction 提取月份函数

func NewMonthFunction

func NewMonthFunction() *MonthFunction

func (*MonthFunction) Execute

func (f *MonthFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*MonthFunction) Validate

func (f *MonthFunction) Validate(args []interface{}) error

type NowFunction

type NowFunction struct {
	*BaseFunction
}

NowFunction returns current timestamp

func NewNowFunction

func NewNowFunction() *NowFunction

func (*NowFunction) Execute

func (f *NowFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*NowFunction) Validate

func (f *NowFunction) Validate(args []interface{}) error

type NthValueFunction

type NthValueFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

NthValueFunction 返回窗口中第N个值

func NewNthValueFunction

func NewNthValueFunction() *NthValueFunction

func (*NthValueFunction) Add

func (f *NthValueFunction) Add(value interface{})

func (*NthValueFunction) Clone

func (*NthValueFunction) Execute

func (f *NthValueFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*NthValueFunction) Init added in v0.10.3

func (f *NthValueFunction) Init(args []interface{}) error

Init implements ParameterizedFunction interface

func (*NthValueFunction) New

实现AggregatorFunction接口

func (*NthValueFunction) Reset

func (f *NthValueFunction) Reset()

func (*NthValueFunction) Result

func (f *NthValueFunction) Result() interface{}

func (*NthValueFunction) Validate

func (f *NthValueFunction) Validate(args []interface{}) error

type NullIfFunction

type NullIfFunction struct {
	*BaseFunction
}

NullIfFunction returns NULL if two values are equal

func NewNullIfFunction

func NewNullIfFunction() *NullIfFunction

func (*NullIfFunction) Execute

func (f *NullIfFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*NullIfFunction) Validate

func (f *NullIfFunction) Validate(args []interface{}) error

type ParameterizedFunction added in v0.10.3

type ParameterizedFunction interface {
	AggregatorFunction
	// Init initializes the function with parsed arguments
	Init(args []interface{}) error
}

ParameterizedFunction defines the interface for functions that need parameter initialization

type PercentileAggregatorFunction

type PercentileAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为PercentileFunction添加AggregatorFunction接口实现

func NewPercentileAggregatorFunction

func NewPercentileAggregatorFunction() *PercentileAggregatorFunction

func (*PercentileAggregatorFunction) Add

func (f *PercentileAggregatorFunction) Add(value interface{})

func (*PercentileAggregatorFunction) Clone

func (*PercentileAggregatorFunction) Execute

func (f *PercentileAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*PercentileAggregatorFunction) New

func (*PercentileAggregatorFunction) Reset

func (f *PercentileAggregatorFunction) Reset()

func (*PercentileAggregatorFunction) Result

func (f *PercentileAggregatorFunction) Result() interface{}

func (*PercentileAggregatorFunction) Validate

func (f *PercentileAggregatorFunction) Validate(args []interface{}) error

type PercentileFunction

type PercentileFunction struct {
	*BaseFunction
}

PercentileFunction 百分位数函数

func NewPercentileFunction

func NewPercentileFunction() *PercentileFunction

func (*PercentileFunction) Execute

func (f *PercentileFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*PercentileFunction) Validate

func (f *PercentileFunction) Validate(args []interface{}) error

type PowerFunction

type PowerFunction struct {
	*BaseFunction
}

PowerFunction 幂函数

func NewPowerFunction

func NewPowerFunction() *PowerFunction

func (*PowerFunction) Execute

func (f *PowerFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*PowerFunction) Validate

func (f *PowerFunction) Validate(args []interface{}) error

type RandFunction

type RandFunction struct {
	*BaseFunction
}

RandFunction 随机数函数

func NewRandFunction

func NewRandFunction() *RandFunction

func (*RandFunction) Execute

func (f *RandFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RandFunction) Validate

func (f *RandFunction) Validate(args []interface{}) error

type RegexpMatchesFunction

type RegexpMatchesFunction struct {
	*BaseFunction
}

RegexpMatchesFunction 正则表达式匹配

func NewRegexpMatchesFunction

func NewRegexpMatchesFunction() *RegexpMatchesFunction

func (*RegexpMatchesFunction) Execute

func (f *RegexpMatchesFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RegexpMatchesFunction) Validate

func (f *RegexpMatchesFunction) Validate(args []interface{}) error

type RegexpReplaceFunction

type RegexpReplaceFunction struct {
	*BaseFunction
}

RegexpReplaceFunction 正则表达式替换

func NewRegexpReplaceFunction

func NewRegexpReplaceFunction() *RegexpReplaceFunction

func (*RegexpReplaceFunction) Execute

func (f *RegexpReplaceFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RegexpReplaceFunction) Validate

func (f *RegexpReplaceFunction) Validate(args []interface{}) error

type RegexpSubstringFunction

type RegexpSubstringFunction struct {
	*BaseFunction
}

RegexpSubstringFunction 正则表达式提取子字符串

func NewRegexpSubstringFunction

func NewRegexpSubstringFunction() *RegexpSubstringFunction

func (*RegexpSubstringFunction) Execute

func (f *RegexpSubstringFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RegexpSubstringFunction) Validate

func (f *RegexpSubstringFunction) Validate(args []interface{}) error

type ReplaceFunction

type ReplaceFunction struct {
	*BaseFunction
}

ReplaceFunction 替换字符串中的内容

func NewReplaceFunction

func NewReplaceFunction() *ReplaceFunction

func (*ReplaceFunction) Execute

func (f *ReplaceFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ReplaceFunction) Validate

func (f *ReplaceFunction) Validate(args []interface{}) error

type RoundFunction

type RoundFunction struct {
	*BaseFunction
}

RoundFunction 四舍五入函数

func NewRoundFunction

func NewRoundFunction() *RoundFunction

func (*RoundFunction) Execute

func (f *RoundFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RoundFunction) Validate

func (f *RoundFunction) Validate(args []interface{}) error

type RowNumberFunction

type RowNumberFunction struct {
	*BaseFunction
	CurrentRowNumber int64
}

RowNumberFunction returns row number

func NewRowNumberFunction

func NewRowNumberFunction() *RowNumberFunction

func (*RowNumberFunction) Execute

func (f *RowNumberFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RowNumberFunction) Reset

func (f *RowNumberFunction) Reset()

func (*RowNumberFunction) Validate

func (f *RowNumberFunction) Validate(args []interface{}) error

type RpadFunction

type RpadFunction struct {
	*BaseFunction
}

RpadFunction 右填充字符串

func NewRpadFunction

func NewRpadFunction() *RpadFunction

func (*RpadFunction) Execute

func (f *RpadFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RpadFunction) Validate

func (f *RpadFunction) Validate(args []interface{}) error

type RtrimFunction

type RtrimFunction struct {
	*BaseFunction
}

RtrimFunction 去除右侧空白字符

func NewRtrimFunction

func NewRtrimFunction() *RtrimFunction

func (*RtrimFunction) Execute

func (f *RtrimFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*RtrimFunction) Validate

func (f *RtrimFunction) Validate(args []interface{}) error

type SecondFunction

type SecondFunction struct {
	*BaseFunction
}

SecondFunction 提取秒数函数

func NewSecondFunction

func NewSecondFunction() *SecondFunction

func (*SecondFunction) Execute

func (f *SecondFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SecondFunction) Validate

func (f *SecondFunction) Validate(args []interface{}) error

type Sha1Function

type Sha1Function struct {
	*BaseFunction
}

Sha1Function calculates SHA1 hash value

func NewSha1Function

func NewSha1Function() *Sha1Function

func (*Sha1Function) Execute

func (f *Sha1Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Sha1Function) Validate

func (f *Sha1Function) Validate(args []interface{}) error

type Sha256Function

type Sha256Function struct {
	*BaseFunction
}

Sha256Function calculates SHA256 hash value

func NewSha256Function

func NewSha256Function() *Sha256Function

func (*Sha256Function) Execute

func (f *Sha256Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Sha256Function) Validate

func (f *Sha256Function) Validate(args []interface{}) error

type Sha512Function

type Sha512Function struct {
	*BaseFunction
}

Sha512Function calculates SHA512 hash value

func NewSha512Function

func NewSha512Function() *Sha512Function

func (*Sha512Function) Execute

func (f *Sha512Function) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*Sha512Function) Validate

func (f *Sha512Function) Validate(args []interface{}) error

type SignFunction

type SignFunction struct {
	*BaseFunction
}

SignFunction 符号函数

func NewSignFunction

func NewSignFunction() *SignFunction

func (*SignFunction) Execute

func (f *SignFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SignFunction) Validate

func (f *SignFunction) Validate(args []interface{}) error

type SinFunction

type SinFunction struct {
	*BaseFunction
}

SinFunction 正弦函数

func NewSinFunction

func NewSinFunction() *SinFunction

func (*SinFunction) Execute

func (f *SinFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SinFunction) Validate

func (f *SinFunction) Validate(args []interface{}) error

type SinhFunction

type SinhFunction struct {
	*BaseFunction
}

SinhFunction 双曲正弦函数

func NewSinhFunction

func NewSinhFunction() *SinhFunction

func (*SinhFunction) Execute

func (f *SinhFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SinhFunction) Validate

func (f *SinhFunction) Validate(args []interface{}) error

type SplitFunction

type SplitFunction struct {
	*BaseFunction
}

SplitFunction 按分隔符分割字符串

func NewSplitFunction

func NewSplitFunction() *SplitFunction

func (*SplitFunction) Execute

func (f *SplitFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SplitFunction) Validate

func (f *SplitFunction) Validate(args []interface{}) error

type SqrtFunction

type SqrtFunction struct {
	*BaseFunction
}

SqrtFunction calculates square root

func NewSqrtFunction

func NewSqrtFunction() *SqrtFunction

func (*SqrtFunction) Execute

func (f *SqrtFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SqrtFunction) Validate

func (f *SqrtFunction) Validate(args []interface{}) error

type StartswithFunction

type StartswithFunction struct {
	*BaseFunction
}

StartswithFunction 检查字符串是否以指定前缀开始

func NewStartswithFunction

func NewStartswithFunction() *StartswithFunction

func (*StartswithFunction) Execute

func (f *StartswithFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StartswithFunction) Validate

func (f *StartswithFunction) Validate(args []interface{}) error

type StdDevAggregatorFunction

type StdDevAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为StdDevFunction添加AggregatorFunction接口实现

func NewStdDevAggregatorFunction

func NewStdDevAggregatorFunction() *StdDevAggregatorFunction

func (*StdDevAggregatorFunction) Add

func (f *StdDevAggregatorFunction) Add(value interface{})

func (*StdDevAggregatorFunction) Clone

func (*StdDevAggregatorFunction) Execute

func (f *StdDevAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevAggregatorFunction) New

func (*StdDevAggregatorFunction) Reset

func (f *StdDevAggregatorFunction) Reset()

func (*StdDevAggregatorFunction) Result

func (f *StdDevAggregatorFunction) Result() interface{}

func (*StdDevAggregatorFunction) Validate

func (f *StdDevAggregatorFunction) Validate(args []interface{}) error

type StdDevFunction

type StdDevFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

StdDevFunction 标准差函数(韦尔福德算法实现)

func NewStdDevFunction

func NewStdDevFunction() *StdDevFunction

func (*StdDevFunction) Add added in v0.10.2

func (f *StdDevFunction) Add(value interface{})

func (*StdDevFunction) Clone added in v0.10.2

func (*StdDevFunction) Execute

func (f *StdDevFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevFunction) New added in v0.10.2

实现AggregatorFunction接口 - 韦尔福德算法

func (*StdDevFunction) Reset added in v0.10.2

func (f *StdDevFunction) Reset()

func (*StdDevFunction) Result added in v0.10.2

func (f *StdDevFunction) Result() interface{}

func (*StdDevFunction) Validate

func (f *StdDevFunction) Validate(args []interface{}) error

type StdDevSAggregatorFunction

type StdDevSAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为StdDevSFunction添加AggregatorFunction接口实现

func NewStdDevSAggregatorFunction

func NewStdDevSAggregatorFunction() *StdDevSAggregatorFunction

func (*StdDevSAggregatorFunction) Add

func (f *StdDevSAggregatorFunction) Add(value interface{})

func (*StdDevSAggregatorFunction) Clone

func (*StdDevSAggregatorFunction) Execute

func (f *StdDevSAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevSAggregatorFunction) New

func (*StdDevSAggregatorFunction) Reset

func (f *StdDevSAggregatorFunction) Reset()

func (*StdDevSAggregatorFunction) Result

func (f *StdDevSAggregatorFunction) Result() interface{}

func (*StdDevSAggregatorFunction) Validate

func (f *StdDevSAggregatorFunction) Validate(args []interface{}) error

type StdDevSFunction

type StdDevSFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

StdDevSFunction 样本标准差函数(韦尔福德算法实现)

func NewStdDevSFunction

func NewStdDevSFunction() *StdDevSFunction

func (*StdDevSFunction) Add added in v0.10.2

func (f *StdDevSFunction) Add(value interface{})

func (*StdDevSFunction) Clone added in v0.10.2

func (*StdDevSFunction) Execute

func (f *StdDevSFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*StdDevSFunction) New added in v0.10.2

实现AggregatorFunction接口 - 韦尔福德算法

func (*StdDevSFunction) Reset added in v0.10.2

func (f *StdDevSFunction) Reset()

func (*StdDevSFunction) Result added in v0.10.2

func (f *StdDevSFunction) Result() interface{}

func (*StdDevSFunction) Validate

func (f *StdDevSFunction) Validate(args []interface{}) error

type SubstringFunction

type SubstringFunction struct {
	*BaseFunction
}

SubstringFunction 提取子字符串

func NewSubstringFunction

func NewSubstringFunction() *SubstringFunction

func (*SubstringFunction) Execute

func (f *SubstringFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SubstringFunction) Validate

func (f *SubstringFunction) Validate(args []interface{}) error

type SumFunction

type SumFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

SumFunction calculates the sum of numeric values

func NewSumFunction

func NewSumFunction() *SumFunction

func (*SumFunction) Add

func (f *SumFunction) Add(value interface{})

func (*SumFunction) Clone

func (f *SumFunction) Clone() AggregatorFunction

func (*SumFunction) Execute

func (f *SumFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*SumFunction) New

Implement AggregatorFunction interface

func (*SumFunction) Reset

func (f *SumFunction) Reset()

func (*SumFunction) Result

func (f *SumFunction) Result() interface{}

func (*SumFunction) Validate

func (f *SumFunction) Validate(args []interface{}) error

type TanFunction

type TanFunction struct {
	*BaseFunction
}

TanFunction 正切函数

func NewTanFunction

func NewTanFunction() *TanFunction

func (*TanFunction) Execute

func (f *TanFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*TanFunction) Validate

func (f *TanFunction) Validate(args []interface{}) error

type TanhFunction

type TanhFunction struct {
	*BaseFunction
}

TanhFunction 双曲正切函数

func NewTanhFunction

func NewTanhFunction() *TanhFunction

func (*TanhFunction) Execute

func (f *TanhFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*TanhFunction) Validate

func (f *TanhFunction) Validate(args []interface{}) error

type ToJsonFunction

type ToJsonFunction struct {
	*BaseFunction
}

ToJsonFunction converts value to JSON string

func NewToJsonFunction

func NewToJsonFunction() *ToJsonFunction

func (*ToJsonFunction) Execute

func (f *ToJsonFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ToJsonFunction) Validate

func (f *ToJsonFunction) Validate(args []interface{}) error

type ToSecondsFunction

type ToSecondsFunction struct {
	*BaseFunction
}

ToSecondsFunction 转换为Unix时间戳(秒)

func NewToSecondsFunction

func NewToSecondsFunction() *ToSecondsFunction

func (*ToSecondsFunction) Execute

func (f *ToSecondsFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*ToSecondsFunction) Validate

func (f *ToSecondsFunction) Validate(args []interface{}) error

type TrimFunction

type TrimFunction struct {
	*BaseFunction
}

TrimFunction 去除首尾空格函数

func NewTrimFunction

func NewTrimFunction() *TrimFunction

func (*TrimFunction) Execute

func (f *TrimFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*TrimFunction) Validate

func (f *TrimFunction) Validate(args []interface{}) error

type TruncFunction

type TruncFunction struct {
	*BaseFunction
}

TruncFunction 截断小数位数

func NewTruncFunction

func NewTruncFunction() *TruncFunction

NewTruncFunction 创建新的 trunc 函数

func (*TruncFunction) Execute

func (f *TruncFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

Execute 执行函数

func (*TruncFunction) Validate

func (f *TruncFunction) Validate(args []interface{}) error

Validate 验证参数

type UnixTimestampFunction

type UnixTimestampFunction struct {
	*BaseFunction
}

UnixTimestampFunction Unix时间戳函数

func NewUnixTimestampFunction

func NewUnixTimestampFunction() *UnixTimestampFunction

func (*UnixTimestampFunction) Execute

func (f *UnixTimestampFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UnixTimestampFunction) Validate

func (f *UnixTimestampFunction) Validate(args []interface{}) error

type UnnestFunction

type UnnestFunction struct {
	*BaseFunction
}

UnnestFunction 将数组展开为多行

func NewUnnestFunction

func NewUnnestFunction() *UnnestFunction

func (*UnnestFunction) Execute

func (f *UnnestFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UnnestFunction) Validate

func (f *UnnestFunction) Validate(args []interface{}) error

type UnnestResult

type UnnestResult struct {
	Rows []map[string]interface{}
}

UnnestResult 表示 unnest 函数的结果

type UpperFunction

type UpperFunction struct {
	*BaseFunction
}

UpperFunction converts string to uppercase

func NewUpperFunction

func NewUpperFunction() *UpperFunction

func (*UpperFunction) Execute

func (f *UpperFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UpperFunction) Validate

func (f *UpperFunction) Validate(args []interface{}) error

type UrlDecodeFunction

type UrlDecodeFunction struct {
	*BaseFunction
}

UrlDecodeFunction URL解码函数

func NewUrlDecodeFunction

func NewUrlDecodeFunction() *UrlDecodeFunction

func (*UrlDecodeFunction) Execute

func (f *UrlDecodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UrlDecodeFunction) Validate

func (f *UrlDecodeFunction) Validate(args []interface{}) error

type UrlEncodeFunction

type UrlEncodeFunction struct {
	*BaseFunction
}

UrlEncodeFunction URL编码函数

func NewUrlEncodeFunction

func NewUrlEncodeFunction() *UrlEncodeFunction

func (*UrlEncodeFunction) Execute

func (f *UrlEncodeFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*UrlEncodeFunction) Validate

func (f *UrlEncodeFunction) Validate(args []interface{}) error

type VarAggregatorFunction

type VarAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为VarFunction添加AggregatorFunction接口实现

func NewVarAggregatorFunction

func NewVarAggregatorFunction() *VarAggregatorFunction

func (*VarAggregatorFunction) Add

func (f *VarAggregatorFunction) Add(value interface{})

func (*VarAggregatorFunction) Clone

func (*VarAggregatorFunction) Execute

func (f *VarAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarAggregatorFunction) New

func (*VarAggregatorFunction) Reset

func (f *VarAggregatorFunction) Reset()

func (*VarAggregatorFunction) Result

func (f *VarAggregatorFunction) Result() interface{}

func (*VarAggregatorFunction) Validate

func (f *VarAggregatorFunction) Validate(args []interface{}) error

type VarFunction

type VarFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

VarFunction 总体方差函数(韦尔福德算法实现)

func NewVarFunction

func NewVarFunction() *VarFunction

func (*VarFunction) Add added in v0.10.2

func (f *VarFunction) Add(value interface{})

func (*VarFunction) Clone added in v0.10.2

func (f *VarFunction) Clone() AggregatorFunction

func (*VarFunction) Execute

func (f *VarFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarFunction) New added in v0.10.2

实现AggregatorFunction接口 - 韦尔福德算法

func (*VarFunction) Reset added in v0.10.2

func (f *VarFunction) Reset()

func (*VarFunction) Result added in v0.10.2

func (f *VarFunction) Result() interface{}

func (*VarFunction) Validate

func (f *VarFunction) Validate(args []interface{}) error

type VarSAggregatorFunction

type VarSAggregatorFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

为VarSFunction添加AggregatorFunction接口实现

func NewVarSAggregatorFunction

func NewVarSAggregatorFunction() *VarSAggregatorFunction

func (*VarSAggregatorFunction) Add

func (f *VarSAggregatorFunction) Add(value interface{})

func (*VarSAggregatorFunction) Clone

func (*VarSAggregatorFunction) Execute

func (f *VarSAggregatorFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarSAggregatorFunction) New

func (*VarSAggregatorFunction) Reset

func (f *VarSAggregatorFunction) Reset()

func (*VarSAggregatorFunction) Result

func (f *VarSAggregatorFunction) Result() interface{}

func (*VarSAggregatorFunction) Validate

func (f *VarSAggregatorFunction) Validate(args []interface{}) error

type VarSFunction

type VarSFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

VarSFunction 样本方差函数(韦尔福德算法实现)

func NewVarSFunction

func NewVarSFunction() *VarSFunction

func (*VarSFunction) Add added in v0.10.2

func (f *VarSFunction) Add(value interface{})

func (*VarSFunction) Clone added in v0.10.2

func (f *VarSFunction) Clone() AggregatorFunction

func (*VarSFunction) Execute

func (f *VarSFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*VarSFunction) New added in v0.10.2

实现AggregatorFunction接口 - 韦尔福德算法

func (*VarSFunction) Reset added in v0.10.2

func (f *VarSFunction) Reset()

func (*VarSFunction) Result added in v0.10.2

func (f *VarSFunction) Result() interface{}

func (*VarSFunction) Validate

func (f *VarSFunction) Validate(args []interface{}) error

type WeekOfYearFunction

type WeekOfYearFunction struct {
	*BaseFunction
}

WeekOfYearFunction 获取一年中的第几周函数

func NewWeekOfYearFunction

func NewWeekOfYearFunction() *WeekOfYearFunction

func (*WeekOfYearFunction) Execute

func (f *WeekOfYearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*WeekOfYearFunction) Validate

func (f *WeekOfYearFunction) Validate(args []interface{}) error

type WindowEndFunction

type WindowEndFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

WindowEndFunction returns window end time

func NewWindowEndFunction

func NewWindowEndFunction() *WindowEndFunction

func (*WindowEndFunction) Add

func (f *WindowEndFunction) Add(value interface{})

func (*WindowEndFunction) Clone

func (*WindowEndFunction) Execute

func (f *WindowEndFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*WindowEndFunction) New

实现AggregatorFunction接口

func (*WindowEndFunction) Reset

func (f *WindowEndFunction) Reset()

func (*WindowEndFunction) Result

func (f *WindowEndFunction) Result() interface{}

func (*WindowEndFunction) Validate

func (f *WindowEndFunction) Validate(args []interface{}) error

type WindowInfo

type WindowInfo struct {
	WindowStart int64
	WindowEnd   int64
	RowCount    int
}

WindowInfo contains window-related information

type WindowStartFunction

type WindowStartFunction struct {
	*BaseFunction
	// contains filtered or unexported fields
}

WindowStartFunction returns window start time

func NewWindowStartFunction

func NewWindowStartFunction() *WindowStartFunction

func (*WindowStartFunction) Add

func (f *WindowStartFunction) Add(value interface{})

func (*WindowStartFunction) Clone

func (*WindowStartFunction) Execute

func (f *WindowStartFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*WindowStartFunction) New

Implement AggregatorFunction interface

func (*WindowStartFunction) Reset

func (f *WindowStartFunction) Reset()

func (*WindowStartFunction) Result

func (f *WindowStartFunction) Result() interface{}

func (*WindowStartFunction) Validate

func (f *WindowStartFunction) Validate(args []interface{}) error

type YearFunction

type YearFunction struct {
	*BaseFunction
}

YearFunction 提取年份函数

func NewYearFunction

func NewYearFunction() *YearFunction

func (*YearFunction) Execute

func (f *YearFunction) Execute(ctx *FunctionContext, args []interface{}) (interface{}, error)

func (*YearFunction) Validate

func (f *YearFunction) Validate(args []interface{}) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL