types

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2025 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferConfig

type BufferConfig struct {
	DataChannelSize     int     `json:"dataChannelSize"`     // 数据输入缓冲区大小
	ResultChannelSize   int     `json:"resultChannelSize"`   // 结果输出缓冲区大小
	WindowOutputSize    int     `json:"windowOutputSize"`    // 窗口输出缓冲区大小
	EnableDynamicResize bool    `json:"enableDynamicResize"` // 是否启用动态缓冲区调整
	MaxBufferSize       int     `json:"maxBufferSize"`       // 最大缓冲区大小
	UsageThreshold      float64 `json:"usageThreshold"`      // 缓冲区使用率阈值
}

BufferConfig 缓冲区配置

type Config

type Config struct {
	// SQL 处理相关配置
	WindowConfig     WindowConfig                        `json:"windowConfig"`
	GroupFields      []string                            `json:"groupFields"`
	SelectFields     map[string]aggregator.AggregateType `json:"selectFields"`
	FieldAlias       map[string]string                   `json:"fieldAlias"`
	SimpleFields     []string                            `json:"simpleFields"`
	FieldExpressions map[string]FieldExpression          `json:"fieldExpressions"`
	Where            string                              `json:"where"`
	Having           string                              `json:"having"`

	// 功能开关
	NeedWindow bool `json:"needWindow"`
	Distinct   bool `json:"distinct"`

	// 结果控制
	Limit       int          `json:"limit"`
	Projections []Projection `json:"projections"`

	// 性能配置
	PerformanceConfig PerformanceConfig `json:"performanceConfig"`
}

Config 流处理配置

func NewConfig

func NewConfig() Config

NewConfig 创建默认配置

func NewConfigWithPerformance

func NewConfigWithPerformance(perfConfig PerformanceConfig) Config

NewConfigWithPerformance 创建带性能配置的Config

type ExpansionConfig

type ExpansionConfig struct {
	GrowthFactor     float64       `json:"growthFactor"`     // 扩容因子
	MinIncrement     int           `json:"minIncrement"`     // 最小扩容增量
	TriggerThreshold float64       `json:"triggerThreshold"` // 扩容触发阈值
	ExpansionTimeout time.Duration `json:"expansionTimeout"` // 扩容超时时间
}

ExpansionConfig 扩容配置

type FieldExpression

type FieldExpression struct {
	Field      string   `json:"field"`      // 原始字段名
	Expression string   `json:"expression"` // 完整表达式
	Fields     []string `json:"fields"`     // 表达式中引用的所有字段
}

FieldExpression 字段表达式配置

type MonitoringConfig

type MonitoringConfig struct {
	EnableMonitoring    bool              `json:"enableMonitoring"`    // 是否启用性能监控
	StatsUpdateInterval time.Duration     `json:"statsUpdateInterval"` // 统计信息更新间隔
	EnableDetailedStats bool              `json:"enableDetailedStats"` // 是否启用详细统计
	WarningThresholds   WarningThresholds `json:"warningThresholds"`   // 性能警告阈值
}

MonitoringConfig 监控配置

type OverflowConfig

type OverflowConfig struct {
	Strategy          string             `json:"strategy"`          // 溢出策略: "drop", "block", "expand", "persist"
	BlockTimeout      time.Duration      `json:"blockTimeout"`      // 阻塞超时时间
	AllowDataLoss     bool               `json:"allowDataLoss"`     // 是否允许数据丢失
	PersistenceConfig *PersistenceConfig `json:"persistenceConfig"` // 持久化配置
	ExpansionConfig   ExpansionConfig    `json:"expansionConfig"`   // 扩容配置
}

OverflowConfig 溢出策略配置

type PerformanceConfig

type PerformanceConfig struct {
	BufferConfig     BufferConfig     `json:"bufferConfig"`     // 缓冲区配置
	OverflowConfig   OverflowConfig   `json:"overflowConfig"`   // 溢出策略配置
	WorkerConfig     WorkerConfig     `json:"workerConfig"`     // 工作池配置
	MonitoringConfig MonitoringConfig `json:"monitoringConfig"` // 监控配置
}

PerformanceConfig 性能配置

func DefaultPerformanceConfig

func DefaultPerformanceConfig() PerformanceConfig

DefaultPerformanceConfig 默认性能配置

func HighPerformanceConfig

func HighPerformanceConfig() PerformanceConfig

HighPerformanceConfig 高性能配置预设

func LowLatencyConfig

func LowLatencyConfig() PerformanceConfig

LowLatencyConfig 低延迟配置预设

func PersistencePerformanceConfig

func PersistencePerformanceConfig() PerformanceConfig

PersistencePerformanceConfig 持久化配置预设

func ZeroDataLossConfig

func ZeroDataLossConfig() PerformanceConfig

ZeroDataLossConfig 零数据丢失配置预设

type PersistenceConfig

type PersistenceConfig struct {
	DataDir       string        `json:"dataDir"`       // 持久化数据目录
	MaxFileSize   int64         `json:"maxFileSize"`   // 最大文件大小
	FlushInterval time.Duration `json:"flushInterval"` // 刷新间隔
	MaxRetries    int           `json:"maxRetries"`    // 最大重试次数
	RetryInterval time.Duration `json:"retryInterval"` // 重试间隔
}

PersistenceConfig 持久化配置

type Projection

type Projection struct {
	OutputName string               `json:"outputName"` // 输出字段名
	SourceType ProjectionSourceType `json:"sourceType"` // 数据来源类型
	InputName  string               `json:"inputName"`  // 输入字段名
}

Projection SELECT列表中的投影配置

type ProjectionSourceType

type ProjectionSourceType int

ProjectionSourceType 投影来源类型

const (
	SourceGroupKey ProjectionSourceType = iota
	SourceAggregateResult
	SourceWindowProperty // For window_start, window_end
)

type Row

type Row struct {
	Timestamp time.Time
	Data      interface{}
	Slot      *TimeSlot
}

func (*Row) GetTimestamp

func (r *Row) GetTimestamp() time.Time

GetTimestamp 获取时间戳

type RowEvent

type RowEvent interface {
	GetTimestamp() time.Time
}

type TimeSlot

type TimeSlot struct {
	Start *time.Time
	End   *time.Time
}

func NewTimeSlot

func NewTimeSlot(start, end *time.Time) *TimeSlot

func (TimeSlot) Contains

func (ts TimeSlot) Contains(t time.Time) bool

Contains 检查给定时间是否在槽位范围内

func (*TimeSlot) GetEndTime

func (ts *TimeSlot) GetEndTime() *time.Time

func (*TimeSlot) GetStartTime

func (ts *TimeSlot) GetStartTime() *time.Time

func (TimeSlot) Hash

func (ts TimeSlot) Hash() uint64

Hash 生成槽位的哈希值

func (*TimeSlot) WindowEnd

func (ts *TimeSlot) WindowEnd() int64

func (*TimeSlot) WindowStart

func (ts *TimeSlot) WindowStart() int64

type WarningThresholds

type WarningThresholds struct {
	DropRateWarning     float64 `json:"dropRateWarning"`     // 丢弃率警告阈值
	DropRateCritical    float64 `json:"dropRateCritical"`    // 丢弃率严重阈值
	BufferUsageWarning  float64 `json:"bufferUsageWarning"`  // 缓冲区使用率警告阈值
	BufferUsageCritical float64 `json:"bufferUsageCritical"` // 缓冲区使用率严重阈值
}

WarningThresholds 性能警告阈值

type WindowConfig

type WindowConfig struct {
	Type       string                 `json:"type"`
	Params     map[string]interface{} `json:"params"`
	TsProp     string                 `json:"tsProp"`
	TimeUnit   time.Duration          `json:"timeUnit"`
	GroupByKey string                 `json:"groupByKey"` // 会话窗口分组键
}

WindowConfig 窗口配置

type WorkerConfig

type WorkerConfig struct {
	SinkPoolSize     int `json:"sinkPoolSize"`     // Sink工作池大小
	SinkWorkerCount  int `json:"sinkWorkerCount"`  // Sink工作线程数
	MaxRetryRoutines int `json:"maxRetryRoutines"` // 最大重试协程数
}

WorkerConfig 工作池配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL