Documentation
¶
Index ¶
- Constants
- type PersistenceManager
- type Stream
- func NewHighPerformanceStream(config types.Config) (*Stream, error)
- func NewStream(config types.Config) (*Stream, error)
- func NewStreamProcessor() (*Stream, error)
- func NewStreamWithBuffers(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int) (*Stream, error)
- func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)
- func NewStreamWithHighPerformance(config types.Config) (*Stream, error)
- func NewStreamWithLossPolicy(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int, ...) (*Stream, error)
- func NewStreamWithLossPolicyAndPersistence(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int, ...) (*Stream, error)
- func NewStreamWithLowLatency(config types.Config) (*Stream, error)
- func NewStreamWithZeroDataLoss(config types.Config) (*Stream, error)
- func NewStreamWithoutDataLoss(config types.Config, strategy string) (*Stream, error)
- func (s *Stream) AddSink(sink func(interface{}))
- func (s *Stream) Emit(data interface{})
- func (s *Stream) GetDetailedStats() map[string]interface{}
- func (s *Stream) GetPersistenceStats() map[string]interface{}
- func (s *Stream) GetResultsChan() <-chan interface{}
- func (s *Stream) GetStats() map[string]int64
- func (s *Stream) IsAggregationQuery() bool
- func (s *Stream) LoadAndReprocessPersistedData() error
- func (s *Stream) ProcessSync(data interface{}) (interface{}, error)
- func (s *Stream) RegisterFilter(conditionStr string) error
- func (s *Stream) ResetStats()
- func (s *Stream) Start()
- func (s *Stream) Stop()
Constants ¶
const ( WindowStartField = "window_start" WindowEndField = "window_end" )
窗口相关常量
const ( StrategyDrop = "drop" StrategyBlock = "block" StrategyExpand = "expand" StrategyPersist = "persist" )
溢出策略常量
const ( StatsInputCount = "input_count" StatsOutputCount = "output_count" StatsDroppedCount = "dropped_count" StatsDataChanLen = "data_chan_len" StatsDataChanCap = "data_chan_cap" StatsResultChanLen = "result_chan_len" StatsResultChanCap = "result_chan_cap" StatsSinkPoolLen = "sink_pool_len" StatsSinkPoolCap = "sink_pool_cap" StatsActiveRetries = "active_retries" StatsExpanding = "expanding" )
统计信息字段常量
const ( StatsBasicStats = "basic_stats" StatsDataChanUsage = "data_chan_usage" StatsResultChanUsage = "result_chan_usage" StatsSinkPoolUsage = "sink_pool_usage" StatsProcessRate = "process_rate" StatsDropRate = "drop_rate" StatsPerformanceLevel = "performance_level" )
详细统计信息字段常量
const ( PerformanceLevelCritical = "CRITICAL" PerformanceLevelWarning = "WARNING" PerformanceLevelHighLoad = "HIGH_LOAD" PerformanceLevelModerateLoad = "MODERATE_LOAD" PerformanceLevelOptimal = "OPTIMAL" )
性能级别常量
const ( PersistenceEnabled = "enabled" PersistenceMessage = "message" PersistenceNotEnabledMsg = "persistence not enabled" PerformanceConfigKey = "performanceConfig" )
持久化相关常量
const (
SQLKeywordCase = "CASE"
)
SQL关键字常量
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PersistenceManager ¶
type PersistenceManager struct {
// contains filtered or unexported fields
}
PersistenceManager 数据持久化管理器
func NewPersistenceManager ¶
func NewPersistenceManager(dataDir string) *PersistenceManager
NewPersistenceManager 创建默认配置的持久化管理器
func NewPersistenceManagerWithConfig ¶
func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) *PersistenceManager
NewPersistenceManagerWithConfig 创建自定义配置的持久化管理器
func (*PersistenceManager) GetStats ¶
func (pm *PersistenceManager) GetStats() map[string]interface{}
GetStats 获取持久化统计信息
func (*PersistenceManager) LoadPersistedData ¶
func (pm *PersistenceManager) LoadPersistedData() ([]interface{}, error)
LoadPersistedData 加载并删除持久化数据
func (*PersistenceManager) PersistData ¶
func (pm *PersistenceManager) PersistData(data interface{}) error
PersistData 持久化单条数据
type Stream ¶
func NewHighPerformanceStream ¶
NewHighPerformanceStream 创建高性能配置的Stream (已弃用,使用NewStreamWithHighPerformance) Deprecated: 使用NewStreamWithHighPerformance替代
func NewStreamProcessor ¶
func NewStreamWithBuffers ¶
func NewStreamWithBuffers(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int) (*Stream, error)
NewStreamWithBuffers 创建带自定义缓冲区大小的Stream (已弃用,使用NewStreamWithCustomPerformance) Deprecated: 使用NewStreamWithCustomPerformance替代
func NewStreamWithCustomPerformance ¶
func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)
NewStreamWithCustomPerformance 创建自定义性能配置的Stream
func NewStreamWithHighPerformance ¶
NewStreamWithHighPerformance 创建高性能Stream
func NewStreamWithLossPolicy ¶
func NewStreamWithLossPolicy(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int, overflowStrategy string, timeout time.Duration) (*Stream, error)
NewStreamWithLossPolicy 创建带数据丢失策略的流处理器 (已弃用,使用NewStreamWithCustomPerformance) Deprecated: 使用NewStreamWithCustomPerformance替代
func NewStreamWithLossPolicyAndPersistence ¶
func NewStreamWithLossPolicyAndPersistence(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int, overflowStrategy string, timeout time.Duration, persistDataDir string, persistMaxFileSize int64, persistFlushInterval time.Duration) (*Stream, error)
NewStreamWithLossPolicyAndPersistence 创建带数据丢失策略和持久化配置的流处理器 (已弃用,使用NewStreamWithCustomPerformance) Deprecated: 使用NewStreamWithCustomPerformance替代
func NewStreamWithLowLatency ¶
NewStreamWithLowLatency 创建低延迟Stream
func NewStreamWithZeroDataLoss ¶
NewStreamWithZeroDataLoss 创建零数据丢失Stream
func NewStreamWithoutDataLoss ¶
NewStreamWithoutDataLoss 创建零数据丢失的流处理器 (已弃用,使用NewStreamWithZeroDataLoss) Deprecated: 使用NewStreamWithZeroDataLoss替代
func (*Stream) GetDetailedStats ¶
GetDetailedStats 获取详细的性能统计信息
func (*Stream) GetPersistenceStats ¶
GetPersistenceStats 获取持久化统计信息
func (*Stream) GetResultsChan ¶
func (s *Stream) GetResultsChan() <-chan interface{}
func (*Stream) IsAggregationQuery ¶
IsAggregationQuery 检查当前流是否为聚合查询
func (*Stream) LoadAndReprocessPersistedData ¶
LoadAndReprocessPersistedData 加载并重新处理持久化数据
func (*Stream) ProcessSync ¶
ProcessSync 同步处理单条数据,立即返回结果 仅适用于非聚合查询,聚合查询会返回错误