node

package
v2.4.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2026 License: Apache-2.0 Imports: 48 Imported by: 0

Documentation

Index

Constants

View Source
const (
	WatermarkKey  = "$$wartermark"
	EventInputKey = "$$eventinputs"
	StreamWMKey   = "$$streamwms"
)
View Source
const (
	WindowInputsKey = "$$windowInputs"
	TriggerTimeKey  = "$$triggerTime"
	MsgCountKey     = "$$msgCount"
)
View Source
const BatchKey = "$$batchInputs"
View Source
const (
	OffsetKey = "$$offset"
)
View Source
const (
	V2WindowInputsKey = "$$v2windowInputs"
)

Variables

View Source
var EnableAlignWindow bool
View Source
var InfTime = time.Unix(1<<63-62135596801, 999999999)

Functions

func LookupPing

func LookupPing(lookupType string, config map[string]any) error

func SinkPing

func SinkPing(sinkType string, config map[string]any) error

func SourcePing

func SourcePing(sourceType string, config map[string]any) error

Types

type BatchMergerOp added in v2.3.0

type BatchMergerOp struct {
	// contains filtered or unexported fields
}

func NewBatchMergerOp added in v2.3.0

func NewBatchMergerOp(name string, rOpt *def.RuleOption) (*BatchMergerOp, error)

func (BatchMergerOp) AddInputCount added in v2.3.0

func (o BatchMergerOp) AddInputCount()

func (*BatchMergerOp) Exec added in v2.3.0

func (o *BatchMergerOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec decode op receives map/[]map and converts it to []map.

func (BatchMergerOp) GetInput added in v2.3.0

func (o BatchMergerOp) GetInput() (chan any, string)

func (BatchMergerOp) GetInputCount added in v2.3.0

func (o BatchMergerOp) GetInputCount() int

func (BatchMergerOp) SetBarrierHandler added in v2.3.0

func (o BatchMergerOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type BatchOp

type BatchOp struct {
	// contains filtered or unexported fields
}

func NewBatchOp

func NewBatchOp(name string, rOpt *def.RuleOption, batchSize int, lingerInterval time.Duration) (*BatchOp, error)

func (BatchOp) AddInputCount

func (o BatchOp) AddInputCount()

func (*BatchOp) Exec

func (b *BatchOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (BatchOp) GetInput

func (o BatchOp) GetInput() (chan any, string)

func (BatchOp) GetInputCount

func (o BatchOp) GetInputCount() int

func (BatchOp) SetBarrierHandler

func (o BatchOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type BatchWriterOp added in v2.2.0

type BatchWriterOp struct {
	// contains filtered or unexported fields
}

BatchWriterOp is a streaming writer to convert batch data into bytes in streaming way Immutable: false Input: any (mostly MessageTuple/SinkTupleList, may receive RawTuple after transformOp). Batch EOF is a signal to flush the buffer. Output: RawTuple

func NewBatchWriterOp added in v2.2.0

func NewBatchWriterOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*BatchWriterOp, error)

func (BatchWriterOp) AddInputCount added in v2.2.0

func (o BatchWriterOp) AddInputCount()

func (*BatchWriterOp) Exec added in v2.2.0

func (o *BatchWriterOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec decode op receives map/[]map and converts it to bytes. If receiving bytes, just return it.

func (BatchWriterOp) GetInput added in v2.2.0

func (o BatchWriterOp) GetInput() (chan any, string)

func (BatchWriterOp) GetInputCount added in v2.2.0

func (o BatchWriterOp) GetInputCount() int

func (BatchWriterOp) SetBarrierHandler added in v2.2.0

func (o BatchWriterOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type CacheOp

type CacheOp struct {
	// contains filtered or unexported fields
}

CacheOp receives tuples and decide to send through or save to disk. Run right before sink Immutable: true Input: any (mostly MessageTuple/MessageTupleList, may receive RawTuple after transformOp) Special validation: one output only

func NewCacheOp

func NewCacheOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc *model.SinkConf) (*CacheOp, error)

func (CacheOp) AddInputCount

func (o CacheOp) AddInputCount()

func (*CacheOp) Exec

func (s *CacheOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec ingest data and send through. If channel full, save data to disk cache and start send timer Once all cache sent, stop send timer

func (CacheOp) GetInput

func (o CacheOp) GetInput() (chan any, string)

func (CacheOp) GetInputCount

func (o CacheOp) GetInputCount() int

func (CacheOp) SetBarrierHandler

func (o CacheOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type Collector

type Collector interface {
	GetInput() (chan any, string)
}

type CompNode

type CompNode interface {
	TopNode
	Nodes() []TopNode
}

CompNode is a composite node. For implicit splitted nodes For example, sink node or source node may be implemented internally as a collection of connected nodes

type CompressOp

type CompressOp struct {
	// contains filtered or unexported fields
}

func NewCompressOp

func NewCompressOp(name string, rOpt *def.RuleOption, compressMethod string, compressProps map[string]any) (*CompressOp, error)

func (CompressOp) AddInputCount

func (o CompressOp) AddInputCount()

func (*CompressOp) Exec

func (o *CompressOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (CompressOp) GetInput

func (o CompressOp) GetInput() (chan any, string)

func (CompressOp) GetInputCount

func (o CompressOp) GetInputCount() int

func (CompressOp) SetBarrierHandler

func (o CompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*CompressOp) Worker

func (o *CompressOp) Worker(_ api.StreamContext, item any) []any

type CountWindowIncAggEventOp added in v2.1.0

type CountWindowIncAggEventOp struct {
	CountWindowIncAggEventOpState
	// contains filtered or unexported fields
}

func NewCountWindowIncAggEventOp added in v2.1.0

func NewCountWindowIncAggEventOp(o *WindowIncAggOperator) *CountWindowIncAggEventOp

func (*CountWindowIncAggEventOp) PutState added in v2.1.0

func (co *CountWindowIncAggEventOp) PutState(ctx api.StreamContext)

func (*CountWindowIncAggEventOp) RestoreFromState added in v2.1.0

func (co *CountWindowIncAggEventOp) RestoreFromState(ctx api.StreamContext) error

type CountWindowIncAggEventOpState added in v2.1.0

type CountWindowIncAggEventOpState struct {
	CurrWindow     *IncAggWindow
	CurrWindowSize int
	EmitList       []*IncAggWindow
}

type CountWindowIncAggOp added in v2.1.0

type CountWindowIncAggOp struct {
	*WindowIncAggOperator

	CountWindowIncAggOpState
	// contains filtered or unexported fields
}

func (CountWindowIncAggOp) AddInputCount added in v2.1.0

func (o CountWindowIncAggOp) AddInputCount()

func (CountWindowIncAggOp) GetInput added in v2.1.0

func (o CountWindowIncAggOp) GetInput() (chan any, string)

func (CountWindowIncAggOp) GetInputCount added in v2.1.0

func (o CountWindowIncAggOp) GetInputCount() int

func (*CountWindowIncAggOp) PutState added in v2.1.0

func (co *CountWindowIncAggOp) PutState(ctx api.StreamContext)

func (*CountWindowIncAggOp) RestoreFromState added in v2.1.0

func (co *CountWindowIncAggOp) RestoreFromState(ctx api.StreamContext) error

func (CountWindowIncAggOp) SetBarrierHandler added in v2.1.0

func (o CountWindowIncAggOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type CountWindowIncAggOpState added in v2.1.0

type CountWindowIncAggOpState struct {
	CurrWindow     *IncAggWindow
	CurrWindowSize int
}

type DataSinkNode

type DataSinkNode interface {
	TopNode
	MetricNode
	Collector
	Exec(api.StreamContext, chan<- error)
	GetStreamContext() api.StreamContext
	GetInputCount() int
	AddInputCount()
	SetQos(def.Qos)
	SetBarrierHandler(checkpoint.BarrierHandler)
}

type DataSourceNode

type DataSourceNode interface {
	TopNode
	MetricNode
	Emitter
	Open(ctx api.StreamContext, errCh chan<- error)
}

type DecodeOp

type DecodeOp struct {
	// contains filtered or unexported fields
}

DecodeOp manages the format decoding (employ schema) and sending frequency (for batch decode, like a json array)

func NewDecodeOp

func NewDecodeOp(ctx api.StreamContext, forPayload bool, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, props map[string]any) (*DecodeOp, error)

func (DecodeOp) AddInputCount

func (o DecodeOp) AddInputCount()

func (*DecodeOp) Exec

func (o *DecodeOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec decode op receives raw data and converts it to message

func (DecodeOp) GetInput

func (o DecodeOp) GetInput() (chan any, string)

func (DecodeOp) GetInputCount

func (o DecodeOp) GetInputCount() int

func (*DecodeOp) PayloadBatchDecodeWorker

func (o *DecodeOp) PayloadBatchDecodeWorker(ctx api.StreamContext, item any) []any

PayloadBatchDecodeWorker deals with payload like

{
	"ts": 123456,
	"batchField": [
		{"payloadField":"data","otherField":1},
		{"payloadField":"data2","otherField":2}
	]
}

It merges all payload result into one

{
	"ts": 123456,
	// parsed fields are merged
	"parsedField": 1,
	"parsedField": 2,
	// other fields also merged and keep the latest
	"otherField": 2
}

If parse result is a list, it will also merge them in

func (*DecodeOp) PayloadDecodeWorker

func (o *DecodeOp) PayloadDecodeWorker(ctx api.StreamContext, item any) []any

PayloadDecodeWorker each input has one message with the payload field to decode

{
	"payloadField":"data","otherField":1
}

{
	// parsed fields
	"parsedField": 1,
	"parsedField2": 2,
	// keep the original field if in schema
	"payloadField":"data",
	"otherField":1
}

If parse result is a list, it will also output a list

func (*DecodeOp) ResetSchema added in v2.3.0

func (o *DecodeOp) ResetSchema(ctx api.StreamContext, schema map[string]*ast.JsonStreamField)

func (DecodeOp) SetBarrierHandler

func (o DecodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*DecodeOp) Worker

func (o *DecodeOp) Worker(ctx api.StreamContext, item any) []any

type DecompressOp

type DecompressOp struct {
	// contains filtered or unexported fields
}

func NewDecompressOp

func NewDecompressOp(name string, rOpt *def.RuleOption, compressMethod string) (*DecompressOp, error)

func (DecompressOp) AddInputCount

func (o DecompressOp) AddInputCount()

func (*DecompressOp) Exec

func (o *DecompressOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (DecompressOp) GetInput

func (o DecompressOp) GetInput() (chan any, string)

func (DecompressOp) GetInputCount

func (o DecompressOp) GetInputCount() int

func (DecompressOp) SetBarrierHandler

func (o DecompressOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*DecompressOp) Worker

func (o *DecompressOp) Worker(_ api.StreamContext, item any) []any

type DedupTriggerNode

type DedupTriggerNode struct {
	// contains filtered or unexported fields
}

func NewDedupTriggerNode

func NewDedupTriggerNode(name string, options *def.RuleOption, aliasName string, startField string, endField string, nowField string, expire int64) *DedupTriggerNode

func (DedupTriggerNode) AddInputCount

func (o DedupTriggerNode) AddInputCount()

func (*DedupTriggerNode) Exec

func (w *DedupTriggerNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (DedupTriggerNode) GetInput

func (o DedupTriggerNode) GetInput() (chan any, string)

func (DedupTriggerNode) GetInputCount

func (o DedupTriggerNode) GetInputCount() int

func (DedupTriggerNode) SetBarrierHandler

func (o DedupTriggerNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type Emitter

type Emitter interface {
	AddOutput(chan any, string) error
	RemoveOutput(string) error
}

type EncodeOp

type EncodeOp struct {
	// contains filtered or unexported fields
}

EncodeOp converts tuple to raw bytes according to the FORMAT property Immutable: false Input: any (mostly MessageTuple/SinkTupleList, may receive RawTuple after transformOp Output: RawTuple

func NewEncodeOp

func NewEncodeOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, sc *SinkConf) (*EncodeOp, error)

func (EncodeOp) AddInputCount

func (o EncodeOp) AddInputCount()

func (*EncodeOp) Exec

func (o *EncodeOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec decode op receives map/[]map and converts it to bytes. If receiving bytes, just return it.

func (EncodeOp) GetInput

func (o EncodeOp) GetInput() (chan any, string)

func (EncodeOp) GetInputCount

func (o EncodeOp) GetInputCount() int

func (EncodeOp) SetBarrierHandler

func (o EncodeOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*EncodeOp) Worker

func (o *EncodeOp) Worker(ctx api.StreamContext, item any) []any

type EncryptNode

type EncryptNode struct {
	// contains filtered or unexported fields
}

EncryptNode encrypt raw bytes Immutable: false Input: RawTuple Output: RawTuple

func NewEncryptOp

func NewEncryptOp(name string, rOpt *def.RuleOption, encryptMethod string, encProps map[string]any) (*EncryptNode, error)

func (EncryptNode) AddInputCount

func (o EncryptNode) AddInputCount()

func (*EncryptNode) Exec

func (o *EncryptNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (EncryptNode) GetInput

func (o EncryptNode) GetInput() (chan any, string)

func (EncryptNode) GetInputCount

func (o EncryptNode) GetInputCount() int

func (EncryptNode) SetBarrierHandler

func (o EncryptNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*EncryptNode) Worker

func (o *EncryptNode) Worker(_ api.StreamContext, item any) []any

type EventSlidingWindowOp added in v2.3.0

type EventSlidingWindowOp struct {
	*WindowV2Operator
	Delay  time.Duration
	Length time.Duration
	// contains filtered or unexported fields
}

func NewEventSlidingWindowOp added in v2.3.0

func NewEventSlidingWindowOp(o *WindowV2Operator) *EventSlidingWindowOp

func (EventSlidingWindowOp) AddInputCount added in v2.3.0

func (o EventSlidingWindowOp) AddInputCount()

func (EventSlidingWindowOp) GetInput added in v2.3.0

func (o EventSlidingWindowOp) GetInput() (chan any, string)

func (EventSlidingWindowOp) GetInputCount added in v2.3.0

func (o EventSlidingWindowOp) GetInputCount() int

func (EventSlidingWindowOp) SetBarrierHandler added in v2.3.0

func (o EventSlidingWindowOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type EventTimeTrigger

type EventTimeTrigger struct {
	// contains filtered or unexported fields
}

EventTimeTrigger scans the input tuples and find out the tuples in the current window The inputs are sorted by watermark op

func NewEventTimeTrigger

func NewEventTimeTrigger(window *WindowConfig) (*EventTimeTrigger, error)

type HoppingWindowIncAggEventOp added in v2.1.0

type HoppingWindowIncAggEventOp struct {
	HoppingWindowIncAggEventOpState
	// contains filtered or unexported fields
}

func NewHoppingWindowIncAggEventOp added in v2.1.0

func NewHoppingWindowIncAggEventOp(o *WindowIncAggOperator) *HoppingWindowIncAggEventOp

func (*HoppingWindowIncAggEventOp) PutState added in v2.1.0

func (ho *HoppingWindowIncAggEventOp) PutState(ctx api.StreamContext)

func (*HoppingWindowIncAggEventOp) RestoreFromState added in v2.1.0

func (ho *HoppingWindowIncAggEventOp) RestoreFromState(ctx api.StreamContext) error

type HoppingWindowIncAggEventOpState added in v2.1.0

type HoppingWindowIncAggEventOpState struct {
	CurrWindowList        []*IncAggWindow
	NextTriggerWindowTime time.Time
}

type HoppingWindowIncAggOp added in v2.1.0

type HoppingWindowIncAggOp struct {
	*WindowIncAggOperator
	FirstTimer *clock.Timer

	Length   time.Duration
	Interval time.Duration

	HoppingWindowIncAggOpState
	// contains filtered or unexported fields
}

func NewHoppingWindowIncAggOp added in v2.1.0

func NewHoppingWindowIncAggOp(o *WindowIncAggOperator) *HoppingWindowIncAggOp

func (HoppingWindowIncAggOp) AddInputCount added in v2.1.0

func (o HoppingWindowIncAggOp) AddInputCount()

func (HoppingWindowIncAggOp) GetInput added in v2.1.0

func (o HoppingWindowIncAggOp) GetInput() (chan any, string)

func (HoppingWindowIncAggOp) GetInputCount added in v2.1.0

func (o HoppingWindowIncAggOp) GetInputCount() int

func (*HoppingWindowIncAggOp) PutState added in v2.1.0

func (ho *HoppingWindowIncAggOp) PutState(ctx api.StreamContext)

func (*HoppingWindowIncAggOp) RestoreFromState added in v2.1.0

func (ho *HoppingWindowIncAggOp) RestoreFromState(ctx api.StreamContext) error

func (HoppingWindowIncAggOp) SetBarrierHandler added in v2.1.0

func (o HoppingWindowIncAggOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type HoppingWindowIncAggOpState added in v2.1.0

type HoppingWindowIncAggOpState struct {
	CurrWindowList []*IncAggWindow
}

type IncAggOpTask added in v2.1.0

type IncAggOpTask struct {
	// contains filtered or unexported fields
}

type IncAggRange added in v2.1.0

type IncAggRange struct {
	FunctionState map[string]interface{}
	LastRow       *xsql.Tuple
	Fields        map[string]interface{}
	// contains filtered or unexported fields
}

func (*IncAggRange) Clone added in v2.1.0

func (r *IncAggRange) Clone(ctx api.StreamContext) *IncAggRange

type IncAggWindow added in v2.1.0

type IncAggWindow struct {
	StartTime             time.Time
	EventTime             time.Time
	DimensionsIncAggRange map[string]*IncAggRange
}

func (*IncAggWindow) Clone added in v2.1.0

func (w *IncAggWindow) Clone(ctx api.StreamContext) *IncAggWindow

func (*IncAggWindow) GenerateAllFunctionState added in v2.1.0

func (w *IncAggWindow) GenerateAllFunctionState()

type JoinAlignNode

type JoinAlignNode struct {
	// contains filtered or unexported fields
}

JoinAlignNode will block the stream and buffer all the table tuples. Once buffered, it will combine the later input with the buffer The input for batch table MUST be *WindowTuples

func NewJoinAlignNode

func NewJoinAlignNode(name string, emitters []string, sizes []int, options *def.RuleOption) (*JoinAlignNode, error)

func (JoinAlignNode) AddInputCount

func (o JoinAlignNode) AddInputCount()

func (*JoinAlignNode) Exec

func (n *JoinAlignNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (JoinAlignNode) GetInput

func (o JoinAlignNode) GetInput() (chan any, string)

func (JoinAlignNode) GetInputCount

func (o JoinAlignNode) GetInputCount() int

func (JoinAlignNode) SetBarrierHandler

func (o JoinAlignNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type LookupConf

type LookupConf struct {
	Cache           bool              `json:"cache"`
	CacheTTL        cast.DurationConf `json:"cacheTtl"`
	CacheMissingKey bool              `json:"cacheMissingKey"`
}

type LookupNode

type LookupNode struct {
	// contains filtered or unexported fields
}

LookupNode will look up the data from the external source when receiving an event

func NewLookupNode

func NewLookupNode(ctx api.StreamContext, name string, isBytesLookup bool, fields []string, keys []string, joinType ast.JoinType, vals []ast.Expr, srcOptions *ast.Options, options *def.RuleOption, props map[string]any) (*LookupNode, error)

func (LookupNode) AddInputCount

func (o LookupNode) AddInputCount()

func (*LookupNode) Exec

func (n *LookupNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (LookupNode) GetInput

func (o LookupNode) GetInput() (chan any, string)

func (LookupNode) GetInputCount

func (o LookupNode) GetInputCount() int

func (LookupNode) SetBarrierHandler

func (o LookupNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type MergeableTopo

type MergeableTopo interface {
	GetSource() DataSourceNode
	// MergeSrc Add child topo as the source with following operators
	MergeSrc(parentTopo *def.PrintableTopo)
	// LinkTopo Add printable topo link from the parent topo to the child topo
	LinkTopo(parentTopo *def.PrintableTopo, parentJointName string)
	// SubMetrics return the metrics of the sub nodes
	SubMetrics() ([]string, []any)
	// Close notifies subtopo to deref
	Close(ctx api.StreamContext)
}

type MetricNode

type MetricNode interface {
	GetMetrics() []any
	RemoveMetrics(ruleId string)
}

type OperatorNode

type OperatorNode interface {
	DataSinkNode
	Emitter
	Broadcast(data interface{})
}

type PriorityQueue

type PriorityQueue []*TriggerRequest

func (*PriorityQueue) Peek

func (pq *PriorityQueue) Peek() *TriggerRequest

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() *TriggerRequest

Pop removes and returns the item with the highest priority from the priority queue

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x *TriggerRequest)

Push adds an item to the priority queue

type RateLimitOp

type RateLimitOp struct {
	// contains filtered or unexported fields
}

RateLimitOp handle messages at a regular rate, ignoring messages that arrive too quickly, only keep the most recent message. (default strategy) If strategy is set, send through all messages as well as trigger signal and let strategy node handle the merge. Otherwise, send the most recent message at trigger time Input: Raw Output: Raw as it is Concurrency: false

func NewRateLimitOp

func NewRateLimitOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, schema map[string]*ast.JsonStreamField, props map[string]any) (*RateLimitOp, error)

func (RateLimitOp) AddInputCount

func (o RateLimitOp) AddInputCount()

func (*RateLimitOp) Exec

func (o *RateLimitOp) Exec(ctx api.StreamContext, errCh chan<- error)

Exec ratelimit op deal with 3 merge strategy - latest - merge by mergeField (when format and mergeField is set and no payload format) - merge by merger (when format, payloadFormat and merger is set)

func (RateLimitOp) GetInput

func (o RateLimitOp) GetInput() (chan any, string)

func (RateLimitOp) GetInputCount

func (o RateLimitOp) GetInputCount() int

func (*RateLimitOp) ResetSchema added in v2.3.0

func (o *RateLimitOp) ResetSchema(ctx api.StreamContext, schema map[string]*ast.JsonStreamField)

func (RateLimitOp) SetBarrierHandler

func (o RateLimitOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type SchemaNode

type SchemaNode interface {
	ResetSchema(api.StreamContext, map[string]*ast.JsonStreamField)
}

type SinkConf

type SinkConf struct {
	Concurrency      int               `json:"concurrency"`
	Omitempty        bool              `json:"omitIfEmpty"`
	SendSingle       bool              `json:"sendSingle"`
	DataTemplate     string            `json:"dataTemplate"`
	Format           string            `json:"format"`
	SchemaId         string            `json:"schemaId"`
	Delimiter        string            `json:"delimiter"`
	BufferLength     int               `json:"bufferLength"`
	Fields           []string          `json:"fields"`
	ExcludeFields    []string          `json:"excludeFields"`
	DataField        string            `json:"dataField"`
	BatchSize        int               `json:"batchSize"`
	LingerInterval   cast.DurationConf `json:"lingerInterval"`
	Compression      string            `json:"compression"`
	CompressionProps map[string]any    `json:"compressionProps"`
	Encryption       string            `json:"encryption"`
	EncProps         map[string]any    `json:"encProps"`
	HasHeader        bool              `json:"hasHeader"`
	model.SinkConf
}

func ParseConf

func ParseConf(logger api.Logger, props map[string]any) (*SinkConf, error)

type SinkNode

type SinkNode struct {
	// contains filtered or unexported fields
}

SinkNode represents a sink node that collects data from the stream It typically only do connect and send. It does not do any processing. This node is the skeleton. It will refer to a sink instance to do the real work.

func NewBytesSinkNode

func NewBytesSinkNode(ctx api.StreamContext, name string, sink api.BytesCollector, rOpt def.RuleOption, eoflimit int, sc *SinkConf, isRetry bool) (*SinkNode, error)

NewBytesSinkNode creates a sink node that collects data from the stream. Do some static validation

func NewTupleSinkNode

func NewTupleSinkNode(ctx api.StreamContext, name string, sink api.TupleCollector, rOpt def.RuleOption, eoflimit int, sc *SinkConf, isRetry bool) (*SinkNode, error)

NewTupleSinkNode creates a sink node that collects data from the stream. Do some static validation

func (SinkNode) AddInputCount

func (o SinkNode) AddInputCount()

func (*SinkNode) Exec

func (s *SinkNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (SinkNode) GetInput

func (o SinkNode) GetInput() (chan any, string)

func (SinkNode) GetInputCount

func (o SinkNode) GetInputCount() int

func (SinkNode) SetBarrierHandler

func (o SinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*SinkNode) SetResendOutput

func (s *SinkNode) SetResendOutput(output chan<- any)

type SlidingWindowIncAggEventOp added in v2.1.0

type SlidingWindowIncAggEventOp struct {
	SlidingWindowIncAggEventOpState
	// contains filtered or unexported fields
}

func NewSlidingWindowIncAggEventOp added in v2.1.0

func NewSlidingWindowIncAggEventOp(o *WindowIncAggOperator) *SlidingWindowIncAggEventOp

func (*SlidingWindowIncAggEventOp) PutState added in v2.1.0

func (so *SlidingWindowIncAggEventOp) PutState(ctx api.StreamContext)

func (*SlidingWindowIncAggEventOp) RestoreFromState added in v2.1.0

func (so *SlidingWindowIncAggEventOp) RestoreFromState(ctx api.StreamContext) error

type SlidingWindowIncAggEventOpState added in v2.1.0

type SlidingWindowIncAggEventOpState struct {
	SlidingWindowIncAggOpState
	EmitList []*IncAggWindow
}

type SlidingWindowIncAggOp added in v2.1.0

type SlidingWindowIncAggOp struct {
	*WindowIncAggOperator

	Length time.Duration
	Delay  time.Duration

	SlidingWindowIncAggOpState
	// contains filtered or unexported fields
}

func NewSlidingWindowIncAggOp added in v2.1.0

func NewSlidingWindowIncAggOp(o *WindowIncAggOperator) *SlidingWindowIncAggOp

func (SlidingWindowIncAggOp) AddInputCount added in v2.1.0

func (o SlidingWindowIncAggOp) AddInputCount()

func (SlidingWindowIncAggOp) GetInput added in v2.1.0

func (o SlidingWindowIncAggOp) GetInput() (chan any, string)

func (SlidingWindowIncAggOp) GetInputCount added in v2.1.0

func (o SlidingWindowIncAggOp) GetInputCount() int

func (*SlidingWindowIncAggOp) PutState added in v2.1.0

func (so *SlidingWindowIncAggOp) PutState(ctx api.StreamContext)

func (*SlidingWindowIncAggOp) RestoreFromState added in v2.1.0

func (so *SlidingWindowIncAggOp) RestoreFromState(ctx api.StreamContext) error

func (SlidingWindowIncAggOp) SetBarrierHandler added in v2.1.0

func (o SlidingWindowIncAggOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type SlidingWindowIncAggOpState added in v2.1.0

type SlidingWindowIncAggOpState struct {
	CurrWindowList []*IncAggWindow
}

type SlidingWindowOp added in v2.3.0

type SlidingWindowOp struct {
	*WindowV2Operator
	Delay  time.Duration
	Length time.Duration
	// contains filtered or unexported fields
}

func NewSlidingWindowOp added in v2.3.0

func NewSlidingWindowOp(o *WindowV2Operator) *SlidingWindowOp

func (SlidingWindowOp) AddInputCount added in v2.3.0

func (o SlidingWindowOp) AddInputCount()

func (SlidingWindowOp) GetInput added in v2.3.0

func (o SlidingWindowOp) GetInput() (chan any, string)

func (SlidingWindowOp) GetInputCount added in v2.3.0

func (o SlidingWindowOp) GetInputCount() int

func (SlidingWindowOp) SetBarrierHandler added in v2.3.0

func (o SlidingWindowOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type SourceInstanceNode

type SourceInstanceNode interface {
	GetSource() api.Source
}

type SourceNode

type SourceNode struct {
	// contains filtered or unexported fields
}

SourceNode is a node that connects to an external source The SourceNode is an all-in-one source node that support connect and decode and more. The SourceConnectorNode is a node that only connects to external source and does not decode.

func NewSourceNode

func NewSourceNode(ctx api.StreamContext, name string, ss api.Source, props map[string]any, rOpt *def.RuleOption) (*SourceNode, error)

NewSourceNode creates a SourceConnectorNode

func (SourceNode) AddOutput

func (o SourceNode) AddOutput(output chan any, name string) error

func (SourceNode) Broadcast

func (o SourceNode) Broadcast(val any)

func (SourceNode) BroadcastCustomized

func (o SourceNode) BroadcastCustomized(val any, broadcastFunc func(val any))

func (SourceNode) Close

func (o SourceNode) Close()

func (SourceNode) GetMetrics

func (o SourceNode) GetMetrics() []any

func (SourceNode) GetName

func (o SourceNode) GetName() string

func (*SourceNode) GetSource added in v2.0.7

func (m *SourceNode) GetSource() api.Source

GetSource only used for test

func (SourceNode) GetStreamContext

func (o SourceNode) GetStreamContext() api.StreamContext

func (*SourceNode) Open

func (m *SourceNode) Open(ctx api.StreamContext, ctrlCh chan<- error)

Open will be invoked by topo. It starts reading data.

func (SourceNode) RemoveMetrics

func (o SourceNode) RemoveMetrics(ruleId string)

func (SourceNode) RemoveOutput

func (o SourceNode) RemoveOutput(name string) error

func (*SourceNode) Rewind

func (m *SourceNode) Rewind(ctx api.StreamContext) error

func (*SourceNode) Run

func (m *SourceNode) Run(ctx api.StreamContext, ctrlCh chan<- error)

Run Subscribe could be a long-running function

func (SourceNode) SetQos

func (o SourceNode) SetQos(qos def.Qos)

type StateWindowOp added in v2.3.0

type StateWindowOp struct {
	*WindowV2Operator

	PartitionExpr   *ast.PartitionExpr
	SingleCondition ast.Expr
	BeginCondition  ast.Expr
	EmitCondition   ast.Expr
	// contains filtered or unexported fields
}

func NewStateWindowOp added in v2.3.0

func NewStateWindowOp(o *WindowV2Operator) *StateWindowOp

func (StateWindowOp) AddInputCount added in v2.3.0

func (o StateWindowOp) AddInputCount()

func (StateWindowOp) GetInput added in v2.3.0

func (o StateWindowOp) GetInput() (chan any, string)

func (StateWindowOp) GetInputCount added in v2.3.0

func (o StateWindowOp) GetInputCount() int

func (StateWindowOp) SetBarrierHandler added in v2.3.0

func (o StateWindowOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type StateWindowStatus added in v2.3.0

type StateWindowStatus struct {
	StartTime time.Time
	EndTime   time.Time
	OnBegin   bool
	Scanner   *WindowScanner
}

type SwitchConfig

type SwitchConfig struct {
	Cases            []ast.Expr
	StopAtFirstMatch bool
}

type SwitchNode

type SwitchNode struct {
	// contains filtered or unexported fields
}

func NewSwitchNode

func NewSwitchNode(name string, conf *SwitchConfig, options *def.RuleOption) (*SwitchNode, error)

func (SwitchNode) AddInputCount

func (o SwitchNode) AddInputCount()

func (*SwitchNode) AddOutput

func (n *SwitchNode) AddOutput(output chan interface{}, name string) error

AddOutput SwitchNode overrides the defaultSinkNode's AddOutput to add output to the outputNodes SwitchNode itself has multiple outlets defined by the outputNodes. This default function will add the output to the first outlet

func (*SwitchNode) Exec

func (n *SwitchNode) Exec(ctx api.StreamContext, errCh chan<- error)

func (*SwitchNode) GetEmitter

func (n *SwitchNode) GetEmitter(outputIndex int) Emitter

GetEmitter returns the nth emitter of the node. SwtichNode is the only node that has multiple emitters In planner graph, fromNodes is a multi-dim array, switch node is the only node that could be in the second dim The dim is the index

func (SwitchNode) GetInput

func (o SwitchNode) GetInput() (chan any, string)

func (SwitchNode) GetInputCount

func (o SwitchNode) GetInputCount() int

func (SwitchNode) SetBarrierHandler

func (o SwitchNode) SetBarrierHandler(bh checkpoint.BarrierHandler)

type TopNode

type TopNode interface {
	GetName() string
}

type TransformOp

type TransformOp struct {
	// contains filtered or unexported fields
}

TransformOp transforms the row/collection to sink tuples Immutable: false Change trigger frequency: true, by sendSingle property Input: Row/Collection Output: MessageTuple, SinkTupleList, RawTuple

func NewTransformOp

func NewTransformOp(name string, rOpt *def.RuleOption, sc *SinkConf, templates []string) (*TransformOp, error)

NewTransformOp creates a transform node sink conf should have been validated before

func (TransformOp) AddInputCount

func (o TransformOp) AddInputCount()

func (*TransformOp) Exec

func (t *TransformOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (TransformOp) GetInput

func (o TransformOp) GetInput() (chan any, string)

func (TransformOp) GetInputCount

func (o TransformOp) GetInputCount() int

func (TransformOp) SetBarrierHandler

func (o TransformOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*TransformOp) Worker

func (t *TransformOp) Worker(ctx api.StreamContext, item any) []any

Worker do not need to process error and control messages

type TriggerRequest

type TriggerRequest struct {
	// contains filtered or unexported fields
}

type TumblingWindowIncAggEventOp added in v2.1.0

type TumblingWindowIncAggEventOp struct {
	*HoppingWindowIncAggEventOp
}

func NewTumblingWindowIncAggEventOp added in v2.1.0

func NewTumblingWindowIncAggEventOp(o *WindowIncAggOperator) *TumblingWindowIncAggEventOp

type TumblingWindowIncAggOp added in v2.1.0

type TumblingWindowIncAggOp struct {
	*WindowIncAggOperator

	FirstTimer *clock.Timer
	Interval   time.Duration
	TumblingWindowIncAggOpState
	// contains filtered or unexported fields
}

func NewTumblingWindowIncAggOp added in v2.1.0

func NewTumblingWindowIncAggOp(o *WindowIncAggOperator) *TumblingWindowIncAggOp

func (TumblingWindowIncAggOp) AddInputCount added in v2.1.0

func (o TumblingWindowIncAggOp) AddInputCount()

func (TumblingWindowIncAggOp) GetInput added in v2.1.0

func (o TumblingWindowIncAggOp) GetInput() (chan any, string)

func (TumblingWindowIncAggOp) GetInputCount added in v2.1.0

func (o TumblingWindowIncAggOp) GetInputCount() int

func (*TumblingWindowIncAggOp) PutState added in v2.1.0

func (to *TumblingWindowIncAggOp) PutState(ctx api.StreamContext)

func (*TumblingWindowIncAggOp) RestoreFromState added in v2.1.0

func (to *TumblingWindowIncAggOp) RestoreFromState(ctx api.StreamContext) error

func (TumblingWindowIncAggOp) SetBarrierHandler added in v2.1.0

func (o TumblingWindowIncAggOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type TumblingWindowIncAggOpState added in v2.1.0

type TumblingWindowIncAggOpState struct {
	CurrWindow *IncAggWindow
}

type TupleList

type TupleList struct {
	// contains filtered or unexported fields
}

func NewTupleList

func NewTupleList(tuples []xsql.EventRow, windowSize int) (TupleList, error)

type UnFunc

type UnFunc func(api.StreamContext, interface{}) interface{}

UnFunc implements UnOperation as type func (context.Context, interface{})

func (UnFunc) Apply

func (f UnFunc) Apply(ctx api.StreamContext, data interface{}) interface{}

Apply implements UnOperation.Apply method

type UnOperation

type UnOperation interface {
	Apply(ctx api.StreamContext, data interface{}, fv *xsql.FunctionValuer, afv *xsql.AggregateFunctionValuer) interface{}
}

UnOperation interface represents unary operations (i.e. Map, Filter, etc)

type UnaryOperator

type UnaryOperator struct {
	// contains filtered or unexported fields
}

func New

func New(name string, options *def.RuleOption) *UnaryOperator

New NewUnary creates *UnaryOperator value

func (UnaryOperator) AddInputCount

func (o UnaryOperator) AddInputCount()

func (*UnaryOperator) Exec

func (o *UnaryOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor

func (UnaryOperator) GetInput

func (o UnaryOperator) GetInput() (chan any, string)

func (UnaryOperator) GetInputCount

func (o UnaryOperator) GetInputCount() int

func (UnaryOperator) SetBarrierHandler

func (o UnaryOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)

func (*UnaryOperator) SetOperation

func (o *UnaryOperator) SetOperation(op UnOperation)

SetOperation sets the executor operation

type WatermarkOp

type WatermarkOp struct {
	// contains filtered or unexported fields
}

WatermarkOp is used when event time is enabled. It is used to align the event time of the input streams It sends out the data in time order with watermark.

func NewWatermarkOp

func NewWatermarkOp(name string, sendWatermark bool, streams []string, options *def.RuleOption) *WatermarkOp

func (WatermarkOp) AddInputCount

func (o WatermarkOp) AddInputCount()

func (*WatermarkOp) Exec

func (w *WatermarkOp) Exec(ctx api.StreamContext, errCh chan<- error)

func (WatermarkOp) GetInput

func (o WatermarkOp) GetInput() (chan any, string)

func (WatermarkOp) GetInputCount

func (o WatermarkOp) GetInputCount() int

func (WatermarkOp) SetBarrierHandler

func (o WatermarkOp) SetBarrierHandler(bh checkpoint.BarrierHandler)

type WindowConfig

type WindowConfig struct {
	TriggerCondition ast.Expr
	StateFuncs       []*ast.Call
	Type             ast.WindowType
	// For time window
	Length   time.Duration
	Interval time.Duration // If the interval is not set, it is equals to Length
	Delay    time.Duration
	// For count window
	CountLength   int
	CountInterval int
	RawInterval   int
	TimeUnit      ast.Token

	// For state window
	SingleCondition ast.Expr
	BeginCondition  ast.Expr
	EmitCondition   ast.Expr

	PartitionExpr *ast.PartitionExpr
	// contains filtered or unexported fields
}

type WindowIncAggOperator added in v2.1.0

type WindowIncAggOperator struct {
	Dimensions ast.Dimensions

	WindowExec windowIncAggExec
	// contains filtered or unexported fields
}

func NewWindowIncAggOp added in v2.1.0

func NewWindowIncAggOp(name string, w *WindowConfig, dimensions ast.Dimensions, aggFields []*ast.Field, options *def.RuleOption) (*WindowIncAggOperator, error)

func (WindowIncAggOperator) AddInputCount added in v2.1.0

func (o WindowIncAggOperator) AddInputCount()

func (*WindowIncAggOperator) Close added in v2.1.0

func (o *WindowIncAggOperator) Close()

func (*WindowIncAggOperator) Exec added in v2.1.0

func (o *WindowIncAggOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor input: *xsql.Tuple from preprocessor output: xsql.WindowTuplesSet

func (WindowIncAggOperator) GetInput added in v2.1.0

func (o WindowIncAggOperator) GetInput() (chan any, string)

func (WindowIncAggOperator) GetInputCount added in v2.1.0

func (o WindowIncAggOperator) GetInputCount() int

func (WindowIncAggOperator) SetBarrierHandler added in v2.1.0

func (o WindowIncAggOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)

type WindowOperator

type WindowOperator struct {
	// contains filtered or unexported fields
}

func NewWindowOp

func NewWindowOp(name string, w WindowConfig, options *def.RuleOption) (*WindowOperator, error)

func (WindowOperator) AddInputCount

func (o WindowOperator) AddInputCount()

func (*WindowOperator) Close

func (o *WindowOperator) Close()

func (*WindowOperator) Exec

func (o *WindowOperator) Exec(ctx api.StreamContext, errCh chan<- error)

Exec is the entry point for the executor input: xsql.EventRow from preprocessor output: xsql.WindowTuplesSet

func (WindowOperator) GetInput

func (o WindowOperator) GetInput() (chan any, string)

func (WindowOperator) GetInputCount

func (o WindowOperator) GetInputCount() int

func (WindowOperator) SetBarrierHandler

func (o WindowOperator) SetBarrierHandler(bh checkpoint.BarrierHandler)

type WindowScanner added in v2.3.0

type WindowScanner struct {
	Tuples []*xsql.Tuple
}

type WindowV2Exec added in v2.3.0

type WindowV2Exec interface {
	// contains filtered or unexported methods
}

type WindowV2Operator added in v2.3.0

type WindowV2Operator struct {
	// contains filtered or unexported fields
}

func NewWindowV2Op added in v2.3.0

func NewWindowV2Op(name string, w WindowConfig, options *def.RuleOption) (*WindowV2Operator, error)

func (WindowV2Operator) AddInputCount added in v2.3.0

func (o WindowV2Operator) AddInputCount()

func (*WindowV2Operator) Close added in v2.3.0

func (o *WindowV2Operator) Close()

func (*WindowV2Operator) Exec added in v2.3.0

func (o *WindowV2Operator) Exec(ctx api.StreamContext, errCh chan<- error)

func (WindowV2Operator) GetInput added in v2.3.0

func (o WindowV2Operator) GetInput() (chan any, string)

func (WindowV2Operator) GetInputCount added in v2.3.0

func (o WindowV2Operator) GetInputCount() int

func (WindowV2Operator) SetBarrierHandler added in v2.3.0

func (o WindowV2Operator) SetBarrierHandler(bh checkpoint.BarrierHandler)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL