Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferConfig ¶
type BufferConfig struct {
DataChannelSize int `json:"dataChannelSize"` // 数据输入缓冲区大小
ResultChannelSize int `json:"resultChannelSize"` // 结果输出缓冲区大小
WindowOutputSize int `json:"windowOutputSize"` // 窗口输出缓冲区大小
EnableDynamicResize bool `json:"enableDynamicResize"` // 是否启用动态缓冲区调整
MaxBufferSize int `json:"maxBufferSize"` // 最大缓冲区大小
UsageThreshold float64 `json:"usageThreshold"` // 缓冲区使用率阈值
}
BufferConfig 缓冲区配置
type Config ¶
type Config struct {
// SQL 处理相关配置
WindowConfig WindowConfig `json:"windowConfig"`
GroupFields []string `json:"groupFields"`
SelectFields map[string]aggregator.AggregateType `json:"selectFields"`
FieldAlias map[string]string `json:"fieldAlias"`
SimpleFields []string `json:"simpleFields"`
FieldExpressions map[string]FieldExpression `json:"fieldExpressions"`
Where string `json:"where"`
Having string `json:"having"`
// 功能开关
NeedWindow bool `json:"needWindow"`
Distinct bool `json:"distinct"`
// 结果控制
Limit int `json:"limit"`
Projections []Projection `json:"projections"`
// 性能配置
PerformanceConfig PerformanceConfig `json:"performanceConfig"`
}
Config 流处理配置
func NewConfigWithPerformance ¶
func NewConfigWithPerformance(perfConfig PerformanceConfig) Config
NewConfigWithPerformance 创建带性能配置的Config
type ExpansionConfig ¶
type ExpansionConfig struct {
GrowthFactor float64 `json:"growthFactor"` // 扩容因子
MinIncrement int `json:"minIncrement"` // 最小扩容增量
TriggerThreshold float64 `json:"triggerThreshold"` // 扩容触发阈值
ExpansionTimeout time.Duration `json:"expansionTimeout"` // 扩容超时时间
}
ExpansionConfig 扩容配置
type FieldExpression ¶
type FieldExpression struct {
Field string `json:"field"` // 原始字段名
Expression string `json:"expression"` // 完整表达式
Fields []string `json:"fields"` // 表达式中引用的所有字段
}
FieldExpression 字段表达式配置
type MonitoringConfig ¶
type MonitoringConfig struct {
EnableMonitoring bool `json:"enableMonitoring"` // 是否启用性能监控
StatsUpdateInterval time.Duration `json:"statsUpdateInterval"` // 统计信息更新间隔
EnableDetailedStats bool `json:"enableDetailedStats"` // 是否启用详细统计
WarningThresholds WarningThresholds `json:"warningThresholds"` // 性能警告阈值
}
MonitoringConfig 监控配置
type OverflowConfig ¶
type OverflowConfig struct {
Strategy string `json:"strategy"` // 溢出策略: "drop", "block", "expand", "persist"
BlockTimeout time.Duration `json:"blockTimeout"` // 阻塞超时时间
AllowDataLoss bool `json:"allowDataLoss"` // 是否允许数据丢失
PersistenceConfig *PersistenceConfig `json:"persistenceConfig"` // 持久化配置
ExpansionConfig ExpansionConfig `json:"expansionConfig"` // 扩容配置
}
OverflowConfig 溢出策略配置
type PerformanceConfig ¶
type PerformanceConfig struct {
BufferConfig BufferConfig `json:"bufferConfig"` // 缓冲区配置
OverflowConfig OverflowConfig `json:"overflowConfig"` // 溢出策略配置
WorkerConfig WorkerConfig `json:"workerConfig"` // 工作池配置
MonitoringConfig MonitoringConfig `json:"monitoringConfig"` // 监控配置
}
PerformanceConfig 性能配置
func DefaultPerformanceConfig ¶
func DefaultPerformanceConfig() PerformanceConfig
DefaultPerformanceConfig 默认性能配置
func HighPerformanceConfig ¶
func HighPerformanceConfig() PerformanceConfig
HighPerformanceConfig 高性能配置预设
func PersistencePerformanceConfig ¶
func PersistencePerformanceConfig() PerformanceConfig
PersistencePerformanceConfig 持久化配置预设
type PersistenceConfig ¶
type PersistenceConfig struct {
DataDir string `json:"dataDir"` // 持久化数据目录
MaxFileSize int64 `json:"maxFileSize"` // 最大文件大小
FlushInterval time.Duration `json:"flushInterval"` // 刷新间隔
MaxRetries int `json:"maxRetries"` // 最大重试次数
RetryInterval time.Duration `json:"retryInterval"` // 重试间隔
}
PersistenceConfig 持久化配置
type Projection ¶
type Projection struct {
OutputName string `json:"outputName"` // 输出字段名
SourceType ProjectionSourceType `json:"sourceType"` // 数据来源类型
InputName string `json:"inputName"` // 输入字段名
}
Projection SELECT列表中的投影配置
type ProjectionSourceType ¶
type ProjectionSourceType int
ProjectionSourceType 投影来源类型
const ( SourceGroupKey ProjectionSourceType = iota SourceAggregateResult SourceWindowProperty // For window_start, window_end )
type TimeSlot ¶
func NewTimeSlot ¶
func (*TimeSlot) GetEndTime ¶
func (*TimeSlot) GetStartTime ¶
func (*TimeSlot) WindowStart ¶
type WarningThresholds ¶
type WarningThresholds struct {
DropRateWarning float64 `json:"dropRateWarning"` // 丢弃率警告阈值
DropRateCritical float64 `json:"dropRateCritical"` // 丢弃率严重阈值
BufferUsageWarning float64 `json:"bufferUsageWarning"` // 缓冲区使用率警告阈值
BufferUsageCritical float64 `json:"bufferUsageCritical"` // 缓冲区使用率严重阈值
}
WarningThresholds 性能警告阈值
type WindowConfig ¶
type WindowConfig struct {
Type string `json:"type"`
Params map[string]interface{} `json:"params"`
TsProp string `json:"tsProp"`
TimeUnit time.Duration `json:"timeUnit"`
GroupByKey string `json:"groupByKey"` // 会话窗口分组键
}
WindowConfig 窗口配置
type WorkerConfig ¶
type WorkerConfig struct {
SinkPoolSize int `json:"sinkPoolSize"` // Sink工作池大小
SinkWorkerCount int `json:"sinkWorkerCount"` // Sink工作线程数
MaxRetryRoutines int `json:"maxRetryRoutines"` // 最大重试协程数
}
WorkerConfig 工作池配置
Click to show internal directories.
Click to hide internal directories.