types

package
v0.10.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package types provides core type definitions and data structures for StreamSQL.

This package defines fundamental data types, configuration structures, and interfaces used throughout the StreamSQL stream processing pipeline. It ensures type safety and provides a unified API for data manipulation across components.

Core Features

• Data Types - Core data structures for stream processing • Configuration Management - Centralized configuration structures • Type Safety - Strong typing with validation • Serialization Support - JSON serialization support • Cross-Component Compatibility - Shared types across packages

Configuration Structures

Core configuration types:

type Config struct {
	WindowConfig     WindowConfig                        // Window settings
	GroupFields      []string                            // GROUP BY fields
	SelectFields     map[string]aggregator.AggregateType // SELECT aggregations
	FieldAlias       map[string]string                   // Field aliases
	SimpleFields     []string                            // Non-aggregated fields
	FieldExpressions map[string]FieldExpression          // Computed expressions
	Where            string                              // WHERE clause
	Having           string                              // HAVING clause
	NeedWindow       bool                                // Window requirement
	Distinct         bool                                // DISTINCT flag
	Limit            int                                 // LIMIT clause
	PerformanceConfig PerformanceConfig                  // Performance settings
}

Window Configuration

Unified configuration for all window types:

type WindowConfig struct {
	Type       string                 // Window type
	Params     map[string]interface{} // Parameters
	TsProp     string                 // Timestamp property
	TimeUnit   time.Duration          // Time unit
	GroupByKey string                 // Grouping key
}

// Example configurations
// Tumbling window
windowConfig := WindowConfig{
	Type: "tumbling",
	Params: map[string]interface{}{
		"size": "5s",
	},
	TsProp: "timestamp",
}

// Sliding window
windowConfig := WindowConfig{
	Type: "sliding",
	Params: map[string]interface{}{
		"size": "30s",
		"slide": "10s",
	},
	TsProp: "timestamp",
}

// Counting window
windowConfig := WindowConfig{
	Type: "counting",
	Params: map[string]interface{}{
		"count": 100,
	},
}

// Session window
windowConfig := WindowConfig{
	Type: "session",
	Params: map[string]interface{}{
		"timeout": "5m",
	},
	GroupByKey: "user_id",
}

Performance Configuration

Comprehensive performance tuning options:

type PerformanceConfig struct {
	// Buffer management
	BufferSize       int           // Input buffer size
	BatchSize        int           // Processing batch size
	FlushInterval    time.Duration // Automatic flush interval
	HighWaterMark    float64       // Buffer high water mark (0.0-1.0)
	LowWaterMark     float64       // Buffer low water mark (0.0-1.0)

	// Worker pool configuration
	WorkerPoolSize   int           // Number of worker goroutines
	MaxWorkers       int           // Maximum worker limit
	WorkerIdleTime   time.Duration // Worker idle timeout

	// Overflow handling
	OverflowStrategy string        // "drop", "block", "spill", "compress"
	SpillDirectory   string        // Directory for spill files
	CompressionLevel int           // Compression level (1-9)

	// Memory management
	MaxMemoryUsage   int64         // Maximum memory usage in bytes
	GCInterval       time.Duration // Garbage collection interval
	MemoryThreshold  float64       // Memory usage threshold

	// Monitoring
	MetricsEnabled   bool          // Enable metrics collection
	MetricsInterval  time.Duration // Metrics collection interval
	HealthCheckPort  int           // Health check HTTP port

	// Persistence
	PersistenceEnabled bool        // Enable data persistence
	PersistenceType    string      // "memory", "file", "database"
	PersistencePath    string      // Persistence storage path
	RecoveryEnabled    bool        // Enable automatic recovery
}

Field Management

Advanced field handling and expression support:

type FieldExpression struct {
	Field      string   // Field name
	Expression string   // Expression
	Fields     []string // Referenced fields
}

type Projection struct {
	SourceType ProjectionSourceType // Source type (field, expression, aggregate)
	Source     string               // Source identifier
	Alias      string               // Output alias
	DataType   string               // Expected data type
}

type ProjectionSourceType string

const (
	ProjectionSourceField      ProjectionSourceType = "field"      // Direct field reference
	ProjectionSourceExpression ProjectionSourceType = "expression" // Computed expression
	ProjectionSourceAggregate  ProjectionSourceType = "aggregate"  // Aggregate function
	ProjectionSourceConstant   ProjectionSourceType = "constant"   // Constant value
)

Data Row Representation

Type-safe data row structures for stream processing:

type Row struct {
	Data      map[string]interface{} // Row data
	Timestamp time.Time              // Row timestamp
	Metadata  map[string]interface{} // Additional metadata
	GroupKey  string                 // Grouping key for aggregation
	WindowID  string                 // Window identifier
}

// Row creation and manipulation
func NewRow(data map[string]interface{}) *Row
func (r *Row) GetValue(field string) interface{}
func (r *Row) SetValue(field string, value interface{})
func (r *Row) HasField(field string) bool
func (r *Row) Clone() *Row

Time Management

Time-based data structures for window processing:

type TimeSlot struct {
	Start    time.Time // Slot start time
	End      time.Time // Slot end time
	Duration time.Duration // Slot duration
	ID       string    // Unique slot identifier
}

// Time slot operations
func NewTimeSlot(start time.Time, duration time.Duration) *TimeSlot
func (ts *TimeSlot) Contains(timestamp time.Time) bool
func (ts *TimeSlot) Overlaps(other *TimeSlot) bool
func (ts *TimeSlot) String() string

Configuration Presets

Pre-defined configuration templates for common use cases:

// High Performance Configuration
func NewHighPerformanceConfig() *PerformanceConfig {
	return &PerformanceConfig{
		BufferSize:       50000,
		BatchSize:        1000,
		WorkerPoolSize:   8,
		FlushInterval:    100 * time.Millisecond,
		OverflowStrategy: "spill",
		MetricsEnabled:   true,
	}
}

// Low Latency Configuration
func NewLowLatencyConfig() *PerformanceConfig {
	return &PerformanceConfig{
		BufferSize:       1000,
		BatchSize:        10,
		WorkerPoolSize:   2,
		FlushInterval:    10 * time.Millisecond,
		OverflowStrategy: "drop",
		MetricsEnabled:   false,
	}
}

// Zero Data Loss Configuration
func NewZeroDataLossConfig() *PerformanceConfig {
	return &PerformanceConfig{
		BufferSize:         10000,
		BatchSize:          100,
		WorkerPoolSize:     4,
		FlushInterval:      time.Second,
		OverflowStrategy:   "block",
		PersistenceEnabled: true,
		RecoveryEnabled:    true,
		MetricsEnabled:     true,
	}
}

Usage Examples

Basic configuration:

config := &Config{
	WindowConfig: WindowConfig{
		Type: "tumbling",
		Params: map[string]interface{}{"size": "5s"},
	},
	GroupFields: []string{"device_id"},
	SelectFields: map[string]aggregator.AggregateType{
		"temperature": aggregator.AggregateTypeAvg,
	},
	NeedWindow: true,
}

Data row operations:

row := NewRow(map[string]interface{}{
	"device_id":   "sensor001",
	"temperature": 25.5,
})

deviceID := row.GetValue("device_id").(string)
row.SetValue("processed", true)

Integration

Integrates with other StreamSQL components:

• Stream Package - Core data types for stream processing • Window Package - WindowConfig for window configurations • Aggregator Package - AggregateType definitions • Condition Package - Data structures for clause evaluation • Functions Package - Type definitions for functions • RSQL Package - Config structures for query execution

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregationFieldInfo added in v0.10.3

type AggregationFieldInfo struct {
	FuncName    string                   `json:"funcName"`    // 函数名,如 "first_value"
	InputField  string                   `json:"inputField"`  // 输入字段,如 "displayNum"
	Placeholder string                   `json:"placeholder"` // 占位符,如 "__first_value_0__"
	AggType     aggregator.AggregateType `json:"aggType"`     // 聚合类型
	FullCall    string                   `json:"fullCall"`    // 完整函数调用,如 "NTH_VALUE(value, 2)"
}

AggregationFieldInfo holds information about an aggregation function in an expression

type BufferConfig

type BufferConfig struct {
	DataChannelSize     int     `json:"dataChannelSize"`     // Data input buffer size
	ResultChannelSize   int     `json:"resultChannelSize"`   // Result output buffer size
	WindowOutputSize    int     `json:"windowOutputSize"`    // Window output buffer size
	EnableDynamicResize bool    `json:"enableDynamicResize"` // Enable dynamic buffer resizing
	MaxBufferSize       int     `json:"maxBufferSize"`       // Maximum buffer size
	UsageThreshold      float64 `json:"usageThreshold"`      // Buffer usage threshold
}

BufferConfig buffer configuration

type Config

type Config struct {
	// SQL processing related configuration
	WindowConfig       WindowConfig                        `json:"windowConfig"`
	GroupFields        []string                            `json:"groupFields"`
	SelectFields       map[string]aggregator.AggregateType `json:"selectFields"`
	FieldAlias         map[string]string                   `json:"fieldAlias"`
	SimpleFields       []string                            `json:"simpleFields"`
	FieldExpressions   map[string]FieldExpression          `json:"fieldExpressions"`
	PostAggExpressions []PostAggregationExpression         `json:"postAggExpressions"` // Post-aggregation expressions
	FieldOrder         []string                            `json:"fieldOrder"`         // Original order of fields in SELECT statement
	Where              string                              `json:"where"`
	Having             string                              `json:"having"`

	// Feature switches
	NeedWindow bool `json:"needWindow"`
	Distinct   bool `json:"distinct"`

	// Result control
	Limit       int          `json:"limit"`
	Projections []Projection `json:"projections"`

	// Performance configuration
	PerformanceConfig PerformanceConfig `json:"performanceConfig"`
}

Config stream processing configuration

func NewConfig

func NewConfig() Config

NewConfig creates default configuration

func NewConfigWithPerformance

func NewConfigWithPerformance(perfConfig PerformanceConfig) Config

NewConfigWithPerformance creates Config with performance configuration

type ExpansionConfig

type ExpansionConfig struct {
	GrowthFactor     float64       `json:"growthFactor"`     // Growth factor
	MinIncrement     int           `json:"minIncrement"`     // Minimum expansion increment
	TriggerThreshold float64       `json:"triggerThreshold"` // Expansion trigger threshold
	ExpansionTimeout time.Duration `json:"expansionTimeout"` // Expansion timeout duration
}

ExpansionConfig expansion configuration

type FieldExpression

type FieldExpression struct {
	Field      string   `json:"field"`      // original field name
	Expression string   `json:"expression"` // complete expression
	Fields     []string `json:"fields"`     // all fields referenced in expression
}

FieldExpression field expression configuration

type MonitoringConfig

type MonitoringConfig struct {
	EnableMonitoring    bool              `json:"enableMonitoring"`    // Enable performance monitoring
	StatsUpdateInterval time.Duration     `json:"statsUpdateInterval"` // Statistics update interval
	EnableDetailedStats bool              `json:"enableDetailedStats"` // Enable detailed statistics
	WarningThresholds   WarningThresholds `json:"warningThresholds"`   // Performance warning thresholds
}

MonitoringConfig monitoring configuration

type OverflowConfig

type OverflowConfig struct {
	Strategy        string          `json:"strategy"`        // Overflow strategy: "drop", "block", "expand"
	BlockTimeout    time.Duration   `json:"blockTimeout"`    // Block timeout duration
	AllowDataLoss   bool            `json:"allowDataLoss"`   // Allow data loss
	ExpansionConfig ExpansionConfig `json:"expansionConfig"` // Expansion configuration
}

OverflowConfig overflow strategy configuration

type PerformanceConfig

type PerformanceConfig struct {
	BufferConfig     BufferConfig     `json:"bufferConfig"`     // buffer configuration
	OverflowConfig   OverflowConfig   `json:"overflowConfig"`   // overflow strategy configuration
	WorkerConfig     WorkerConfig     `json:"workerConfig"`     // worker pool configuration
	MonitoringConfig MonitoringConfig `json:"monitoringConfig"` // monitoring configuration
}

PerformanceConfig performance configuration

func DefaultPerformanceConfig

func DefaultPerformanceConfig() PerformanceConfig

DefaultPerformanceConfig returns default performance configuration Provides balanced performance settings suitable for most scenarios

func HighPerformanceConfig

func HighPerformanceConfig() PerformanceConfig

HighPerformanceConfig returns high performance configuration preset Optimizes throughput performance with large buffers and expansion strategy

func LowLatencyConfig

func LowLatencyConfig() PerformanceConfig

LowLatencyConfig returns low latency configuration preset Optimizes latency performance with smaller buffers and fast response strategy

type PostAggregationExpression added in v0.10.3

type PostAggregationExpression struct {
	OutputField        string                 `json:"outputField"`        // 输出字段名
	OriginalExpr       string                 `json:"originalExpr"`       // 原始表达式
	ExpressionTemplate string                 `json:"expressionTemplate"` // 表达式模板
	RequiredFields     []AggregationFieldInfo `json:"requiredFields"`     // 依赖的聚合字段
}

PostAggregationExpression represents an expression that needs to be evaluated after aggregation

type Projection

type Projection struct {
	OutputName string               `json:"outputName"` // output field name
	SourceType ProjectionSourceType `json:"sourceType"` // data source type
	InputName  string               `json:"inputName"`  // input field name
}

Projection projection configuration in SELECT list

type ProjectionSourceType

type ProjectionSourceType int

ProjectionSourceType projection source type

const (
	SourceGroupKey ProjectionSourceType = iota
	SourceAggregateResult
	SourceWindowProperty // For window_start, window_end
)

type Row

type Row struct {
	Timestamp time.Time
	Data      interface{}
	Slot      *TimeSlot
}

func (*Row) GetTimestamp

func (r *Row) GetTimestamp() time.Time

GetTimestamp gets timestamp

type RowEvent

type RowEvent interface {
	GetTimestamp() time.Time
}

type TimeSlot

type TimeSlot struct {
	Start *time.Time
	End   *time.Time
}

func NewTimeSlot

func NewTimeSlot(start, end *time.Time) *TimeSlot

func (TimeSlot) Contains

func (ts TimeSlot) Contains(t time.Time) bool

Contains checks if given time is within slot range

func (*TimeSlot) GetEndTime

func (ts *TimeSlot) GetEndTime() *time.Time

func (*TimeSlot) GetStartTime

func (ts *TimeSlot) GetStartTime() *time.Time

func (TimeSlot) Hash

func (ts TimeSlot) Hash() uint64

Hash generates slot hash value

func (*TimeSlot) WindowEnd

func (ts *TimeSlot) WindowEnd() int64

func (*TimeSlot) WindowStart

func (ts *TimeSlot) WindowStart() int64

type WarningThresholds

type WarningThresholds struct {
	DropRateWarning     float64 `json:"dropRateWarning"`     // Drop rate warning threshold
	DropRateCritical    float64 `json:"dropRateCritical"`    // Drop rate critical threshold
	BufferUsageWarning  float64 `json:"bufferUsageWarning"`  // Buffer usage warning threshold
	BufferUsageCritical float64 `json:"bufferUsageCritical"` // Buffer usage critical threshold
}

WarningThresholds performance warning thresholds

type WindowConfig

type WindowConfig struct {
	Type       string                 `json:"type"`
	Params     map[string]interface{} `json:"params"`
	TsProp     string                 `json:"tsProp"`
	TimeUnit   time.Duration          `json:"timeUnit"`
	GroupByKey string                 `json:"groupByKey"` // Session window grouping key
}

WindowConfig window configuration

type WorkerConfig

type WorkerConfig struct {
	SinkPoolSize     int `json:"sinkPoolSize"`     // Sink pool size
	SinkWorkerCount  int `json:"sinkWorkerCount"`  // Sink worker count
	MaxRetryRoutines int `json:"maxRetryRoutines"` // Maximum retry routines
}

WorkerConfig worker pool configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL