Documentation
¶
Overview ¶
Package types provides core type definitions and data structures for StreamSQL.
This package defines fundamental data types, configuration structures, and interfaces used throughout the StreamSQL stream processing pipeline. It ensures type safety and provides a unified API for data manipulation across components.
Core Features ¶
• Data Types - Core data structures for stream processing • Configuration Management - Centralized configuration structures • Type Safety - Strong typing with validation • Serialization Support - JSON serialization support • Cross-Component Compatibility - Shared types across packages
Configuration Structures ¶
Core configuration types:
type Config struct {
WindowConfig WindowConfig // Window settings
GroupFields []string // GROUP BY fields
SelectFields map[string]aggregator.AggregateType // SELECT aggregations
FieldAlias map[string]string // Field aliases
SimpleFields []string // Non-aggregated fields
FieldExpressions map[string]FieldExpression // Computed expressions
Where string // WHERE clause
Having string // HAVING clause
NeedWindow bool // Window requirement
Distinct bool // DISTINCT flag
Limit int // LIMIT clause
PerformanceConfig PerformanceConfig // Performance settings
}
Window Configuration ¶
Unified configuration for all window types:
type WindowConfig struct {
Type string // Window type
Params map[string]interface{} // Parameters
TsProp string // Timestamp property
TimeUnit time.Duration // Time unit
GroupByKey string // Grouping key
}
// Example configurations
// Tumbling window
windowConfig := WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{
"size": "5s",
},
TsProp: "timestamp",
}
// Sliding window
windowConfig := WindowConfig{
Type: "sliding",
Params: map[string]interface{}{
"size": "30s",
"slide": "10s",
},
TsProp: "timestamp",
}
// Counting window
windowConfig := WindowConfig{
Type: "counting",
Params: map[string]interface{}{
"count": 100,
},
}
// Session window
windowConfig := WindowConfig{
Type: "session",
Params: map[string]interface{}{
"timeout": "5m",
},
GroupByKey: "user_id",
}
Performance Configuration ¶
Comprehensive performance tuning options:
type PerformanceConfig struct {
// Buffer management
BufferSize int // Input buffer size
BatchSize int // Processing batch size
FlushInterval time.Duration // Automatic flush interval
HighWaterMark float64 // Buffer high water mark (0.0-1.0)
LowWaterMark float64 // Buffer low water mark (0.0-1.0)
// Worker pool configuration
WorkerPoolSize int // Number of worker goroutines
MaxWorkers int // Maximum worker limit
WorkerIdleTime time.Duration // Worker idle timeout
// Overflow handling
OverflowStrategy string // "drop", "block", "spill", "compress"
SpillDirectory string // Directory for spill files
CompressionLevel int // Compression level (1-9)
// Memory management
MaxMemoryUsage int64 // Maximum memory usage in bytes
GCInterval time.Duration // Garbage collection interval
MemoryThreshold float64 // Memory usage threshold
// Monitoring
MetricsEnabled bool // Enable metrics collection
MetricsInterval time.Duration // Metrics collection interval
HealthCheckPort int // Health check HTTP port
// Persistence
PersistenceEnabled bool // Enable data persistence
PersistenceType string // "memory", "file", "database"
PersistencePath string // Persistence storage path
RecoveryEnabled bool // Enable automatic recovery
}
Field Management ¶
Advanced field handling and expression support:
type FieldExpression struct {
Field string // Field name
Expression string // Expression
Fields []string // Referenced fields
}
type Projection struct {
SourceType ProjectionSourceType // Source type (field, expression, aggregate)
Source string // Source identifier
Alias string // Output alias
DataType string // Expected data type
}
type ProjectionSourceType string
const (
ProjectionSourceField ProjectionSourceType = "field" // Direct field reference
ProjectionSourceExpression ProjectionSourceType = "expression" // Computed expression
ProjectionSourceAggregate ProjectionSourceType = "aggregate" // Aggregate function
ProjectionSourceConstant ProjectionSourceType = "constant" // Constant value
)
Data Row Representation ¶
Type-safe data row structures for stream processing:
type Row struct {
Data map[string]interface{} // Row data
Timestamp time.Time // Row timestamp
Metadata map[string]interface{} // Additional metadata
GroupKey string // Grouping key for aggregation
WindowID string // Window identifier
}
// Row creation and manipulation
func NewRow(data map[string]interface{}) *Row
func (r *Row) GetValue(field string) interface{}
func (r *Row) SetValue(field string, value interface{})
func (r *Row) HasField(field string) bool
func (r *Row) Clone() *Row
Time Management ¶
Time-based data structures for window processing:
type TimeSlot struct {
Start time.Time // Slot start time
End time.Time // Slot end time
Duration time.Duration // Slot duration
ID string // Unique slot identifier
}
// Time slot operations
func NewTimeSlot(start time.Time, duration time.Duration) *TimeSlot
func (ts *TimeSlot) Contains(timestamp time.Time) bool
func (ts *TimeSlot) Overlaps(other *TimeSlot) bool
func (ts *TimeSlot) String() string
Configuration Presets ¶
Pre-defined configuration templates for common use cases:
// High Performance Configuration
func NewHighPerformanceConfig() *PerformanceConfig {
return &PerformanceConfig{
BufferSize: 50000,
BatchSize: 1000,
WorkerPoolSize: 8,
FlushInterval: 100 * time.Millisecond,
OverflowStrategy: "spill",
MetricsEnabled: true,
}
}
// Low Latency Configuration
func NewLowLatencyConfig() *PerformanceConfig {
return &PerformanceConfig{
BufferSize: 1000,
BatchSize: 10,
WorkerPoolSize: 2,
FlushInterval: 10 * time.Millisecond,
OverflowStrategy: "drop",
MetricsEnabled: false,
}
}
// Zero Data Loss Configuration
func NewZeroDataLossConfig() *PerformanceConfig {
return &PerformanceConfig{
BufferSize: 10000,
BatchSize: 100,
WorkerPoolSize: 4,
FlushInterval: time.Second,
OverflowStrategy: "block",
PersistenceEnabled: true,
RecoveryEnabled: true,
MetricsEnabled: true,
}
}
Usage Examples ¶
Basic configuration:
config := &Config{
WindowConfig: WindowConfig{
Type: "tumbling",
Params: map[string]interface{}{"size": "5s"},
},
GroupFields: []string{"device_id"},
SelectFields: map[string]aggregator.AggregateType{
"temperature": aggregator.AggregateTypeAvg,
},
NeedWindow: true,
}
Data row operations:
row := NewRow(map[string]interface{}{
"device_id": "sensor001",
"temperature": 25.5,
})
deviceID := row.GetValue("device_id").(string)
row.SetValue("processed", true)
Integration ¶
Integrates with other StreamSQL components:
• Stream Package - Core data types for stream processing • Window Package - WindowConfig for window configurations • Aggregator Package - AggregateType definitions • Condition Package - Data structures for clause evaluation • Functions Package - Type definitions for functions • RSQL Package - Config structures for query execution
Index ¶
- type AggregationFieldInfo
- type BufferConfig
- type Config
- type ExpansionConfig
- type FieldExpression
- type MonitoringConfig
- type OverflowConfig
- type PerformanceConfig
- type PostAggregationExpression
- type Projection
- type ProjectionSourceType
- type Row
- type RowEvent
- type TimeSlot
- type WarningThresholds
- type WindowConfig
- type WorkerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AggregationFieldInfo ¶ added in v0.10.3
type AggregationFieldInfo struct {
FuncName string `json:"funcName"` // 函数名,如 "first_value"
InputField string `json:"inputField"` // 输入字段,如 "displayNum"
Placeholder string `json:"placeholder"` // 占位符,如 "__first_value_0__"
AggType aggregator.AggregateType `json:"aggType"` // 聚合类型
FullCall string `json:"fullCall"` // 完整函数调用,如 "NTH_VALUE(value, 2)"
}
AggregationFieldInfo holds information about an aggregation function in an expression
type BufferConfig ¶
type BufferConfig struct {
DataChannelSize int `json:"dataChannelSize"` // Data input buffer size
ResultChannelSize int `json:"resultChannelSize"` // Result output buffer size
WindowOutputSize int `json:"windowOutputSize"` // Window output buffer size
EnableDynamicResize bool `json:"enableDynamicResize"` // Enable dynamic buffer resizing
MaxBufferSize int `json:"maxBufferSize"` // Maximum buffer size
UsageThreshold float64 `json:"usageThreshold"` // Buffer usage threshold
}
BufferConfig buffer configuration
type Config ¶
type Config struct {
// SQL processing related configuration
WindowConfig WindowConfig `json:"windowConfig"`
GroupFields []string `json:"groupFields"`
SelectFields map[string]aggregator.AggregateType `json:"selectFields"`
FieldAlias map[string]string `json:"fieldAlias"`
SimpleFields []string `json:"simpleFields"`
FieldExpressions map[string]FieldExpression `json:"fieldExpressions"`
PostAggExpressions []PostAggregationExpression `json:"postAggExpressions"` // Post-aggregation expressions
FieldOrder []string `json:"fieldOrder"` // Original order of fields in SELECT statement
Where string `json:"where"`
Having string `json:"having"`
// Feature switches
NeedWindow bool `json:"needWindow"`
Distinct bool `json:"distinct"`
// Result control
Limit int `json:"limit"`
Projections []Projection `json:"projections"`
// Performance configuration
PerformanceConfig PerformanceConfig `json:"performanceConfig"`
}
Config stream processing configuration
func NewConfigWithPerformance ¶
func NewConfigWithPerformance(perfConfig PerformanceConfig) Config
NewConfigWithPerformance creates Config with performance configuration
type ExpansionConfig ¶
type ExpansionConfig struct {
GrowthFactor float64 `json:"growthFactor"` // Growth factor
MinIncrement int `json:"minIncrement"` // Minimum expansion increment
TriggerThreshold float64 `json:"triggerThreshold"` // Expansion trigger threshold
ExpansionTimeout time.Duration `json:"expansionTimeout"` // Expansion timeout duration
}
ExpansionConfig expansion configuration
type FieldExpression ¶
type FieldExpression struct {
Field string `json:"field"` // original field name
Expression string `json:"expression"` // complete expression
Fields []string `json:"fields"` // all fields referenced in expression
}
FieldExpression field expression configuration
type MonitoringConfig ¶
type MonitoringConfig struct {
EnableMonitoring bool `json:"enableMonitoring"` // Enable performance monitoring
StatsUpdateInterval time.Duration `json:"statsUpdateInterval"` // Statistics update interval
EnableDetailedStats bool `json:"enableDetailedStats"` // Enable detailed statistics
WarningThresholds WarningThresholds `json:"warningThresholds"` // Performance warning thresholds
}
MonitoringConfig monitoring configuration
type OverflowConfig ¶
type OverflowConfig struct {
Strategy string `json:"strategy"` // Overflow strategy: "drop", "block", "expand"
BlockTimeout time.Duration `json:"blockTimeout"` // Block timeout duration
AllowDataLoss bool `json:"allowDataLoss"` // Allow data loss
ExpansionConfig ExpansionConfig `json:"expansionConfig"` // Expansion configuration
}
OverflowConfig overflow strategy configuration
type PerformanceConfig ¶
type PerformanceConfig struct {
BufferConfig BufferConfig `json:"bufferConfig"` // buffer configuration
OverflowConfig OverflowConfig `json:"overflowConfig"` // overflow strategy configuration
WorkerConfig WorkerConfig `json:"workerConfig"` // worker pool configuration
MonitoringConfig MonitoringConfig `json:"monitoringConfig"` // monitoring configuration
}
PerformanceConfig performance configuration
func DefaultPerformanceConfig ¶
func DefaultPerformanceConfig() PerformanceConfig
DefaultPerformanceConfig returns default performance configuration Provides balanced performance settings suitable for most scenarios
func HighPerformanceConfig ¶
func HighPerformanceConfig() PerformanceConfig
HighPerformanceConfig returns high performance configuration preset Optimizes throughput performance with large buffers and expansion strategy
func LowLatencyConfig ¶
func LowLatencyConfig() PerformanceConfig
LowLatencyConfig returns low latency configuration preset Optimizes latency performance with smaller buffers and fast response strategy
type PostAggregationExpression ¶ added in v0.10.3
type PostAggregationExpression struct {
OutputField string `json:"outputField"` // 输出字段名
OriginalExpr string `json:"originalExpr"` // 原始表达式
ExpressionTemplate string `json:"expressionTemplate"` // 表达式模板
RequiredFields []AggregationFieldInfo `json:"requiredFields"` // 依赖的聚合字段
}
PostAggregationExpression represents an expression that needs to be evaluated after aggregation
type Projection ¶
type Projection struct {
OutputName string `json:"outputName"` // output field name
SourceType ProjectionSourceType `json:"sourceType"` // data source type
InputName string `json:"inputName"` // input field name
}
Projection projection configuration in SELECT list
type ProjectionSourceType ¶
type ProjectionSourceType int
ProjectionSourceType projection source type
const ( SourceGroupKey ProjectionSourceType = iota SourceAggregateResult SourceWindowProperty // For window_start, window_end )
type TimeSlot ¶
func NewTimeSlot ¶
func (*TimeSlot) GetEndTime ¶
func (*TimeSlot) GetStartTime ¶
func (*TimeSlot) WindowStart ¶
type WarningThresholds ¶
type WarningThresholds struct {
DropRateWarning float64 `json:"dropRateWarning"` // Drop rate warning threshold
DropRateCritical float64 `json:"dropRateCritical"` // Drop rate critical threshold
BufferUsageWarning float64 `json:"bufferUsageWarning"` // Buffer usage warning threshold
BufferUsageCritical float64 `json:"bufferUsageCritical"` // Buffer usage critical threshold
}
WarningThresholds performance warning thresholds
type WindowConfig ¶
type WindowConfig struct {
Type string `json:"type"`
Params map[string]interface{} `json:"params"`
TsProp string `json:"tsProp"`
TimeUnit time.Duration `json:"timeUnit"`
GroupByKey string `json:"groupByKey"` // Session window grouping key
}
WindowConfig window configuration
type WorkerConfig ¶
type WorkerConfig struct {
SinkPoolSize int `json:"sinkPoolSize"` // Sink pool size
SinkWorkerCount int `json:"sinkWorkerCount"` // Sink worker count
MaxRetryRoutines int `json:"maxRetryRoutines"` // Maximum retry routines
}
WorkerConfig worker pool configuration