window

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 27, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package window 提供了窗口操作的实现,包括滚动窗口(Tumbling Window)。

Index

Constants

View Source
const (
	TypeTumbling = "tumbling"
	TypeSliding  = "sliding"
	TypeCounting = "counting"
	TypeSession  = "session"
)

Variables

This section is empty.

Functions

func GetTimestamp

func GetTimestamp(data interface{}, tsProp string, timeUnit time.Duration) time.Time

GetTimestamp 从数据中获取时间戳。

Types

type CountingWindow

type CountingWindow struct {
	// contains filtered or unexported fields
}

func NewCountingWindow

func NewCountingWindow(config types.WindowConfig) (*CountingWindow, error)

func (*CountingWindow) Add

func (cw *CountingWindow) Add(data interface{})

func (*CountingWindow) OutputChan

func (cw *CountingWindow) OutputChan() <-chan []types.Row

func (*CountingWindow) Reset

func (cw *CountingWindow) Reset()

func (*CountingWindow) SetCallback

func (cw *CountingWindow) SetCallback(callback func([]types.Row))

func (*CountingWindow) Start

func (cw *CountingWindow) Start()

func (*CountingWindow) Trigger

func (cw *CountingWindow) Trigger()

type SessionWindow

type SessionWindow struct {
	// contains filtered or unexported fields
}

SessionWindow 表示一个会话窗口 会话窗口是基于事件时间的窗口,当一段时间内没有事件到达时,会话窗口就会关闭

func NewSessionWindow

func NewSessionWindow(config types.WindowConfig) (*SessionWindow, error)

NewSessionWindow 创建一个新的会话窗口实例

func (*SessionWindow) Add

func (sw *SessionWindow) Add(data interface{})

Add 向会话窗口添加数据

func (*SessionWindow) OutputChan

func (sw *SessionWindow) OutputChan() <-chan []types.Row

OutputChan 返回一个只读通道,用于接收窗口触发时的数据

func (*SessionWindow) Reset

func (sw *SessionWindow) Reset()

Reset 重置会话窗口的数据

func (*SessionWindow) SetCallback

func (sw *SessionWindow) SetCallback(callback func([]types.Row))

SetCallback 设置会话窗口触发时的回调函数

func (*SessionWindow) Start

func (sw *SessionWindow) Start()

Start 启动会话窗口的定时检查机制

func (*SessionWindow) Stop

func (sw *SessionWindow) Stop()

Stop 停止会话窗口的操作

func (*SessionWindow) Trigger

func (sw *SessionWindow) Trigger()

Trigger 手动触发所有会话窗口

type SlidingWindow

type SlidingWindow struct {
	// contains filtered or unexported fields
}

SlidingWindow 表示一个滑动窗口,用于按时间范围处理数据

func NewSlidingWindow

func NewSlidingWindow(config types.WindowConfig) (*SlidingWindow, error)

NewSlidingWindow 创建一个新的滑动窗口实例 参数 size 表示窗口的总大小,slide 表示窗口每次滑动的时间间隔

func (*SlidingWindow) Add

func (sw *SlidingWindow) Add(data interface{})

Add 向滑动窗口中添加数据 参数 data 表示要添加的数据

func (*SlidingWindow) GetStats

func (sw *SlidingWindow) GetStats() map[string]int64

GetStats 获取窗口性能统计信息

func (*SlidingWindow) NextSlot

func (sw *SlidingWindow) NextSlot() *types.TimeSlot

func (*SlidingWindow) OutputChan

func (sw *SlidingWindow) OutputChan() <-chan []types.Row

OutputChan 返回滑动窗口的输出通道

func (*SlidingWindow) Reset

func (sw *SlidingWindow) Reset()

Reset 重置滑动窗口,清空窗口内的数据

func (*SlidingWindow) ResetStats

func (sw *SlidingWindow) ResetStats()

ResetStats 重置性能统计

func (*SlidingWindow) SetCallback

func (sw *SlidingWindow) SetCallback(callback func([]types.Row))

SetCallback 设置滑动窗口触发时执行的回调函数 参数 callback 表示要设置的回调函数

func (*SlidingWindow) Start

func (sw *SlidingWindow) Start()

Start 启动滑动窗口,开始定时触发窗口

func (*SlidingWindow) Stop

func (sw *SlidingWindow) Stop()

Stop 停止滑动窗口的操作

func (*SlidingWindow) Trigger

func (sw *SlidingWindow) Trigger()

Trigger 触发滑动窗口,处理窗口内的数据

type TimedData

type TimedData struct {
	Data      interface{}
	Timestamp time.Time
}

TimedData 用于包装数据和时间戳

type TumblingWindow

type TumblingWindow struct {
	// contains filtered or unexported fields
}

TumblingWindow 表示一个滚动窗口,用于在固定时间间隔内收集数据并触发处理。

func NewTumblingWindow

func NewTumblingWindow(config types.WindowConfig) (*TumblingWindow, error)

NewTumblingWindow 创建一个新的滚动窗口实例。 参数 size 是窗口的时间大小。

func (*TumblingWindow) Add

func (tw *TumblingWindow) Add(data interface{})

Add 向滚动窗口添加数据。 参数 data 是要添加的数据。

func (*TumblingWindow) GetStats

func (tw *TumblingWindow) GetStats() map[string]int64

GetStats 获取窗口性能统计信息

func (*TumblingWindow) NextSlot

func (sw *TumblingWindow) NextSlot() *types.TimeSlot

func (*TumblingWindow) OutputChan

func (tw *TumblingWindow) OutputChan() <-chan []types.Row

OutputChan 返回一个只读通道,用于接收窗口触发时的数据。

func (*TumblingWindow) Reset

func (tw *TumblingWindow) Reset()

Reset 重置滚动窗口的数据。

func (*TumblingWindow) ResetStats

func (tw *TumblingWindow) ResetStats()

ResetStats 重置性能统计

func (*TumblingWindow) SetCallback

func (tw *TumblingWindow) SetCallback(callback func([]types.Row))

SetCallback 设置滚动窗口触发时的回调函数。 参数 callback 是要设置的回调函数。

func (*TumblingWindow) Start

func (tw *TumblingWindow) Start()

Start 启动滚动窗口的定时触发机制。

func (*TumblingWindow) Stop

func (tw *TumblingWindow) Stop()

Stop 停止滚动窗口的操作。

func (*TumblingWindow) Trigger

func (tw *TumblingWindow) Trigger()

Trigger 触发滚动窗口的处理逻辑。

type Window

type Window interface {
	Add(item interface{})
	//GetResults() []interface{}
	Reset()
	Start()
	OutputChan() <-chan []types.Row
	SetCallback(callback func([]types.Row))
	Trigger()
}

func CreateWindow

func CreateWindow(config types.WindowConfig) (Window, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL