stream

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WindowStartField = "window_start"
	WindowEndField   = "window_end"
)

窗口相关常量

View Source
const (
	StrategyDrop    = "drop"
	StrategyBlock   = "block"
	StrategyExpand  = "expand"
	StrategyPersist = "persist"
)

溢出策略常量

View Source
const (
	StatsInputCount    = "input_count"
	StatsOutputCount   = "output_count"
	StatsDroppedCount  = "dropped_count"
	StatsDataChanLen   = "data_chan_len"
	StatsDataChanCap   = "data_chan_cap"
	StatsResultChanLen = "result_chan_len"
	StatsResultChanCap = "result_chan_cap"
	StatsSinkPoolLen   = "sink_pool_len"
	StatsSinkPoolCap   = "sink_pool_cap"
	StatsActiveRetries = "active_retries"
	StatsExpanding     = "expanding"
)

统计信息字段常量

View Source
const (
	StatsBasicStats       = "basic_stats"
	StatsDataChanUsage    = "data_chan_usage"
	StatsResultChanUsage  = "result_chan_usage"
	StatsSinkPoolUsage    = "sink_pool_usage"
	StatsProcessRate      = "process_rate"
	StatsDropRate         = "drop_rate"
	StatsPerformanceLevel = "performance_level"
)

详细统计信息字段常量

View Source
const (
	PerformanceLevelCritical     = "CRITICAL"
	PerformanceLevelWarning      = "WARNING"
	PerformanceLevelHighLoad     = "HIGH_LOAD"
	PerformanceLevelModerateLoad = "MODERATE_LOAD"
	PerformanceLevelOptimal      = "OPTIMAL"
)

性能级别常量

View Source
const (
	PersistenceEnabled       = "enabled"
	PersistenceMessage       = "message"
	PersistenceNotEnabledMsg = "persistence not enabled"
	PerformanceConfigKey     = "performanceConfig"
)

持久化相关常量

View Source
const (
	SQLKeywordCase = "CASE"
)

SQL关键字常量

Variables

This section is empty.

Functions

This section is empty.

Types

type PersistenceManager

type PersistenceManager struct {
	// contains filtered or unexported fields
}

PersistenceManager 数据持久化管理器

func NewPersistenceManager

func NewPersistenceManager(dataDir string) *PersistenceManager

NewPersistenceManager 创建默认配置的持久化管理器

func NewPersistenceManagerWithConfig

func NewPersistenceManagerWithConfig(dataDir string, maxFileSize int64, flushInterval time.Duration) *PersistenceManager

NewPersistenceManagerWithConfig 创建自定义配置的持久化管理器

func (*PersistenceManager) GetStats

func (pm *PersistenceManager) GetStats() map[string]interface{}

GetStats 获取持久化统计信息

func (*PersistenceManager) LoadPersistedData

func (pm *PersistenceManager) LoadPersistedData() ([]interface{}, error)

LoadPersistedData 加载并删除持久化数据

func (*PersistenceManager) PersistData

func (pm *PersistenceManager) PersistData(data interface{}) error

PersistData 持久化单条数据

func (*PersistenceManager) Start

func (pm *PersistenceManager) Start() error

Start 启动持久化管理器

func (*PersistenceManager) Stop

func (pm *PersistenceManager) Stop() error

Stop 停止持久化管理器

type Stream

type Stream struct {
	Window window.Window
	// contains filtered or unexported fields
}

func NewHighPerformanceStream

func NewHighPerformanceStream(config types.Config) (*Stream, error)

NewHighPerformanceStream 创建高性能配置的Stream (已弃用,使用NewStreamWithHighPerformance) Deprecated: 使用NewStreamWithHighPerformance替代

func NewStream

func NewStream(config types.Config) (*Stream, error)

NewStream 使用统一配置创建Stream

func NewStreamProcessor

func NewStreamProcessor() (*Stream, error)

func NewStreamWithBuffers

func NewStreamWithBuffers(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int) (*Stream, error)

NewStreamWithBuffers 创建带自定义缓冲区大小的Stream (已弃用,使用NewStreamWithCustomPerformance) Deprecated: 使用NewStreamWithCustomPerformance替代

func NewStreamWithCustomPerformance

func NewStreamWithCustomPerformance(config types.Config, perfConfig types.PerformanceConfig) (*Stream, error)

NewStreamWithCustomPerformance 创建自定义性能配置的Stream

func NewStreamWithHighPerformance

func NewStreamWithHighPerformance(config types.Config) (*Stream, error)

NewStreamWithHighPerformance 创建高性能Stream

func NewStreamWithLossPolicy

func NewStreamWithLossPolicy(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int,
	overflowStrategy string, timeout time.Duration) (*Stream, error)

NewStreamWithLossPolicy 创建带数据丢失策略的流处理器 (已弃用,使用NewStreamWithCustomPerformance) Deprecated: 使用NewStreamWithCustomPerformance替代

func NewStreamWithLossPolicyAndPersistence

func NewStreamWithLossPolicyAndPersistence(config types.Config, dataBufSize, resultBufSize, sinkPoolSize int,
	overflowStrategy string, timeout time.Duration, persistDataDir string, persistMaxFileSize int64, persistFlushInterval time.Duration) (*Stream, error)

NewStreamWithLossPolicyAndPersistence 创建带数据丢失策略和持久化配置的流处理器 (已弃用,使用NewStreamWithCustomPerformance) Deprecated: 使用NewStreamWithCustomPerformance替代

func NewStreamWithLowLatency

func NewStreamWithLowLatency(config types.Config) (*Stream, error)

NewStreamWithLowLatency 创建低延迟Stream

func NewStreamWithZeroDataLoss

func NewStreamWithZeroDataLoss(config types.Config) (*Stream, error)

NewStreamWithZeroDataLoss 创建零数据丢失Stream

func NewStreamWithoutDataLoss

func NewStreamWithoutDataLoss(config types.Config, strategy string) (*Stream, error)

NewStreamWithoutDataLoss 创建零数据丢失的流处理器 (已弃用,使用NewStreamWithZeroDataLoss) Deprecated: 使用NewStreamWithZeroDataLoss替代

func (*Stream) AddSink

func (s *Stream) AddSink(sink func(interface{}))

func (*Stream) Emit

func (s *Stream) Emit(data interface{})

func (*Stream) GetDetailedStats

func (s *Stream) GetDetailedStats() map[string]interface{}

GetDetailedStats 获取详细的性能统计信息

func (*Stream) GetPersistenceStats

func (s *Stream) GetPersistenceStats() map[string]interface{}

GetPersistenceStats 获取持久化统计信息

func (*Stream) GetResultsChan

func (s *Stream) GetResultsChan() <-chan interface{}

func (*Stream) GetStats

func (s *Stream) GetStats() map[string]int64

GetStats 获取流处理统计信息 (线程安全版本)

func (*Stream) IsAggregationQuery

func (s *Stream) IsAggregationQuery() bool

IsAggregationQuery 检查当前流是否为聚合查询

func (*Stream) LoadAndReprocessPersistedData

func (s *Stream) LoadAndReprocessPersistedData() error

LoadAndReprocessPersistedData 加载并重新处理持久化数据

func (*Stream) ProcessSync

func (s *Stream) ProcessSync(data interface{}) (interface{}, error)

ProcessSync 同步处理单条数据,立即返回结果 仅适用于非聚合查询,聚合查询会返回错误

func (*Stream) RegisterFilter

func (s *Stream) RegisterFilter(conditionStr string) error

func (*Stream) ResetStats

func (s *Stream) ResetStats()

ResetStats 重置统计信息

func (*Stream) Start

func (s *Stream) Start()

func (*Stream) Stop

func (s *Stream) Stop()

Stop 停止流处理

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL